BigData:
- OLTP (online Transactional processing)
- OLAP(online analytical processing)
- OLTP (OnLine Transaction Processing) : is used for managing current day to day data information.
- ACID
- Atomicity(entire transaction happens at once or nothing happens)
- Consistency
- Isolation
- Durability (persisted)
- ACID
- OLAP (OnLine Analytical Processing): is used to maintain the past history of data and mainly used for data analysis, it can also be referred to as warehouse.
- large amounts of Data
- structured, semi-structured, and unstructured data.
- original/ raw format.
- Any Formats - JSON, BSON, CSV, TSV, Avro, ORC, and Parquet etc.,
- cost-effective way
- No Transformation
-
Timestamp-based Approach: Source shd have timestamp column a. Current TimeStamp
- If current ts = 24-07-2024-01:01:01:0001
- Next Run Select * from T where t > 24-07-2024-01:01:01:0001
b. Source Time Stamp Based
- Save the max timestamp from source column after each run .
- in subsequent runs
Select * from T where ts > max(saved_value)
-
Hash-based Approach:
hashbytes('sha256',concat_ws('',src.c1,src.c2)) as h1,
hashbytes('sha256',concat_ws('',dest.c1,dest.c2)) as h2
from src,dest where h1!=h2
-
Primary Key-based Approach: Comparing the primary keys Src and Target
-
Change Tracking Technology : MS SQL and SQL server
-
Change Data Capture (CDC) - Azure Synapse and DataFactory only:
- One Checkbox to handle delta
- Native to Azure
- Azure to Blob, Azure Gen2 , Cosmos , SQL DB , SAP , PostGres
- Reverting to Delta is Difficult
ALTER DATABASE DB
SET CHANGE_TRACKING = ON
- structured information from various srcs.
- Data is transformed
- has schema
- Use RDBMS
- Used for reporting
- Facts and Dim
- Data is Normalised
- similar to a data warehouse.
- Cateres for specific business unit Eg: Finance Dept
Used to reduce redundancy from the database table.
Each attribute holds a single value.
Name,Age,Dept <- BEFORE
Melvin,32,(Marketing, Sales)
Name,Age,Dept <- AFTER
Melvin,32,Marketing
Melvin,32,Sales
-
After 1NF
-
No Partial Dependencies.(All cols not depending on Natural Key)
Subject |Teacher ID|Teacher Age <- BEFORE Teacher ID|Teacher Age <- AFTER Subject |Teacher ID
3NF (non key cols depending on other non key cols)
-
After 2NF
-
A -> B and B -> C , A-> C
-
Primary Key: EmpID
-
Non-key attributes: Name, State, Country, ZIP
EmpID| Name| State| Country| ZIP
-
ZIP -- >ID , State and Country --> ZIP
ID,NAME,ZIP
ZIP,STATE,Country
- Eg: a seller, a customer, a date, a price , Product
- Contains Primary keys , Surrogate Keys
- Dim = Person Details , Fact = Transaction done by the Person
- Eg: a sale, a transaction, an access
- contains foreign keys from Dimension
- Lowest level of measurement in the Fact table.
- Fact table uses Foreign Key from Dim Tables
- Schema looks like a star , which 1 fact table connected to multiple dimension table
- Is an extension of star Scheme
- multidimensional model
- dimension tables is further sub-dimensioned
- Role Play Dim : Eg- "Date" dim can be used for "Date of Sale", "Date of Delivery","DOB"
- Junk dimension : Table with unrelated attributes to avoid large number of foreign keys in the fact table.
- Degenerate dimensions: Dimension attributes stored as part of fact table and not in a separate dimension table. Eg: "transaction number"
- Stacked dimension : two or more dimensions are combined
- Slowly Changing Dimension : dimension that would undergo changes over time
- Slowly Growing Dimension : growth of records/elements in the dimension.
- Conformed Dimension (Source of Truth ): is shared across multiple data mart or subject area
- Reference Dimension: Used to joined indirectly to the fact table through a key in another dimension table.
- Static Dimension :It not extracted from the original data source, but are created within the context of the data warehouse.
Type 0 : No changes are entertained
INSERT INTO dim
SELECT * FROM src
WHERE NOT EXISTS (SELECT * FROM dim WHERE dim.id = src.id)
Type 1 : Direct Update Old Data (Upsert Original Data)
SK,ID,Name,Cellphone. <- Before
100,1,ABC,1234567890
SK,ID,Name,Cellphone <- After
100,1,ABC,9876543210
UPDATE dim
SET addr = src.addr FROM src WHERE dim.addr = src.addr;
Type 2 : Flag Old Data as 0 , Insert new Data with 1(Flag)
SK,ID,Name,Cellphone,FROM_DT,TO_DT,flag
100,1,ABC,1234567890,2016-06-01,2016-06-10,0
100,1,ABC,9876543210,2016-06-10,NULL,1
INSERT INTO dim
(SELECT * FROM src
JOIN dim
ON src.id = dim.id AND src.addr <> dim.addr)
Type 3 : Keeps history by adding new Column
SK,ID,Name,old_number,new_number
100,1,ABC,1234567890,9876543210
Type 4 : Uses separate history table
SK,ID,Name,Cellphone
100,1,ABC,9876543210History Table
SK,ID,Name,Cellphone,CRT_DT
100,1,ABC,1234567890, 2016-06-01
100,1,ABC,9876543210, 2016-06-10
Type 6 :Combination of type 1, 2 and 3
- New Row (Like type 2)
- Each Active rows can be indicated with a boolean flag / start and end date.
- using additional attributes or columns within the same record instead of creating new records
Eg: a new table record is added as in SCD type 2. Overwrite the old information with the new data as in type 1. And preserve history in a historical_record as in type 3.
**ID,Name,new_number,old_number,EFF_FR_DT,EFF_TO_DT,flag
**1,ABC,9876543210,2345678901, 2015-01-01,2016-01-20,0
1,ABC,9876543210,1234567890, 2016-01-21,Null,1
Natural Key
- Appears Naturally Eg: Registration Number ,Country Code
- 1 or more columns needs to combined to make it unique
- May or may not have a starting value
- Can be Null
Surrogate Key
- Surrogate Key is artificially generated and uniquely identifies each row in the dimension table
- Commonly 1 Column and starts with Integer "1"
- unique
- local only to DataWarehouse , DataLake
- Created using Natural Key
- Cannot be Null
- Skey
df\
.withColumn("sha2",sha2(concat("nat_key1","_","nat_key2"),256))\
.distinct()\
.orderBy(col("sha2").asc())\
.withColumn("skey",monotonically_increasing_id)
- Use "ReIDL" logic, to maintain Skey and NaturalKey relation even during full load . Primary Key
- 1 Column
- May or may not be an Integer Column
- Generated in RDBMS Database
- Not Null
Foreign Key
- Filed that references PRIMARY KEY / SURROGATE KEY
- Used in 2NF Normalization Logic
- withColumn ('pk' ,monotonically_increasing_id())
- create table T (id int Identity (1,1) )
- Add 2 extra Row :
- -9 as 'NULL' : when joined with fact (after replacing NUll in fact with 'NULL') ,all null values will have -9
- -1 as NULL (Orphan Record) : When joined with fact (NULL) , orphan record will have -1
Note : -1 = Orphan Records , -9 for Null
- Replace Null with 'NULL' for joining column
- Left outer Join with (Select Skey and NaturalKey from DIm)
- Replace null with -1 [U can also do anti join]
In data warehouse ETL dimensions processed and loaded before Facts
Eg : In ECommerce Website Customers can place an order as a guest.So the Fact record gets created without Foreign Key from Customer Table.
Never Process Fact : never process fact record until dimension record is available.
id,Name
1,Sam
2,Jack
CustomerID,Item
1,Candy
3,Bread <- X
Park and Retry : insert the unmatched transactional record into a landing / another Fact table and retry in next batch process.
id,Name
1,Sam
2,Jack
CustomerID,Item
1,Candy
CustomerID,Item <-Landing Table
3,Bread
Inferred Flag
srcDf = select * from src where ts > delta_ts
dimDF = spark.read.option("path","dimDFPath").load()
srcDF.selectExpr("id as sID").distinct()\
.join(dimDF,dimDF.id=srcDF.sID,"left")\
.filter("id is null")
.withColumn("inferredFlag" , lit(True))
.write
.format("parquet")
.option("path","dimDFPath")
.save()
srcDF.join(
dimDF.select("id","sKey")
,srcDF.id=dimDF.id
,"left").drop("id").save("factPath")
- When actual dimension record available, columns marked as N/A will be updated with new data received and InferredFlag will be updated as False
ETL with 3 Zones : Raw(Data Lake) , Enriched (Delta Lake )and Curated (Data Warehouse)
-
Raw zone
- Data Lake
- Blob Storage Eg: Azure Gen2 , IBM cloud Object Storage
- Data is moved from source
- Every Load in new timestamp folder
- InDependent ( does not depend on last load type / Data)
- Immutable
- Delta Load / Full Load
- Duplicates can Exists ( Eg: If Source is Type 1 and records are deleted . Parquet files does not allow deletion / updation of existing records )
-
Enriched Zone
- Delta Lake Type (allows delete , update)
- Blob Storage Eg: Azure Gen2 , IBM cloud Object Storage
- Data moved from Raw Zone to Enriched Zone
- Duplicates removed using Upsert Operation
- Transformations are done here
- Fact and Dim
-
Curated Zone
- Data Moved from Enriched to Here
- DataWarehouse
- SQL Database
- Fact and Dim
Step 1. Get all records from Source
maxDestTS= destDF.select(max("UPDATE_TS").alias("max")).collect()["maxDestTS"]
newDF=srcDF.select(f"UPDATE_TS>{maxDestTS}")
Step 2. When Source = SCD type 2 or Soft Delete ie.,(Flag = D) :
- Method 1:
latestIDDF=newDF.groupBy("id").agg(max("UPDATE_TS"))
latestDF= newDF.join(latestIDDF,["id"],"inner_join")
- Method 2:
latestDF= newDF.filter("Flag <> 'D'").withColumn("r",max("UPDATE_TS").over(Window.partitionBy("id"))\
.filter("r == UPDATE_TS")
Step 3. Upsert (Update old records , insert New Records) to Delta Lake
deltaDF=DeltaTable.forPath(spark,"/path")
deltaDF.alias("target")\
.merge(latestDF.alias("src"), "src.id="target.id" ) \
.whenMatchedInsertAll()\
.whenNotMatchedUpdateAll()\
.execute()
Step 4. EZ to DB
df.write.format("jdbc").mode("overwrite").options(url="","usn"= ..).save()
-
Fetch all records from EZ > max(timestamp) from CZ
-
Delete all the above records in CZ ,if same naturalKeys exists in Fetched Data
maxTS= Select max(TS) from Target delTarget = srcDF.filter("srcTS > {maxTS}").withColumn("key",concat_ws(",","k1","k2)).select("key").collect() DELETE FROM target WHERE CONCAT(c1, c2) IN (delTarget);
-
ReInsert fetched Records
- Concat and create hash for NaturalKeys while saving for fast deletion. Eg:```hash(concat_ws_(',',"k1","k2"))
No comments:
Post a Comment