AIRFLOW
Pre-Req:
1. Make Sure python3 is installed
2. pip show any_package_name #to get path of installed packages
3. UpGrade Packages
Method 1
pip3 install pip-review
pip-review --local --interactive
Method 2
pip freeze > requirements.txt
pip3 install -r requirements.txt --upgrade
4. default configuration path " ~/airflow/airflow.config"
min_file_process_interval = 60 # after how much seconds a new DAGs should be picked up from the filesystem
max_threads = 1 # This defines how many threads will run.
5. Default Dags path: #/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/example_dags
Install
- pip3 install apache-airflow
- $airflow version
- $airflow initdb
- Open New Terminal Window
- $airflow webserver
- Open new Terminal Window
- $airflow scheduler
- In browser - localhost:8080
- By default all Dags are stored in example_dags (/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/example_dags)
- Automtically this file should appear in Web UI
- run the Task
ViewResult : Click on last Run > click on any Tasks (Graph View) >click on Download Log
Notes:
DAG consists of Operators with unique Task id which is executed in sequence
DAG name has to be unique and this would appear in webconsole
Each DAG file has 1 DAG object , which is passed as argument to all Operators
DAG object has Name , arguments and schedule
Schedule arguement in DAG object should be of past date to trigger immediately
Operator : Used to execute different command / function for Bash , Python etc .,
Once operators are created , symbol ">>" is used to give the sequence of Execution
SubDAG : This is useful when you have repeating processes within a DAG or among other DAGs.
Xcom(“cross-communication”)
a ) lets you pass messages between tasks.It consists of
b) pusher - The pusher sends a message
c) puller -the puller receives the message.
Ref: https://airflow.apache.org/docs/stable/concepts.html
https://big-data-demystified.ninja/2020/04/15/airflow-xcoms-example-airflow-demystified/
##################
Create new SampleScript.py
summary : Create a 1 DAG object and pass it as argument to operators
args -> DAG object -> Operator
For a DAG to be executed, the start_date must be a time in the past,
##################
import logging
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
logger = logging.getLogger("airflow.task")
dagname='deepak_example_all_operator'
args = {'owner': 'Airflow', 'start_date': days_ago(2)}
dag = DAG(dag_id=dagname,default_args=args, schedule_interval='@once',tags=['python'])
subdag_id="subdagid"
logging.info('This is an info message - {}'.format(dagname))
def print_kwargs(**op_kwargs):
print (op_kwargs.keys())
return 'Whatever you return gets printed in the logs'
def print_args(*op_args):
print(op_args)
return 'Whatever you return gets printed in the logs'
def my_subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG( dag_id="{}.{}".format(parent_dag_name,child_dag_name), default_args=args,schedule_interval="@daily",)
# DummyOperator(task_id=child_dag_name,default_args=args,dag=dag_subdag, )
BashOperator(task_id=child_dag_name, bash_command='echo "hi deepak"', dag=dag_subdag)
return dag_subdag
subdag=my_subdag(dagname, subdag_id, args)
dummy = DummyOperator(task_id='Dummy', dag=dag)
bash_pass = BashOperator(task_id='bash_pass', bash_command='echo 1', dag=dag)
bash_fail = BashOperator(task_id='bash_fail', bash_command='wrong command', dag=dag)
python_dictionary = PythonOperator(task_id='python_dictionary', python_callable=print_kwargs, op_kwargs={"1": "A"}, dag=dag)
python_list = PythonOperator(task_id='python_list', python_callable=print_args, op_args=['one', 'two', 'three'], dag=dag, )
# provide_context=True, #if True , use jinja Template
subdag_operator = SubDagOperator(task_id=subdag_id,subdag=subdag,dag=dag,)
dummy >> python_list >> python_dictionary >> bash_pass >> bash_fail >>subdag_operator
if __name__ == "__main__":
dag.cli()
##################
#Error 1:ImportError: cannot import name 'resolve_types'
- pip3 install attr
- pip3 install cattrs
#Error 2:unknown locale: UTF-8
- Append this to your ~/.bash_profile:
- export LC_ALL=en_US.UTF-8
- export LANG=en_US.UTF-8
- source ~/.bash_profile
#Error 3: airflow-webserver.pid' is stale error
- sudo lsof -i tcp:8080
- kill -9 PID#all PIDs
- rm -rf airflow-webserver.pid
- https://stackoverflow.com/a/59086513
No comments:
Post a Comment