Tuesday, December 19, 2023

Tenacity with Asyncio - Defensive Programming

from tenacity import (retry
,stop_after_attempt
,wait_fixed
,retry_if_exception_type
,stop_after_delay)
import httpx
import asyncio
import socket
@retry(stop=(stop_after_delay(3) | stop_after_attempt(5)),wait=wait_fixed(2),retry=retry_if_exception_type(Exception))
async def stop_after_7_attempts(): #stop_after_10s_or_5_retries
print("Stopping after 7 attempts")
resp=httpx.get("https://www.youtube.com/aasd")
if (resp.status_code == 404): raise Exception("404 Not working")
print(" trying ... " )
asyncio.run(stop_after_7_attempts())
view raw tenacity.py hosted with ❤ by GitHub

Saturday, October 21, 2023

Python : Find Sum using Parallel Procesing using Common Aggregator

import concurrent.futures
import threading
a=0
def fn(i):
global a
a=a+i
print(a,threading.get_native_id())
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor(3) as ep:
ep.map(fn, range(0, 100))
print("final " + str(a))
view raw MultiThread.py hosted with ❤ by GitHub

Thursday, September 21, 2023

Pyspark : Compression

# %% [markdown]
# Test Sample Details
#
# Format : JSON
# Count = 97
# File Size = 2.8 MB
#
# %% [markdown]
# Compression Codec | Time to Compress | Time to Decompress | Benefits | Drawbacks | Compression Size | Common Use Cases
# ----------------- | ----------------- | ------------------- | ----------------------------------- | ----------------------------------- | ---------------------- | -----------------
# None | Very Fast | Very Fast | No CPU overhead | No compression | No compression | No compression required
# LZ4 | Fast | Fast | Low compression overhead | Not the highest compression ratio | Smaller than original data | Real-time , in-memory storage
# Gzip | Slower | Slower | High compression ratio | High CPU overhead | Smaller than original data | Archiving, storage, long-term backups
# Snappy | Fast | Fast | Low compression overhead | Not as compact as Gzip or Zstd | Smaller than original data | Real-time , columnar storage
# Zstd | Balanced | Balanced | High compression ratio | Higher CPU usage compared to LZ4/Snappy | Smaller than Gzip | Batch processing, data storage
# -----------------------------------------------------------------
#
#
#
# Result
#
# parition = Default
# None(.parquet) = 6.7MB * 9 Files
# lz4(.lz4.parquet) = 3.6 MB * 9 Files
# gzip(.gzip.parquet) = 2.8 MB * 9 files (Highest)
# snappy(.snappy.parquet) = 3.6 MB * 9 FIles
# zstd(.zstd.parquet) = 2.8 MB * 9 Files
#
#
# Final Verdict :
#
# Performance / Speed Ranking :
# 1. None
# 2. LZ4
# 3. Snappy
# 4. Zstd
# 5. Gzip
#
# Compression Ranking :
# 1. Gzip
# 2. Zstd
# 3. Snappy
# 4. LZ4
# 5. None
# %%
from pyspark.sql import SparkSession
import os , pathlib , shutil
# %%
spark=SparkSession.builder.appName("test").getOrCreate()
# %%
path="/Users/deepakjayaprakash/Downloads/test"
df =spark.read.option("header","true").json(path+"/input")
# %%
NoCompression=path+"/NoCompression"
if os.path.exists(NoCompression): shutil.rmtree(NoCompression)
# Default Parquet
df.write.options(header="True",compression="none").save(NoCompression)
# %%
lz4=path+"/lz4"
if os.path.exists(lz4): shutil.rmtree(lz4)
df.write.options(header="True",compression="lz4").save(lz4)
# %%
gzip=path+"/gzip"
if os.path.exists(gzip): shutil.rmtree(gzip)
df.write.options(header="True",compression="gzip").save(gzip)
# %%
snappy=path+"/snappy"
if os.path.exists(snappy): shutil.rmtree(snappy)
df.write.options(header="True",compression="snappy").save(snappy)
# %%
zstd=path+"/zstd"
if os.path.exists(zstd): shutil.rmtree(zstd)
df.write.options(header="True",compression="zstd").save(zstd)
# %% [markdown]
# Decompression
# %%
spark.read.options(header="true").parquet(NoCompression).count()
# %%
spark.read.options(header="true").parquet(lz4).count()
# %%
spark.read.options(header="true").parquet(gzip).count()
# %%
spark.read.options(header="true").parquet(snappy).count()
# %%
spark.read.options(header="true").parquet(zstd).count()

