Showing posts with label synapse. Show all posts
Showing posts with label synapse. Show all posts

Tuesday, June 21, 2022

Azure Synapse

Azure Synapse 


Synapse

Note :

  • For JSON file make sure all new lines , blank spaces are removed.
  • Dataflow , Notebooks etc - names has to start with String.
  • blob = flat namespace , Gen2 = directories
  • Azure Active Directory (Entra ID) : Used to Create 1 Functional ID (secret ) called Serive Principle across all applications to communicate with each other
  • SP ( serivce Principle) : Managed by User ie., renew etc.,-
  • Managed identity (service principal) : managed by Azure

DATA TAB ( Lake Database , SQL Database , Linked )

  • Workspace : Based on Azure Serverless (Also Called OnDemand )
    • Lake Database: Spark to Query Data
    • SQL Database: Serverless to query Data
  • Linked

Workspace ( Source is Gen2/Blob Storage )

  • Source is Gen2/Blob Storage
  • This comes default with Synapse
  • Data is not in table format but processed as tables in Runtime
  1. Lake Database:
    • Only Default Gen2 Location which comes along with Synapse to Create Database
    • Uses Templates
    • You can create SQL like table , Add schema to Tables, Define Column Datatypes and Add Foreign to underlying Gen2 Storage
    • Can create above schema against CSV , Parq , json etc
  • Steps:
    • Open NoteBook
     %% sql
     Create Database DB;
     Use DB;
     create table T (id Int, name varchar);
     insert into T values(1,'A');
    
    • Open gen2 in Azure > create a folder "Customer" and upload csv file
    • Open synapse > Data > "+" > New Lake Database > Enter Name , linkedService ,above Folder path and Data Format > "+" to add table and Schema of Csv
    • Deploy (publish)
    • u shd be able to read with SQL / Notebooks
  1. SQL Database (Using SQL script )
    Operations :
    • read : OpenrowSet
    • write : CETAS , CET
create External table  T(c1 int) with(Datasource= xx , format='CSV', Bulk'path')  -- CET


create External table  T2 (c1,c2) as select * from T  ---CETAS ( Cannot alter External Table)

select * from openrowset(Datasource= xx , format='CSV', Bulk'path')

Example:

  • Open SQL script or Connect to Serverless using Dbeaver (Serverless endpoint in Azuer Portal > Synapse Overview Page)

  • run "create database db"

  • Go to SQL Script > Create new Script

  • run " use db"

      CREATE EXTERNAL DATA SOURCE ParentFolder WITH (LOCATION='https://path.dfs.core.windows.net');  
         CREATE EXTERNAL FILE FORMAT myFormat WITH (FORMAT_TYPE=DELTA);  
      select * from openrowset(bulk '/Country'' ,format= DELTA) as D 
    
  • This DB should be visible along with External Source and File Formats

1. Select from Table
IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = 'Test')
		BEGIN
    		CREATE DATABASE Test;
		CREATE EXTERNAL DATA SOURCE ParentFolder WITH (TYPE = HADOOP, LOCATION='https://path.dfs.core.windows.net/temp/');
		CREATE EXTERNAL FILE FORMAT PARQUET WITH (FORMAT_TYPE=PARQUET);
	End
