quick link
Issues in Spark :
- Cannot update /change date
- No schema enforcement
- No delta load
- Data can be messed in overwrite
Adv of Delta Lake
- delete/ Update records ( Upsert = Update and Insert)
- Schema Enforcement
- Time Travel Capabilities
Pre-Req:
-
spark = 3.3.2
-
pip3 install pyspark==3.3.2
import findspark
findspark.init('/Downloads/spark-3.3.2-bin-hadoop3/')
pyspark --packages io.delta:delta-core_2.12:2.2.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
or
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test)\
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0")\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.getOrCreate()
spark.range(1,10).write.format('delta').save('test_table')
Schema Enforcement
from pyspark.sql.functions import col
path="/tmp/deltatest"
spark.range(1,5).coalesce(1).write.format("delta").mode("overwrite").save(path)
new_data = spark.range(5,10).withColumn("id",col("id").cast("String"))
# Schema change caught error
new_data.coalesce(1).write.format("delta").mode("append").save(path)
Deletion and Update ( Deletes and Update Directly from Source )
from delta.tables import *
delta_df = DeltaTable.forPath(spark, path)
delta_df.delete("id=2")#auto refresh
delta_df.update(condition = "id = 5", set = { "id": "500" })
delta_df.toDF().show()
Merge (source is any , target is delta)
deltaTable = DeltaTable.forPath(spark, path)
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
.merge(newData.alias("newData"),"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
Delta- WaterMark Logic
deltaDF=readDelta(sourceSubFolder,"latest")
maxTS=deltaDF.selectExpr(f"max({deltaColumn})").first()["max(UPDATE_TS)"]
resultDF= srcDF2.filter(f"{deltaColumn} > '{maxTS}'")
if resultDF.count()==0:exit(1)
resultDF.write.option("mergeSchema", "true").format("delta").mode("append").save(targetPath)
Upsert
df = spark.range(1,5).withColumn("sha",sha2(concat_ws("_",*["id"]),256))
if( fullLoad == True):
df.coalesce(1).write.format("delta").mode("overwrite").save(path)
else:
deltaTable = DeltaTable.forPath(spark, path)
deltaTable.alias("oldData") \
.merge(df.alias("newData"),"oldData.sha = newData.sha") \
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
Time Travel
spark.read.format("delta").option("versionAsOf",1).load("/mnt/data/delta")
spark.read.format("delta").option("timestampAsOf", "2019-09-22 00:57:00").load("/tmp/stars")
Utils
delta_df.history(1).show() #last operation
delta_df.vacuum() # recursively delete files and directories more than 7 days old
delta_df.vacuum(100) # vacuum files not required by versions more than 100 hours old