Thursday, May 26, 2022

ETL


ETL

BigData:

  • OLTP (online Transactional processing)
  • OLAP(online analytical processing)

Data processing systems

  • OLTP (OnLine Transaction Processing) : is used for managing current day to day data information.
    • ACID
      • Atomicity(entire transaction happens at once or nothing happens)
      • Consistency
      • Isolation
      • Durability (persisted)
  • OLAP (OnLine Analytical Processing): is used to maintain the past history of data and mainly used for data analysis, it can also be referred to as warehouse.

DataLake

  • large amounts of Data
  • structured, semi-structured, and unstructured data.
  • original/ raw format.
  • Any Formats - JSON, BSON, CSV, TSV, Avro, ORC, and Parquet etc.,
  • cost-effective way
  • No Transformation

Delta / WaterMarking ( water level stick in a dam or a river,)

  1. Timestamp-based Approach: Source shd have timestamp column a. Current TimeStamp

    • If current ts = 24-07-2024-01:01:01:0001
    • Next Run Select * from T where t > 24-07-2024-01:01:01:0001

    b. Source Time Stamp Based

    • Save the max timestamp from source column after each run .
    • in subsequent runs Select * from T where ts > max(saved_value)
  2. Hash-based Approach:

 hashbytes('sha256',concat_ws('',src.c1,src.c2)) as h1,
 hashbytes('sha256',concat_ws('',dest.c1,dest.c2)) as h2
from src,dest where h1!=h2
  1. Primary Key-based Approach: Comparing the primary keys Src and Target

  2. Change Tracking Technology : MS SQL and SQL server

  3. Change Data Capture (CDC) - Azure Synapse and DataFactory only:

    • One Checkbox to handle delta
    • Native to Azure
    • Azure to Blob, Azure Gen2 , Cosmos , SQL DB , SAP , PostGres
    • Reverting to Delta is Difficult

    ALTER DATABASE DB
    SET CHANGE_TRACKING = ON


Datawarehouse (Summary, Types, NF, Facts and Dim,Star vs SnowFlake, Key Types, LAD, Handle LAD)

  • structured information from various srcs.
  • Data is transformed
  • has schema
  • Use RDBMS
  • Used for reporting
  • Facts and Dim
  • Data is Normalised

Datamart

  • similar to a data warehouse.
  • Cateres for specific business unit Eg: Finance Dept

Database Normalization

Used to reduce redundancy from the database table.

1NF(atomicity):

Each attribute holds a single value.

Name,Age,Dept <- BEFORE  
Melvin,32,(Marketing, Sales)

Name,Age,Dept <- AFTER  
Melvin,32,Marketing  
Melvin,32,Sales  

2NF (2 natural keys ):

  • After 1NF

  • No Partial Dependencies.(All cols not depending on Natural Key)

      Subject |Teacher ID|Teacher Age  <- BEFORE
    
      Teacher ID|Teacher Age  <- AFTER
      Subject |Teacher ID
    

3NF (non key cols depending on other non key cols)

  • After 2NF

  • A -> B and B -> C , A-> C

  • Primary Key: EmpID

  • Non-key attributes: Name, State, Country, ZIP

    EmpID| Name| State| Country| ZIP
    
  • ZIP -- >ID , State and Country --> ZIP

    ID,NAME,ZIP
    ZIP,STATE,Country

Dimensions (Object)

  • Eg: a seller, a customer, a date, a price , Product
  • Contains Primary keys , Surrogate Keys
  • Dim = Person Details , Fact = Transaction done by the Person

Facts (Action)

  • Eg: a sale, a transaction, an access
  • contains foreign keys from Dimension

Grain:

  • Lowest level of measurement in the Fact table.

Dimension Modeling

Star Scheme :

  • Fact table uses Foreign Key from Dim Tables
  • Schema looks like a star , which 1 fact table connected to multiple dimension table

Snow Flake Scheme

  • Is an extension of star Scheme
  • multidimensional model
  • dimension tables is further sub-dimensioned

6 types of Dimensions :

  • Role Play Dim : Eg- "Date" dim can be used for "Date of Sale", "Date of Delivery","DOB"
  • Junk dimension : Table with unrelated attributes to avoid large number of foreign keys in the fact table.
  • Degenerate dimensions: Dimension attributes stored as part of fact table and not in a separate dimension table. Eg: "transaction number"
  • Stacked dimension : two or more dimensions are combined
  • Slowly Changing Dimension : dimension that would undergo changes over time
  • Slowly Growing Dimension : growth of records/elements in the dimension.
  • Conformed Dimension (Source of Truth ): is shared across multiple data mart or subject area
  • Reference Dimension: Used to joined indirectly to the fact table through a key in another dimension table.
  • Static Dimension :It not extracted from the original data source, but are created within the context of the data warehouse.