Go;
use Test;
select * from openrowset(bulk 'subFOlder/1.csv, format='PARQUET' ,DATA_SOURCE = 'ParentFolder') as rows

Note: Support JSON , CSV etc.,

2. Create new Table from Existing Table in Blob (destination = CSV , PARQUET formats )
CREATE EXTERNAL TABLE T 
WITH (LOCATION='subFolder/',DATA_SOURCE=ParentFolder,FILE_FORMAT=myFormat) 
AS 
Select * from OPENROWSET (LOCATION='subFolder2/',DATA_SOURCE=ParentFolder,FILE_FORMAT=myFormat)  as rows --src
GO;

DROP EXTERNAL DATA SOURCE DS;
DROP EXTERNAL FILE FORMAT myFormat;
3. Create View (CVAS)
 CREATE VIEW V AS (select * from openrowset(bulk 'https://blob.parquet',format = 'parquet') 
 select * from v ;
4. Call SQL Script in Pipeline
1. To use in pipeline
- **Add Web Activity in Pipelin
- **Call the current Synape Workspace -GET https**://myworkspace.dev.azuresynapse.net./sqlScripts/{sqlScriptName}?api-version=2020-12-01
- **Drill down to Required SQL script
5.Cosmos
SELECT * FROM OPENROWSET('CosmosDB','Account=my-cosmos-db;Database=my-db;Key=k',[my-container]) AS T

Linked

  • When u create link service and publish it . The icon should appear here.
  • Only Connected Services like Containers (Gen2 / Blob) ,Files ,Queues,Tables ,Cosmos

Dedicated SQL (Compute Power + Storage Space) -

  1. Relational DB
  2. Uses Distribution to Store Data and. Data not stored in 1 Table ie., Control Node= Compute Node 1 + Compute Node 2 .....
  3. Greater than 1TB
  4. Max 60 Nodes
  5. DW5000c = 5000/500 = 10 nodes
  6. Unique Contraints not supported (ie., duplicates need to be removed by other logic)
  7. Can Create Table in SQL DB
  8. High Cost
  9. Main Concepts: (DW = Node1 (Dist1 =partition1 ,p2..)+ Node2 (Dist2 =partition1 ,p2..) + .... + Node 60
    • Data within each distribution is divided into partitions based on column eg: OrderDate.
    • Distibution (Distributed among nodes) :
      • Hash :
        • Data is distibute based on hash result
        • Large Tables
        • Better performance for joins uses hash col
      • Round Robin:
        • Small Table
        • Even Distribution
        • Bad performance for joins as no key col
      • Replicate : Clone data in all Nodes
    • Index
      • Non Clusterd Column : For Nvarchar(max) , varchar(max) ,Binary
      • Clusterd ColumnStore : Recommended
    • Partition : Data is distributed across many partition types
  10. Creating Tables
    • Normal create table t(i int) with (distribution= Hash(C), ColumnStore Index , Partition =C )
    • COPY into : COPY INTO T FROM "https"//xxx/*.parq WITH (FILE_TYPE = 'PARQUET' , MAXERRORS = 0, IDENTITY_INSERT = 'OFF' );
    • CTAS : Create table T With() As select * from T2
  11. Recommeded :
    • Insert : RoundRobin
    • Fact : Cluster Column Store + Hash Distribution
    • Dim : Replicated

NoteBooks link

df.createOrReplacetemView("T")
%%sql
select * from T

Delta Catalog Tables

df.write.format("delta").saveAsTable("MyManagedTable")
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")
spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'")
%%sql

CREATE TABLE MyExternalTable
USING DELTA
LOCATION '/mydata'

Delta

SELECT * FROM OPENROWSET( BULK  'https://mystore.dfs.core.windows.net/folder/',FORMAT = 'DELTA' ) AS T

Connection String from Linked Service

from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
accountKey = token_library.getConnectionStringAsMap("<LINKED SERVICE NAME>").get("<KEY NAME>")
print(accountKey)

Read and Write Cosmos

df= spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "nm")\
.option("spark.cosmos.container", "cont").load()

df.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

SQL Database (OnDemand) vs Dedicated SQL ( Relation DB)

                                Dedicated   Serverless
1. Select                       yes             Yes
2. insert,update,delete,merge   yes             No
3. CTAS                         yes             No
4. CETAS                        No             yes
5. Cost                         high            low
6. Strategy(hash/RoundRobin.)   yes             No
7. Provisioned                  Seperatly       by Default
8. Volume of Data               High            low
9. Data Stored                  Relational DB   Blog/Gen2

External Table and Views
Both are just pointers to a location in the datalake / Blob (Parquet, JSON, Blob Storage)

                    Exteranl Tables Vs Views
1. Define Schema    Yes                 No
2. Indexing         Yes                 No
3. Performance      Fast                Slow

READ VIEW from NoteBooks

token=accessToken = mssparkutils.credentials.getToken("key")
df=readTableViews("DW")
def readServerlessTableViews(token):
    df = (spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", "jdbc:sqlserver://xxx-syn-01-ondemand.sql.azuresynapse.net:1433;databaseName=db;") \
    .option("dbtable", "myschema.viewName") \
    .option("accessToken", token) \
    .load())
    return df

Dataflow

DataFlow Ref

  • Linked Service : Service to Connect to different DataSources (External And Internal)
  • Integrated Runtime : Compute which executes the DataFlow
  • Source(Linked Service) > Operation > Sink(Linked Service)

DataSet ( Connection to Source)

  • All parameters created in Dataset can be accessed in Dataflow at Debug Settings
  • The same parameters can be used at pipeline level on the Dataflow added
    • DataFlow > New DataSet > Parameters >New > test,string
    • Access : dataset().test
    • example: @concat('/',dataset().test,'/',substring(utcNow(),0,10 ),'.parquet') Note:
  • Dataflow can create DB table on fly.
  • CDC (Change Data Capture)
    • To handle delta records ( all new records including newly added duplicate records)
    • Available for SQL , Azure Dedicated , Gen2
    • Checkpoint Key (Unique marker to identlify last succesfully processed record)
      • Create a Dataflow and Enable CDC
      • Add Dataflow to pipleines
      • In pipleine >Select Dataflow> Settings > Checkpointkey
    • Disadv: If Delta set to ON on and if user wants to perform full load inbetween . User has to make changes to the flow and publish . [ref](https://learn.microsoft.com/en-us/answers/questions/838041/how-to-reset-the-change-data-capture-information-()

Data Flow Activities:

  • Uses Spark Engine

  • Src > Transform > Sink

  • Any number for Sources

  • Different Activities :

      Multiple Inputs:
      	- Source: spark.read
      	- New Branch: Clone  Data into new branch
      	- Join : Normal join (Use exists for left outer join )
      	- Conditional Split: Split records based on condition
      	- Exists: Compare two tables (innner join or left outer join)
      	- Union : Union /Append Files
      	- LookUp:  
      		- Left Outerjoin with 2 tables
      		- Verify Data between 2 tables 
      		
      Schema Modifier (x9)
      	 - DerivedColumn:  WithColumn , (Create Heirarchy for complex JSON)
      	 - Select:         Select (prune)/ WithColumnRenamed / Remove Columns
      	 - Aggregate:      GroupBY / All functions ( max ,sum , etc.,)
      	(User can give values to Agg and GroupBY in Same Activity to apply group level filter)
      	 - Surrogate Key:  Add unique primary Key
      	 - Pivot : Create pivot table
      	 - Unpivot: Uncreate pivot table
      	 - Window: 
      		- Without order(max(col),min(col) etc., ):Eg max('col).Over(Window.Partition('groupby_col))
      		- With Order(rank ,denserank :Eg rank.Over(Window.Partition('groupby_col).orderBy('sortCol))
      	  - Rank: 
      		- Used Single : Entire Dataframe is ranked ( Dense = serially with no gaps)
      		- Row Column : Name of new Column
      		- Row Condition : Existing Col - Asc / Desc
      	  - External Call: rest api
      	  
      Formatters
      	- Flatten: Json with Array ONLY Eg : {"A" : [1,2,3]}
      	- Parse: String > Json
      	- Stringify: Get particular sub-json from main json eg json > Table Cols
      
      Row modifier
      	- Filter:  Filter / Where 
      	- Sort: sort
      	- Alter Row (DB only) : Update DB, With+When , Drop , Fill , parameteried 
      	
      Delete  (on real time only)
      	- Assert: Assert + Derived Column ( isError ('col_name working'))
      
      Flowlet
      
      Destination(Sink)          
       	- spark.write
      	- Cached Sink (temp loc): Access "sink_name#outputs()[RowNo].ColumnName"  
      	- Cached Sink - Convert Data to StringDatatype  to avoid error - "undefined" is not valid JSON Eg : toString(Col1)
      	- cached Lookup (Dict): Sink> Settings> Add Key Column.Access "sink_name#outputs(columnname)" , "sink1#outputs()[1].MAX_UPDATE_TS"
      
      Misc
      	- WildCard:Normal Regex
      	- $$: All Columns
      	- $parameter: Access parameter created within same dataflow activity
      	- String Interpolaton : "folder/file-{$n}.csv" #n is a parameter
      	- ByName('city'): Any column whose name is city
    

Dataflow Script Representation :

Dataflow Transformations

Derived Columns :

  • $$ (this) : All Cols

  • $0 current column name

  • name Eg: name!="abc"

  • type Eg: type != int

  • stream represents the name associated with each stream, or transformation in your flow

  • position is the ordinal position of columns in your data flow

  • origin is the transformation where a column originated or was last updated

    1. trim any col which is a String(Pattern): - Derived Col > Add > Col Pattern > type == 'string',$$ -> trim($$) - Auto mapping on
    2. Remove special character from all columns
      • Add Select
      • Add Mapping > Rule based Mapping
      • true() , regexReplace($$,'[^0-9a-zA-Z]','')
      • or true() , regexReplace('Column','[^0-9a-zA-Z]','')
    3. String Column to Date
      • Derived COlumn
      • toDate(byName('Date'),'mm/dd/yyyy')
    4. Metadata of Column inside Dataflow
      • Derived Col > col ,Columns()
      • Flatten > unroll by col > col
    5. Calculate Column Pattern (apply logic for data in mutliple columns)
      • Derived Column >Columns >Add >Column PAttern type =='string', $$+'_clean', trim($$,'')
    6. Max : Agg > max(col)

Dataflow Json

1. Table to Json 
   - Read Source (import projection)
   - Derived Col > Create Structure 
   - Save to Json Sink
2. Handling json :
		- Parse
		- Flatten
		- Stringyfy
		- Derived Cols : Create Complex Structure
3. Stringify 
	- get particular sub-json from main json eg json > Col to Table 
	- DataFlow >REST APi Source (url="https://api.first.org/data/v1/countries" , Anonymous Request) or        https://restcountries.com/v3.1/name/united
	- Source > Projection > Import projection
	- Add Stringyfy>Cols>newcol , Expr >Under Input Schema> Select Structure to Flatten
	- Add Select Statment > filter the new_col
	- Select new column 
	- Sink
4. Parse (JSON in single Column in a table) :
	val j = List(("IN",1),("US",2)).toDF("country","medals")
	.withColumn("j", to_json(struct('country,'medals)))
        +-------+------+--------------------+
        |country|medals|                   j|
        +-------+------+--------------------+
        |     IN|     1|{"country":"IN","...|
	- Make sure the Source > projection > Import Schema
	- Add Parse
		- Document Form = Single Document
		- Columns > new col , j , (country as string,		medals as integer)
5. Flatten :
	 - Flattens  Json with Array ONLY
	 - If json has array {"A" : [1,2,3]}

Pipeline

  • Linked Service : Service to Connect to different DataSources (External And Internal)
  • Activity : Copy , Dataflow , Notebooks ,SQL etc.,
  • Private Endpoint : Endpoint with in Private Virtual network
  • Integrated Runtime : Compute which executes the DataFlow
    • Azure IR : in cloud-to-cloud scenarios.
    • Self hosted IR : to connect to on-premises data sources or services.
    • SQL Server IR : to connect to SQL Server databases in a private network.
  • Can run activities in parallel
  • Throttling under Integration Runtime 'AutoResolveIntegrationRuntime' : There is a limit of simultaneous pipelines in an integration runtime. You need to split the pipeline to run into multiple runtimes.
  • Triggers : used to start the pipeline
    • Manual
    • Tumbling Window :
      • Run Continously after finishing
      • Delays and concurrency can be set
      • Auto Retry in case of Fail
    • Event : storage Events etc.,
    • Scheduled Triggers Note:
  1. Pipeline and Synapse Live uses functions from ARM templates .
  2. Hence sync master branch with ARM template update ARM template

Activity 1 : Print statement alternate :

1. create a variable
2. Set the variable in the pipeline
3. Check the contents of variable after the run 

Activity 2 : Protoype pipeline

1. Use wait statements 
2. Set Variable 

Activity 3 : Token Refresh logic

  1. generate token 1st time
  2. Parallel Connection to
    1. until loop(flag is true)
      1. Wait
      2. Keep generating new token
    2. Copy Activity(uses above token)
      1. fail : Set flag = true
      2. Pass : Set flag = true

Activity 4: Run SQL Script saved in Datatab

  1. Web Activity > Url :exampleWorkspace.dev.azuresynapse.net/sqlScripts/exampleSqlScript?api-version=2020-12-01, Method: GET , Auth: Managed Identity , Resrc : https://devsynapse .net

HTAP (Hybrid Transactional and Analytical Processing Patterns)

  • Data Sync bw Src (SQL server , Azure SQL database )and Target - Dataverse(Datalakes) , Azure SQL Dedicated and Cosmos targets

  • Src > Synapse > Target

  • Cosmos DB (OLTP):

    • No SQL
    • Type of Cosmos DB can be selected depending on Source (SQL ,Mongo etc., )
    • Fast Read and Write
    • Auto Scales
    • Uses API
    • Expensive
    • To Enable HTAP (On Demand - video8):
      • Cosmose Homepage : To turn on Analytical Contianer in Cosmos
      • In Synapse: Homepage > Integration > Enable Synapse link
    • Synapse read cosmos: Select * from openrowset() with()as T CROSS APPLY OPENJSON(C) with()
  • SQL server

    • Use managed Identnity
    • Azure Synapse Link enabled
    • Add ips to Firewall
    • Create linked Service + Linked CONNECTION in Synapse(Piplnes > "+")
    • Consfiure Blob Storage for intermiitend result

Deployment Strategy [GIT]

Summary

  • Publish branch : ARM templates are stored and updated
  • Collaboration Branch = Master Branch.
  • Feature Branch = Individual Branch created by users.
  • Root Folder = root folder in your Azure Repos collaboration branch.
  • Pull Request =Request merge from Feature Branch > Master Branch.
  • commit all = Save All Changes to Git.
  • Publish Should only be allowed from master Branch.
  • Code is merged from Master Branch to Synapse Services. (Git Actions is configured to run pull)
  • Publich > ARM template gets created.
  • Once master gets published , the code is available in Synapse-live and workspace_publish branch get created.
  • ARM Template = Azure Resource Management Template. Note : ARM templates from workspace_publish branch is not used.

Whenever we click on button publish (master branch) . Synapse creates 2 jsons

  1. TemplateForWorkspace :
    • All work (pipelines, Notebooks, DataFlows etc., ) in 1 single file
    • This file does not contain parameters
    • It contains logics only
  2. TemplateParametersForWorkspace : Contains only Parameters

For Successful Deployment we need to :

  1. Move TemplateForWorkspace to higher Environment as is
  2. Replace "TemplateParametersForWorkspace" with values as per the higher Env

Git (parameter replace for all artifacts)

  1. When Code is merged from Master Branch to Synapse Services.

  2. This will trigger function in GitActions ( Azure/Synapse-workspace-deployment@V1.7.0 ) - which is configured in CICD

     	uses:  Azure/Synapse-workspace-deployment@V1.7.0
     	with:
     		TargetWorkspaceName:  ${{ env.SN_WORKSPACE }}  # Dev [Live]
     		ArtifactsFolder:  ${{ github.workspace }}
     		environment:  Azure Public
     		clientId:  ${{ env.CLIENT_ID }}
     		clientSecret:  ${{ env.CLIENT_SECRET }}
     		tenantId:  ${{ env.TENANT_ID }}
     		resourceGroup:  ${{ env.RESOURCEGROUP }}
     		subscriptionId:  ${{ env.SUBSCRIPTIONID }}
     		DeleteArtifactsNotInTemplate:  true
     		operation:  validateDeploy
    
  3. Two ARM template (Azure Resource Management Template) gets created.

    • TemplateParametersForWorkspace.json
    • TemplateForWorkspace.json
    • Note: ARM templates from workspace_publish branch is not used.
  4. Users Need to add 2 custom files at the root:

    • {{ synapse_workspace }}-parameteres.yaml : Value of Each Property Eg: linkedservice

    • template-parameter-definition.json :

    • Synapse will read this file and use its configuration to generate which properties get parameterized.

    • If no file is found, the default template is used.

    • file consists of [trigger, pipeline, linked service, dataset, integration runtime, and data flow]

    • "=" is current value ,"-" is don't keep the default .

    • Syntax : <action>:<name>:<stype>

           "Microsoft.Synapse/workspaces/linkedServices": {
               "*": {
                   "properties": {
                       "typeProperties": {
                           "baseUrl": "="
                       }
                   }
               }
           }
      

    If u have linked service , "parameteres.yaml" will replace the values with values in this yml and in the structure as "template-parameter-definition.json"

  5. Above Function will Deploy Synapse artifacts using templates to given env

  6. You can get this file from the link.

To Change artifacts of properties , A custom parameter template can be used {{ workspace }}-parameteres.yaml. To override the default parameter template, a custom parameter template named template-parameters-definition.json should be placed in the root folder of the Git branch.

Points to Remember in Synapse - Git

  • Collaboration Branch = Master Branch.
  • Feature Branch = Individual Branch created by users.
  • Root Folder = root folder in your Azure Repos collaboration branch.
  • Pull Request =Request merge from Feature Branch > Master Branch.
  • commit all = Save All Changes to Git.
  • Publish Should only be allowed from master Branch.
  • Code is merged from Master Branch to Synapse Services. (Git Actions is configured to run pull)
  • ARM template gets created.
  • Once master gets published , the code is available in Synapse-live and workspace_publish branch get created.
  • ARM Template = Azure Resource Management Template. Note : ARM templates from workspace_publish branch is not used.

Questions

NoteBooks Questions :

- Whats is the Use of temp tables ?  to reference data across languages
- How to reference other Notebook  ? `%run /<path>/Notebook1 { "parameterInt": 1}
- How Return Values from NotePad ?
	from notebookutils import mssparkutils		
	mssparkutils.notebook.exit("hi")
- To pass external parameters from pipelines to Notebook ?
  - Create a variable in Notebook Eg:input=""
  - Convert it into parameters ie., hover over >> ck on "..." > Toggle parameters
  - Create a new Pipeline
  - Add the notebook into pipeline
  - In Pipeline ,select Notebook > settings >Add notebook> baseparamters>"input"- string 
  - Click outside of Noteboook > variables > "input" - string
  - click on Notebook in pipeline > baseparamters > "input"  ,@variables('input')
- How to Read a CSV from Gen2 ? 
	df= spark.read.option('header','true').option('delimiter',',').csv('abfss://1.csv')	
- What are Magic Commands ?
	- Line Magic(%) - same line only Eg: ```python y = 2 if x < 3 else 4```
	- Cell Magic(%%) - entie cell Eg:
			%%timeit
			if x < 3:y=2
			else:y=4
	- How is Spark session configuration done magic command ?
     %%configure 
		{"driverMemory":"28g","driverCores":4,"executorMemory":"28g","executorCores":4
			,"jars":["abfs[s]://blob/myjar1.jar"]}
- How to Reference unpublished notebook ? Check box option on Notebook Settings
- How Python logging in Notebook ? 
	import logging
	defaultLogger = logging.getLogger('default') 
	defaultLogger.debug("default debug message")

- File operations ?mssparkutils.fs.ls('path') #head,append,ls,cp,mv.mkdirs,rm
- read parq ? ```spark.read.parquet('abfss://parquet@deltaformatdemostorage.dfs.core.windows.net/employees')```

Dataflow Questions

1. StringInterpolation (parameter inside string)? 
	"folder/file-{$n}.csv" #n is a parameter
	"My age is :{toInteger(col)}"
2. Parameters Wildcard Dataflow ?
	-dataflow add parameter > add "path"
	-Source > Source Options > Wildcard paths > 'folder/'+$path+'*.csv'
3. Send whole Col itself as parameter?
	-Dataflow > Parameter > data , string, "test"       # this is data
	-Dataflow > Parameter > varcol , double, colname    # Dynamic join
4. schema Drift (Source) ?
	 Enable when column changes in a Schema
5. BroadCast Join (Join) ?
	When whole Table can be accomodated in memory 
6. DataFlow Assert ?
	toInteger($col_name) == 1000
7. How to "select max(revenue),* from T"?  (Self Join)  
	- Src > Projection > Import Projection
	- New branch > Aggregate  > Add : maxcolumn ,max(revenue)
	-  Inner Join (main branch) when revenue = max(revenue)
8. Select max(column) from table
	- Src > Projection > Import Projection
	- New branch > Aggregate  > Add : countcolumn ,count(col_name)
9. Find Surrogate Key
	[Ref](https://www.youtube.com/watch?v=9U-0VPU2ZPU)
	- Lookup (does left outer join of different source )
	- Lookup + Derived Column(isMatch) + Conditional Split (isMatch=True/false)+ others
	- Lookup : the result of each row is stored in new column
	- Derived Column =isMatch() 
	- Agg = first
10. Cast all Cols to Integer ?
	- Select > Regex > 1==1 , $$ , toInteger($$)
11. LAD
	- Cached lookup -https://www.youtube.com/watch?v=HqCn42FaRJs)
	- Add Cache type Sink as "cachedata"
	- to call the cache : cachedata#outputs()[1].columnname
12. Multiple Sinks? Dataflow >Settings > Set order
13. Accesing sink output in Pipeline? @string(activity("name").output.runStatus.output.sink_name.value[0].colname)
14. Canonical Model (Flexible Schema)?
	Derived Col>RuleBased> locate('colname',lower('name')) !=0 -> colname
15. Selection of group of Columns from Schema? Select >type!='string' && name != 'id',$$+'_notstring'
16. Dynamic Values / Parameterization[ref](https://www.youtube.com/watch?v=CMOPPie9bXM)
	- if Default parameter is not set , the user will be forced to enter the parameter value at the start
	- Only parameters declared/created can be used 
	- Any parameter created on : 
		- Dataset will be available on Dataflow (in Debug Settings)
		- Create at Dataset ,Dataflow will be available in pipeline by clicking on the Dataflow object
17. Dataflow Parameterisation ?
	- Creation : Parameters >dataflow_name,String,defaultvalue = Blank
	- Access: $dataflow_name
	- Sending Value: 
		1. Add to a Dataflow
		2. Debug Settings > Parameters > Dataset Paramters >dataset_name as value
18. Passing parameters from Dataset >DataFlow>  Pipeline
	- If u already have set parameter in Dataset and used in Dataflow. 
	- U can use by : pipeline > Select Dataflow> Settings > u should be able to see "dataset_name" under 
19. Row to Columns (unpivot) : Unpivot > Ungroup > Unpivot Colum
20. Assert 
	- Expects true,Expects false,Expects Unique,Expects exists(col to col matching)
	- description :toString(col_name) +'working'
	- filter : col_name == 1000
	- Assert + Derived Column =( isError ('col_name working'))
	- Assert + Split =  (hasError())
21. External Call 
	 - loop the rest api for the given input -https://www.youtube.com/watch?v=dIMfbwX8r0A)
	 - source (List)
	 - Add a derived Column to construct a resource using List
	 - add external call 
		- linked Service : GET , Anonymous , the Url will be overrided by below
		- method : GET ,previousActivity
		- Relative Url : Endpoint
		- Row Relative url : resource (constructed above)
22. Create Heirarachy
	- Add Source (input a csv (flat file) )
	- Add Derived Column  > Exp Builder > Create Column > Add SubColumn Window
	- Over (Group Column) : c2
	- sort : sort column  (Window Column is arranged : rank , dense_rank ,row_number etc.,)
	- range : 
	- Window : max(col), denserank(),rank()
23. Create variable inside an Activity in Dataflow:
	- Derived col > locals
	or
	- Cache > WriteTo Activity > use in another activity
	- Dynamic Expression > Create varible > var1 ,trim(right(col,6))
	- Accessing it ":var1"
24. Join Scheme drift Table (schema less table ) with table with schema
   - Src1 (schema Drifted , allow schema drift , infer drifted col) > Derived Col: col_tojoin, toInteger(byName('col_tojoin'))
   - Src2 (Scheme Table)
   - join : src1 ,src2 on col_tojoin
   - Sink (allow schema drift turned ON)
25. remove spaces from all col names? replace($$,' ')
26. Add schema to Schema less src? Src > Data Preview > Map Drifted Column
27. Specific Partition Files?
  - Dataset : Target = Container base path only
  - Src > Src Options > 
	  - Wild Card = top_folder/""/""/*.parquet
	  - Partition root path = top_folder
	  - column to store file name = filtercolumn
28. Saving Data into specific location based on input
	- Create a Column in transformation which contains Target
	     - Derived col > month , split(col,"-")/[2]
	- Sink > Settings > Filename Option (default : generic name)
	- Sink > Optimize > Set Partioning
	- Sink > Optimize > Key > UNique value per partition > Key Col = month
29. Pipeline Trigger:
		Schedule Pipeline
		cron Job pipeline etc.,
30. Save Partition files with Name:
		- Sink > Settings > File name option > Pattern = movies[n].parquet
		- Optimize >Set Partioning , Partion TYpe =Round Robin ,10         
31. Create Complex Datatype (subcols , keyvalue ,array):
		1. Derived Col
		   - all subcols to Main Cols
		   - access > Derived Col >  col.subcol
		2. New Col > ['key1' -> col1 , 'key2' -> col2]
		3. split(Name, ' ')
32. Coalesce and FileName
		- Sink >Setting > Pattern / Single Fle > movies[n].csv
		- Sink >Optimize > Set Partitioning > Hash > nos=5 , col =primarykey
33. Partitioning? Sink >Optimize > Set Partitioning > Key > col = "year"
34. Calculate Fuzzy:
	- 2 sources
	- join > soundex(name) == soundex(name2)

35. Deduplication / Distinct Rows  	
   - Method 1
       - Aggregate > GroupBy > sha2(256,columns()) , mycols
       - Aggregate > Aggregate >Each Col matches= true() , $$ , first($$)
   - Method 2
       - Aggregate > GroupBy > sha2(256,col1,col2,soundex(col2))
       - Aggregate > Aggregate > Each Col matches= true() , $$ , first($$)
   - Method 3
       - https://docs.microsoft.com/en-us/azure/data-factory/data-flow-script
       - https://www.youtube.com/watch?v=QOi26ETtPTw
36. Delta Loading using SQl Change Tracking Technology
	- https://www.youtube.com/watch?v=IN-4v0e7UIs
	- https://docs.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-change-tracking-feature-portal
37. Distinct
   - GroupBY (Aggregate) > Group By target_column 
   - Aggregates > Column pattern>name != 'target col',$$ ,first(ss)
   (select all columns except target column)
38. Row Count on a Table
	Aggregate > GroupBY (blank) , Aggregates = colname , count(1)
39. Routing /Saving Data to Different Target in Same DF:
        - Contitional Split
        - Using Parameter
        - New Column in Run Time
40. Start SurrogateKey from next Value (https://www.youtube.com/watch?v=tc283k8CWh8)
	Method 1:
	- Dim > Agg> groupBy blank ,Agg=max(SurroagteKey) 
	- Src > Add Surrogate Key from One > Cross join with Dim Agg > key+max(SurroagteKey) 
	Method 2:
	- Dim > Agg> groupBy blank ,Agg=max(SurroagteKey)  > Sink the Max(value)
	- Src > Add Surrogate Key from One > Derived Column > surrogateKey + sink

41. Group players into each array in each row by team name (collect_list)
   - GroupBY (Aggregate ) > Groupby Team name > Agg : collect(player)
42. Datadriven Framework:
	- Create a parameterised dataflow  (ie., Dataflow which needs parameters)
	- Create a pipeline : Lookup > Foreach > above Dataflow
	- Lookup = csv file
	- Foreach: Settings >Sequential , @activity('Lookup1').output.value
	- Inside Foreach >Parameter= pipeline Expression ,@item().columnNames
	- Run Debug
	- If error: ck Pipeline screen > On output tab >  ck on see error on failure to debug
43. WARM Cluster Dataflow (to track performance)?
	-Abstract the starting of Cluster with dummy Dataflow
        ie., add a dummy dataflow at the beginning of target dataflow
	-Example: src > filter (1==2) > sink
44. SCD Type -1
	- Src > Alter Row > Sink
	- Alter Row > Upsert if
45. Create Datalake
	- Sink
	- Sink Type > Inline > Select Inline Dataset Type >  Delta
46. Replace Special Characters
	- Select > true() ,regexReplace($$,'[^a-zA-Z]','')
47. Hashing
	
	sha2(256,columns())
	or
	parameter = [c1,c2]
	sha2(256,parameter)
48. How to left outer join? Use exists
49. Remove Column 
	- Select Activity  
	- source :   name != 'Row_Number' && name != 'null' && left(name,2) != '_c'
	- and this for the output column name: $$
50: Cache Sink ("undefined" is not valid JSON)
	- Make sure the Data ur trying to save is converted to correct Data Type
	- Eg : toString(Col1)

Excercises:

  1. Find Max Row
  2. Find Distinct Rows
  3. Group by Country
  4. Parameters (To save data into file is blog with name given as paramter in pipeline)
  5. Creating Variables
  6. Regex / WildCard
  7. Save file as abcd_timestamp.parquet
  8. variable local inside Derived Column only
  9. Create Datadriven Framework ( Data is coming from csv /json into dataflow)
  10. Send function as parameter into Dataflow

PipeLine Questions

  1. Pipeline Expression should start with ? "@"
  2. Parameters in pipeline?
    • constant
    • Create = Pipeline Window > Parameters > New "param"
    • Access= @pipeline().parameters.param
  3. Access output of any activityin Pipeline ? @activity('name').output:
  4. Variables in pipeline?
    • modified
    • Create = Pipeline Window > Variabes > New "var"
    • Access= @variables('var')
  5. Access notebook return data in pipeline?
    • @activity('notebookname').output.status.Output.result.exitValue
    • How to get count of records Ingested after ETL ? @activity('name').output
  6. How Logging (Pipeline)?
    • capture the json from one activity into another
    • Dataflow (log) = Create simple csv with data "1" (dummy source)
    • Dataflow > Dataflow (log)> parameter =@activity('name').output.runStatus.metrics
    • Logging functionality directly available in Copy job
  7. How Dynamic Run time Resource Allocation (pipeline) ?
    • Pipeline > Dataflow > Settings > Core Count = @if( greater(activity('Name').output.size , 400000) ,128)
  8. Use of Lookup ??
    • Input= any data sources

    • output = single value /array

    • run Queries / SPROC

    • SELECT * FROM @{pipeline().parameters.T}

        Pipeline Lookup Example:
        	- pipeline>lookup > "Select col1 from table"
        	- Foreach > @activity('lookup').output.value
        	- Add Dataflow inside Foreach
        	- src (src dataset is parameterized)> Settings > @item().col1
      
  9. Lookup vs Script Acivity ?
    • Lookup :returns the result of executing a query or stored procedure.
    • Script: can only execute Data Manipulation Language (DML- INSERT, UPDATE, DELETE and SELECT) and Data Definition Language (DDL - CREATE, ALTER and DROP)
  10. Which Activity to Access Folder Structure in gen 2 ? getMetadata
  11. Run SQL scripts (Develop > SQL Script)are Disconnected
    • Option 1 : Add Script Activity and Directly add the SQL Code
    • Option 2 (Call the Current Synapse using REST to get all artifacts ) :
      • Save a SQL Script in Develop
      • url : https://.dev.azuresynapse.net
      • Resource : https://dev.azuresynapse.net/
      • GET , System-Managed-Identtity
      • Then Navigate in the Response Result to ge tSQL
view raw Synapse.md hosted with ❤ by GitHub