Tuesday, September 5, 2023

PySpark Cache

# %%
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("performance").getOrCreate()
# %%
import string,random
# %%
rdd_student=spark.sparkContext.parallelize(zip(range(1,27),string.ascii_uppercase))
# %%
studentDF= spark.createDataFrame(rdd_student,["id","name"])
# %% [markdown]
# Without any Performance Enhancement
# %%
rddMaths=spark.sparkContext.parallelize(zip(range(1,27),[random.randint(1,100) for i in range(1,27)]))
mathsDF = spark.createDataFrame(rddMaths,["id","marks"])
studentDF.join(mathsDF,["id"],"leftouter").show()
# %% [markdown]
# With Checkpoint
# %%
rddScience= spark.sparkContext.parallelize(zip(range(1,27),[random.randint(1,100) for i in range(1,27)]))
scienceDF = spark.createDataFrame(rddScience,["id","marks"])
spark.sparkContext.setCheckpointDir("/tmp/spark-temp")
scienceDF=scienceDF.checkpoint()
studentDF.join(scienceDF,["id"],"leftouter").show()
# %% [markdown]
# With Cache
# %%
rddOther= spark.sparkContext.parallelize(zip(range(1,27),[random.randint(1,100) for i in range(1,27)]))
otherDF = spark.createDataFrame(rddOther,["id","marks"]).cache()
studentDF.join(otherDF,["id"],"leftouter").show()
view raw pysparkCache.py hosted with ❤ by GitHub

Saturday, September 2, 2023

About Me

Bootstrap demo

TriggerBlock is a cool tech blog that's all about helping you out with tutorials, problem-solving, hacks, and quick fixes for different coding languages and techie stuff like Java, Python, Scala, Kubernetes, and more. They've got plenty of articles on testing too. Check it out!

Thursday, March 16, 2023

Configure Pyspark

 Pyspark


Pre-Req:

  • Install Python 3.9
  • Find the location of python ($which python) and Keep it handy
  • pip3 install ipython #optional
  • pip3 install pyspark
  • Download apache spark zip > Unzip to a Path

Steps:

  • Create file ~/.bash_profile

  • Add Below Contents

      export PYSPARK_PYTHON=python3
      export PYTHONPATH="Python location"
      export PYSPARK_PATH="../spark-x.x.x-bin-hadoop3/bin/pyspark"
      alias pyspark=$PYSPARK_PATH
      export PATH=$PATH:$PYSPARK_PATH
    
      #optional
      alias ipython='python3 -m IPython' 
    

    Example :

      export PYSPARK_PYTHON=python3
      export PYTHONPATH="/Users/deepakjayaprakash/Library/Python/3.9/bin/python3"
      export PYSPARK_PATH="/Users/deepakjayaprakash/Downloads/spark-3.3.2-bin-hadoop3/bin/pyspark"
      alias pyspark=$PYSPARK_PATH
      export PATH=$PATH:$PYSPARK_PATH
    
      #optional
      alias ipython='python3 -m IPython' 
    
  • Save File

  • source ~\.bash_profile

view raw pyspark.md hosted with ❤ by GitHub

Thursday, February 16, 2023

Versioning

Versioning

  1. Calender Versioning
  2. Semantic Versioning

Calender Versioning

