Airflow - một số ghi chép

Một số ghi chép, tips & tricks của mình trong quá trình sử dụng Apache Airflow.

  • Viết các functions (tasks) luôn cho mọi kết quả giống nhau với các input giống nhau (stateless).

    • Tránh sử dụng global variables, random values, hardware timers.
  • Một số tính năng nên biết

    • depends_on_past sử dụng khi viết DAGs để chắc chắn mọi task instance trước đó đều success.
    • LatestOnlyOperator để skip một số bước phía sau nếu một số task bị trễ.
    • BranchPythonOperator cho phép rẽ nhánh workflow tùy vào điều kiện được định nghĩa.
  • Sử dụng airflow test <dag-id> <task-id> ... để test task instance trên local khi code.

  • Sử dụng Docker Compose để thiết lập môi trường local cho dễ.

  • Để test DAG với scheduler, hãy set schedule_interval=@once, chạy thử, để chạy lại thì chỉ cần clear DagRuns trên UI hoặc bằng lệnh airflow clear

  • Khi DAG đã được chạy, airflow chứa các task instance trong DB. Nếu bạn thay đổi start_date hoặc interval, scheduler có thể sẽ gặp lỗi. Nên đổi tên dag_id nếu muốn thay đổi start_date hoặc interval.

  • Sử dụng Bitshift thay vì set_upstream() and set_downstream() để code dễ nhìn hơn, ví dụ

    op1 >> op2
    # tương đương: op1.set_downstream(op2)
    
    op1 >> op2 >> op3 << op4
    # tương đương:
    #    op1.set_downstream(op2)
    #    op2.set_downstream(op3)
    #    op3.set_upstream(op4)
    
    op1 >> [op2, op3] >> op4
    # tương đương
    #    op1 >> op2
    #    op1 >> op3
    #    op2 >> op4
    #    op3 >> op4
    # hoặc tương đương
    #    op1.set_downstream([op2, op3])
    #    op2.set_downstream(op4)
    #    op3.set_downstream(op4)
    
  • Sử dụng Variables để lưu trữ params của DAGs (Admin -> Variables)

    from airflow.models import Variable
    foo = Variable.get("foo")
    bar = Variable.get("bar", deserialize_json=True)
    baz = Variable.get("baz", default_var=None)
    

    hoặc sử dụng variable trong jinja template:

    echo {{ var.value.<variable_name> }}
    
  • Sử dụng Slack để nhận thông báo lỗi

  • Sử dụng default arguments để tránh lặp lại các tham số

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'params': { 'foo': 'baz' }
    }
    
    with DAG(dag_id='airflow', default_args=default_args):
        op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1')
        op2 = BigQueryOperator(task_id='query_2', sql='SELECT 2')
        op1 >> op2
    
  • Lưu password, token trong Connections

    from airflow.hooks.base_hook import BaseHook
    aws_token = BaseHook.get_connection('aws_token').password
    
  • Có thể generate DAG một cách tự động, ví dụ

    def create_dag(id):
        dag = DAG(f'dag_job_{id}', default_args)
        op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1', dag=dag)
        ...
        return dag
    
    for i in range(100):
        globals()[f'dag_job_{id}'] = create_dag(id)
    
Data EngineerAirflowDataData Engineer