Tuesday, June 21, 2022

Azure Synapse

Azure Synapse 


Synapse

Note :

  • For JSON file make sure all new lines , blank spaces are removed.
  • Dataflow , Notebooks etc - names has to start with String.
  • blob = flat namespace , Gen2 = directories
  • Azure Active Directory (Entra ID) : Used to Create 1 Functional ID (secret ) called Serive Principle across all applications to communicate with each other
  • SP ( serivce Principle) : Managed by User ie., renew etc.,-
  • Managed identity (service principal) : managed by Azure

DATA TAB ( Lake Database , SQL Database , Linked )

  • Workspace : Based on Azure Serverless (Also Called OnDemand )
    • Lake Database: Spark to Query Data
    • SQL Database: Serverless to query Data
  • Linked

Workspace ( Source is Gen2/Blob Storage )

  • Source is Gen2/Blob Storage
  • This comes default with Synapse
  • Data is not in table format but processed as tables in Runtime
  1. Lake Database:
    • Only Default Gen2 Location which comes along with Synapse to Create Database
    • Uses Templates
    • You can create SQL like table , Add schema to Tables, Define Column Datatypes and Add Foreign to underlying Gen2 Storage
    • Can create above schema against CSV , Parq , json etc
  • Steps:
    • Open NoteBook
     %% sql
     Create Database DB;
     Use DB;
     create table T (id Int, name varchar);
     insert into T values(1,'A');
    
    • Open gen2 in Azure > create a folder "Customer" and upload csv file
    • Open synapse > Data > "+" > New Lake Database > Enter Name , linkedService ,above Folder path and Data Format > "+" to add table and Schema of Csv
    • Deploy (publish)
    • u shd be able to read with SQL / Notebooks
  1. SQL Database (Using SQL script )
    Operations :
    • read : OpenrowSet
    • write : CETAS , CET
create External table  T(c1 int) with(Datasource= xx , format='CSV', Bulk'path')  -- CET


create External table  T2 (c1,c2) as select * from T  ---CETAS ( Cannot alter External Table)

select * from openrowset(Datasource= xx , format='CSV', Bulk'path')

Example:

  • Open SQL script or Connect to Serverless using Dbeaver (Serverless endpoint in Azuer Portal > Synapse Overview Page)

  • run "create database db"

  • Go to SQL Script > Create new Script

  • run " use db"

      CREATE EXTERNAL DATA SOURCE ParentFolder WITH (LOCATION='https://path.dfs.core.windows.net');  
         CREATE EXTERNAL FILE FORMAT myFormat WITH (FORMAT_TYPE=DELTA);  
      select * from openrowset(bulk '/Country'' ,format= DELTA) as D 
    
  • This DB should be visible along with External Source and File Formats

1. Select from Table
IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = 'Test')
		BEGIN
    		CREATE DATABASE Test;
		CREATE EXTERNAL DATA SOURCE ParentFolder WITH (TYPE = HADOOP, LOCATION='https://path.dfs.core.windows.net/temp/');
		CREATE EXTERNAL FILE FORMAT PARQUET WITH (FORMAT_TYPE=PARQUET);
	End
