Wednesday, January 18, 2023

ETL : Delta Lake

Delta Lake 


Delta Lake

quick link

Issues in Spark :

  • Cannot update /change date
  • No schema enforcement
  • No delta load
  • Data can be messed in overwrite

Adv of Delta Lake

  • delete/ Update records ( Upsert = Update and Insert)
  • Schema Enforcement
  • Time Travel Capabilities

Pre-Req:

  • spark = 3.3.2

  • pip3 install pyspark==3.3.2

      import findspark
      findspark.init('/Downloads/spark-3.3.2-bin-hadoop3/')
      
      pyspark --packages io.delta:delta-core_2.12:2.2.0 \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
    

    or

      from pyspark.sql import SparkSession
    
      spark = SparkSession.builder.appName("test)\
      .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0")\
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
      .getOrCreate()
    
      spark.range(1,10).write.format('delta').save('test_table')
    

Schema Enforcement

from pyspark.sql.functions import col
path="/tmp/deltatest"
spark.range(1,5).coalesce(1).write.format("delta").mode("overwrite").save(path)
new_data = spark.range(5,10).withColumn("id",col("id").cast("String"))

# Schema change caught error
new_data.coalesce(1).write.format("delta").mode("append").save(path)

Deletion and Update ( Deletes and Update Directly from Source )

from delta.tables import *
delta_df = DeltaTable.forPath(spark, path)
delta_df.delete("id=2")#auto refresh 
delta_df.update(condition = "id = 5", set = { "id": "500" })
delta_df.toDF().show()  

Merge (source is any , target is delta)

deltaTable = DeltaTable.forPath(spark, path)
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
  .merge(newData.alias("newData"),"oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()	  

Delta- WaterMark Logic

deltaDF=readDelta(sourceSubFolder,"latest")
maxTS=deltaDF.selectExpr(f"max({deltaColumn})").first()["max(UPDATE_TS)"]
resultDF= srcDF2.filter(f"{deltaColumn} > '{maxTS}'")
if resultDF.count()==0:exit(1)
resultDF.write.option("mergeSchema", "true").format("delta").mode("append").save(targetPath)	

Upsert

df = spark.range(1,5).withColumn("sha",sha2(concat_ws("_",*["id"]),256))
if( fullLoad == True):
	df.coalesce(1).write.format("delta").mode("overwrite").save(path)
else:
	deltaTable = DeltaTable.forPath(spark, path)
	deltaTable.alias("oldData") \
	.merge(df.alias("newData"),"oldData.sha = newData.sha") \
	.whenMatchedUpdateAll()\
	.whenNotMatchedInsertAll()\
	.execute()	

Time Travel

spark.read.format("delta").option("versionAsOf",1).load("/mnt/data/delta")
spark.read.format("delta").option("timestampAsOf", "2019-09-22 00:57:00").load("/tmp/stars")

Utils

delta_df.history(1).show() #last operation	
delta_df.vacuum()     # recursively delete files and directories more than 7 days old
delta_df.vacuum(100)  # vacuum files not required by versions more than 100 hours old
view raw deltalake.md hosted with ❤ by GitHub