This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
TriggerBlock is a cool tech blog that's all about helping you out with
tutorials, problem-solving, hacks, and quick fixes for different coding
languages and techie stuff like Java, Python, Scala, Kubernetes, and more.
They've got plenty of articles on testing too. Check it out!
delta_df.history(1).show() #last operation
delta_df.vacuum() # recursively delete files and directories more than 7 days old
delta_df.vacuum(100) # vacuum files not required by versions more than 100 hours old
For JSON file make sure all new lines , blank spaces are removed.
Dataflow , Notebooks etc - names has to start with String.
blob = flat namespace , Gen2 = directories
Azure Active Directory (Entra ID) : Used to Create 1 Functional ID (secret ) called Serive Principle across all applications to communicate with each other
SP ( serivce Principle) : Managed by User ie., renew etc.,-
Managed identity (service principal) : managed by Azure
DATA TAB ( Lake Database , SQL Database , Linked )
Workspace : Based on Azure Serverless (Also Called OnDemand )
Lake Database: Spark to Query Data
SQL Database: Serverless to query Data
Linked
Workspace ( Source is Gen2/Blob Storage )
Source is Gen2/Blob Storage
This comes default with Synapse
Data is not in table format but processed as tables in Runtime
Lake Database:
Only Default Gen2 Location which comes along with Synapse to Create Database
Uses Templates
You can create SQL like table , Add schema to Tables, Define Column Datatypes and Add Foreign to underlying Gen2 Storage
Can create above schema against CSV , Parq , json etc
Steps:
Open NoteBook
%% sql
Create Database DB;
Use DB;
create table T (id Int, name varchar);
insert into T values(1,'A');
Open gen2 in Azure > create a folder "Customer" and upload csv file
Open synapse > Data > "+" > New Lake Database > Enter Name , linkedService ,above Folder path and Data Format > "+" to add table and Schema of Csv
Deploy (publish)
u shd be able to read with SQL / Notebooks
SQL Database (Using SQL script )
Operations :
read : OpenrowSet
write : CETAS , CET
create External table T(c1 int) with(Datasource= xx , format='CSV', Bulk'path') -- CET
create External table T2 (c1,c2) as select * from T ---CETAS ( Cannot alter External Table)
select * from openrowset(Datasource= xx , format='CSV', Bulk'path')
Example:
Open SQL script or Connect to Serverless using Dbeaver (Serverless endpoint in Azuer Portal > Synapse Overview Page)
run "create database db"
Go to SQL Script > Create new Script
run " use db"
CREATE EXTERNAL DATA SOURCE ParentFolder WITH (LOCATION='https://path.dfs.core.windows.net');
CREATE EXTERNAL FILE FORMAT myFormat WITH (FORMAT_TYPE=DELTA);
select * from openrowset(bulk '/Country'' ,format= DELTA) as D
This DB should be visible along with External Source and File Formats
1. Select from Table
IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = 'Test')
BEGIN
CREATE DATABASE Test;
CREATE EXTERNAL DATA SOURCE ParentFolder WITH (TYPE = HADOOP, LOCATION='https://path.dfs.core.windows.net/temp/');
CREATE EXTERNAL FILE FORMAT PARQUET WITH (FORMAT_TYPE=PARQUET);
End
Go;
use Test;
select * from openrowset(bulk 'subFOlder/1.csv, format='PARQUET' ,DATA_SOURCE = 'ParentFolder') as rows
Note: Support JSON , CSV etc.,
2. Create new Table from Existing Table in Blob (destination = CSV , PARQUET formats )
CREATE EXTERNAL TABLE T
WITH (LOCATION='subFolder/',DATA_SOURCE=ParentFolder,FILE_FORMAT=myFormat)
AS
Select * from OPENROWSET (LOCATION='subFolder2/',DATA_SOURCE=ParentFolder,FILE_FORMAT=myFormat) as rows --src
GO;
DROP EXTERNAL DATA SOURCE DS;
DROP EXTERNAL FILE FORMAT myFormat;
3. Create View (CVAS)
CREATE VIEW V AS (select * from openrowset(bulk 'https://blob.parquet',format = 'parquet')
select * from v ;
4. Call SQL Script in Pipeline
1. To use in pipeline
- **Add Web Activity in Pipelin
- **Call the current Synape Workspace -GET https**://myworkspace.dev.azuresynapse.net./sqlScripts/{sqlScriptName}?api-version=2020-12-01
- **Drill down to Required SQL script
5.Cosmos
SELECT * FROM OPENROWSET('CosmosDB','Account=my-cosmos-db;Database=my-db;Key=k',[my-container]) AS T
Linked
When u create link service and publish it . The icon should appear here.
Only Connected Services like Containers (Gen2 / Blob) ,Files ,Queues,Tables ,Cosmos
Dedicated SQL (Compute Power + Storage Space) -
Relational DB
Uses Distribution to Store Data and. Data not stored in 1 Table ie., Control Node= Compute Node 1 + Compute Node 2 .....
Greater than 1TB
Max 60 Nodes
DW5000c = 5000/500 = 10 nodes
Unique Contraints not supported (ie., duplicates need to be removed by other logic)
SQL Database (OnDemand) vs Dedicated SQL ( Relation DB)
Dedicated Serverless
1. Select yes Yes
2. insert,update,delete,merge yes No
3. CTAS yes No
4. CETAS No yes
5. Cost high low
6. Strategy(hash/RoundRobin.) yes No
7. Provisioned Seperatly by Default
8. Volume of Data High low
9. Data Stored Relational DB Blog/Gen2
External Table and Views
Both are just pointers to a location in the datalake / Blob (Parquet, JSON, Blob Storage)
Exteranl Tables Vs Views
1. Define Schema Yes No
2. Indexing Yes No
3. Performance Fast Slow
Multiple Inputs:
- Source: spark.read
- New Branch: Clone Data into new branch
- Join : Normal join (Use exists for left outer join )
- Conditional Split: Split records based on condition
- Exists: Compare two tables (innner join or left outer join)
- Union : Union /Append Files
- LookUp:
- Left Outerjoin with 2 tables
- Verify Data between 2 tables
Schema Modifier (x9)
- DerivedColumn: WithColumn , (Create Heirarchy for complex JSON)
- Select: Select (prune)/ WithColumnRenamed / Remove Columns
- Aggregate: GroupBY / All functions ( max ,sum , etc.,)
(User can give values to Agg and GroupBY in Same Activity to apply group level filter)
- Surrogate Key: Add unique primary Key
- Pivot : Create pivot table
- Unpivot: Uncreate pivot table
- Window:
- Without order(max(col),min(col) etc., ):Eg max('col).Over(Window.Partition('groupby_col))
- With Order(rank ,denserank :Eg rank.Over(Window.Partition('groupby_col).orderBy('sortCol))
- Rank:
- Used Single : Entire Dataframe is ranked ( Dense = serially with no gaps)
- Row Column : Name of new Column
- Row Condition : Existing Col - Asc / Desc
- External Call: rest api
Formatters
- Flatten: Json with Array ONLY Eg : {"A" : [1,2,3]}
- Parse: String > Json
- Stringify: Get particular sub-json from main json eg json > Table Cols
Row modifier
- Filter: Filter / Where
- Sort: sort
- Alter Row (DB only) : Update DB, With+When , Drop , Fill , parameteried
Delete (on real time only)
- Assert: Assert + Derived Column ( isError ('col_name working'))
Flowlet
Destination(Sink)
- spark.write
- Cached Sink (temp loc): Access "sink_name#outputs()[RowNo].ColumnName"
- Cached Sink - Convert Data to StringDatatype to avoid error - "undefined" is not valid JSON Eg : toString(Col1)
- cached Lookup (Dict): Sink> Settings> Add Key Column.Access "sink_name#outputs(columnname)" , "sink1#outputs()[1].MAX_UPDATE_TS"
Misc
- WildCard:Normal Regex
- $$: All Columns
- $parameter: Access parameter created within same dataflow activity
- String Interpolaton : "folder/file-{$n}.csv" #n is a parameter
- ByName('city'): Any column whose name is city
stream represents the name associated with each stream, or transformation in your flow
position is the ordinal position of columns in your data flow
origin is the transformation where a column originated or was last updated
trim any col which is a String(Pattern):
- Derived Col > Add > Col Pattern > type == 'string',$$ -> trim($$)
- Auto mapping on
Remove special character from all columns
Add Select
Add Mapping > Rule based Mapping
true() , regexReplace($$,'[^0-9a-zA-Z]','')
or true() , regexReplace('Column','[^0-9a-zA-Z]','')
String Column to Date
Derived COlumn
toDate(byName('Date'),'mm/dd/yyyy')
Metadata of Column inside Dataflow
Derived Col > col ,Columns()
Flatten > unroll by col > col
Calculate Column Pattern (apply logic for data in mutliple columns)
Derived Column >Columns >Add >Column PAttern
type =='string', $$+'_clean', trim($$,'')
Max : Agg > max(col)
Dataflow Json
1. Table to Json
- Read Source (import projection)
- Derived Col > Create Structure
- Save to Json Sink
2. Handling json :
- Parse
- Flatten
- Stringyfy
- Derived Cols : Create Complex Structure
3. Stringify
- get particular sub-json from main json eg json > Col to Table
- DataFlow >REST APi Source (url="https://api.first.org/data/v1/countries" , Anonymous Request) or https://restcountries.com/v3.1/name/united
- Source > Projection > Import projection
- Add Stringyfy>Cols>newcol , Expr >Under Input Schema> Select Structure to Flatten
- Add Select Statment > filter the new_col
- Select new column
- Sink
4. Parse (JSON in single Column in a table) :
val j = List(("IN",1),("US",2)).toDF("country","medals")
.withColumn("j", to_json(struct('country,'medals)))
+-------+------+--------------------+
|country|medals| j|
+-------+------+--------------------+
| IN| 1|{"country":"IN","...|
- Make sure the Source > projection > Import Schema
- Add Parse
- Document Form = Single Document
- Columns > new col , j , (country as string, medals as integer)
5. Flatten :
- Flattens Json with Array ONLY
- If json has array {"A" : [1,2,3]}
Pipeline
Linked Service : Service to Connect to different DataSources (External And Internal)
Activity : Copy , Dataflow , Notebooks ,SQL etc.,
Private Endpoint : Endpoint with in Private Virtual network
Integrated Runtime : Compute which executes the DataFlow
Azure IR : in cloud-to-cloud scenarios.
Self hosted IR : to connect to on-premises data sources or services.
SQL Server IR : to connect to SQL Server databases in a private network.
Can run activities in parallel
Throttling under Integration Runtime 'AutoResolveIntegrationRuntime' : There is a limit of simultaneous pipelines in an integration runtime. You need to split the pipeline to run into multiple runtimes.
Triggers : used to start the pipeline
Manual
Tumbling Window :
Run Continously after finishing
Delays and concurrency can be set
Auto Retry in case of Fail
Event : storage Events etc.,
Scheduled Triggers
Note:
Pipeline and Synapse Live uses functions from ARM templates .
If u have linked service , "parameteres.yaml" will replace the values with values in this yml and in the structure as "template-parameter-definition.json"
Above Function will Deploy Synapse artifacts using templates to given env
To Change artifacts of properties , A custom parameter template can be used {{ workspace }}-parameteres.yaml. To override the default parameter template, a custom parameter template named template-parameters-definition.json should be placed in the root folder of the Git branch.
Points to Remember in Synapse - Git
Collaboration Branch = Master Branch.
Feature Branch = Individual Branch created by users.
Root Folder = root folder in your Azure Repos collaboration branch.
Pull Request =Request merge from Feature Branch > Master Branch.
commit all = Save All Changes to Git.
Publish Should only be allowed from master Branch.
Code is merged from Master Branch to Synapse Services. (Git Actions is configured to run pull)
ARM template gets created.
Once master gets published , the code is available in Synapse-live and workspace_publish branch get created.
ARM Template = Azure Resource Management Template.
Note : ARM templates from workspace_publish branch is not used.
Questions
NoteBooks Questions :
- Whats is the Use of temp tables ? to reference data across languages
- How to reference other Notebook ? `%run /<path>/Notebook1 { "parameterInt": 1}
- How Return Values from NotePad ?
from notebookutils import mssparkutils
mssparkutils.notebook.exit("hi")
- To pass external parameters from pipelines to Notebook ?
- Create a variable in Notebook Eg:input=""
- Convert it into parameters ie., hover over >> ck on "..." > Toggle parameters
- Create a new Pipeline
- Add the notebook into pipeline
- In Pipeline ,select Notebook > settings >Add notebook> baseparamters>"input"- string
- Click outside of Noteboook > variables > "input" - string
- click on Notebook in pipeline > baseparamters > "input" ,@variables('input')
- How to Read a CSV from Gen2 ?
df= spark.read.option('header','true').option('delimiter',',').csv('abfss://1.csv')
- What are Magic Commands ?
- Line Magic(%) - same line only Eg: ```python y = 2 if x < 3 else 4```
- Cell Magic(%%) - entie cell Eg:
%%timeit
if x < 3:y=2
else:y=4
- How is Spark session configuration done magic command ?
%%configure
{"driverMemory":"28g","driverCores":4,"executorMemory":"28g","executorCores":4
,"jars":["abfs[s]://blob/myjar1.jar"]}
- How to Reference unpublished notebook ? Check box option on Notebook Settings
- How Python logging in Notebook ?
import logging
defaultLogger = logging.getLogger('default')
defaultLogger.debug("default debug message")
- File operations ?mssparkutils.fs.ls('path') #head,append,ls,cp,mv.mkdirs,rm
- read parq ? ```spark.read.parquet('abfss://parquet@deltaformatdemostorage.dfs.core.windows.net/employees')```
Dataflow Questions
1. StringInterpolation (parameter inside string)?
"folder/file-{$n}.csv" #n is a parameter
"My age is :{toInteger(col)}"
2. Parameters Wildcard Dataflow ?
-dataflow add parameter > add "path"
-Source > Source Options > Wildcard paths > 'folder/'+$path+'*.csv'
3. Send whole Col itself as parameter?
-Dataflow > Parameter > data , string, "test" # this is data
-Dataflow > Parameter > varcol , double, colname # Dynamic join
4. schema Drift (Source) ?
Enable when column changes in a Schema
5. BroadCast Join (Join) ?
When whole Table can be accomodated in memory
6. DataFlow Assert ?
toInteger($col_name) == 1000
7. How to "select max(revenue),* from T"? (Self Join)
- Src > Projection > Import Projection
- New branch > Aggregate > Add : maxcolumn ,max(revenue)
- Inner Join (main branch) when revenue = max(revenue)
8. Select max(column) from table
- Src > Projection > Import Projection
- New branch > Aggregate > Add : countcolumn ,count(col_name)
9. Find Surrogate Key
[Ref](https://www.youtube.com/watch?v=9U-0VPU2ZPU)
- Lookup (does left outer join of different source )
- Lookup + Derived Column(isMatch) + Conditional Split (isMatch=True/false)+ others
- Lookup : the result of each row is stored in new column
- Derived Column =isMatch()
- Agg = first
10. Cast all Cols to Integer ?
- Select > Regex > 1==1 , $$ , toInteger($$)
11. LAD
- Cached lookup -https://www.youtube.com/watch?v=HqCn42FaRJs)
- Add Cache type Sink as "cachedata"
- to call the cache : cachedata#outputs()[1].columnname
12. Multiple Sinks? Dataflow >Settings > Set order
13. Accesing sink output in Pipeline? @string(activity("name").output.runStatus.output.sink_name.value[0].colname)
14. Canonical Model (Flexible Schema)?
Derived Col>RuleBased> locate('colname',lower('name')) !=0 -> colname
15. Selection of group of Columns from Schema? Select >type!='string' && name != 'id',$$+'_notstring'
16. Dynamic Values / Parameterization[ref](https://www.youtube.com/watch?v=CMOPPie9bXM)
- if Default parameter is not set , the user will be forced to enter the parameter value at the start
- Only parameters declared/created can be used
- Any parameter created on :
- Dataset will be available on Dataflow (in Debug Settings)
- Create at Dataset ,Dataflow will be available in pipeline by clicking on the Dataflow object
17. Dataflow Parameterisation ?
- Creation : Parameters >dataflow_name,String,defaultvalue = Blank
- Access: $dataflow_name
- Sending Value:
1. Add to a Dataflow
2. Debug Settings > Parameters > Dataset Paramters >dataset_name as value
18. Passing parameters from Dataset >DataFlow> Pipeline
- If u already have set parameter in Dataset and used in Dataflow.
- U can use by : pipeline > Select Dataflow> Settings > u should be able to see "dataset_name" under
19. Row to Columns (unpivot) : Unpivot > Ungroup > Unpivot Colum
20. Assert
- Expects true,Expects false,Expects Unique,Expects exists(col to col matching)
- description :toString(col_name) +'working'
- filter : col_name == 1000
- Assert + Derived Column =( isError ('col_name working'))
- Assert + Split = (hasError())
21. External Call
- loop the rest api for the given input -https://www.youtube.com/watch?v=dIMfbwX8r0A)
- source (List)
- Add a derived Column to construct a resource using List
- add external call
- linked Service : GET , Anonymous , the Url will be overrided by below
- method : GET ,previousActivity
- Relative Url : Endpoint
- Row Relative url : resource (constructed above)
22. Create Heirarachy
- Add Source (input a csv (flat file) )
- Add Derived Column > Exp Builder > Create Column > Add SubColumn Window
- Over (Group Column) : c2
- sort : sort column (Window Column is arranged : rank , dense_rank ,row_number etc.,)
- range :
- Window : max(col), denserank(),rank()
23. Create variable inside an Activity in Dataflow:
- Derived col > locals
or
- Cache > WriteTo Activity > use in another activity
- Dynamic Expression > Create varible > var1 ,trim(right(col,6))
- Accessing it ":var1"
24. Join Scheme drift Table (schema less table ) with table with schema
- Src1 (schema Drifted , allow schema drift , infer drifted col) > Derived Col: col_tojoin, toInteger(byName('col_tojoin'))
- Src2 (Scheme Table)
- join : src1 ,src2 on col_tojoin
- Sink (allow schema drift turned ON)
25. remove spaces from all col names? replace($$,' ')
26. Add schema to Schema less src? Src > Data Preview > Map Drifted Column
27. Specific Partition Files?
- Dataset : Target = Container base path only
- Src > Src Options >
- Wild Card = top_folder/""/""/*.parquet
- Partition root path = top_folder
- column to store file name = filtercolumn
28. Saving Data into specific location based on input
- Create a Column in transformation which contains Target
- Derived col > month , split(col,"-")/[2]
- Sink > Settings > Filename Option (default : generic name)
- Sink > Optimize > Set Partioning
- Sink > Optimize > Key > UNique value per partition > Key Col = month
29. Pipeline Trigger:
Schedule Pipeline
cron Job pipeline etc.,
30. Save Partition files with Name:
- Sink > Settings > File name option > Pattern = movies[n].parquet
- Optimize >Set Partioning , Partion TYpe =Round Robin ,10
31. Create Complex Datatype (subcols , keyvalue ,array):
1. Derived Col
- all subcols to Main Cols
- access > Derived Col > col.subcol
2. New Col > ['key1' -> col1 , 'key2' -> col2]
3. split(Name, ' ')
32. Coalesce and FileName
- Sink >Setting > Pattern / Single Fle > movies[n].csv
- Sink >Optimize > Set Partitioning > Hash > nos=5 , col =primarykey
33. Partitioning? Sink >Optimize > Set Partitioning > Key > col = "year"
34. Calculate Fuzzy:
- 2 sources
- join > soundex(name) == soundex(name2)
35. Deduplication / Distinct Rows
- Method 1
- Aggregate > GroupBy > sha2(256,columns()) , mycols
- Aggregate > Aggregate >Each Col matches= true() , $$ , first($$)
- Method 2
- Aggregate > GroupBy > sha2(256,col1,col2,soundex(col2))
- Aggregate > Aggregate > Each Col matches= true() , $$ , first($$)
- Method 3
- https://docs.microsoft.com/en-us/azure/data-factory/data-flow-script
- https://www.youtube.com/watch?v=QOi26ETtPTw
36. Delta Loading using SQl Change Tracking Technology
- https://www.youtube.com/watch?v=IN-4v0e7UIs
- https://docs.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-change-tracking-feature-portal
37. Distinct
- GroupBY (Aggregate) > Group By target_column
- Aggregates > Column pattern>name != 'target col',$$ ,first(ss)
(select all columns except target column)
38. Row Count on a Table
Aggregate > GroupBY (blank) , Aggregates = colname , count(1)
39. Routing /Saving Data to Different Target in Same DF:
- Contitional Split
- Using Parameter
- New Column in Run Time
40. Start SurrogateKey from next Value (https://www.youtube.com/watch?v=tc283k8CWh8)
Method 1:
- Dim > Agg> groupBy blank ,Agg=max(SurroagteKey)
- Src > Add Surrogate Key from One > Cross join with Dim Agg > key+max(SurroagteKey)
Method 2:
- Dim > Agg> groupBy blank ,Agg=max(SurroagteKey) > Sink the Max(value)
- Src > Add Surrogate Key from One > Derived Column > surrogateKey + sink
41. Group players into each array in each row by team name (collect_list)
- GroupBY (Aggregate ) > Groupby Team name > Agg : collect(player)
42. Datadriven Framework:
- Create a parameterised dataflow (ie., Dataflow which needs parameters)
- Create a pipeline : Lookup > Foreach > above Dataflow
- Lookup = csv file
- Foreach: Settings >Sequential , @activity('Lookup1').output.value
- Inside Foreach >Parameter= pipeline Expression ,@item().columnNames
- Run Debug
- If error: ck Pipeline screen > On output tab > ck on see error on failure to debug
43. WARM Cluster Dataflow (to track performance)?
-Abstract the starting of Cluster with dummy Dataflow
ie., add a dummy dataflow at the beginning of target dataflow
-Example: src > filter (1==2) > sink
44. SCD Type -1
- Src > Alter Row > Sink
- Alter Row > Upsert if
45. Create Datalake
- Sink
- Sink Type > Inline > Select Inline Dataset Type > Delta
46. Replace Special Characters
- Select > true() ,regexReplace($$,'[^a-zA-Z]','')
47. Hashing
sha2(256,columns())
or
parameter = [c1,c2]
sha2(256,parameter)
48. How to left outer join? Use exists
49. Remove Column
- Select Activity
- source : name != 'Row_Number' && name != 'null' && left(name,2) != '_c'
- and this for the output column name: $$
50: Cache Sink ("undefined" is not valid JSON)
- Make sure the Data ur trying to save is converted to correct Data Type
- Eg : toString(Col1)
Excercises:
Find Max Row
Find Distinct Rows
Group by Country
Parameters (To save data into file is blog with name given as paramter in pipeline)
Creating Variables
Regex / WildCard
Save file as abcd_timestamp.parquet
variable local inside Derived Column only
Create Datadriven Framework ( Data is coming from csv /json into dataflow)
Send function as parameter into Dataflow
PipeLine Questions
Pipeline Expression should start with ? "@"
Parameters in pipeline?
constant
Create = Pipeline Window > Parameters > New "param"
Access= @pipeline().parameters.param
Access output of any activityin Pipeline ? @activity('name').output:
Lookup :returns the result of executing a query or stored procedure.
Script: can only execute Data Manipulation Language (DML- INSERT, UPDATE, DELETE and SELECT) and Data Definition Language (DDL - CREATE, ALTER and DROP)
Which Activity to Access Folder Structure in gen 2 ? getMetadata
Run SQL scripts (Develop > SQL Script)are Disconnected
Option 1 : Add Script Activity and Directly add the SQL Code
Option 2 (Call the Current Synapse using REST to get all artifacts ) :
OLTP (OnLine Transaction Processing) : is used for managing current day to day data information.
ACID
Atomicity(entire transaction happens at once or nothing happens)
Consistency
Isolation
Durability (persisted)
OLAP (OnLine Analytical Processing): is used to maintain the past history of data and mainly used for data analysis, it can also be referred to as warehouse.
DataLake
large amounts of Data
structured, semi-structured, and unstructured data.
original/ raw format.
Any Formats - JSON, BSON, CSV, TSV, Avro, ORC, and Parquet etc.,
cost-effective way
No Transformation
Delta / WaterMarking ( water level stick in a dam or a river,)
Timestamp-based Approach: Source shd have timestamp column
a. Current TimeStamp
If current ts = 24-07-2024-01:01:01:0001
Next Run Select * from T where t > 24-07-2024-01:01:01:0001
b. Source Time Stamp Based
Save the max timestamp from source column after each run .
in subsequent runs Select * from T where ts > max(saved_value)
Hash-based Approach:
hashbytes('sha256',concat_ws('',src.c1,src.c2)) as h1,
hashbytes('sha256',concat_ws('',dest.c1,dest.c2)) as h2
from src,dest where h1!=h2
Primary Key-based Approach: Comparing the primary keys Src and Target
Change Tracking Technology : MS SQL and SQL server
Change Data Capture (CDC) - Azure Synapse and DataFactory only:
One Checkbox to handle delta
Native to Azure
Azure to Blob, Azure Gen2 , Cosmos , SQL DB , SAP , PostGres
Reverting to Delta is Difficult
ALTER DATABASE DB
SET CHANGE_TRACKING = ON
Datawarehouse (Summary, Types, NF, Facts and Dim,Star vs SnowFlake, Key Types, LAD, Handle LAD)
structured information from various srcs.
Data is transformed
has schema
Use RDBMS
Used for reporting
Facts and Dim
Data is Normalised
Datamart
similar to a data warehouse.
Cateres for specific business unit Eg: Finance Dept
Database Normalization
Used to reduce redundancy from the database table.
1NF(atomicity):
Each attribute holds a single value.
Name,Age,Dept <- BEFORE
Melvin,32,(Marketing, Sales)
Name,Age,Dept <- AFTER
Melvin,32,Marketing
Melvin,32,Sales
2NF (2 natural keys ):
After 1NF
No Partial Dependencies.(All cols not depending on Natural Key)
Subject |Teacher ID|Teacher Age <- BEFORE
Teacher ID|Teacher Age <- AFTER
Subject |Teacher ID
3NF (non key cols depending on other non key cols)
After 2NF
A -> B and B -> C , A-> C
Primary Key: EmpID
Non-key attributes: Name, State, Country, ZIP
EmpID| Name| State| Country| ZIP
ZIP -- >ID , State and Country --> ZIP
ID,NAME,ZIP
ZIP,STATE,Country
Dimensions (Object)
Eg: a seller, a customer, a date, a price , Product
Contains Primary keys , Surrogate Keys
Dim = Person Details , Fact = Transaction done by the Person
Facts (Action)
Eg: a sale, a transaction, an access
contains foreign keys from Dimension
Grain:
Lowest level of measurement in the Fact table.
Dimension Modeling
Star Scheme :
Fact table uses Foreign Key from Dim Tables
Schema looks like a star , which 1 fact table connected to multiple dimension table
Snow Flake Scheme
Is an extension of star Scheme
multidimensional model
dimension tables is further sub-dimensioned
6 types of Dimensions :
Role Play Dim : Eg- "Date" dim can be used for "Date of Sale", "Date of Delivery","DOB"
Junk dimension : Table with unrelated attributes to avoid large number of foreign keys in the fact table.
Degenerate dimensions: Dimension attributes stored as part of fact table and not in a separate dimension table. Eg: "transaction number"
Stacked dimension : two or more dimensions are combined
Slowly Changing Dimension : dimension that would undergo changes over time
Slowly Growing Dimension : growth of records/elements in the dimension.
Conformed Dimension (Source of Truth ): is shared across multiple data mart or subject area
Reference Dimension: Used to joined indirectly to the fact table through a key in another dimension table.
Static Dimension :It not extracted from the original data source, but are created within the context of the data warehouse.
Types of Slowly Changing Dimension
Type 0 : No changes are entertained
INSERT INTO dim
SELECT * FROM src
WHERE NOT EXISTS (SELECT * FROM dim WHERE dim.id = src.id)
Type 1 : Direct Update Old Data (Upsert Original Data)
SK,ID,Name,Cellphone. <- Before
100,1,ABC,1234567890
SK,ID,Name,Cellphone <- After
100,1,ABC,9876543210
UPDATE dim
SET addr = src.addr FROM src WHERE dim.addr = src.addr;
Type 2 : Flag Old Data as 0 , Insert new Data with 1(Flag)
History Table SK,ID,Name,Cellphone,CRT_DT
100,1,ABC,1234567890, 2016-06-01
100,1,ABC,9876543210, 2016-06-10
Type 6 :Combination of type 1, 2 and 3
New Row (Like type 2)
Each Active rows can be indicated with a boolean flag / start and end date.
using additional attributes or columns within the same record instead of creating new records
Eg: a new table record is added as in SCD type 2. Overwrite the old information with the new data as in type 1. And preserve history in a historical_record as in type 3.
Fetch all records from EZ > max(timestamp) from CZ
Delete all the above records in CZ ,if same naturalKeys exists in Fetched Data
maxTS= Select max(TS) from Target
delTarget = srcDF.filter("srcTS > {maxTS}").withColumn("key",concat_ws(",","k1","k2)).select("key").collect()
DELETE FROM target WHERE CONCAT(c1, c2) IN (delTarget);
ReInsert fetched Records
Performance :
Concat and create hash for NaturalKeys while saving for fast deletion. Eg:```hash(concat_ws_(',',"k1","k2"))
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Indexes to query data faster, speed up sort operation, and enforce unique constraints.
A DB table each row has rowid and sequence number to identify row
Eg :table = list of pairs (rowid, row) )
Index is created on a seperate table which has opposite relationship: (row, rowid)
SQLite uses B-tree ie., balanced-tree ie., Actual table rows = Index table rows
Mac:
$brew install sqlite3
>sqlite3
>CREATE TABLE contacts (
first_name text NOT NULL,
last_name text NOT NULL,
email text NOT NULL
);
>CREATE UNIQUE INDEX idx_contacts_email ON contacts (email);
>INSERT INTO contacts (first_name, last_name, email) VALUES('John','Doe','john.doe@sqlitetutorial.net');
>INSERT INTO contacts (first_name, last_name, email) VALUES('Johny','Doe','john.doe@sqlitetutorial.net');
SQLite issued an error message indicating that the unique index has been violated.
Note :email shd be unique across
>INSERT INTO contacts (first_name, last_name, email)
VALUES('David','Brown','david.brown@sqlitetutorial.net'),
('Lisa','Smith','lisa.smith@sqlitetutorial.net');
>EXPLAIN QUERY PLAN
SELECT * FROM contacts WHERE email = 'lisa.smith@sqlitetutorial.net';
>.quit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Building wheel for pymssql (PEP 517): finished with status 'error'
ERROR: Failed building wheel for pymssql
Failed to build pymssql
ERROR: Could not build wheels for pymssql which use PEP 517 and cannot be installed directly
Exception information:
pip._internal.exceptions.InstallationError: Could not build wheels for pymssql which use PEP 517 and cannot be installed directly
From the new page opened , Copy Create an SSH key Pair "ssh-keygen -m PEM -t rsa -b 4096"
Run "ssh-keygen -m PEM -t rsa -b 4096" in Terminal
Click Enter all the times
Testing Connection
In the previous page , enter the path of the id_rsa.pub Eg : "~/.ssh/id_rsa.pub" or "(if u have added the file in a new folder ) ~/.ssh/vm_linux/pushpa/id_rsa.pub "
click on "test your connection"
Your unsaved edits will be discarded , ck Yes
Ck on Test Connection
this shd pass
Connection:
Settings > Connect > SSH
copy the run example ie ., ssh -i <private key path> pushpaxxxxx@1xx.xx.1xx.xx
Replace the privae key path to ids_rsa which was created
Make sure u give the azure account password when asked
git clone https://github.com/Azure-Samples/html-docs-hello-world.git
cd html-docs-hello-world
az webapp up --location westeurope --name <app_name> --html
go to the app URL: http://<app_name>.azurewebsites.net
Update and redeploy the app
az webapp up --location westeurope --name <app_name> --html
Clean up
az group delete --name appsvc_rg_Windows_westeurope
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait A class B extends A class C extends B object LowerBoundGeneric extends App { class Test[A >: B](val x: A) //Can have of type A and B not
C val temp = new B() // new C() = Fail val test: Test[B] = new Test[B](temp) } object CovariantGeneric extends App { class Test2[+A]{ def run[B >: A](element: B)=print("working")
} val temp2 =new C() // new C() = Fail new Test2[B]().run(temp2) }
Apply
//whereby the compiler converts f(a) into f.apply(a) object Applytest extends App{ class Foo(x: Int) { def apply(y: Int) =x+y} val f = new Foo(3) println(f(4)) // returns 25 }
Partial Function
/* function is f: X -> Y, A partial function = Does not force f to map every element of X to
an element of Y ie., several subpartial function to handle differnt elements in same data
set new PartialFunction[input , output] if "isDefined" is true than execute "apply" orElse, andthen */ object Partialtest extends App{ val sample = 1 to 5 val isEven = new PartialFunction[Int, String] { def apply(x: Int) = x + " is even" def isDefinedAt(x: Int) = (x != 0 && x%2 ==
0) } val isOdd: PartialFunction[Int, String] = { case x if x % 2 == 1 => x + " is odd" } val evenNumbers = sample map (isEven orElse isOdd) print(evenNumbers) }
Companion Object
/* Companion object and its class can access each other’s private members
(fields and methods) Have same name Same file */ object CompanionTest extends App{ class Person {var name = ""} object Person { def apply(name: String): Person = { var p = new Person() p.name = name p } } print(Person("Fred Flinstone").name) //Person.apply("Fred
Flinstone"). }
Future
/* Anything inside Future {}, is run in a different thread Application’s main thread doesn’t stop for Future to Complete Result of Future is always Try types: Success or Failure To make main thread wait scala.concurrent.Await.result(future,15.seconds)
is used isComplete , value ,map , collect */ object FutureTest extends App{ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.util.{Failure, Success} val f1:Future[Int] = Future { Thread.sleep(1000); 21 + 21 } while(f1.isCompleted!=true){println("future operation completed ??
- "+f1.isCompleted)} println(f1.value) val f2:Future[Int]=f1.map(i => i+1) f2.onComplete { case Success(value) => println(s"Got the callback, value
= $value") case Failure(e) => e.printStackTrace } }
Implicit
object ImplicitTest extends App{ case class Person(name: String) {def greet = println(s"Hi, my name
is $name")} implicit def fromStringToPerson(name: String) = Person(name) "Peter".greet }
Copy the api-key . url under : Language Translator >Service
Credentials
Replace api-key and url (More REST calls : Language Translator
>GettingStarted)
curl -X POST --user "apikey:{apikey}"
\ --header "Content-Type: text/plain"
\ --data "Language Translator translates text from one language to another"
\
"{url}/v3/identify?version=2018-05-01"
open Ibmcloud Shell from the ibmcloud tool bar
Run the new Command
Cloudant Database
Login to IBMCloud
Goto Catalog
Select and Create a Cloudant Instance
Open the Cloudant Instance provisioned from Resource List > Services
and Software >Cloudant
Click on Manage > Launch Dashboard
Create Database > test >Click on Create
Open test DB > Design Document > New Doc > add new json key
value
eg:
{
"_id": "ce9575de70477c932e222bf5b6bd7fea",
"name": "deepak"
}
Click on Create Document
Lets fetch this document from API
Under Cloudant page > Service Credentails > Create New Role >
Manager >Add
Open the New Service Credentails Created , Note down apikey , url
Open ibmcli from ibmcloud tool bar
(https://cloud.ibm.com/docs/account?topic=account-iamtoken_from_apikey&interface=api)
$curl -X POST 'https://iam.cloud.ibm.com/identity/token' -H
'Content-Type: application/x-www-form-urlencoded' -d
'grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey=<MY_APIKEY>'
Copy the Token generated
Run below commands
API_BEARER_TOKEN=<paste token here>
curl -H "Authorization: Bearer $API_BEARER_TOKEN" -X GET "{url}/test/{_id
from cloudant}"
Other Api:
curl -H "Authorization: Bearer $API_BEARER_TOKEN" -X PUT /{db}" #Create DB
curl -H "Authorization: Bearer $API_BEARER_TOKEN" -X PUT /{db}/{doc_id}"
Create Document
curl -H "Authorization: Bearer $API_BEARER_TOKEN" -X GET "{url}/test/{_id
from cloudant}" #Read Document
U can create a proxy link for "https://eu-gb.functions.cloud.ibm.com/api/v1/namespaces/j.thepac%40gmail.com_dev/actions/hello-world/helloworld?blocking=true" link by Creating ApiGateWay and providing the above url .
Simple ETL from COS to DB2
Pre- Req:
DB2:
Make sure u have created a DB2 instance in IBMCLoud
Create a table in DB2 (do not insert any records)
CREATE TABLE table_name (col1 int, col1 varchar(255)); -- successfully
created
In Db2 Ui > Data icon > Tables
Click on the scheme
check if the table is created
Test it
Syntax : Select * from scheme.table;
Example:Select * from DXC02390.table_name;
note down the Scheme name and table name
Click on about icon in DB2 UI
Note down from "<crn ..........::>"
Cloudant:
Create a Cloudant Object Storage (COS) in IBM Cloud
Create a Bucket
Add a parq File , with scheme similar to the above Table created (use
apache spark to create the file locally and drag and drop)
Run the below command to copy the data from COS to DB2
Syntax :
SELECT * FROM <Object SQL URL> STORED AS PARQUET
INTO crn:xxxxxxx:/scheme.table PARALLELISM 2
Example:
SELECT * FROM
cos://jp-tok/cloud-object-storage-7d-cos-standard-gsi/test2Cols.parquet
STORED AS PARQUET INTO crn:v1:bluemix:public:dashdb-for-transactions:eu-gb:a/e31b7085afca4ab8b6ac9b1077cd8af9:9257e5bc-49f0-43a1-b776-f7a0ff41b2b6::/DXC02390.MONOREPO_POC
PARALLELISM 2
Copy ETL using REST
Pre-Req: Simple ETL from COS to DB2
curl -X POST 'https://iam.cloud.ibm.com/identity/token' \
-d '{"statement":"SELECT * FROM
cos://jp-tok/cloud-object-storage-7d-cos-standard-gsi/test2Cols.parquet
STORED AS PARQUET INTO
cos://jp-tok/cloud-object-storage-7d-cos-standard-gsi/test2Cols_result"
}'
Run Spark Job on COS Data
login to IBMCLOUD
Goto Catalog > Search for Watson Studio
Agree to terms and conditions> Click on Create
Click On next >Next > click Create Watson Studio
Click on Projects > New Project >Empty Project
Add to Project > Notebook
Select Runtime > python (least configuration)
!pip -q install ibmcloudsql
import ibmcloudsql
cloud_api_key="Create api key from Manage"
sql_crn="crn of SQL Query Instance"
sql_cos_endpoint="cosendpoint of bucket/result_prefix"
query="right click on the COS parq file and click on SQL
Query"
1. make sure bazel is installed in your computer 2. create a new folder as Project 3. cd inside the project folder 4. create a new "WORKSPACE" file 5. create python/Program folder 6. cd to Program 7. Create a new file BUILD file:
package(default_visibility = ["//visibility:public"]) py_binary( name = 'hello', #anyname main = 'hello.py', #reference path eg: parentfolder.file srcs= ['hello.py'], #filename )
8. $echo "print('hi')" > hello.py 9. make sure ur in folder containing BUILD file 10. $bazel run hello
Bazel has default setting for Python and Java ie., u can start with empty WORKSPACE and run python/java source files .
skylib_version = "0.8.0" http_archive( name = "bazel_skylib", type = "tar.gz", url = "https://github.com/bazelbuild/bazel-skylib/releases/download/{}/bazel-skylib.{}.tar.gz".format (skylib_version, skylib_version), sha256 = "2ef429f5d7ce7111263289644d233707dba35e39696377ebab8b0bc701f7818e", )
Load
rules_scala : like scala_binary,scala_test etc., to use in BUILD file
scala_config : Config scala version
scala_register_toolchain : For using the jar file build from 1 languge as input to another
scala repositories : to download default libraries for scala
Set maven as third party repo
IntelliJ Setup
1. Make sure intelliJ has bazel plugin installed 2. import above project as basel project 3. create new 4. next ( if u already have .ijwb/ folder created , make sure it is deleted) 5. done
Common Commands :
bazel build target #target can be name of build or //path of package:target
In this project WORKSPACE is empty because Native rules ship with the Bazel binary and do not require a load statement. Native rules are available globally in BUILD files.
But for scala ,python etc u need to include load statements in workspace and use them in Build files
Steps:
Open link https://github.com/bazelbuild
select repos u need for creating ur project
Example if u want to add "bazel-skylib" (Provides functions , file paths, and data types in build file)
If you're just doing personal projects where nobody else will use the code, then you can make any name .
Don't make up something that starts with com. or net. or other top-level domain though, because that would imply that you own the domain name (ie. using com.john as your package name just because your name happens to be John is not a good idea).
The domain-name-backwards convention is there to prevent name collisions. Two different companies with the same product name will have different namespaces so everything works fine.
import com.google.gson.{Gson, JsonParser} val json="""{"hello": "world", "age": 42}""" val parser:JsonParser= new JsonParser(); val res= parser.parse(json).getAsJsonObject() println(res.get("hello")) // world
//read from file //val path="/path/file.json" //val lines = scala.io.Source.fromFile(path, "utf-8").getLines.mkString
Code 2 (Requires Lift Web - Maven):
import net.liftweb.json._ import net.liftweb.Serialiazation.write case class Address(city:String,Country:String) case class Person(name:String ,address:Address) implicits def formats=DefaultFormats print(write(Person("Sam",Address("NY","US"))
- Cluster: A collection of nodes(Computer) represent a cluster.
- Pod:
- Runs Container Image
- unique ip address
- Pods are the smallest unit in kubernetes
- private, isolated network.
- Container Image : code
- kubectl: Command creates a proxy ,forwarding communications into the cluster using API.
LoadBalancer
- load balancer functionality
- Only Used in Cloud Services ( AWS, GCP ,Azure ..)
- Each service needs new LoadBalancer and new IP address.
Ingress (Similar to port-forward / Simple Service )
- Ingress is an object that allows access to your Kubernetes services from outside
- Needs ingress controller running.
- Ingress controller ( will not run by default )
- Ingress Resource
kubectl get pods --all-namespaces | grep ingress
kubectl get service --all-namespaces | grep ingress
kubectl get Ingress ingress-name
kubectl delete ingress ingress-name
apiVersion: v1
kind: Pod
# Pod / Job
metadata:
name: hi
spec:
containers:
- name: hi
image: ubuntu
command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600']
# imagePullPolicy: Never
kubectl create -f hi.yml
kubectl exec -it hi -- /bin/bash
Activities
Activity : Pull local Docker image into minikube
Create a file with name Dockerfile
Add below lines :
FROM alpine:3.4
RUN apk update
RUN apk add vim
RUN apk add curl
open new terminal
minikube start
eval $(minikube docker-env)
docker build -t foo:1.0 .
docker images #Check if foo is created
kubectl run foo -it --image=foo:1.0
- $cat /proc/version
- $exit
kubectl get pods
kubectl get deployments
kubectl delete deployment foo
Activity :Create "Hello world" python program push and pull from remote Docker
Pre-Req:
FROM python:3.7
RUN mkdir /app
WORKDIR /app
ADD . /app/
EXPOSE 5000
CMD ["python","-u", "/app/main.py"]
Steps:
- cd into apps
- sudo docker images #empty table
- sudo docker build -t any_name:v1 . # note there is '.' at the end
- sudo docker images
- sudo docker run -p 4000:80 any_name
- sudo docker images #note down the id /name, usually it is latest
- sudo docker login
- sudo docker tag TAG_id usn/repo_name:TAG_NAME_of_image
#docker tag 3a4677d31cde usn/test_repo:latest
- sudo docker push usn/repo_name:TAG_NAME_of_image
#docker push usn/repo:latest
- kubectl apply -f deployment.yaml #pull image from docker hub & create pod
- kubectl logs -f foo #hi
- kubectl get pods #shows all pods
- kubectl delete pod pod_name #to delete pod
#Status =CrashLoopBackOff. Because we just have 1 print statement , so whenever the application was closing after "hi".The pod try to restart service and had done it mutiple times
Activity : Send arguments from different terminal
kubectl attach redis_container -i
Activity :Forward ports from Pods to your local machine
SubQuery : Simple subquery doesn't use values from the outer query and is being calculated only once:
SELECT id, first_name FROM student_details WHERE id IN (SELECT student_id FROM student_subjects
WHERE subject= 'Science');
CoRelated Subquery - Query To Find all employees whose salary is above average for their department SELECT employee_number, name FROM employees emp WHERE salary > ( SELECT AVG(salary)FROM employees
Hold Raspberry Pi Pico on-board BOOTSEL button (Button on the board)
Plug it into USB (or pulling down the RUN/Reset pin to ground)
It will appear as a USB disk drive
you can copy paste the firmware onto drive
the drive will change for Circuit python and no reaction for Micro Python
Micro Python :
Install and open thonny IDE
IDE should automatically detect pico
Shell
from machine import Pin
led = Pin("LED", Pin.OUT)
led.value(1)
led.value(0)
Code - save as main.py in "MicroPython" device
from machine import Pin
import time
led = Pin("LED", Pin.OUT) # Pin(25, Pin.OUT)
for i in range(1, 5):
print(i)
led.value(1)
time.sleep(1) # Use 1 second instead of 10 seconds for better visibility
led.value(0)
time.sleep(1)
Circuit Python :
Configure Mu Editor so that Code can be seen running in real time., ie as soon as the code is saved , the result reflected in LEDs directly .
sudo apt-get update
sudo apt-get -f upgrade
apt install libfuse2
Download and Install Mu Editor
Run
Copy paste below program into code.py
Save the file in the device
Open Mu Editor
Should automatically recognise PICO and Opens code.py
Blink Program
import board
import time
from digitalio import DigitalInOut, Direction,Pull
led = DigitalInOut(board.LED)
led.direction = Direction.OUTPUT
#Connect LED between Pin1 ie GP0 and Pin 2
op = DigitalInOut(board.GP0)
op.direction = Direction.OUTPUT
while 1:
if led.value==0: led.value= True
elif led.value==1:led.value = False
time.sleep(0.5)
if op.value==0: op.value= True
elif op.value==1:op.value = False
time.sleep(0.5)
Input Switch
import time
import board
import digitalio
button = digitalio.DigitalInOut(board.GP0)
button.switch_to_input(pull=digitalio.Pull.UP )
while True:
print(button.value)
time.sleep(0.5)
1. connect SEEEDuino xiao to PC using TYPE-C cable 2. short RST pins using a cable fast , 2 times. 3. Once done successfully,Audrino drives appears 4. Go website -
5. Download latest .UF2 file 6. Copy and paste it inside the drive 7. Now the drive will be converted to CIRCUITPY 8. Create a file code.py 9. Copy paste below code into code.py (same for all circuit py IC)
import time import board from digitalio import DigitalInOut,Direction
led = DigitalInOut(board.D13) #D13 is a built in LED
#A1 - A10 can be used as well if u use a separate LED and a Resistor 100 - 400 ohms refer below for calculations led.direction=tinker .OUTPUT
while True: led.value = True time.sleep(1) led.value=False time.sleep(1)
10. Save file 11. The LED should start blinking
A simple LED circuit consists of a LED and resistor. The
resistor is used to limit the current that is being drawn and is called a
current limiting resistor. Without the
resistor the LED would run at too high of a voltage, resulting in too
much current being drawn which in turn would instantly burn the LED, and
likely also the GPIO port.
To calculate the resistor value we need to examine the
specifications of the LED. Specifically we need to find the forward
voltage (VF) and the forward current (IF).