Go;
use Test;
select * from openrowset(bulk 'subFOlder/1.csv, format='PARQUET' ,DATA_SOURCE = 'ParentFolder') as rows

Note: Support JSON , CSV etc.,

2. Create new Table from Existing Table in Blob (destination = CSV , PARQUET formats )
CREATE EXTERNAL TABLE T 
WITH (LOCATION='subFolder/',DATA_SOURCE=ParentFolder,FILE_FORMAT=myFormat) 
AS 
Select * from OPENROWSET (LOCATION='subFolder2/',DATA_SOURCE=ParentFolder,FILE_FORMAT=myFormat)  as rows --src
GO;

DROP EXTERNAL DATA SOURCE DS;
DROP EXTERNAL FILE FORMAT myFormat;
3. Create View (CVAS)
 CREATE VIEW V AS (select * from openrowset(bulk 'https://blob.parquet',format = 'parquet') 
 select * from v ;
4. Call SQL Script in Pipeline
1. To use in pipeline
- **Add Web Activity in Pipelin
- **Call the current Synape Workspace -GET https**://myworkspace.dev.azuresynapse.net./sqlScripts/{sqlScriptName}?api-version=2020-12-01
- **Drill down to Required SQL script
5.Cosmos
SELECT * FROM OPENROWSET('CosmosDB','Account=my-cosmos-db;Database=my-db;Key=k',[my-container]) AS T

Linked

  • When u create link service and publish it . The icon should appear here.
  • Only Connected Services like Containers (Gen2 / Blob) ,Files ,Queues,Tables ,Cosmos

Dedicated SQL (Compute Power + Storage Space) -

  1. Relational DB
  2. Uses Distribution to Store Data and. Data not stored in 1 Table ie., Control Node= Compute Node 1 + Compute Node 2 .....
  3. Greater than 1TB
  4. Max 60 Nodes
  5. DW5000c = 5000/500 = 10 nodes
  6. Unique Contraints not supported (ie., duplicates need to be removed by other logic)
  7. Can Create Table in SQL DB
  8. High Cost
  9. Main Concepts: (DW = Node1 (Dist1 =partition1 ,p2..)+ Node2 (Dist2 =partition1 ,p2..) + .... + Node 60
    • Data within each distribution is divided into partitions based on column eg: OrderDate.
    • Distibution (Distributed among nodes) :
      • Hash :
        • Data is distibute based on hash result
        • Large Tables
        • Better performance for joins uses hash col
      • Round Robin:
        • Small Table
        • Even Distribution
        • Bad performance for joins as no key col
      • Replicate : Clone data in all Nodes
    • Index
      • Non Clusterd Column : For Nvarchar(max) , varchar(max) ,Binary
      • Clusterd ColumnStore : Recommended
    • Partition : Data is distributed across many partition types
  10. Creating Tables
    • Normal create table t(i int) with (distribution= Hash(C), ColumnStore Index , Partition =C )
    • COPY into : COPY INTO T FROM "https"//xxx/*.parq WITH (FILE_TYPE = 'PARQUET' , MAXERRORS = 0, IDENTITY_INSERT = 'OFF' );
    • CTAS : Create table T With() As select * from T2
  11. Recommeded :
    • Insert : RoundRobin
    • Fact : Cluster Column Store + Hash Distribution
    • Dim : Replicated

NoteBooks link

df.createOrReplacetemView("T")
%%sql
select * from T

Delta Catalog Tables

df.write.format("delta").saveAsTable("MyManagedTable")
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")
spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'")
%%sql

CREATE TABLE MyExternalTable
USING DELTA
LOCATION '/mydata'

Delta

SELECT * FROM OPENROWSET( BULK  'https://mystore.dfs.core.windows.net/folder/',FORMAT = 'DELTA' ) AS T

Connection String from Linked Service

from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
accountKey = token_library.getConnectionStringAsMap("<LINKED SERVICE NAME>").get("<KEY NAME>")
print(accountKey)

Read and Write Cosmos

df= spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "nm")\
.option("spark.cosmos.container", "cont").load()

df.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

SQL Database (OnDemand) vs Dedicated SQL ( Relation DB)

                                Dedicated   Serverless
1. Select                       yes             Yes
2. insert,update,delete,merge   yes             No
3. CTAS                         yes             No
4. CETAS                        No             yes
5. Cost                         high            low
6. Strategy(hash/RoundRobin.)   yes             No
7. Provisioned                  Seperatly       by Default
8. Volume of Data               High            low
9. Data Stored                  Relational DB   Blog/Gen2

External Table and Views
Both are just pointers to a location in the datalake / Blob (Parquet, JSON, Blob Storage)

                    Exteranl Tables Vs Views
1. Define Schema    Yes                 No
2. Indexing         Yes                 No
3. Performance      Fast                Slow

READ VIEW from NoteBooks

token=accessToken = mssparkutils.credentials.getToken("key")
df=readTableViews("DW")
def readServerlessTableViews(token):
    df = (spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", "jdbc:sqlserver://xxx-syn-01-ondemand.sql.azuresynapse.net:1433;databaseName=db;") \
    .option("dbtable", "myschema.viewName") \
    .option("accessToken", token) \
    .load())
    return df

Dataflow

DataFlow Ref

  • Linked Service : Service to Connect to different DataSources (External And Internal)
  • Integrated Runtime : Compute which executes the DataFlow
  • Source(Linked Service) > Operation > Sink(Linked Service)

DataSet ( Connection to Source)

  • All parameters created in Dataset can be accessed in Dataflow at Debug Settings
  • The same parameters can be used at pipeline level on the Dataflow added
    • DataFlow > New DataSet > Parameters >New > test,string
    • Access : dataset().test
    • example: @concat('/',dataset().test,'/',substring(utcNow(),0,10 ),'.parquet') Note:
  • Dataflow can create DB table on fly.
  • CDC (Change Data Capture)
    • To handle delta records ( all new records including newly added duplicate records)
    • Available for SQL , Azure Dedicated , Gen2
    • Checkpoint Key (Unique marker to identlify last succesfully processed record)
      • Create a Dataflow and Enable CDC
      • Add Dataflow to pipleines
      • In pipleine >Select Dataflow> Settings > Checkpointkey
    • Disadv: If Delta set to ON on and if user wants to perform full load inbetween . User has to make changes to the flow and publish . [ref](https://learn.microsoft.com/en-us/answers/questions/838041/how-to-reset-the-change-data-capture-information-()

Data Flow Activities:

  • Uses Spark Engine

  • Src > Transform > Sink

  • Any number for Sources

  • Different Activities :

      Multiple Inputs:
      	- Source: spark.read
      	- New Branch: Clone  Data into new branch
      	- Join : Normal join (Use exists for left outer join )
      	- Conditional Split: Split records based on condition
      	- Exists: Compare two tables (innner join or left outer join)
      	- Union : Union /Append Files
      	- LookUp:  
      		- Left Outerjoin with 2 tables
      		- Verify Data between 2 tables 
      		
      Schema Modifier (x9)
      	 - DerivedColumn:  WithColumn , (Create Heirarchy for complex JSON)
      	 - Select:         Select (prune)/ WithColumnRenamed / Remove Columns
      	 - Aggregate:      GroupBY / All functions ( max ,sum , etc.,)
      	(User can give values to Agg and GroupBY in Same Activity to apply group level filter)
      	 - Surrogate Key:  Add unique primary Key
      	 - Pivot : Create pivot table
      	 - Unpivot: Uncreate pivot table
      	 - Window: 
      		- Without order(max(col),min(col) etc., ):Eg max('col).Over(Window.Partition('groupby_col))
      		- With Order(rank ,denserank :Eg rank.Over(Window.Partition('groupby_col).orderBy('sortCol))
      	  - Rank: 
      		- Used Single : Entire Dataframe is ranked ( Dense = serially with no gaps)
      		- Row Column : Name of new Column
      		- Row Condition : Existing Col - Asc / Desc
      	  - External Call: rest api
      	  
      Formatters
      	- Flatten: Json with Array ONLY Eg : {"A" : [1,2,3]}
      	- Parse: String > Json
      	- Stringify: Get particular sub-json from main json eg json > Table Cols
      
      Row modifier
      	- Filter:  Filter / Where 
      	- Sort: sort
      	- Alter Row (DB only) : Update DB, With+When , Drop , Fill , parameteried 
      	
      Delete  (on real time only)
      	- Assert: Assert + Derived Column ( isError ('col_name working'))
      
      Flowlet
      
      Destination(Sink)          
       	- spark.write
      	- Cached Sink (temp loc): Access "sink_name#outputs()[RowNo].ColumnName"  
      	- Cached Sink - Convert Data to StringDatatype  to avoid error - "undefined" is not valid JSON Eg : toString(Col1)
      	- cached Lookup (Dict): Sink> Settings> Add Key Column.Access "sink_name#outputs(columnname)" , "sink1#outputs()[1].MAX_UPDATE_TS"
      
      Misc
      	- WildCard:Normal Regex
      	- $$: All Columns
      	- $parameter: Access parameter created within same dataflow activity
      	- String Interpolaton : "folder/file-{$n}.csv" #n is a parameter
      	- ByName('city'): Any column whose name is city
    

Dataflow Script Representation :

Dataflow Transformations

Derived Columns :

  • $$ (this) : All Cols

  • $0 current column name

  • name Eg: name!="abc"

  • type Eg: type != int

  • stream represents the name associated with each stream, or transformation in your flow

  • position is the ordinal position of columns in your data flow

  • origin is the transformation where a column originated or was last updated

    1. trim any col which is a String(Pattern): - Derived Col > Add > Col Pattern > type == 'string',$$ -> trim($$) - Auto mapping on
    2. Remove special character from all columns
      • Add Select
      • Add Mapping > Rule based Mapping
      • true() , regexReplace($$,'[^0-9a-zA-Z]','')
      • or true() , regexReplace('Column','[^0-9a-zA-Z]','')
    3. String Column to Date
      • Derived COlumn
      • toDate(byName('Date'),'mm/dd/yyyy')
    4. Metadata of Column inside Dataflow
      • Derived Col > col ,Columns()
      • Flatten > unroll by col > col
    5. Calculate Column Pattern (apply logic for data in mutliple columns)
      • Derived Column >Columns >Add >Column PAttern type =='string', $$+'_clean', trim($$,'')
    6. Max : Agg > max(col)

Dataflow Json

1. Table to Json 
   - Read Source (import projection)
   - Derived Col > Create Structure 
   - Save to Json Sink
2. Handling json :
		- Parse
		- Flatten
		- Stringyfy
		- Derived Cols : Create Complex Structure
3. Stringify 
	- get particular sub-json from main json eg json > Col to Table 
	- DataFlow >REST APi Source (url="https://api.first.org/data/v1/countries" , Anonymous Request) or        https://restcountries.com/v3.1/name/united
	- Source > Projection > Import projection
	- Add Stringyfy>Cols>newcol , Expr >Under Input Schema> Select Structure to Flatten
	- Add Select Statment > filter the new_col
	- Select new column 
	- Sink
4. Parse (JSON in single Column in a table) :
	val j = List(("IN",1),("US",2)).toDF("country","medals")
	.withColumn("j", to_json(struct('country,'medals)))
        +-------+------+--------------------+
        |country|medals|                   j|
        +-------+------+--------------------+
        |     IN|     1|{"country":"IN","...|
	- Make sure the Source > projection > Import Schema
	- Add Parse
		- Document Form = Single Document
		- Columns > new col , j , (country as string,		medals as integer)
5. Flatten :
	 - Flattens  Json with Array ONLY
	 - If json has array {"A" : [1,2,3]}

Pipeline

  • Linked Service : Service to Connect to different DataSources (External And Internal)
  • Activity : Copy , Dataflow , Notebooks ,SQL etc.,
  • Private Endpoint : Endpoint with in Private Virtual network
  • Integrated Runtime : Compute which executes the DataFlow
    • Azure IR : in cloud-to-cloud scenarios.
    • Self hosted IR : to connect to on-premises data sources or services.
    • SQL Server IR : to connect to SQL Server databases in a private network.
  • Can run activities in parallel
  • Throttling under Integration Runtime 'AutoResolveIntegrationRuntime' : There is a limit of simultaneous pipelines in an integration runtime. You need to split the pipeline to run into multiple runtimes.
  • Triggers : used to start the pipeline
    • Manual
    • Tumbling Window :
      • Run Continously after finishing
      • Delays and concurrency can be set
      • Auto Retry in case of Fail
    • Event : storage Events etc.,
    • Scheduled Triggers Note:
  1. Pipeline and Synapse Live uses functions from ARM templates .
  2. Hence sync master branch with ARM template update ARM template

Activity 1 : Print statement alternate :

1. create a variable
2. Set the variable in the pipeline
3. Check the contents of variable after the run 

Activity 2 : Protoype pipeline

1. Use wait statements 
2. Set Variable 

Activity 3 : Token Refresh logic

  1. generate token 1st time
  2. Parallel Connection to
    1. until loop(flag is true)
      1. Wait
      2. Keep generating new token
    2. Copy Activity(uses above token)
      1. fail : Set flag = true
      2. Pass : Set flag = true

Activity 4: Run SQL Script saved in Datatab

  1. Web Activity > Url :exampleWorkspace.dev.azuresynapse.net/sqlScripts/exampleSqlScript?api-version=2020-12-01, Method: GET , Auth: Managed Identity , Resrc : https://devsynapse .net

HTAP (Hybrid Transactional and Analytical Processing Patterns)

  • Data Sync bw Src (SQL server , Azure SQL database )and Target - Dataverse(Datalakes) , Azure SQL Dedicated and Cosmos targets

  • Src > Synapse > Target

  • Cosmos DB (OLTP):

    • No SQL
    • Type of Cosmos DB can be selected depending on Source (SQL ,Mongo etc., )
    • Fast Read and Write
    • Auto Scales
    • Uses API
    • Expensive
    • To Enable HTAP (On Demand - video8):
      • Cosmose Homepage : To turn on Analytical Contianer in Cosmos
      • In Synapse: Homepage > Integration > Enable Synapse link
    • Synapse read cosmos: Select * from openrowset() with()as T CROSS APPLY OPENJSON(C) with()
  • SQL server

    • Use managed Identnity
    • Azure Synapse Link enabled
    • Add ips to Firewall
    • Create linked Service + Linked CONNECTION in Synapse(Piplnes > "+")
    • Consfiure Blob Storage for intermiitend result

Deployment Strategy [GIT]

Summary

  • Publish branch : ARM templates are stored and updated
  • Collaboration Branch = Master Branch.
  • Feature Branch = Individual Branch created by users.
  • Root Folder = root folder in your Azure Repos collaboration branch.
  • Pull Request =Request merge from Feature Branch > Master Branch.
  • commit all = Save All Changes to Git.
  • Publish Should only be allowed from master Branch.
  • Code is merged from Master Branch to Synapse Services. (Git Actions is configured to run pull)
  • Publich > ARM template gets created.
  • Once master gets published , the code is available in Synapse-live and workspace_publish branch get created.
  • ARM Template = Azure Resource Management Template. Note : ARM templates from workspace_publish branch is not used.

Whenever we click on button publish (master branch) . Synapse creates 2 jsons

  1. TemplateForWorkspace :
    • All work (pipelines, Notebooks, DataFlows etc., ) in 1 single file
    • This file does not contain parameters
    • It contains logics only
  2. TemplateParametersForWorkspace : Contains only Parameters

For Successful Deployment we need to :

  1. Move TemplateForWorkspace to higher Environment as is
  2. Replace "TemplateParametersForWorkspace" with values as per the higher Env

Git (parameter replace for all artifacts)

  1. When Code is merged from Master Branch to Synapse Services.

  2. This will trigger function in GitActions ( Azure/Synapse-workspace-deployment@V1.7.0 ) - which is configured in CICD

     	uses:  Azure/Synapse-workspace-deployment@V1.7.0
     	with:
     		TargetWorkspaceName:  ${{ env.SN_WORKSPACE }}  # Dev [Live]
     		ArtifactsFolder:  ${{ github.workspace }}
     		environment:  Azure Public
     		clientId:  ${{ env.CLIENT_ID }}
     		clientSecret:  ${{ env.CLIENT_SECRET }}
     		tenantId:  ${{ env.TENANT_ID }}
     		resourceGroup:  ${{ env.RESOURCEGROUP }}
     		subscriptionId:  ${{ env.SUBSCRIPTIONID }}
     		DeleteArtifactsNotInTemplate:  true
     		operation:  validateDeploy
    
  3. Two ARM template (Azure Resource Management Template) gets created.

    • TemplateParametersForWorkspace.json
    • TemplateForWorkspace.json
    • Note: ARM templates from workspace_publish branch is not used.
  4. Users Need to add 2 custom files at the root:

    • {{ synapse_workspace }}-parameteres.yaml : Value of Each Property Eg: linkedservice

    • template-parameter-definition.json :

    • Synapse will read this file and use its configuration to generate which properties get parameterized.

    • If no file is found, the default template is used.

    • file consists of [trigger, pipeline, linked service, dataset, integration runtime, and data flow]

    • "=" is current value ,"-" is don't keep the default .

    • Syntax : <action>:<name>:<stype>

           "Microsoft.Synapse/workspaces/linkedServices": {
               "*": {
                   "properties": {
                       "typeProperties": {
                           "baseUrl": "="
                       }
                   }
               }
           }
      

    If u have linked service , "parameteres.yaml" will replace the values with values in this yml and in the structure as "template-parameter-definition.json"

  5. Above Function will Deploy Synapse artifacts using templates to given env

  6. You can get this file from the link.

To Change artifacts of properties , A custom parameter template can be used {{ workspace }}-parameteres.yaml. To override the default parameter template, a custom parameter template named template-parameters-definition.json should be placed in the root folder of the Git branch.

Points to Remember in Synapse - Git

  • Collaboration Branch = Master Branch.
  • Feature Branch = Individual Branch created by users.
  • Root Folder = root folder in your Azure Repos collaboration branch.
  • Pull Request =Request merge from Feature Branch > Master Branch.
  • commit all = Save All Changes to Git.
  • Publish Should only be allowed from master Branch.
  • Code is merged from Master Branch to Synapse Services. (Git Actions is configured to run pull)
  • ARM template gets created.
  • Once master gets published , the code is available in Synapse-live and workspace_publish branch get created.
  • ARM Template = Azure Resource Management Template. Note : ARM templates from workspace_publish branch is not used.

Questions

NoteBooks Questions :

- Whats is the Use of temp tables ?  to reference data across languages
- How to reference other Notebook  ? `%run /<path>/Notebook1 { "parameterInt": 1}
- How Return Values from NotePad ?
	from notebookutils import mssparkutils		
	mssparkutils.notebook.exit("hi")
- To pass external parameters from pipelines to Notebook ?
  - Create a variable in Notebook Eg:input=""
  - Convert it into parameters ie., hover over >> ck on "..." > Toggle parameters
  - Create a new Pipeline
  - Add the notebook into pipeline
  - In Pipeline ,select Notebook > settings >Add notebook> baseparamters>"input"- string 
  - Click outside of Noteboook > variables > "input" - string
  - click on Notebook in pipeline > baseparamters > "input"  ,@variables('input')
- How to Read a CSV from Gen2 ? 
	df= spark.read.option('header','true').option('delimiter',',').csv('abfss://1.csv')	
- What are Magic Commands ?
	- Line Magic(%) - same line only Eg: ```python y = 2 if x < 3 else 4```
	- Cell Magic(%%) - entie cell Eg:
			%%timeit
			if x < 3:y=2
			else:y=4
	- How is Spark session configuration done magic command ?
     %%configure 
		{"driverMemory":"28g","driverCores":4,"executorMemory":"28g","executorCores":4
			,"jars":["abfs[s]://blob/myjar1.jar"]}
- How to Reference unpublished notebook ? Check box option on Notebook Settings
- How Python logging in Notebook ? 
	import logging
	defaultLogger = logging.getLogger('default') 
	defaultLogger.debug("default debug message")

- File operations ?mssparkutils.fs.ls('path') #head,append,ls,cp,mv.mkdirs,rm
- read parq ? ```spark.read.parquet('abfss://parquet@deltaformatdemostorage.dfs.core.windows.net/employees')```

Dataflow Questions

1. StringInterpolation (parameter inside string)? 
	"folder/file-{$n}.csv" #n is a parameter
	"My age is :{toInteger(col)}"
2. Parameters Wildcard Dataflow ?
	-dataflow add parameter > add "path"
	-Source > Source Options > Wildcard paths > 'folder/'+$path+'*.csv'
3. Send whole Col itself as parameter?
	-Dataflow > Parameter > data , string, "test"       # this is data
	-Dataflow > Parameter > varcol , double, colname    # Dynamic join
4. schema Drift (Source) ?
	 Enable when column changes in a Schema
5. BroadCast Join (Join) ?
	When whole Table can be accomodated in memory 
6. DataFlow Assert ?
	toInteger($col_name) == 1000
7. How to "select max(revenue),* from T"?  (Self Join)  
	- Src > Projection > Import Projection
	- New branch > Aggregate  > Add : maxcolumn ,max(revenue)
	-  Inner Join (main branch) when revenue = max(revenue)
8. Select max(column) from table
	- Src > Projection > Import Projection
	- New branch > Aggregate  > Add : countcolumn ,count(col_name)
9. Find Surrogate Key
	[Ref](https://www.youtube.com/watch?v=9U-0VPU2ZPU)
	- Lookup (does left outer join of different source )
	- Lookup + Derived Column(isMatch) + Conditional Split (isMatch=True/false)+ others
	- Lookup : the result of each row is stored in new column
	- Derived Column =isMatch() 
	- Agg = first
10. Cast all Cols to Integer ?
	- Select > Regex > 1==1 , $$ , toInteger($$)
11. LAD
	- Cached lookup -https://www.youtube.com/watch?v=HqCn42FaRJs)
	- Add Cache type Sink as "cachedata"
	- to call the cache : cachedata#outputs()[1].columnname
12. Multiple Sinks? Dataflow >Settings > Set order
13. Accesing sink output in Pipeline? @string(activity("name").output.runStatus.output.sink_name.value[0].colname)
14. Canonical Model (Flexible Schema)?
	Derived Col>RuleBased> locate('colname',lower('name')) !=0 -> colname
15. Selection of group of Columns from Schema? Select >type!='string' && name != 'id',$$+'_notstring'
16. Dynamic Values / Parameterization[ref](https://www.youtube.com/watch?v=CMOPPie9bXM)
	- if Default parameter is not set , the user will be forced to enter the parameter value at the start
	- Only parameters declared/created can be used 
	- Any parameter created on : 
		- Dataset will be available on Dataflow (in Debug Settings)
		- Create at Dataset ,Dataflow will be available in pipeline by clicking on the Dataflow object
17. Dataflow Parameterisation ?
	- Creation : Parameters >dataflow_name,String,defaultvalue = Blank
	- Access: $dataflow_name
	- Sending Value: 
		1. Add to a Dataflow
		2. Debug Settings > Parameters > Dataset Paramters >dataset_name as value
18. Passing parameters from Dataset >DataFlow>  Pipeline
	- If u already have set parameter in Dataset and used in Dataflow. 
	- U can use by : pipeline > Select Dataflow> Settings > u should be able to see "dataset_name" under 
19. Row to Columns (unpivot) : Unpivot > Ungroup > Unpivot Colum
20. Assert 
	- Expects true,Expects false,Expects Unique,Expects exists(col to col matching)
	- description :toString(col_name) +'working'
	- filter : col_name == 1000
	- Assert + Derived Column =( isError ('col_name working'))
	- Assert + Split =  (hasError())
21. External Call 
	 - loop the rest api for the given input -https://www.youtube.com/watch?v=dIMfbwX8r0A)
	 - source (List)
	 - Add a derived Column to construct a resource using List
	 - add external call 
		- linked Service : GET , Anonymous , the Url will be overrided by below
		- method : GET ,previousActivity
		- Relative Url : Endpoint
		- Row Relative url : resource (constructed above)
22. Create Heirarachy
	- Add Source (input a csv (flat file) )
	- Add Derived Column  > Exp Builder > Create Column > Add SubColumn Window
	- Over (Group Column) : c2
	- sort : sort column  (Window Column is arranged : rank , dense_rank ,row_number etc.,)
	- range : 
	- Window : max(col), denserank(),rank()
23. Create variable inside an Activity in Dataflow:
	- Derived col > locals
	or
	- Cache > WriteTo Activity > use in another activity
	- Dynamic Expression > Create varible > var1 ,trim(right(col,6))
	- Accessing it ":var1"
24. Join Scheme drift Table (schema less table ) with table with schema
   - Src1 (schema Drifted , allow schema drift , infer drifted col) > Derived Col: col_tojoin, toInteger(byName('col_tojoin'))
   - Src2 (Scheme Table)
   - join : src1 ,src2 on col_tojoin
   - Sink (allow schema drift turned ON)
25. remove spaces from all col names? replace($$,' ')
26. Add schema to Schema less src? Src > Data Preview > Map Drifted Column
27. Specific Partition Files?
  - Dataset : Target = Container base path only
  - Src > Src Options > 
	  - Wild Card = top_folder/""/""/*.parquet
	  - Partition root path = top_folder
	  - column to store file name = filtercolumn
28. Saving Data into specific location based on input
	- Create a Column in transformation which contains Target
	     - Derived col > month , split(col,"-")/[2]
	- Sink > Settings > Filename Option (default : generic name)
	- Sink > Optimize > Set Partioning
	- Sink > Optimize > Key > UNique value per partition > Key Col = month
29. Pipeline Trigger:
		Schedule Pipeline
		cron Job pipeline etc.,
30. Save Partition files with Name:
		- Sink > Settings > File name option > Pattern = movies[n].parquet
		- Optimize >Set Partioning , Partion TYpe =Round Robin ,10         
31. Create Complex Datatype (subcols , keyvalue ,array):
		1. Derived Col
		   - all subcols to Main Cols
		   - access > Derived Col >  col.subcol
		2. New Col > ['key1' -> col1 , 'key2' -> col2]
		3. split(Name, ' ')
32. Coalesce and FileName
		- Sink >Setting > Pattern / Single Fle > movies[n].csv
		- Sink >Optimize > Set Partitioning > Hash > nos=5 , col =primarykey
33. Partitioning? Sink >Optimize > Set Partitioning > Key > col = "year"
34. Calculate Fuzzy:
	- 2 sources
	- join > soundex(name) == soundex(name2)

35. Deduplication / Distinct Rows  	
   - Method 1
       - Aggregate > GroupBy > sha2(256,columns()) , mycols
       - Aggregate > Aggregate >Each Col matches= true() , $$ , first($$)
   - Method 2
       - Aggregate > GroupBy > sha2(256,col1,col2,soundex(col2))
       - Aggregate > Aggregate > Each Col matches= true() , $$ , first($$)
   - Method 3
       - https://docs.microsoft.com/en-us/azure/data-factory/data-flow-script
       - https://www.youtube.com/watch?v=QOi26ETtPTw
36. Delta Loading using SQl Change Tracking Technology
	- https://www.youtube.com/watch?v=IN-4v0e7UIs
	- https://docs.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-change-tracking-feature-portal
37. Distinct
   - GroupBY (Aggregate) > Group By target_column 
   - Aggregates > Column pattern>name != 'target col',$$ ,first(ss)
   (select all columns except target column)
38. Row Count on a Table
	Aggregate > GroupBY (blank) , Aggregates = colname , count(1)
39. Routing /Saving Data to Different Target in Same DF:
        - Contitional Split
        - Using Parameter
        - New Column in Run Time
40. Start SurrogateKey from next Value (https://www.youtube.com/watch?v=tc283k8CWh8)
	Method 1:
	- Dim > Agg> groupBy blank ,Agg=max(SurroagteKey) 
	- Src > Add Surrogate Key from One > Cross join with Dim Agg > key+max(SurroagteKey) 
	Method 2:
	- Dim > Agg> groupBy blank ,Agg=max(SurroagteKey)  > Sink the Max(value)
	- Src > Add Surrogate Key from One > Derived Column > surrogateKey + sink

41. Group players into each array in each row by team name (collect_list)
   - GroupBY (Aggregate ) > Groupby Team name > Agg : collect(player)
42. Datadriven Framework:
	- Create a parameterised dataflow  (ie., Dataflow which needs parameters)
	- Create a pipeline : Lookup > Foreach > above Dataflow
	- Lookup = csv file
	- Foreach: Settings >Sequential , @activity('Lookup1').output.value
	- Inside Foreach >Parameter= pipeline Expression ,@item().columnNames
	- Run Debug
	- If error: ck Pipeline screen > On output tab >  ck on see error on failure to debug
43. WARM Cluster Dataflow (to track performance)?
	-Abstract the starting of Cluster with dummy Dataflow
        ie., add a dummy dataflow at the beginning of target dataflow
	-Example: src > filter (1==2) > sink
44. SCD Type -1
	- Src > Alter Row > Sink
	- Alter Row > Upsert if
45. Create Datalake
	- Sink
	- Sink Type > Inline > Select Inline Dataset Type >  Delta
46. Replace Special Characters
	- Select > true() ,regexReplace($$,'[^a-zA-Z]','')
47. Hashing
	
	sha2(256,columns())
	or
	parameter = [c1,c2]
	sha2(256,parameter)
48. How to left outer join? Use exists
49. Remove Column 
	- Select Activity  
	- source :   name != 'Row_Number' && name != 'null' && left(name,2) != '_c'
	- and this for the output column name: $$
50: Cache Sink ("undefined" is not valid JSON)
	- Make sure the Data ur trying to save is converted to correct Data Type
	- Eg : toString(Col1)

Excercises:

  1. Find Max Row
  2. Find Distinct Rows
  3. Group by Country
  4. Parameters (To save data into file is blog with name given as paramter in pipeline)
  5. Creating Variables
  6. Regex / WildCard
  7. Save file as abcd_timestamp.parquet
  8. variable local inside Derived Column only
  9. Create Datadriven Framework ( Data is coming from csv /json into dataflow)
  10. Send function as parameter into Dataflow

PipeLine Questions

  1. Pipeline Expression should start with ? "@"
  2. Parameters in pipeline?
    • constant
    • Create = Pipeline Window > Parameters > New "param"
    • Access= @pipeline().parameters.param
  3. Access output of any activityin Pipeline ? @activity('name').output:
  4. Variables in pipeline?
    • modified
    • Create = Pipeline Window > Variabes > New "var"
    • Access= @variables('var')
  5. Access notebook return data in pipeline?
    • @activity('notebookname').output.status.Output.result.exitValue
    • How to get count of records Ingested after ETL ? @activity('name').output
  6. How Logging (Pipeline)?
    • capture the json from one activity into another
    • Dataflow (log) = Create simple csv with data "1" (dummy source)
    • Dataflow > Dataflow (log)> parameter =@activity('name').output.runStatus.metrics
    • Logging functionality directly available in Copy job
  7. How Dynamic Run time Resource Allocation (pipeline) ?
    • Pipeline > Dataflow > Settings > Core Count = @if( greater(activity('Name').output.size , 400000) ,128)
  8. Use of Lookup ??
    • Input= any data sources

    • output = single value /array

    • run Queries / SPROC

    • SELECT * FROM @{pipeline().parameters.T}

        Pipeline Lookup Example:
        	- pipeline>lookup > "Select col1 from table"
        	- Foreach > @activity('lookup').output.value
        	- Add Dataflow inside Foreach
        	- src (src dataset is parameterized)> Settings > @item().col1
      
  9. Lookup vs Script Acivity ?
    • Lookup :returns the result of executing a query or stored procedure.
    • Script: can only execute Data Manipulation Language (DML- INSERT, UPDATE, DELETE and SELECT) and Data Definition Language (DDL - CREATE, ALTER and DROP)
  10. Which Activity to Access Folder Structure in gen 2 ? getMetadata
  11. Run SQL scripts (Develop > SQL Script)are Disconnected
    • Option 1 : Add Script Activity and Directly add the SQL Code
    • Option 2 (Call the Current Synapse using REST to get all artifacts ) :
      • Save a SQL Script in Develop
      • url : https://.dev.azuresynapse.net
      • Resource : https://dev.azuresynapse.net/
      • GET , System-Managed-Identtity
      • Then Navigate in the Response Result to ge tSQL
view raw Synapse.md hosted with ❤ by GitHub

Thursday, May 26, 2022

ETL


ETL

BigData:

  • OLTP (online Transactional processing)
  • OLAP(online analytical processing)

Data processing systems

  • OLTP (OnLine Transaction Processing) : is used for managing current day to day data information.
    • ACID
      • Atomicity(entire transaction happens at once or nothing happens)
      • Consistency
      • Isolation
      • Durability (persisted)
  • OLAP (OnLine Analytical Processing): is used to maintain the past history of data and mainly used for data analysis, it can also be referred to as warehouse.

DataLake

  • large amounts of Data
  • structured, semi-structured, and unstructured data.
  • original/ raw format.
  • Any Formats - JSON, BSON, CSV, TSV, Avro, ORC, and Parquet etc.,
  • cost-effective way
  • No Transformation

Delta / WaterMarking ( water level stick in a dam or a river,)

  1. Timestamp-based Approach: Source shd have timestamp column a. Current TimeStamp

    • If current ts = 24-07-2024-01:01:01:0001
    • Next Run Select * from T where t > 24-07-2024-01:01:01:0001

    b. Source Time Stamp Based

    • Save the max timestamp from source column after each run .
    • in subsequent runs Select * from T where ts > max(saved_value)
  2. Hash-based Approach:

 hashbytes('sha256',concat_ws('',src.c1,src.c2)) as h1,
 hashbytes('sha256',concat_ws('',dest.c1,dest.c2)) as h2
from src,dest where h1!=h2
  1. Primary Key-based Approach: Comparing the primary keys Src and Target

  2. Change Tracking Technology : MS SQL and SQL server

  3. Change Data Capture (CDC) - Azure Synapse and DataFactory only:

    • One Checkbox to handle delta
    • Native to Azure
    • Azure to Blob, Azure Gen2 , Cosmos , SQL DB , SAP , PostGres
    • Reverting to Delta is Difficult

    ALTER DATABASE DB
    SET CHANGE_TRACKING = ON


Datawarehouse (Summary, Types, NF, Facts and Dim,Star vs SnowFlake, Key Types, LAD, Handle LAD)

  • structured information from various srcs.
  • Data is transformed
  • has schema
  • Use RDBMS
  • Used for reporting
  • Facts and Dim
  • Data is Normalised

Datamart

  • similar to a data warehouse.
  • Cateres for specific business unit Eg: Finance Dept

Database Normalization

Used to reduce redundancy from the database table.

1NF(atomicity):

Each attribute holds a single value.

Name,Age,Dept <- BEFORE  
Melvin,32,(Marketing, Sales)

Name,Age,Dept <- AFTER  
Melvin,32,Marketing  
Melvin,32,Sales  

2NF (2 natural keys ):

  • After 1NF

  • No Partial Dependencies.(All cols not depending on Natural Key)

      Subject |Teacher ID|Teacher Age  <- BEFORE
    
      Teacher ID|Teacher Age  <- AFTER
      Subject |Teacher ID
    

3NF (non key cols depending on other non key cols)

  • After 2NF

  • A -> B and B -> C , A-> C

  • Primary Key: EmpID

  • Non-key attributes: Name, State, Country, ZIP

    EmpID| Name| State| Country| ZIP
    
  • ZIP -- >ID , State and Country --> ZIP

    ID,NAME,ZIP
    ZIP,STATE,Country

Dimensions (Object)

  • Eg: a seller, a customer, a date, a price , Product
  • Contains Primary keys , Surrogate Keys
  • Dim = Person Details , Fact = Transaction done by the Person

Facts (Action)

  • Eg: a sale, a transaction, an access
  • contains foreign keys from Dimension

Grain:

  • Lowest level of measurement in the Fact table.

Dimension Modeling

Star Scheme :

  • Fact table uses Foreign Key from Dim Tables
  • Schema looks like a star , which 1 fact table connected to multiple dimension table

Snow Flake Scheme

  • Is an extension of star Scheme
  • multidimensional model
  • dimension tables is further sub-dimensioned

6 types of Dimensions :

  • Role Play Dim : Eg- "Date" dim can be used for "Date of Sale", "Date of Delivery","DOB"
  • Junk dimension : Table with unrelated attributes to avoid large number of foreign keys in the fact table.
  • Degenerate dimensions: Dimension attributes stored as part of fact table and not in a separate dimension table. Eg: "transaction number"
  • Stacked dimension : two or more dimensions are combined
  • Slowly Changing Dimension : dimension that would undergo changes over time
  • Slowly Growing Dimension : growth of records/elements in the dimension.
  • Conformed Dimension (Source of Truth ): is shared across multiple data mart or subject area
  • Reference Dimension: Used to joined indirectly to the fact table through a key in another dimension table.
  • Static Dimension :It not extracted from the original data source, but are created within the context of the data warehouse.

Types of Slowly Changing Dimension

Type 0 : No changes are entertained

INSERT INTO dim
SELECT * FROM src
WHERE NOT EXISTS (SELECT * FROM dim WHERE dim.id = src.id)

Type 1 : Direct Update Old Data (Upsert Original Data)

SK,ID,Name,Cellphone. <- Before
100,1,ABC,1234567890

SK,ID,Name,Cellphone <- After
100,1,ABC,9876543210

UPDATE dim
SET addr = src.addr FROM src WHERE dim.addr = src.addr;

Type 2 : Flag Old Data as 0 , Insert new Data with 1(Flag)

SK,ID,Name,Cellphone,FROM_DT,TO_DT,flag
100,1,ABC,1234567890,2016-06-01,2016-06-10,0
100,1,ABC,9876543210,2016-06-10,NULL,1

INSERT INTO dim
(SELECT * FROM src
JOIN dim
ON src.id = dim.id AND src.addr <> dim.addr)
Type 3 : Keeps history by adding new Column

SK,ID,Name,old_number,new_number
100,1,ABC,1234567890,9876543210

Type 4 : Uses separate history table

SK,ID,Name,Cellphone
100,1,ABC,9876543210

History Table
SK,ID,Name,Cellphone,CRT_DT
100,1,ABC,1234567890, 2016-06-01
100,1,ABC,9876543210, 2016-06-10

Type 6 :Combination of type 1, 2 and 3

  • New Row (Like type 2)
  • Each Active rows can be indicated with a boolean flag / start and end date.
  • using additional attributes or columns within the same record instead of creating new records

Eg: a new table record is added as in SCD type 2. Overwrite the old information with the new data as in type 1. And preserve history in a historical_record as in type 3.

**ID,Name,new_number,old_number,EFF_FR_DT,EFF_TO_DT,flag
**1,ABC,9876543210,2345678901, 2015-01-01,2016-01-20,0
1,ABC,9876543210,1234567890, 2016-01-21,Null,1

Surrogate Key vs Natural Key vs Primary Key :

Natural Key

  • Appears Naturally Eg: Registration Number ,Country Code
  • 1 or more columns needs to combined to make it unique
  • May or may not have a starting value
  • Can be Null

Surrogate Key

  • Surrogate Key is artificially generated and uniquely identifies each row in the dimension table
  • Commonly 1 Column and starts with Integer "1"
  • unique
  • local only to DataWarehouse , DataLake
  • Created using Natural Key
  • Cannot be Null
  • Skey
df\
.withColumn("sha2",sha2(concat("nat_key1","_","nat_key2"),256))\
.distinct()\
.orderBy(col("sha2").asc())\
.withColumn("skey",monotonically_increasing_id)
  • Use "ReIDL" logic, to maintain Skey and NaturalKey relation even during full load . Primary Key
  • 1 Column
  • May or may not be an Integer Column
  • Generated in RDBMS Database
  • Not Null

Foreign Key

  • Filed that references PRIMARY KEY / SURROGATE KEY
  • Used in 2NF Normalization Logic

Create Skey in Dim

  • withColumn ('pk' ,monotonically_increasing_id())
  • create table T (id int Identity (1,1) )
  • Add 2 extra Row :
    • -9 as 'NULL' : when joined with fact (after replacing NUll in fact with 'NULL') ,all null values will have -9
    • -1 as NULL (Orphan Record) : When joined with fact (NULL) , orphan record will have -1

Create Fact Relation

Note : -1 = Orphan Records , -9 for Null

  • Replace Null with 'NULL' for joining column
  • Left outer Join with (Select Skey and NaturalKey from DIm)
  • Replace null with -1 [U can also do anti join]

Late Arriving Dimension / Early Arriving Fact (LAD)

In data warehouse ETL dimensions processed and loaded before Facts

Eg : In ECommerce Website Customers can place an order as a guest.So the Fact record gets created without Foreign Key from Customer Table.

Design Approaches for LAD

Never Process Fact : never process fact record until dimension record is available.

id,Name
1,Sam
2,Jack

CustomerID,Item
1,Candy
3,Bread <- X

Park and Retry : insert the unmatched transactional record into a landing / another Fact table and retry in next batch process.

id,Name
1,Sam
2,Jack

CustomerID,Item
1,Candy

CustomerID,Item <-Landing Table
3,Bread

Inferred Flag

srcDf = select * from src where ts > delta_ts
dimDF = spark.read.option("path","dimDFPath").load()

srcDF.selectExpr("id as sID").distinct()\
.join(dimDF,dimDF.id=srcDF.sID,"left")\
.filter("id is null")
.withColumn("inferredFlag" , lit(True))
.write
.format("parquet")
.option("path","dimDFPath")
.save()

srcDF.join(
        dimDF.select("id","sKey")
        ,srcDF.id=dimDF.id
        ,"left").drop("id").save("factPath")
  • When actual dimension record available, columns marked as N/A will be updated with new data received and InferredFlag will be updated as False

Design for ETL

ETL with 3 Zones : Raw(Data Lake) , Enriched (Delta Lake )and Curated (Data Warehouse)

  1. Raw zone

    • Data Lake
    • Blob Storage Eg: Azure Gen2 , IBM cloud Object Storage
    • Data is moved from source
    • Every Load in new timestamp folder
    • InDependent ( does not depend on last load type / Data)
    • Immutable
    • Delta Load / Full Load
    • Duplicates can Exists ( Eg: If Source is Type 1 and records are deleted . Parquet files does not allow deletion / updation of existing records )
  2. Enriched Zone

    • Delta Lake Type (allows delete , update)
    • Blob Storage Eg: Azure Gen2 , IBM cloud Object Storage
    • Data moved from Raw Zone to Enriched Zone
    • Duplicates removed using Upsert Operation
    • Transformations are done here
    • Fact and Dim
  3. Curated Zone

    • Data Moved from Enriched to Here
    • DataWarehouse
    • SQL Database
    • Fact and Dim

Steps :

Step 1. Get all records from Source

maxDestTS= destDF.select(max("UPDATE_TS").alias("max")).collect()["maxDestTS"]
newDF=srcDF.select(f"UPDATE_TS>{maxDestTS}")

Step 2. When Source = SCD type 2 or Soft Delete ie.,(Flag = D) :

  • Method 1:
latestIDDF=newDF.groupBy("id").agg(max("UPDATE_TS"))
latestDF= newDF.join(latestIDDF,["id"],"inner_join")
  • Method 2:
latestDF= newDF.filter("Flag <> 'D'").withColumn("r",max("UPDATE_TS").over(Window.partitionBy("id"))\
.filter("r == UPDATE_TS")

Step 3. Upsert (Update old records , insert New Records) to Delta Lake

	deltaDF=DeltaTable.forPath(spark,"/path")
	deltaDF.alias("target")\
	.merge(latestDF.alias("src"), "src.id="target.id" ) \
	.whenMatchedInsertAll()\
	.whenNotMatchedUpdateAll()\
	.execute()

Step 4. EZ to DB

Method 1 : Full Overwrite

df.write.format("jdbc").mode("overwrite").options(url="","usn"= ..).save()

Method 2 :

  1. Fetch all records from EZ > max(timestamp) from CZ

  2. Delete all the above records in CZ ,if same naturalKeys exists in Fetched Data

     maxTS= Select max(TS) from Target
     delTarget = srcDF.filter("srcTS > {maxTS}").withColumn("key",concat_ws(",","k1","k2)).select("key").collect()
     DELETE FROM target WHERE CONCAT(c1, c2) IN (delTarget);
    
  3. ReInsert fetched Records

Performance :

  • Concat and create hash for NaturalKeys while saving for fast deletion. Eg:```hash(concat_ws_(',',"k1","k2"))
view raw ETL.md hosted with ❤ by GitHub

Monday, May 16, 2022

Build Jar : Add multiple class files in the Jar using Bazel

 Add multiple class files in the Jar using Bazel 


#---------------------------
#Bazel build parent.jar
#This would add Parent.scala , Child1.scala , Child2.scala in the jar file creatd
#---------------------------
load("@io_bazel_rules_scala//scala:scala.bzl", "scala_binary","scala_library")
package(default_visibility = ["//visibility:public"])
scala_library(
name = "parent",
srcs =
["Parent.scala",
"//src/main/child1:Child1.scala",
"//src/main/child2:Child2.scala",
],
deps = [
"//src/main/child1",
"//src/main/child2",
],
)
view raw BUILD hosted with ❤ by GitHub

Thursday, May 12, 2022

Azure Synapse - Working with Custom jar Scala (Bazel)

 Working with Custom jar in Azure 

There is 2 Jars :

  1. Big Fat Jar or uber Jar (Contains main class  )
  2. Build Dependency Jar/ Library to be used in Synapse 

Big Fat Jar or uber Jar:


Build Dependency Jar/ Library

  • Does not Contain main class
  • Does not contain dependencies
  • Note : Download external dependencies from Maven and upload it separately (refer bewlo link)
  • $ bazel build build_name.jar

Once u have jars Created based on ure need , use link - https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-manage-scala-packages to upload to Azure 

Friday, May 6, 2022

Azure Synapse : Passing and receiving Values to and from Notebooks

 Passing and receiving Values to and from Notebooks

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
view raw notebook.ipynb hosted with ❤ by GitHub

Friday, April 22, 2022

SQL : Index

Index

  • Indexes to query data faster, speed up sort operation, and enforce unique constraints.
  • A DB table each row has rowid and sequence number to identify row
  • Eg :table = list of pairs (rowid, row) )
  • Index is created on a seperate table which has opposite relationship: (row, rowid)
  • SQLite uses B-tree ie., balanced-tree ie., Actual table rows = Index table rows

Mac:

$brew install sqlite3
>sqlite3

>CREATE TABLE contacts (
first_name text NOT NULL,
last_name text NOT NULL,
email text NOT NULL
);
>CREATE UNIQUE INDEX idx_contacts_email ON contacts (email);
>INSERT INTO contacts (first_name, last_name, email) VALUES('John','Doe','john.doe@sqlitetutorial.net');
>INSERT INTO contacts (first_name, last_name, email) VALUES('Johny','Doe','john.doe@sqlitetutorial.net');

SQLite issued an error message indicating that the unique index has been violated. Note :email shd be unique across

>INSERT INTO contacts (first_name, last_name, email)
VALUES('David','Brown','david.brown@sqlitetutorial.net'),
    ('Lisa','Smith','lisa.smith@sqlitetutorial.net');
    
 >EXPLAIN QUERY PLAN 
 SELECT * FROM contacts WHERE email = 'lisa.smith@sqlitetutorial.net';
 >.quit
view raw indexing.md hosted with ❤ by GitHub

Saturday, March 12, 2022

Scala : HTTP Client using Sttp

 HTTP Client using Sttp


HttpClient in spark-shell REPL

  • open link https://mvnrepository.com/artifact/com.softwaremill.sttp.client3/core

  • open compatible scalalink

  • click on Maven tab

  • copy groupId:artifactId:version ie., com.softwaremill.sttp.client3:core_2.12:3.7.4

  • create packagename as below

  • Open terminal and run below commands :

    $spark-shell --packages com.softwaremill.sttp.client3:core_2.12:3.7.4
    
    scala> import sttp.client3._
    import sttp.client3._
    
    scala> implicit val backend = HttpURLConnectionBackend()
    backent: sttp.client.SttpBackend[sttp.client.Identity,Nothing,sttp.client.NothingT] = sttp.client.FollowRedirectsBackend@50bac431
    
    scala> val r = basicRequest.get(uri"https://pokeapi.co/api/v2/pokemon/ditto").send(backend)
    r: sttp.client.Identity[sttp.client.Response[Either[String,String]]] = Response(Right({"ab....
    
    scala> println(r)
    Response(Right({"abilities":[{"ability":{"name":"limber
    
// Ref -https://sttp.softwaremill.com/en/latest/examples.html
/*
libraryDependencies += "com.softwaremill.sttp.client3" %% "core" % "3.5.1"
Bazel Dependencies :
com.softwaremill.sttp.client3.core
com.softwaremill.sttp.client3.model
com.softwaremill.sttp.client3.shared
*/
import sttp.client3._
object SttpClient extends App{
val url= "http://0.0.0.0:9999/test/1"
val headers:Map[String,String]=Map("Content-type"->"application/json")
val payload:String="""{"name":"deepsk"}"""
post(url,headers,payload)
get(url,headers)
def get(url:String,headers:Map[String,String]):Unit={
val backend = HttpURLConnectionBackend()
val response = basicRequest.headers(headers) .get(uri"${url}").send(backend)
println(response.body)
}
def post(url:String,headers:Map[String,String],payload:String):Unit={
val backend = HttpURLConnectionBackend()
val response = basicRequest.body(payload).headers(headers) .post(uri"${url}").send(backend)
println(response.body)
}
}

Thursday, January 20, 2022

Coding Principles

 Coding Principles

Principles

  • Loose Coupling
  • High Cohesion
  • Change is Local
  • It is Easy to Remove

Smells

  • Rigidity ( A -> B -> C . something hardcoded in C )
  • Fragility
  • Immobility
  • Viscosity of Design (Taking a shortcut and introducing technical debt requires less effort than doing it right.)
  • Viscosity of Environment (skipping rigorous testing)
  • Needless Complexity
  • Needless Repetition
  • Opacity

Class Design

  • Single Responsibility Principle (SRP)
  • Open Closed Principle (OCP)
  • Liskov Substitution Principle (LSP)
  • Dependency Inversion Principle (DIP)
  • Interface Segregation Principle (ISP)
  • Classes Should be Small

General

  • Follow Standard Conventions
  • Keep It Simple Stupid
  • Boy Scout Rule
  • Avoid Multiple Languages in One Source File

Design

  • Keep Configurable Data at High Levels
  • Don’t Be Arbitrary (Have a reason for the way you structure your code)
  • Be Precise

Dependencies

  • Make Logical Dependencies Physical
  • Singletons / Service Locator
  • Base Classes Depending On Their Derivatives

Naming

  • Choose Descriptive / Unambiguous Names (Names have to reflect the entire functionality)
  • Standard Nomenclature Where Possible
  • no Encodings in Names (No prefixes, no type/scope information)

Understandability

  • maintain Consistency
  • Use Explanatory Variables
  • Prefer Dedicated Value Objects to Primitive Types (path type , instead of String , enclosing class)
  • Poorly Written Comment
  • Obscured Intent (Too dense algorithms that lose all expressiveness.)
  • Obvious Behaviour Is Unimplemented
  • Hidden Logical Dependency

Methods

  • Methods Should Do One Thing
  • Methods Should perform what is described by the name of the function.
  • Avoid Method with Too Many Input Arguments
  • Avoid Method with Too Many output Arguments (return object instead)
  • Avoid Selector / Flag Arguments
  • Avoid Inappropriate Static

Source Code Structure

  • Variables and methods should be defined close to where they are used.
  • Local variables should be declared just above their first usage ,depending on scope
  • Nesting (should be more specific)
  • Structure Code into Namespaces by Feature
  • same feature together. Eg :A feature may use another feature; a business feature may use a core feature like logging

Useless Stuff

  • Avoid Dead Comment, Code
  • Avoid Clutter
  • Inappropriate Information

Maintainability Killers

  • Avoid Duplication
  • Magic Numbers / Strings (Replace Magic Numbers and Strings with named constants)

Exception Handling

  • Catch Specific Exceptions
  • Catch Where You Can React in a Meaningful Way
  • Use Exceptions instead of Return Codes or null
  • Fail Fast
  • Avoid Using Exceptions for Control Flow
  • Avoid Swallowing Exceptions

Sunday, January 9, 2022

Azure Django-postgres Application

Azure Django-postgres Application


postgres-Django

Ref:Tutorial

Azure App Service

Creat APP

    git clone https://github.com/Azure-Samples/djangoapp
    cd djangoapp
    az webapp up \
    --resource-group DjangoPostgres-tutorial-rg \
    --location centralus \
    --plan DjangoPostgres-tutorial-plan \
    --sku F1 \
    --name djangopostgresapp

Add extension to db extension to CLI

    az extension add --name db-up   #install db-up (ref- https://gist.github.com/j-thepac/b0ab1196585cf8b9a8478e4a3c8b6aae)

Create a new Postgres instance

    az postgres up \
    --resource-group DjangoPostgres \
    --location centralus \
    --sku-name B_Gen5_1 \
    --server-name djangopostgresserver \
    --database-name pollsdb \
    --admin-user sqlusr \
    --admin-password Azure@123 \
    --ssl-enforcement Enabled

Note down DB details from console

Connect App and DB

    az webapp config appsettings set \
    --name djangopostgresapp \
    --settings DBHOST="djangopostgresserver" DBUSER="sqlusr" DBPASS="Azure@123" DBNAME="pollsdb" 

DB Migration

    az webapp ssh #open ssh 
    python manage.py migrate
    python manage.py createsuperuser # usn=root/password Pollsdb1
    #skip email 
    exit

Test :

    az webapp browse

open https://djangopostgresapp.azurewebsites.net/ (http://.azurewebsites.net/admin)

Update python file and redeploy

    Make any changes to python file
    az webapp up
    az webapp log tail

group delete

    az group delete --name DjangoPostgres --no-wait
    az group delete --name DjangoPostgres-tutorial-rg --no-wait

Saturday, January 8, 2022

Azure Errors and Solution

 Azure Errors and Solution


az extension add --name db-up --debug

Error:

Building wheel for pymssql (PEP 517): finished with status 'error'
  ERROR: Failed building wheel for pymssql
Failed to build pymssql
ERROR: Could not build wheels for pymssql which use PEP 517 and cannot be installed directly
Exception information:

pip._internal.exceptions.InstallationError: Could not build wheels for pymssql which use PEP 517 and cannot be installed directly

Environnment :

  • Mac Monetary
  • Python 3.6

Ref

Stackoverflow Github

Solution

brew install freetds openssl
export LDFLAGS="-L/usr/local/opt/freetds/lib -L/usr/local/opt/openssl@3/lib"
export CFLAGS="-I/usr/local/opt/freetds/include"
export CPPFLAGS="-I/usr/local/opt/openssl@3/include"
az extension add --name db-up --debug
view raw azdb-up.md hosted with ❤ by GitHub

Azure Linux VM

Azure Linux VM Connection

pre-Req: 

  1. For Mac Users
  2. Make sure linux machine is created

Steps:

  1. Open Linux Machine Panel
  2. Go to Settings > Connect > SSH
  3. Click on other Clients
  4. From the new page opened , Copy Create an SSH key Pair "ssh-keygen -m PEM -t rsa -b 4096"
  5. Run "ssh-keygen -m PEM -t rsa -b 4096" in Terminal
  6. Click Enter all the times

Testing Connection

  1. In the previous page , enter the path of the id_rsa.pub Eg : "~/.ssh/id_rsa.pub" or "(if u have added the file in a new folder ) ~/.ssh/vm_linux/pushpa/id_rsa.pub " 
  2. click on "test your connection"
  3. Your unsaved edits will be discarded , ck Yes
  4. Ck on Test Connection 
  5. this shd pass

Connection:

  1. Settings > Connect > SSH 
  2. copy the run example ie ., ssh -i <private key path> pushpaxxxxx@1xx.xx.1xx.xx
  3. Replace the privae key path to ids_rsa which was created
  4. Make sure u give the azure account password when asked

Install Python3

  1. sudo su
  2. <azure password>
  3. yum install python3
  4. pip3 install requests

Monday, January 3, 2022

Azure Static WebPage

 Azure Static WebPage


Azure Static page:

Azure App Service

git clone https://github.com/Azure-Samples/html-docs-hello-world.git
cd html-docs-hello-world
az webapp up --location westeurope --name <app_name> --html
go to the app URL: http://<app_name>.azurewebsites.net

Update and redeploy the app

az webapp up --location westeurope --name <app_name> --html

Clean up

az group delete --name appsvc_rg_Windows_westeurope

Reference

https://docs.microsoft.com/en-us/azure/app-service/quickstart-html

Azure API

Azure API


Create Webapp using Flask

Azure App Service

Pre-Req

  • Make Sure u have setup account created for Azure
  • You have enabled Free Subscription
  • You have created a webapp instance manullay example "flaskserverpushparanimptest2"

Setup Env

$brew update && brew install azure-cli
$az upgrade
$az version
$az login

Code Flask Api

$git clone https://github.com/Azure-Samples/python-docs-hello-world
$cd python-docs-hello-world
$python3 -m venv .venv
$source .venv/bin/activate
$pip install -r requirements.txt
$flask run

Test locally

test localy in browser : http://localhost:5000/ ctrl+C to Quit

Deploy to Azure

$az webapp up --sku F1 --name flaskserverpushparanimptest2 #sku is pricing F1 is free

Test Azure Web

open http://flaskserverpushparanimptest2.azurewebsites.net

Add new enhancement for your file and run below code

    $az webapp up
view raw AzureFlask.md hosted with ❤ by GitHub

Azure Cosmos DB

 Azure Cosmos DB 


"""
https://docs.microsoft.com/en-us/azure/cosmos-db/sql/create-sql-api-python
https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/cosmos/azure-cosmos/samples/examples.py
Pre-Req:
pip install --pre azure-cosmos
Have a Azure Account with Free Subscribption Enabled
Make sure Cosmosa DB is created along with DB and Container
#Azure portal > Azure Cosmos > ck on Existing Cosmos Db > Settings > Keys
to get Primary key and endpoint
"""
from azure.cosmos import exceptions, CosmosClient, PartitionKey
import uuid
import json
endpoint = "GET KEYS FROM : Azure portal > Azure Cosmos > ck on Existing Cosmos Db > Settings > Keys"
key = 'GET KEYS FROM : Azure portal > Azure Cosmos > ck on Existing Cosmos Db > Settings > Keys'
jsondata= {"id": "3","category": "hobby","name": "TV","description": "Pick up TV"}
dbname="ToDoDatabase"
containerid="ToDoList"
query="SELECT * FROM ITEMS"
class CosmosDb():
def __init__(self,endpoint,key,dbname,containerid):
self.endpoint=endpoint
self.key=key
self.client = CosmosClient(endpoint, key)
self.database = self.client.get_database_client(database=dbname) #create_database_if_not_exists
self.container = self.database.get_container_client(containerid) #create_container_if_not_exists
def create_item(self,jsondata):
self.container.create_item(body=jsondata)
def list_containers(self,database_name):
database = self.client.get_database_client(database_name)
return database.list_containers()
def modify_item(self,item_name,partition,key,new_value):
item = self.container.read_item(item_name, partition_key=partition)
item[key] = new_value
updated_item = container.upsert_item(item)
def query_items(self,query:str):
return self.container.query_items(query=query,enable_cross_partition_query=True)
cosmos=CosmosDb(endpoint,key,dbname,containerid)
# res=cosmos.list_containers(dbname)
res=cosmos.query_items("SELECT * FROM ITEMS")
for i in res : print(json.dumps(i,indent=True))
view raw cosmosapi.py hosted with ❤ by GitHub