Azure Synapse
Note :
- For
JSON
file make sure all new lines , blank spaces are removed. - Dataflow , Notebooks etc - names has to start with String.
- blob = flat namespace , Gen2 = directories
- Azure Active Directory (Entra ID) : Used to Create 1 Functional ID (secret ) called Serive Principle across all applications to communicate with each other
- SP ( serivce Principle) : Managed by User ie., renew etc.,-
- Managed identity (service principal) : managed by Azure
- Workspace : Based on Azure Serverless (Also Called OnDemand )
- Lake Database: Spark to Query Data
- SQL Database: Serverless to query Data
- Linked
- Source is Gen2/Blob Storage
- This comes default with Synapse
- Data is not in table format but processed as tables in Runtime
- Lake Database:
- Only Default Gen2 Location which comes along with Synapse to Create Database
- Uses Templates
- You can create SQL like table , Add schema to Tables, Define Column Datatypes and Add Foreign to underlying Gen2 Storage
- Can create above schema against CSV , Parq , json etc
- Steps:
- Open NoteBook
%% sql Create Database DB; Use DB; create table T (id Int, name varchar); insert into T values(1,'A');
- Open gen2 in Azure > create a folder "Customer" and upload csv file
- Open synapse > Data > "+" > New Lake Database > Enter Name , linkedService ,above Folder path and Data Format > "+" to add table and Schema of Csv
- Deploy (publish)
- u shd be able to read with SQL / Notebooks
- SQL Database (Using SQL script )
Operations :- read : OpenrowSet
- write : CETAS , CET
create External table T(c1 int) with(Datasource= xx , format='CSV', Bulk'path') -- CET
create External table T2 (c1,c2) as select * from T ---CETAS ( Cannot alter External Table)
select * from openrowset(Datasource= xx , format='CSV', Bulk'path')
Example:
-
Open SQL script or Connect to Serverless using Dbeaver (Serverless endpoint in Azuer Portal > Synapse Overview Page)
-
run "create database db"
-
Go to SQL Script > Create new Script
-
run " use db"
CREATE EXTERNAL DATA SOURCE ParentFolder WITH (LOCATION='https://path.dfs.core.windows.net'); CREATE EXTERNAL FILE FORMAT myFormat WITH (FORMAT_TYPE=DELTA); select * from openrowset(bulk '/Country'' ,format= DELTA) as D
-
This DB should be visible along with External Source and File Formats
IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = 'Test')
BEGIN
CREATE DATABASE Test;
CREATE EXTERNAL DATA SOURCE ParentFolder WITH (TYPE = HADOOP, LOCATION='https://path.dfs.core.windows.net/temp/');
CREATE EXTERNAL FILE FORMAT PARQUET WITH (FORMAT_TYPE=PARQUET);
End
Go;
use Test;
select * from openrowset(bulk 'subFOlder/1.csv, format='PARQUET' ,DATA_SOURCE = 'ParentFolder') as rows
Note: Support JSON , CSV etc.,
CREATE EXTERNAL TABLE T
WITH (LOCATION='subFolder/',DATA_SOURCE=ParentFolder,FILE_FORMAT=myFormat)
AS
Select * from OPENROWSET (LOCATION='subFolder2/',DATA_SOURCE=ParentFolder,FILE_FORMAT=myFormat) as rows --src
GO;
DROP EXTERNAL DATA SOURCE DS;
DROP EXTERNAL FILE FORMAT myFormat;
CREATE VIEW V AS (select * from openrowset(bulk 'https://blob.parquet',format = 'parquet')
select * from v ;
1. To use in pipeline
- **Add Web Activity in Pipelin
- **Call the current Synape Workspace -GET https**://myworkspace.dev.azuresynapse.net./sqlScripts/{sqlScriptName}?api-version=2020-12-01
- **Drill down to Required SQL script
SELECT * FROM OPENROWSET('CosmosDB','Account=my-cosmos-db;Database=my-db;Key=k',[my-container]) AS T
- When u create link service and publish it . The icon should appear here.
- Only Connected Services like Containers (Gen2 / Blob) ,Files ,Queues,Tables ,Cosmos
- Relational DB
- Uses Distribution to Store Data and. Data not stored in 1 Table ie., Control Node= Compute Node 1 + Compute Node 2 .....
- Greater than 1TB
- Max 60 Nodes
- DW5000c = 5000/500 = 10 nodes
- Unique Contraints not supported (ie., duplicates need to be removed by other logic)
- Can Create Table in SQL DB
- High Cost
- Main Concepts: (DW = Node1 (Dist1 =partition1 ,p2..)+ Node2 (Dist2 =partition1 ,p2..) + .... + Node 60
- Data within each distribution is divided into partitions based on column eg: OrderDate.
- Distibution (Distributed among nodes) :
- Hash :
- Data is distibute based on hash result
- Large Tables
- Better performance for joins uses hash col
- Round Robin:
- Small Table
- Even Distribution
- Bad performance for joins as no key col
- Replicate : Clone data in all Nodes
- Hash :
- Index
- Non Clusterd Column : For Nvarchar(max) , varchar(max) ,Binary
- Clusterd ColumnStore : Recommended
- Partition : Data is distributed across many partition types
- Creating Tables
- Normal
create table t(i int) with (distribution= Hash(C), ColumnStore Index , Partition =C )
- COPY into :
COPY INTO T FROM "https"//xxx/*.parq WITH (FILE_TYPE = 'PARQUET' , MAXERRORS = 0, IDENTITY_INSERT = 'OFF' );
- CTAS :
Create table T With() As select * from T2
- Normal
- Recommeded :
- Insert : RoundRobin
- Fact : Cluster Column Store + Hash Distribution
- Dim : Replicated
NoteBooks link
df.createOrReplacetemView("T")
%%sql
select * from T
Delta Catalog Tables
df.write.format("delta").saveAsTable("MyManagedTable")
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")
spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'")
%%sql
CREATE TABLE MyExternalTable
USING DELTA
LOCATION '/mydata'
Delta
SELECT * FROM OPENROWSET( BULK 'https://mystore.dfs.core.windows.net/folder/',FORMAT = 'DELTA' ) AS T
Connection String from Linked Service
from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
accountKey = token_library.getConnectionStringAsMap("<LINKED SERVICE NAME>").get("<KEY NAME>")
print(accountKey)
Read and Write Cosmos
df= spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "nm")\
.option("spark.cosmos.container", "cont").load()
df.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
Dedicated Serverless
1. Select yes Yes
2. insert,update,delete,merge yes No
3. CTAS yes No
4. CETAS No yes
5. Cost high low
6. Strategy(hash/RoundRobin.) yes No
7. Provisioned Seperatly by Default
8. Volume of Data High low
9. Data Stored Relational DB Blog/Gen2
External Table and Views
Both are just pointers to a location in the datalake / Blob (Parquet, JSON, Blob Storage)
Exteranl Tables Vs Views
1. Define Schema Yes No
2. Indexing Yes No
3. Performance Fast Slow
token=accessToken = mssparkutils.credentials.getToken("key")
df=readTableViews("DW")
def readServerlessTableViews(token):
df = (spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", "jdbc:sqlserver://xxx-syn-01-ondemand.sql.azuresynapse.net:1433;databaseName=db;") \
.option("dbtable", "myschema.viewName") \
.option("accessToken", token) \
.load())
return df
- Linked Service : Service to Connect to different DataSources (External And Internal)
- Integrated Runtime : Compute which executes the DataFlow
- Source(Linked Service) > Operation > Sink(Linked Service)
- All parameters created in Dataset can be accessed in Dataflow at Debug Settings
- The same parameters can be used at pipeline level on the Dataflow added
- DataFlow > New DataSet > Parameters >New > test,string
- Access : dataset().test
- example: @concat('/',dataset().test,'/',substring(utcNow(),0,10 ),'.parquet') Note:
- Dataflow can create DB table on fly.
- CDC (Change Data Capture)
- To handle delta records ( all new records including newly added duplicate records)
- Available for SQL , Azure Dedicated , Gen2
- Checkpoint Key (Unique marker to identlify last succesfully processed record)
- Create a Dataflow and Enable CDC
- Add Dataflow to pipleines
- In pipleine >Select Dataflow> Settings > Checkpointkey
- Disadv: If Delta set to ON on and if user wants to perform full load inbetween . User has to make changes to the flow and publish . [ref](https://learn.microsoft.com/en-us/answers/questions/838041/how-to-reset-the-change-data-capture-information-()
-
Uses Spark Engine
-
Src > Transform > Sink
-
Any number for Sources
-
Different Activities :
Multiple Inputs: - Source: spark.read - New Branch: Clone Data into new branch - Join : Normal join (Use exists for left outer join ) - Conditional Split: Split records based on condition - Exists: Compare two tables (innner join or left outer join) - Union : Union /Append Files - LookUp: - Left Outerjoin with 2 tables - Verify Data between 2 tables Schema Modifier (x9) - DerivedColumn: WithColumn , (Create Heirarchy for complex JSON) - Select: Select (prune)/ WithColumnRenamed / Remove Columns - Aggregate: GroupBY / All functions ( max ,sum , etc.,) (User can give values to Agg and GroupBY in Same Activity to apply group level filter) - Surrogate Key: Add unique primary Key - Pivot : Create pivot table - Unpivot: Uncreate pivot table - Window: - Without order(max(col),min(col) etc., ):Eg max('col).Over(Window.Partition('groupby_col)) - With Order(rank ,denserank :Eg rank.Over(Window.Partition('groupby_col).orderBy('sortCol)) - Rank: - Used Single : Entire Dataframe is ranked ( Dense = serially with no gaps) - Row Column : Name of new Column - Row Condition : Existing Col - Asc / Desc - External Call: rest api Formatters - Flatten: Json with Array ONLY Eg : {"A" : [1,2,3]} - Parse: String > Json - Stringify: Get particular sub-json from main json eg json > Table Cols Row modifier - Filter: Filter / Where - Sort: sort - Alter Row (DB only) : Update DB, With+When , Drop , Fill , parameteried Delete (on real time only) - Assert: Assert + Derived Column ( isError ('col_name working')) Flowlet Destination(Sink) - spark.write - Cached Sink (temp loc): Access "sink_name#outputs()[RowNo].ColumnName" - Cached Sink - Convert Data to StringDatatype to avoid error - "undefined" is not valid JSON Eg : toString(Col1) - cached Lookup (Dict): Sink> Settings> Add Key Column.Access "sink_name#outputs(columnname)" , "sink1#outputs()[1].MAX_UPDATE_TS" Misc - WildCard:Normal Regex - $$: All Columns - $parameter: Access parameter created within same dataflow activity - String Interpolaton : "folder/file-{$n}.csv" #n is a parameter - ByName('city'): Any column whose name is city
- Used to copy , paste , share , resuse
- Find replace
- https://docs.microsoft.com/en-us/azure/data-factory/data-flow-script#script-snippets
Derived Columns :
-
$$
(this) : All Cols -
$0
current column name -
name
Eg: name!="abc" -
type
Eg: type != int -
stream
represents the name associated with each stream, or transformation in your flow -
position
is the ordinal position of columns in your data flow -
origin
is the transformation where a column originated or was last updated- trim any col which is a String(Pattern):
- Derived Col > Add > Col Pattern > type == 'string',
$$ -> trim($$)
- Auto mapping on - Remove special character from all columns
- Add Select
- Add Mapping > Rule based Mapping
- true() , regexReplace($$,'[^0-9a-zA-Z]','')
- or true() , regexReplace('Column','[^0-9a-zA-Z]','')
- String Column to Date
- Derived COlumn
- toDate(byName('Date'),'mm/dd/yyyy')
- Metadata of Column inside Dataflow
- Derived Col > col ,Columns()
- Flatten > unroll by col > col
- Calculate Column Pattern (apply logic for data in mutliple columns)
- Derived Column >Columns >Add >Column PAttern
type =='string', $$+'_clean', trim($$,'')
- Derived Column >Columns >Add >Column PAttern
- Max : Agg > max(col)
- trim any col which is a String(Pattern):
- Derived Col > Add > Col Pattern > type == 'string',
1. Table to Json
- Read Source (import projection)
- Derived Col > Create Structure
- Save to Json Sink
2. Handling json :
- Parse
- Flatten
- Stringyfy
- Derived Cols : Create Complex Structure
3. Stringify
- get particular sub-json from main json eg json > Col to Table
- DataFlow >REST APi Source (url="https://api.first.org/data/v1/countries" , Anonymous Request) or https://restcountries.com/v3.1/name/united
- Source > Projection > Import projection
- Add Stringyfy>Cols>newcol , Expr >Under Input Schema> Select Structure to Flatten
- Add Select Statment > filter the new_col
- Select new column
- Sink
4. Parse (JSON in single Column in a table) :
val j = List(("IN",1),("US",2)).toDF("country","medals")
.withColumn("j", to_json(struct('country,'medals)))
+-------+------+--------------------+
|country|medals| j|
+-------+------+--------------------+
| IN| 1|{"country":"IN","...|
- Make sure the Source > projection > Import Schema
- Add Parse
- Document Form = Single Document
- Columns > new col , j , (country as string, medals as integer)
5. Flatten :
- Flattens Json with Array ONLY
- If json has array {"A" : [1,2,3]}
- Linked Service : Service to Connect to different DataSources (External And Internal)
- Activity : Copy , Dataflow , Notebooks ,SQL etc.,
- Private Endpoint : Endpoint with in Private Virtual network
- Integrated Runtime : Compute which executes the DataFlow
- Azure IR : in cloud-to-cloud scenarios.
- Self hosted IR : to connect to on-premises data sources or services.
- SQL Server IR : to connect to SQL Server databases in a private network.
- Can run activities in parallel
- Throttling under Integration Runtime 'AutoResolveIntegrationRuntime' : There is a limit of simultaneous pipelines in an integration runtime. You need to split the pipeline to run into multiple runtimes.
- Triggers : used to start the pipeline
- Manual
- Tumbling Window :
- Run Continously after finishing
- Delays and concurrency can be set
- Auto Retry in case of Fail
- Event : storage Events etc.,
- Scheduled Triggers Note:
- Pipeline and Synapse Live uses functions from ARM templates .
- Hence sync master branch with ARM template update ARM template
1. create a variable
2. Set the variable in the pipeline
3. Check the contents of variable after the run
1. Use wait statements
2. Set Variable
- generate token 1st time
- Parallel Connection to
- until loop(flag is true)
- Wait
- Keep generating new token
- Copy Activity(uses above token)
- fail : Set flag = true
- Pass : Set flag = true
- until loop(flag is true)
- Web Activity > Url :exampleWorkspace.dev.azuresynapse.net/sqlScripts/exampleSqlScript?api-version=2020-12-01, Method: GET , Auth: Managed Identity , Resrc : https://devsynapse .net
-
Data Sync bw Src (SQL server , Azure SQL database )and Target - Dataverse(Datalakes) , Azure SQL Dedicated and Cosmos targets
-
Src > Synapse > Target
-
Cosmos DB (OLTP):
- No SQL
- Type of Cosmos DB can be selected depending on Source (SQL ,Mongo etc., )
- Fast Read and Write
- Auto Scales
- Uses API
- Expensive
- To Enable HTAP (On Demand - video8):
- Cosmose Homepage : To turn on Analytical Contianer in Cosmos
- In Synapse: Homepage > Integration > Enable Synapse link
- Synapse read cosmos: Select * from openrowset() with()as T CROSS APPLY OPENJSON(C) with()
-
SQL server
- Use managed Identnity
- Azure Synapse Link enabled
- Add ips to Firewall
- Create linked Service + Linked CONNECTION in Synapse(Piplnes > "+")
- Consfiure Blob Storage for intermiitend result
- Publish branch : ARM templates are stored and updated
- Collaboration Branch = Master Branch.
- Feature Branch = Individual Branch created by users.
- Root Folder = root folder in your Azure Repos collaboration branch.
- Pull Request =Request merge from Feature Branch > Master Branch.
- commit all = Save All Changes to Git.
- Publish Should only be allowed from master Branch.
- Code is merged from Master Branch to Synapse Services. (Git Actions is configured to run pull)
- Publich > ARM template gets created.
- Once master gets published , the code is available in Synapse-live and workspace_publish branch get created.
- ARM Template = Azure Resource Management Template. Note : ARM templates from workspace_publish branch is not used.
Whenever we click on button publish (master branch) . Synapse creates 2 jsons
- TemplateForWorkspace :
- All work (pipelines, Notebooks, DataFlows etc., ) in 1 single file
- This file does not contain parameters
- It contains logics only
- TemplateParametersForWorkspace : Contains only Parameters
- Move TemplateForWorkspace to higher Environment as is
- Replace "TemplateParametersForWorkspace" with values as per the higher Env
-
When Code is merged from Master Branch to Synapse Services.
-
This will trigger function in GitActions ( Azure/Synapse-workspace-deployment@V1.7.0 ) - which is configured in CICD
uses: Azure/Synapse-workspace-deployment@V1.7.0 with: TargetWorkspaceName: ${{ env.SN_WORKSPACE }} # Dev [Live] ArtifactsFolder: ${{ github.workspace }} environment: Azure Public clientId: ${{ env.CLIENT_ID }} clientSecret: ${{ env.CLIENT_SECRET }} tenantId: ${{ env.TENANT_ID }} resourceGroup: ${{ env.RESOURCEGROUP }} subscriptionId: ${{ env.SUBSCRIPTIONID }} DeleteArtifactsNotInTemplate: true operation: validateDeploy
-
Two ARM template (Azure Resource Management Template) gets created.
- TemplateParametersForWorkspace.json
- TemplateForWorkspace.json
- Note: ARM templates from workspace_publish branch is not used.
-
Users Need to add 2 custom files at the root:
-
{{ synapse_workspace }}-parameteres.yaml : Value of Each Property Eg: linkedservice
- Eg :myKeyVault_properties_typeProperties_baseUrl: https://myAzure.vault.azure.net/
-
template-parameter-definition.json :
-
Synapse will read this file and use its configuration to generate which properties get parameterized.
-
If no file is found, the default template is used.
-
file consists of [trigger, pipeline, linked service, dataset, integration runtime, and data flow]
-
"=" is current value ,"-" is don't keep the default .
-
Syntax :
<action>:<name>:<stype>
"Microsoft.Synapse/workspaces/linkedServices": { "*": { "properties": { "typeProperties": { "baseUrl": "=" } } } }
If u have linked service , "parameteres.yaml" will replace the values with values in this yml and in the structure as "template-parameter-definition.json"
-
-
Above Function will Deploy Synapse artifacts using templates to given env
-
You can get this file from the link.
To Change artifacts of properties , A custom parameter template can be used {{ workspace }}-parameteres.yaml. To override the default parameter template, a custom parameter template named template-parameters-definition.json should be placed in the root folder of the Git branch.
- Collaboration Branch = Master Branch.
- Feature Branch = Individual Branch created by users.
- Root Folder = root folder in your Azure Repos collaboration branch.
- Pull Request =Request merge from Feature Branch > Master Branch.
- commit all = Save All Changes to Git.
- Publish Should only be allowed from master Branch.
- Code is merged from Master Branch to Synapse Services. (Git Actions is configured to run pull)
- ARM template gets created.
- Once master gets published , the code is available in Synapse-live and workspace_publish branch get created.
- ARM Template = Azure Resource Management Template. Note : ARM templates from workspace_publish branch is not used.
- Whats is the Use of temp tables ? to reference data across languages
- How to reference other Notebook ? `%run /<path>/Notebook1 { "parameterInt": 1}
- How Return Values from NotePad ?
from notebookutils import mssparkutils
mssparkutils.notebook.exit("hi")
- To pass external parameters from pipelines to Notebook ?
- Create a variable in Notebook Eg:input=""
- Convert it into parameters ie., hover over >> ck on "..." > Toggle parameters
- Create a new Pipeline
- Add the notebook into pipeline
- In Pipeline ,select Notebook > settings >Add notebook> baseparamters>"input"- string
- Click outside of Noteboook > variables > "input" - string
- click on Notebook in pipeline > baseparamters > "input" ,@variables('input')
- How to Read a CSV from Gen2 ?
df= spark.read.option('header','true').option('delimiter',',').csv('abfss://1.csv')
- What are Magic Commands ?
- Line Magic(%) - same line only Eg: ```python y = 2 if x < 3 else 4```
- Cell Magic(%%) - entie cell Eg:
%%timeit
if x < 3:y=2
else:y=4
- How is Spark session configuration done magic command ?
%%configure
{"driverMemory":"28g","driverCores":4,"executorMemory":"28g","executorCores":4
,"jars":["abfs[s]://blob/myjar1.jar"]}
- How to Reference unpublished notebook ? Check box option on Notebook Settings
- How Python logging in Notebook ?
import logging
defaultLogger = logging.getLogger('default')
defaultLogger.debug("default debug message")
- File operations ?mssparkutils.fs.ls('path') #head,append,ls,cp,mv.mkdirs,rm
- read parq ? ```spark.read.parquet('abfss://parquet@deltaformatdemostorage.dfs.core.windows.net/employees')```
1. StringInterpolation (parameter inside string)?
"folder/file-{$n}.csv" #n is a parameter
"My age is :{toInteger(col)}"
2. Parameters Wildcard Dataflow ?
-dataflow add parameter > add "path"
-Source > Source Options > Wildcard paths > 'folder/'+$path+'*.csv'
3. Send whole Col itself as parameter?
-Dataflow > Parameter > data , string, "test" # this is data
-Dataflow > Parameter > varcol , double, colname # Dynamic join
4. schema Drift (Source) ?
Enable when column changes in a Schema
5. BroadCast Join (Join) ?
When whole Table can be accomodated in memory
6. DataFlow Assert ?
toInteger($col_name) == 1000
7. How to "select max(revenue),* from T"? (Self Join)
- Src > Projection > Import Projection
- New branch > Aggregate > Add : maxcolumn ,max(revenue)
- Inner Join (main branch) when revenue = max(revenue)
8. Select max(column) from table
- Src > Projection > Import Projection
- New branch > Aggregate > Add : countcolumn ,count(col_name)
9. Find Surrogate Key
[Ref](https://www.youtube.com/watch?v=9U-0VPU2ZPU)
- Lookup (does left outer join of different source )
- Lookup + Derived Column(isMatch) + Conditional Split (isMatch=True/false)+ others
- Lookup : the result of each row is stored in new column
- Derived Column =isMatch()
- Agg = first
10. Cast all Cols to Integer ?
- Select > Regex > 1==1 , $$ , toInteger($$)
11. LAD
- Cached lookup -https://www.youtube.com/watch?v=HqCn42FaRJs)
- Add Cache type Sink as "cachedata"
- to call the cache : cachedata#outputs()[1].columnname
12. Multiple Sinks? Dataflow >Settings > Set order
13. Accesing sink output in Pipeline? @string(activity("name").output.runStatus.output.sink_name.value[0].colname)
14. Canonical Model (Flexible Schema)?
Derived Col>RuleBased> locate('colname',lower('name')) !=0 -> colname
15. Selection of group of Columns from Schema? Select >type!='string' && name != 'id',$$+'_notstring'
16. Dynamic Values / Parameterization[ref](https://www.youtube.com/watch?v=CMOPPie9bXM)
- if Default parameter is not set , the user will be forced to enter the parameter value at the start
- Only parameters declared/created can be used
- Any parameter created on :
- Dataset will be available on Dataflow (in Debug Settings)
- Create at Dataset ,Dataflow will be available in pipeline by clicking on the Dataflow object
17. Dataflow Parameterisation ?
- Creation : Parameters >dataflow_name,String,defaultvalue = Blank
- Access: $dataflow_name
- Sending Value:
1. Add to a Dataflow
2. Debug Settings > Parameters > Dataset Paramters >dataset_name as value
18. Passing parameters from Dataset >DataFlow> Pipeline
- If u already have set parameter in Dataset and used in Dataflow.
- U can use by : pipeline > Select Dataflow> Settings > u should be able to see "dataset_name" under
19. Row to Columns (unpivot) : Unpivot > Ungroup > Unpivot Colum
20. Assert
- Expects true,Expects false,Expects Unique,Expects exists(col to col matching)
- description :toString(col_name) +'working'
- filter : col_name == 1000
- Assert + Derived Column =( isError ('col_name working'))
- Assert + Split = (hasError())
21. External Call
- loop the rest api for the given input -https://www.youtube.com/watch?v=dIMfbwX8r0A)
- source (List)
- Add a derived Column to construct a resource using List
- add external call
- linked Service : GET , Anonymous , the Url will be overrided by below
- method : GET ,previousActivity
- Relative Url : Endpoint
- Row Relative url : resource (constructed above)
22. Create Heirarachy
- Add Source (input a csv (flat file) )
- Add Derived Column > Exp Builder > Create Column > Add SubColumn Window
- Over (Group Column) : c2
- sort : sort column (Window Column is arranged : rank , dense_rank ,row_number etc.,)
- range :
- Window : max(col), denserank(),rank()
23. Create variable inside an Activity in Dataflow:
- Derived col > locals
or
- Cache > WriteTo Activity > use in another activity
- Dynamic Expression > Create varible > var1 ,trim(right(col,6))
- Accessing it ":var1"
24. Join Scheme drift Table (schema less table ) with table with schema
- Src1 (schema Drifted , allow schema drift , infer drifted col) > Derived Col: col_tojoin, toInteger(byName('col_tojoin'))
- Src2 (Scheme Table)
- join : src1 ,src2 on col_tojoin
- Sink (allow schema drift turned ON)
25. remove spaces from all col names? replace($$,' ')
26. Add schema to Schema less src? Src > Data Preview > Map Drifted Column
27. Specific Partition Files?
- Dataset : Target = Container base path only
- Src > Src Options >
- Wild Card = top_folder/""/""/*.parquet
- Partition root path = top_folder
- column to store file name = filtercolumn
28. Saving Data into specific location based on input
- Create a Column in transformation which contains Target
- Derived col > month , split(col,"-")/[2]
- Sink > Settings > Filename Option (default : generic name)
- Sink > Optimize > Set Partioning
- Sink > Optimize > Key > UNique value per partition > Key Col = month
29. Pipeline Trigger:
Schedule Pipeline
cron Job pipeline etc.,
30. Save Partition files with Name:
- Sink > Settings > File name option > Pattern = movies[n].parquet
- Optimize >Set Partioning , Partion TYpe =Round Robin ,10
31. Create Complex Datatype (subcols , keyvalue ,array):
1. Derived Col
- all subcols to Main Cols
- access > Derived Col > col.subcol
2. New Col > ['key1' -> col1 , 'key2' -> col2]
3. split(Name, ' ')
32. Coalesce and FileName
- Sink >Setting > Pattern / Single Fle > movies[n].csv
- Sink >Optimize > Set Partitioning > Hash > nos=5 , col =primarykey
33. Partitioning? Sink >Optimize > Set Partitioning > Key > col = "year"
34. Calculate Fuzzy:
- 2 sources
- join > soundex(name) == soundex(name2)
35. Deduplication / Distinct Rows
- Method 1
- Aggregate > GroupBy > sha2(256,columns()) , mycols
- Aggregate > Aggregate >Each Col matches= true() , $$ , first($$)
- Method 2
- Aggregate > GroupBy > sha2(256,col1,col2,soundex(col2))
- Aggregate > Aggregate > Each Col matches= true() , $$ , first($$)
- Method 3
- https://docs.microsoft.com/en-us/azure/data-factory/data-flow-script
- https://www.youtube.com/watch?v=QOi26ETtPTw
36. Delta Loading using SQl Change Tracking Technology
- https://www.youtube.com/watch?v=IN-4v0e7UIs
- https://docs.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-change-tracking-feature-portal
37. Distinct
- GroupBY (Aggregate) > Group By target_column
- Aggregates > Column pattern>name != 'target col',$$ ,first(ss)
(select all columns except target column)
38. Row Count on a Table
Aggregate > GroupBY (blank) , Aggregates = colname , count(1)
39. Routing /Saving Data to Different Target in Same DF:
- Contitional Split
- Using Parameter
- New Column in Run Time
40. Start SurrogateKey from next Value (https://www.youtube.com/watch?v=tc283k8CWh8)
Method 1:
- Dim > Agg> groupBy blank ,Agg=max(SurroagteKey)
- Src > Add Surrogate Key from One > Cross join with Dim Agg > key+max(SurroagteKey)
Method 2:
- Dim > Agg> groupBy blank ,Agg=max(SurroagteKey) > Sink the Max(value)
- Src > Add Surrogate Key from One > Derived Column > surrogateKey + sink
41. Group players into each array in each row by team name (collect_list)
- GroupBY (Aggregate ) > Groupby Team name > Agg : collect(player)
42. Datadriven Framework:
- Create a parameterised dataflow (ie., Dataflow which needs parameters)
- Create a pipeline : Lookup > Foreach > above Dataflow
- Lookup = csv file
- Foreach: Settings >Sequential , @activity('Lookup1').output.value
- Inside Foreach >Parameter= pipeline Expression ,@item().columnNames
- Run Debug
- If error: ck Pipeline screen > On output tab > ck on see error on failure to debug
43. WARM Cluster Dataflow (to track performance)?
-Abstract the starting of Cluster with dummy Dataflow
ie., add a dummy dataflow at the beginning of target dataflow
-Example: src > filter (1==2) > sink
44. SCD Type -1
- Src > Alter Row > Sink
- Alter Row > Upsert if
45. Create Datalake
- Sink
- Sink Type > Inline > Select Inline Dataset Type > Delta
46. Replace Special Characters
- Select > true() ,regexReplace($$,'[^a-zA-Z]','')
47. Hashing
sha2(256,columns())
or
parameter = [c1,c2]
sha2(256,parameter)
48. How to left outer join? Use exists
49. Remove Column
- Select Activity
- source : name != 'Row_Number' && name != 'null' && left(name,2) != '_c'
- and this for the output column name: $$
50: Cache Sink ("undefined" is not valid JSON)
- Make sure the Data ur trying to save is converted to correct Data Type
- Eg : toString(Col1)
- Find Max Row
- Find Distinct Rows
- Group by Country
- Parameters (To save data into file is blog with name given as paramter in pipeline)
- Creating Variables
- Regex / WildCard
- Save file as abcd_timestamp.parquet
- variable local inside Derived Column only
- Create Datadriven Framework ( Data is coming from csv /json into dataflow)
- Send function as parameter into Dataflow
- Pipeline Expression should start with ? "@"
- Parameters in pipeline?
- constant
- Create = Pipeline Window > Parameters > New "param"
- Access= @pipeline().parameters.param
- Access output of any activityin Pipeline ? @activity('name').output:
- Variables in pipeline?
- modified
- Create = Pipeline Window > Variabes > New "var"
- Access= @variables('var')
- Access notebook return data in pipeline?
- @activity('notebookname').output.status.Output.result.exitValue
- How to get count of records Ingested after ETL ? @activity('name').output
- How Logging (Pipeline)?
- capture the json from one activity into another
- Dataflow (log) = Create simple csv with data "1" (dummy source)
- Dataflow > Dataflow (log)> parameter =@activity('name').output.runStatus.metrics
- Logging functionality directly available in Copy job
- How Dynamic Run time Resource Allocation (pipeline) ?
- Pipeline > Dataflow > Settings > Core Count = @if( greater(activity('Name').output.size , 400000) ,128)
- Use of Lookup ??
-
Input= any data sources
-
output = single value /array
-
run Queries / SPROC
-
SELECT * FROM @{pipeline().parameters.T}
Pipeline Lookup Example: - pipeline>lookup > "Select col1 from table" - Foreach > @activity('lookup').output.value - Add Dataflow inside Foreach - src (src dataset is parameterized)> Settings > @item().col1
-
- Lookup vs Script Acivity ?
- Lookup :returns the result of executing a query or stored procedure.
- Script: can only execute Data Manipulation Language (DML- INSERT, UPDATE, DELETE and SELECT) and Data Definition Language (DDL - CREATE, ALTER and DROP)
- Which Activity to Access Folder Structure in gen 2 ? getMetadata
- Run SQL scripts (Develop > SQL Script)are Disconnected
- Option 1 : Add Script Activity and Directly add the SQL Code
- Option 2 (Call the Current Synapse using REST to get all artifacts ) :
- Save a SQL Script in Develop
- url : https://.dev.azuresynapse.net
- Resource : https://dev.azuresynapse.net/
- GET , System-Managed-Identtity
- Then Navigate in the Response Result to ge tSQL
No comments:
Post a Comment