Thursday, May 26, 2022

ETL


ETL

BigData:

  • OLTP (online Transactional processing)
  • OLAP(online analytical processing)

Data processing systems

  • OLTP (OnLine Transaction Processing) : is used for managing current day to day data information.
    • ACID
      • Atomicity(entire transaction happens at once or nothing happens)
      • Consistency
      • Isolation
      • Durability (persisted)
  • OLAP (OnLine Analytical Processing): is used to maintain the past history of data and mainly used for data analysis, it can also be referred to as warehouse.

DataLake

  • large amounts of Data
  • structured, semi-structured, and unstructured data.
  • original/ raw format.
  • Any Formats - JSON, BSON, CSV, TSV, Avro, ORC, and Parquet etc.,
  • cost-effective way
  • No Transformation

Delta / WaterMarking ( water level stick in a dam or a river,)

  1. Timestamp-based Approach: Source shd have timestamp column a. Current TimeStamp

    • If current ts = 24-07-2024-01:01:01:0001
    • Next Run Select * from T where t > 24-07-2024-01:01:01:0001

    b. Source Time Stamp Based

    • Save the max timestamp from source column after each run .
    • in subsequent runs Select * from T where ts > max(saved_value)
  2. Hash-based Approach:

 hashbytes('sha256',concat_ws('',src.c1,src.c2)) as h1,
 hashbytes('sha256',concat_ws('',dest.c1,dest.c2)) as h2
from src,dest where h1!=h2
  1. Primary Key-based Approach: Comparing the primary keys Src and Target

  2. Change Tracking Technology : MS SQL and SQL server

  3. Change Data Capture (CDC) - Azure Synapse and DataFactory only:

    • One Checkbox to handle delta
    • Native to Azure
    • Azure to Blob, Azure Gen2 , Cosmos , SQL DB , SAP , PostGres
    • Reverting to Delta is Difficult

    ALTER DATABASE DB
    SET CHANGE_TRACKING = ON


Datawarehouse (Summary, Types, NF, Facts and Dim,Star vs SnowFlake, Key Types, LAD, Handle LAD)

  • structured information from various srcs.
  • Data is transformed
  • has schema
  • Use RDBMS
  • Used for reporting
  • Facts and Dim
  • Data is Normalised

Datamart

  • similar to a data warehouse.
  • Cateres for specific business unit Eg: Finance Dept

Database Normalization

Used to reduce redundancy from the database table.

1NF(atomicity):

Each attribute holds a single value.

Name,Age,Dept <- BEFORE  
Melvin,32,(Marketing, Sales)

Name,Age,Dept <- AFTER  
Melvin,32,Marketing  
Melvin,32,Sales  

2NF (2 natural keys ):

  • After 1NF

  • No Partial Dependencies.(All cols not depending on Natural Key)

      Subject |Teacher ID|Teacher Age  <- BEFORE
    
      Teacher ID|Teacher Age  <- AFTER
      Subject |Teacher ID
    

3NF (non key cols depending on other non key cols)

  • After 2NF

  • A -> B and B -> C , A-> C

  • Primary Key: EmpID

  • Non-key attributes: Name, State, Country, ZIP

    EmpID| Name| State| Country| ZIP
    
  • ZIP -- >ID , State and Country --> ZIP

    ID,NAME,ZIP
    ZIP,STATE,Country

Dimensions (Object)

  • Eg: a seller, a customer, a date, a price , Product
  • Contains Primary keys , Surrogate Keys
  • Dim = Person Details , Fact = Transaction done by the Person

Facts (Action)

  • Eg: a sale, a transaction, an access
  • contains foreign keys from Dimension

Grain:

  • Lowest level of measurement in the Fact table.

Dimension Modeling

Star Scheme :

  • Fact table uses Foreign Key from Dim Tables
  • Schema looks like a star , which 1 fact table connected to multiple dimension table

Snow Flake Scheme

  • Is an extension of star Scheme
  • multidimensional model
  • dimension tables is further sub-dimensioned