Types of Slowly Changing Dimension

Type 0 : No changes are entertained

INSERT INTO dim
SELECT * FROM src
WHERE NOT EXISTS (SELECT * FROM dim WHERE dim.id = src.id)

Type 1 : Direct Update Old Data (Upsert Original Data)

SK,ID,Name,Cellphone. <- Before
100,1,ABC,1234567890

SK,ID,Name,Cellphone <- After
100,1,ABC,9876543210

UPDATE dim
SET addr = src.addr FROM src WHERE dim.addr = src.addr;

Type 2 : Flag Old Data as 0 , Insert new Data with 1(Flag)

SK,ID,Name,Cellphone,FROM_DT,TO_DT,flag
100,1,ABC,1234567890,2016-06-01,2016-06-10,0
100,1,ABC,9876543210,2016-06-10,NULL,1

INSERT INTO dim
(SELECT * FROM src
JOIN dim
ON src.id = dim.id AND src.addr <> dim.addr)
Type 3 : Keeps history by adding new Column

SK,ID,Name,old_number,new_number
100,1,ABC,1234567890,9876543210

Type 4 : Uses separate history table

SK,ID,Name,Cellphone
100,1,ABC,9876543210

History Table
SK,ID,Name,Cellphone,CRT_DT
100,1,ABC,1234567890, 2016-06-01
100,1,ABC,9876543210, 2016-06-10

Type 6 :Combination of type 1, 2 and 3

  • New Row (Like type 2)
  • Each Active rows can be indicated with a boolean flag / start and end date.
  • using additional attributes or columns within the same record instead of creating new records

Eg: a new table record is added as in SCD type 2. Overwrite the old information with the new data as in type 1. And preserve history in a historical_record as in type 3.

**ID,Name,new_number,old_number,EFF_FR_DT,EFF_TO_DT,flag
**1,ABC,9876543210,2345678901, 2015-01-01,2016-01-20,0
1,ABC,9876543210,1234567890, 2016-01-21,Null,1

Surrogate Key vs Natural Key vs Primary Key :

Natural Key

  • Appears Naturally Eg: Registration Number ,Country Code
  • 1 or more columns needs to combined to make it unique
  • May or may not have a starting value
  • Can be Null

Surrogate Key

  • Surrogate Key is artificially generated and uniquely identifies each row in the dimension table
  • Commonly 1 Column and starts with Integer "1"
  • unique
  • local only to DataWarehouse , DataLake
  • Created using Natural Key
  • Cannot be Null
  • Skey
df\
.withColumn("sha2",sha2(concat("nat_key1","_","nat_key2"),256))\
.distinct()\
.orderBy(col("sha2").asc())\
.withColumn("skey",monotonically_increasing_id)
  • Use "ReIDL" logic, to maintain Skey and NaturalKey relation even during full load . Primary Key
  • 1 Column
  • May or may not be an Integer Column
  • Generated in RDBMS Database
  • Not Null

Foreign Key

  • Filed that references PRIMARY KEY / SURROGATE KEY
  • Used in 2NF Normalization Logic

Create Skey in Dim

  • withColumn ('pk' ,monotonically_increasing_id())
  • create table T (id int Identity (1,1) )
  • Add 2 extra Row :
    • -9 as 'NULL' : when joined with fact (after replacing NUll in fact with 'NULL') ,all null values will have -9
    • -1 as NULL (Orphan Record) : When joined with fact (NULL) , orphan record will have -1

Create Fact Relation

Note : -1 = Orphan Records , -9 for Null

  • Replace Null with 'NULL' for joining column
  • Left outer Join with (Select Skey and NaturalKey from DIm)
  • Replace null with -1 [U can also do anti join]

Late Arriving Dimension / Early Arriving Fact (LAD)

In data warehouse ETL dimensions processed and loaded before Facts

Eg : In ECommerce Website Customers can place an order as a guest.So the Fact record gets created without Foreign Key from Customer Table.

Design Approaches for LAD

Never Process Fact : never process fact record until dimension record is available.

id,Name
1,Sam
2,Jack

CustomerID,Item
1,Candy
3,Bread <- X

Park and Retry : insert the unmatched transactional record into a landing / another Fact table and retry in next batch process.

id,Name
1,Sam
2,Jack

CustomerID,Item
1,Candy

CustomerID,Item <-Landing Table
3,Bread

Inferred Flag

srcDf = select * from src where ts > delta_ts
dimDF = spark.read.option("path","dimDFPath").load()

srcDF.selectExpr("id as sID").distinct()\
.join(dimDF,dimDF.id=srcDF.sID,"left")\
.filter("id is null")
.withColumn("inferredFlag" , lit(True))
.write
.format("parquet")
.option("path","dimDFPath")
.save()

