Monday, November 9, 2020

AirFlow

AIRFLOW


Pre-Req:

1. Make Sure python3 is installed
2. pip show any_package_name #to get path of installed packages
3. UpGrade Packages 

Method 1

   pip3 install pip-review
   pip-review --local --interactive

Method 2

   pip freeze > requirements.txt
   pip3 install -r requirements.txt --upgrade

4. default configuration path " ~/airflow/airflow.config"
min_file_process_interval = 60 # after how much seconds a new DAGs should be picked up from the filesystem
max_threads = 1 # This defines how many threads will run.
5. Default Dags path: #/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/example_dags

Install
  •    pip3 install apache-airflow    
  •    $airflow version
  •    $airflow initdb
  •    Open New Terminal Window
  •    $airflow webserver
  •    Open new Terminal Window
  •    $airflow scheduler 
  •    In browser  - localhost:8080
  •    By default all Dags are stored in example_dags (/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/example_dags)
  •    Automtically this file should appear in Web UI
  •    run the Task
   ViewResult : Click on last Run > click on any Tasks (Graph View) >click on Download Log

Notes:

DAG consists of Operators with unique Task id which is executed in sequence
DAG name has to be unique and this would appear in webconsole
Each DAG file has 1 DAG object , which is passed as argument to all Operators
DAG object has Name , arguments and schedule 
Schedule arguement in DAG object should be of past date to trigger immediately

Operator : Used to execute different command / function for Bash , Python etc .,
Once operators are created  , symbol ">>" is used to give the sequence of Execution
SubDAG : This is useful when you have repeating processes within a DAG or among other DAGs.
Xcom(“cross-communication”)
 a ) lets you pass messages between tasks.It consists of 
 b) pusher - The pusher sends a message
 c) puller -the puller receives the message.
Ref: https://airflow.apache.org/docs/stable/concepts.html 
https://big-data-demystified.ninja/2020/04/15/airflow-xcoms-example-airflow-demystified/

##################
Create new SampleScript.py
summary : Create a 1 DAG object and pass it as argument to operators
args  -> DAG object -> Operator
For a DAG to be executed, the start_date must be a time in the past, 
##################
import logging
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

logger = logging.getLogger("airflow.task")
dagname='deepak_example_all_operator'
args = {'owner': 'Airflow', 'start_date': days_ago(2)}
dag = DAG(dag_id=dagname,default_args=args, schedule_interval='@once',tags=['python'])
subdag_id="subdagid"
logging.info('This is an info message  - {}'.format(dagname))

def print_kwargs(**op_kwargs):
    print (op_kwargs.keys())
    return 'Whatever you return gets printed in the logs'

def print_args(*op_args):
    print(op_args)
    return 'Whatever you return gets printed in the logs'

def my_subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG( dag_id="{}.{}".format(parent_dag_name,child_dag_name), default_args=args,schedule_interval="@daily",)
    # DummyOperator(task_id=child_dag_name,default_args=args,dag=dag_subdag, )
    BashOperator(task_id=child_dag_name, bash_command='echo "hi deepak"', dag=dag_subdag)
    return dag_subdag

subdag=my_subdag(dagname, subdag_id, args)

dummy = DummyOperator(task_id='Dummy', dag=dag)
bash_pass = BashOperator(task_id='bash_pass', bash_command='echo 1', dag=dag)
bash_fail = BashOperator(task_id='bash_fail', bash_command='wrong command', dag=dag)
python_dictionary = PythonOperator(task_id='python_dictionary', python_callable=print_kwargs, op_kwargs={"1": "A"}, dag=dag)
python_list = PythonOperator(task_id='python_list', python_callable=print_args, op_args=['one', 'two', 'three'], dag=dag, )
# provide_context=True, #if True , use jinja Template

subdag_operator = SubDagOperator(task_id=subdag_id,subdag=subdag,dag=dag,)

dummy >> python_list >> python_dictionary >> bash_pass >> bash_fail >>subdag_operator

if __name__ == "__main__":
    dag.cli()
  ##################

#Error 1:ImportError: cannot import name 'resolve_types'
  •    pip3 install attr
  •    pip3 install cattrs

#Error 2:unknown locale: UTF-8
  •    Append this to your ~/.bash_profile:
  •    export LC_ALL=en_US.UTF-8
  •    export LANG=en_US.UTF-8
  •    source ~/.bash_profile

#Error 3: airflow-webserver.pid' is stale error
  • sudo lsof -i tcp:8080
  • kill -9 PID#all PIDs
  • rm -rf airflow-webserver.pid
  • https://stackoverflow.com/a/59086513

No comments:

Post a Comment