6 types of Dimensions :

  • Role Play Dim : Eg- "Date" dim can be used for "Date of Sale", "Date of Delivery","DOB"
  • Junk dimension : Table with unrelated attributes to avoid large number of foreign keys in the fact table.
  • Degenerate dimensions: Dimension attributes stored as part of fact table and not in a separate dimension table. Eg: "transaction number"
  • Stacked dimension : two or more dimensions are combined
  • Slowly Changing Dimension : dimension that would undergo changes over time
  • Slowly Growing Dimension : growth of records/elements in the dimension.
  • Conformed Dimension (Source of Truth ): is shared across multiple data mart or subject area
  • Reference Dimension: Used to joined indirectly to the fact table through a key in another dimension table.
  • Static Dimension :It not extracted from the original data source, but are created within the context of the data warehouse.

Types of Slowly Changing Dimension

Type 0 : No changes are entertained

INSERT INTO dim
SELECT * FROM src
WHERE NOT EXISTS (SELECT * FROM dim WHERE dim.id = src.id)

Type 1 : Direct Update Old Data (Upsert Original Data)

SK,ID,Name,Cellphone. <- Before
100,1,ABC,1234567890

SK,ID,Name,Cellphone <- After
100,1,ABC,9876543210

UPDATE dim
SET addr = src.addr FROM src WHERE dim.addr = src.addr;

Type 2 : Flag Old Data as 0 , Insert new Data with 1(Flag)

SK,ID,Name,Cellphone,FROM_DT,TO_DT,flag
100,1,ABC,1234567890,2016-06-01,2016-06-10,0
100,1,ABC,9876543210,2016-06-10,NULL,1

INSERT INTO dim
(SELECT * FROM src
JOIN dim
ON src.id = dim.id AND src.addr <> dim.addr)
Type 3 : Keeps history by adding new Column

SK,ID,Name,old_number,new_number
100,1,ABC,1234567890,9876543210

Type 4 : Uses separate history table

SK,ID,Name,Cellphone
100,1,ABC,9876543210

History Table
SK,ID,Name,Cellphone,CRT_DT
100,1,ABC,1234567890, 2016-06-01
100,1,ABC,9876543210, 2016-06-10

Type 6 :Combination of type 1, 2 and 3

  • New Row (Like type 2)
  • Each Active rows can be indicated with a boolean flag / start and end date.
  • using additional attributes or columns within the same record instead of creating new records

Eg: a new table record is added as in SCD type 2. Overwrite the old information with the new data as in type 1. And preserve history in a historical_record as in type 3.

**ID,Name,new_number,old_number,EFF_FR_DT,EFF_TO_DT,flag
**1,ABC,9876543210,2345678901, 2015-01-01,2016-01-20,0
1,ABC,9876543210,1234567890, 2016-01-21,Null,1

Surrogate Key vs Natural Key vs Primary Key :

Natural Key

  • Appears Naturally Eg: Registration Number ,Country Code
  • 1 or more columns needs to combined to make it unique
  • May or may not have a starting value
  • Can be Null

Surrogate Key

  • Surrogate Key is artificially generated and uniquely identifies each row in the dimension table
  • Commonly 1 Column and starts with Integer "1"
  • unique
  • local only to DataWarehouse , DataLake
  • Created using Natural Key
  • Cannot be Null
  • Skey
df\
.withColumn("sha2",sha2(concat("nat_key1","_","nat_key2"),256))\
.distinct()\
.orderBy(col("sha2").asc())\
.withColumn("skey",monotonically_increasing_id)
  • Use "ReIDL" logic, to maintain Skey and NaturalKey relation even during full load . Primary Key
  • 1 Column
  • May or may not be an Integer Column
  • Generated in RDBMS Database
  • Not Null

Foreign Key

  • Filed that references PRIMARY KEY / SURROGATE KEY
  • Used in 2NF Normalization Logic

Create Skey in Dim

  • withColumn ('pk' ,monotonically_increasing_id())
  • create table T (id int Identity (1,1) )
  • Add 2 extra Row :
    • -9 as 'NULL' : when joined with fact (after replacing NUll in fact with 'NULL') ,all null values will have -9
    • -1 as NULL (Orphan Record) : When joined with fact (NULL) , orphan record will have -1

Create Fact Relation

Note : -1 = Orphan Records , -9 for Null

  • Replace Null with 'NULL' for joining column
  • Left outer Join with (Select Skey and NaturalKey from DIm)
  • Replace null with -1 [U can also do anti join]