https://calver.org/

  • ubuntu 16.04 = Ubuntu October,2016
  • Pycharm 2022.3.2

Semantic Versioning

https://semver.org/

MAJOR_VERSION.MINOR_VERSION.PATCH_VERSION Ex:5.4.2 = Major version 5, Minor version 4, Patch version 2.

Patch versions = bug fixes Minor version = new functionality Patch version = breaking changes are introduced

Notes:

  1. Initial version of development 0.1.0 (early development ).
  2. Once the public API is stable, then a release to version 1.0.0

Pre-Release /Beta

  1. If a library was in version 2.8.9
  2. If there is a plan to release beta for 3.0.0
  3. beta Version release willbe 3.0.0-beta.1
view raw versioning.md hosted with ❤ by GitHub

Wednesday, January 18, 2023

ETL : Delta Lake

Delta Lake 


Delta Lake

quick link

Issues in Spark :

  • Cannot update /change date
  • No schema enforcement
  • No delta load
  • Data can be messed in overwrite

Adv of Delta Lake

  • delete/ Update records ( Upsert = Update and Insert)
  • Schema Enforcement
  • Time Travel Capabilities

Pre-Req:

  • spark = 3.3.2

  • pip3 install pyspark==3.3.2

      import findspark
      findspark.init('/Downloads/spark-3.3.2-bin-hadoop3/')
      
      pyspark --packages io.delta:delta-core_2.12:2.2.0 \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
    

    or

      from pyspark.sql import SparkSession
    
      spark = SparkSession.builder.appName("test)\
      .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0")\
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
      .getOrCreate()
    
      spark.range(1,10).write.format('delta').save('test_table')
    

Schema Enforcement

from pyspark.sql.functions import col
path="/tmp/deltatest"
spark.range(1,5).coalesce(1).write.format("delta").mode("overwrite").save(path)
new_data = spark.range(5,10).withColumn("id",col("id").cast("String"))

# Schema change caught error
new_data.coalesce(1).write.format("delta").mode("append").save(path)

Deletion and Update ( Deletes and Update Directly from Source )

from delta.tables import *
delta_df = DeltaTable.forPath(spark, path)
delta_df.delete("id=2")#auto refresh 
delta_df.update(condition = "id = 5", set = { "id": "500" })
delta_df.toDF().show()  

Merge (source is any , target is delta)

deltaTable = DeltaTable.forPath(spark, path)
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
  .merge(newData.alias("newData"),"oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()	  

Delta- WaterMark Logic

deltaDF=readDelta(sourceSubFolder,"latest")
maxTS=deltaDF.selectExpr(f"max({deltaColumn})").first()["max(UPDATE_TS)"]
resultDF= srcDF2.filter(f"{deltaColumn} > '{maxTS}'")
if resultDF.count()==0:exit(1)
resultDF.write.option("mergeSchema", "true").format("delta").mode("append").save(targetPath)	

Upsert

df = spark.range(1,5).withColumn("sha",sha2(concat_ws("_",*["id"]),256))
if( fullLoad == True):
	df.coalesce(1).write.format("delta").mode("overwrite").save(path)
else:
	deltaTable = DeltaTable.forPath(spark, path)
	deltaTable.alias("oldData") \
	.merge(df.alias("newData"),"oldData.sha = newData.sha") \
	.whenMatchedUpdateAll()\
	.whenNotMatchedInsertAll()\
	.execute()	

Time Travel

spark.read.format("delta").option("versionAsOf",1).load("/mnt/data/delta")
spark.read.format("delta").option("timestampAsOf", "2019-09-22 00:57:00").load("/tmp/stars")

Utils

delta_df.history(1).show() #last operation	
delta_df.vacuum()     # recursively delete files and directories more than 7 days old
delta_df.vacuum(100)  # vacuum files not required by versions more than 100 hours old
view raw deltalake.md hosted with ❤ by GitHub