srcDF.join(
        dimDF.select("id","sKey")
        ,srcDF.id=dimDF.id
        ,"left").drop("id").save("factPath")
  • When actual dimension record available, columns marked as N/A will be updated with new data received and InferredFlag will be updated as False

Design for ETL

ETL with 3 Zones : Raw(Data Lake) , Enriched (Delta Lake )and Curated (Data Warehouse)

  1. Raw zone

    • Data Lake
    • Blob Storage Eg: Azure Gen2 , IBM cloud Object Storage
    • Data is moved from source
    • Every Load in new timestamp folder
    • InDependent ( does not depend on last load type / Data)
    • Immutable
    • Delta Load / Full Load
    • Duplicates can Exists ( Eg: If Source is Type 1 and records are deleted . Parquet files does not allow deletion / updation of existing records )
  2. Enriched Zone

    • Delta Lake Type (allows delete , update)
    • Blob Storage Eg: Azure Gen2 , IBM cloud Object Storage
    • Data moved from Raw Zone to Enriched Zone
    • Duplicates removed using Upsert Operation
    • Transformations are done here
    • Fact and Dim
  3. Curated Zone

    • Data Moved from Enriched to Here
    • DataWarehouse
    • SQL Database
    • Fact and Dim

Steps :

Step 1. Get all records from Source

maxDestTS= destDF.select(max("UPDATE_TS").alias("max")).collect()["maxDestTS"]
newDF=srcDF.select(f"UPDATE_TS>{maxDestTS}")

Step 2. When Source = SCD type 2 or Soft Delete ie.,(Flag = D) :

  • Method 1:
latestIDDF=newDF.groupBy("id").agg(max("UPDATE_TS"))
latestDF= newDF.join(latestIDDF,["id"],"inner_join")
  • Method 2:
latestDF= newDF.filter("Flag <> 'D'").withColumn("r",max("UPDATE_TS").over(Window.partitionBy("id"))\
.filter("r == UPDATE_TS")

Step 3. Upsert (Update old records , insert New Records) to Delta Lake

	deltaDF=DeltaTable.forPath(spark,"/path")
	deltaDF.alias("target")\
	.merge(latestDF.alias("src"), "src.id="target.id" ) \
	.whenMatchedInsertAll()\
	.whenNotMatchedUpdateAll()\
	.execute()

Step 4. EZ to DB

Method 1 : Full Overwrite

df.write.format("jdbc").mode("overwrite").options(url="","usn"= ..).save()

Method 2 :

  1. Fetch all records from EZ > max(timestamp) from CZ

  2. Delete all the above records in CZ ,if same naturalKeys exists in Fetched Data

     maxTS= Select max(TS) from Target
     delTarget = srcDF.filter("srcTS > {maxTS}").withColumn("key",concat_ws(",","k1","k2)).select("key").collect()
     DELETE FROM target WHERE CONCAT(c1, c2) IN (delTarget);
    
  3. ReInsert fetched Records

Performance :

  • Concat and create hash for NaturalKeys while saving for fast deletion. Eg:```hash(concat_ws_(',',"k1","k2"))
view raw ETL.md hosted with ❤ by GitHub

Monday, May 16, 2022

Build Jar : Add multiple class files in the Jar using Bazel

 Add multiple class files in the Jar using Bazel 


#---------------------------
#Bazel build parent.jar
#This would add Parent.scala , Child1.scala , Child2.scala in the jar file creatd
#---------------------------
load("@io_bazel_rules_scala//scala:scala.bzl", "scala_binary","scala_library")
package(default_visibility = ["//visibility:public"])
scala_library(
name = "parent",
srcs =
["Parent.scala",
"//src/main/child1:Child1.scala",
"//src/main/child2:Child2.scala",
],
deps = [
"//src/main/child1",
"//src/main/child2",
],
)
view raw BUILD hosted with ❤ by GitHub

Thursday, May 12, 2022

Azure Synapse - Working with Custom jar Scala (Bazel)

 Working with Custom jar in Azure 

There is 2 Jars :

  1. Big Fat Jar or uber Jar (Contains main class  )
  2. Build Dependency Jar/ Library to be used in Synapse 

Big Fat Jar or uber Jar:


Build Dependency Jar/ Library

  • Does not Contain main class
  • Does not contain dependencies
  • Note : Download external dependencies from Maven and upload it separately (refer bewlo link)
  • $ bazel build build_name.jar

Once u have jars Created based on ure need , use link - https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-manage-scala-packages to upload to Azure 

Friday, May 6, 2022

Azure Synapse : Passing and receiving Values to and from Notebooks

 Passing and receiving Values to and from Notebooks

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
view raw notebook.ipynb hosted with ❤ by GitHub