Late Arriving Dimension / Early Arriving Fact (LAD)

In data warehouse ETL dimensions processed and loaded before Facts

Eg : In ECommerce Website Customers can place an order as a guest.So the Fact record gets created without Foreign Key from Customer Table.

Design Approaches for LAD

Never Process Fact : never process fact record until dimension record is available.

id,Name
1,Sam
2,Jack

CustomerID,Item
1,Candy
3,Bread <- X

Park and Retry : insert the unmatched transactional record into a landing / another Fact table and retry in next batch process.

id,Name
1,Sam
2,Jack

CustomerID,Item
1,Candy

CustomerID,Item <-Landing Table
3,Bread

Inferred Flag

srcDf = select * from src where ts > delta_ts
dimDF = spark.read.option("path","dimDFPath").load()

srcDF.selectExpr("id as sID").distinct()\
.join(dimDF,dimDF.id=srcDF.sID,"left")\
.filter("id is null")
.withColumn("inferredFlag" , lit(True))
.write
.format("parquet")
.option("path","dimDFPath")
.save()

srcDF.join(
        dimDF.select("id","sKey")
        ,srcDF.id=dimDF.id
        ,"left").drop("id").save("factPath")
  • When actual dimension record available, columns marked as N/A will be updated with new data received and InferredFlag will be updated as False

Design for ETL

ETL with 3 Zones : Raw(Data Lake) , Enriched (Delta Lake )and Curated (Data Warehouse)

  1. Raw zone

    • Data Lake
    • Blob Storage Eg: Azure Gen2 , IBM cloud Object Storage
    • Data is moved from source
    • Every Load in new timestamp folder
    • InDependent ( does not depend on last load type / Data)
    • Immutable
    • Delta Load / Full Load
    • Duplicates can Exists ( Eg: If Source is Type 1 and records are deleted . Parquet files does not allow deletion / updation of existing records )
  2. Enriched Zone

    • Delta Lake Type (allows delete , update)
    • Blob Storage Eg: Azure Gen2 , IBM cloud Object Storage
    • Data moved from Raw Zone to Enriched Zone
    • Duplicates removed using Upsert Operation
    • Transformations are done here
    • Fact and Dim
  3. Curated Zone

    • Data Moved from Enriched to Here
    • DataWarehouse
    • SQL Database
    • Fact and Dim

Steps :

Step 1. Get all records from Source

maxDestTS= destDF.select(max("UPDATE_TS").alias("max")).collect()["maxDestTS"]
newDF=srcDF.select(f"UPDATE_TS>{maxDestTS}")

Step 2. When Source = SCD type 2 or Soft Delete ie.,(Flag = D) :

  • Method 1:
latestIDDF=newDF.groupBy("id").agg(max("UPDATE_TS"))
latestDF= newDF.join(latestIDDF,["id"],"inner_join")
  • Method 2:
latestDF= newDF.filter("Flag <> 'D'").withColumn("r",max("UPDATE_TS").over(Window.partitionBy("id"))\
.filter("r == UPDATE_TS")

Step 3. Upsert (Update old records , insert New Records) to Delta Lake

	deltaDF=DeltaTable.forPath(spark,"/path")
	deltaDF.alias("target")\
	.merge(latestDF.alias("src"), "src.id="target.id" ) \
	.whenMatchedInsertAll()\
	.whenNotMatchedUpdateAll()\
	.execute()

Step 4. EZ to DB

Method 1 : Full Overwrite

df.write.format("jdbc").mode("overwrite").options(url="","usn"= ..).save()

Method 2 :

  1. Fetch all records from EZ > max(timestamp) from CZ

  2. Delete all the above records in CZ ,if same naturalKeys exists in Fetched Data

     maxTS= Select max(TS) from Target
     delTarget = srcDF.filter("srcTS > {maxTS}").withColumn("key",concat_ws(",","k1","k2)).select("key").collect()
     DELETE FROM target WHERE CONCAT(c1, c2) IN (delTarget);
    
  3. ReInsert fetched Records

Performance :

  • Concat and create hash for NaturalKeys while saving for fast deletion. Eg:```hash(concat_ws_(',',"k1","k2"))
view raw ETL.md hosted with ❤ by GitHub

No comments:

Post a Comment