Tuesday, September 5, 2023

PySpark Cache

# %%
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("performance").getOrCreate()
# %%
import string,random
# %%
rdd_student=spark.sparkContext.parallelize(zip(range(1,27),string.ascii_uppercase))
# %%
studentDF= spark.createDataFrame(rdd_student,["id","name"])
# %% [markdown]
# Without any Performance Enhancement
# %%
rddMaths=spark.sparkContext.parallelize(zip(range(1,27),[random.randint(1,100) for i in range(1,27)]))
mathsDF = spark.createDataFrame(rddMaths,["id","marks"])
studentDF.join(mathsDF,["id"],"leftouter").show()
# %% [markdown]
# With Checkpoint
# %%
rddScience= spark.sparkContext.parallelize(zip(range(1,27),[random.randint(1,100) for i in range(1,27)]))
scienceDF = spark.createDataFrame(rddScience,["id","marks"])
spark.sparkContext.setCheckpointDir("/tmp/spark-temp")
scienceDF=scienceDF.checkpoint()
studentDF.join(scienceDF,["id"],"leftouter").show()
# %% [markdown]
# With Cache
# %%
rddOther= spark.sparkContext.parallelize(zip(range(1,27),[random.randint(1,100) for i in range(1,27)]))
otherDF = spark.createDataFrame(rddOther,["id","marks"]).cache()
studentDF.join(otherDF,["id"],"leftouter").show()
view raw pysparkCache.py hosted with ❤ by GitHub

No comments:

Post a Comment