1. Make Sure python3 is installed
2. pip show any_package_name #to get path of installed packages
3. UpGrade Packages
Method 1
pip3 install pip-review
pip-review --local --interactive
Method 2
pip freeze > requirements.txt
pip3 install -r requirements.txt --upgrade
4. default configuration path " ~/airflow/airflow.config"
min_file_process_interval = 60 # after how much seconds a new DAGs should be picked up from the filesystem
max_threads = 1 # This defines how many threads will run.
5. Default Dags path: #/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/example_dags
- pip3 install apache-airflow
- $airflow version
- $airflow initdb
- Open New Terminal Window
- $airflow webserver
- Open new Terminal Window
- $airflow scheduler
- In browser - localhost:8080
- By default all Dags are stored in example_dags (/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/example_dags)
- Automtically this file should appear in Web UI
- run the Task
ViewResult : Click on last Run > click on any Tasks (Graph View) >click on Download Log
DAG consists of Operators with unique Task id which is executed in sequence
DAG name has to be unique and this would appear in webconsole
Each DAG file has 1 DAG object , which is passed as argument to all Operators
DAG object has Name , arguments and schedule
Schedule arguement in DAG object should be of past date to trigger immediately
Operator : Used to execute different command / function for Bash , Python etc .,
Once operators are created , symbol ">>" is used to give the sequence of Execution
SubDAG : This is useful when you have repeating processes within a DAG or among other DAGs.
a ) lets you pass messages between tasks.It consists of
b) pusher - The pusher sends a message
c) puller -the puller receives the message.
Create new
summary : Create a 1 DAG object and pass it as argument to operators
args -> DAG object -> Operator
For a DAG to be executed, the start_date must be a time in the past,
import logging
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
logger = logging.getLogger("airflow.task")
args = {'owner': 'Airflow', 'start_date': days_ago(2)}
dag = DAG(dag_id=dagname,default_args=args, schedule_interval='@once',tags=['python'])
subdag_id="subdagid"'This is an info message - {}'.format(dagname))
def print_kwargs(**op_kwargs):
print (op_kwargs.keys())
return 'Whatever you return gets printed in the logs'
def print_args(*op_args):
return 'Whatever you return gets printed in the logs'
def my_subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG( dag_id="{}.{}".format(parent_dag_name,child_dag_name), default_args=args,schedule_interval="@daily",)
# DummyOperator(task_id=child_dag_name,default_args=args,dag=dag_subdag, )
BashOperator(task_id=child_dag_name, bash_command='echo "hi deepak"', dag=dag_subdag)
return dag_subdag
subdag=my_subdag(dagname, subdag_id, args)
dummy = DummyOperator(task_id='Dummy', dag=dag)
bash_pass = BashOperator(task_id='bash_pass', bash_command='echo 1', dag=dag)
bash_fail = BashOperator(task_id='bash_fail', bash_command='wrong command', dag=dag)
python_dictionary = PythonOperator(task_id='python_dictionary', python_callable=print_kwargs, op_kwargs={"1": "A"}, dag=dag)
python_list = PythonOperator(task_id='python_list', python_callable=print_args, op_args=['one', 'two', 'three'], dag=dag, )
# provide_context=True, #if True , use jinja Template
subdag_operator = SubDagOperator(task_id=subdag_id,subdag=subdag,dag=dag,)
dummy >> python_list >> python_dictionary >> bash_pass >> bash_fail >>subdag_operator
if __name__ == "__main__":
#Error 1:ImportError: cannot import name 'resolve_types'
- pip3 install attr
- pip3 install cattrs
#Error 2:unknown locale: UTF-8
- Append this to your ~/.bash_profile:
- export LC_ALL=en_US.UTF-8
- export LANG=en_US.UTF-8
- source ~/.bash_profile
#Error 3:' is stale error
- sudo lsof -i tcp:8080
- kill -9 PID#all PIDs
- rm -rf
