Thursday, September 21, 2023

Pyspark : Compression

# %% [markdown]
# Test Sample Details
#
# Format : JSON
# Count = 97
# File Size = 2.8 MB
#
# %% [markdown]
# Compression Codec | Time to Compress | Time to Decompress | Benefits | Drawbacks | Compression Size | Common Use Cases
# ----------------- | ----------------- | ------------------- | ----------------------------------- | ----------------------------------- | ---------------------- | -----------------
# None | Very Fast | Very Fast | No CPU overhead | No compression | No compression | No compression required
# LZ4 | Fast | Fast | Low compression overhead | Not the highest compression ratio | Smaller than original data | Real-time , in-memory storage
# Gzip | Slower | Slower | High compression ratio | High CPU overhead | Smaller than original data | Archiving, storage, long-term backups
# Snappy | Fast | Fast | Low compression overhead | Not as compact as Gzip or Zstd | Smaller than original data | Real-time , columnar storage
# Zstd | Balanced | Balanced | High compression ratio | Higher CPU usage compared to LZ4/Snappy | Smaller than Gzip | Batch processing, data storage
# -----------------------------------------------------------------
#
#
#
# Result
#
# parition = Default
# None(.parquet) = 6.7MB * 9 Files
# lz4(.lz4.parquet) = 3.6 MB * 9 Files
# gzip(.gzip.parquet) = 2.8 MB * 9 files (Highest)
# snappy(.snappy.parquet) = 3.6 MB * 9 FIles
# zstd(.zstd.parquet) = 2.8 MB * 9 Files
#
#
# Final Verdict :
#
# Performance / Speed Ranking :
# 1. None
# 2. LZ4
# 3. Snappy
# 4. Zstd
# 5. Gzip
#
# Compression Ranking :
# 1. Gzip
# 2. Zstd
# 3. Snappy
# 4. LZ4
# 5. None
# %%
from pyspark.sql import SparkSession
import os , pathlib , shutil
# %%
spark=SparkSession.builder.appName("test").getOrCreate()
# %%
path="/Users/deepakjayaprakash/Downloads/test"
df =spark.read.option("header","true").json(path+"/input")
# %%
NoCompression=path+"/NoCompression"
if os.path.exists(NoCompression): shutil.rmtree(NoCompression)
# Default Parquet
df.write.options(header="True",compression="none").save(NoCompression)
# %%
lz4=path+"/lz4"
if os.path.exists(lz4): shutil.rmtree(lz4)
df.write.options(header="True",compression="lz4").save(lz4)
# %%
gzip=path+"/gzip"
if os.path.exists(gzip): shutil.rmtree(gzip)
df.write.options(header="True",compression="gzip").save(gzip)
# %%
snappy=path+"/snappy"
if os.path.exists(snappy): shutil.rmtree(snappy)
df.write.options(header="True",compression="snappy").save(snappy)
# %%
zstd=path+"/zstd"
if os.path.exists(zstd): shutil.rmtree(zstd)
df.write.options(header="True",compression="zstd").save(zstd)
# %% [markdown]
# Decompression
# %%
spark.read.options(header="true").parquet(NoCompression).count()
# %%
spark.read.options(header="true").parquet(lz4).count()
# %%
spark.read.options(header="true").parquet(gzip).count()
# %%
spark.read.options(header="true").parquet(snappy).count()
# %%
spark.read.options(header="true").parquet(zstd).count()

No comments:

Post a Comment