Airflow 2.0 - Taskflow API

Chú trọng vào việc đơn giản hóa và rõ ràng cách viết Airflow DAG, cách trao đổi thông tin giữa các tasks, Airflow 2.0 ra mắt Taskflow API cho phép viết đơn giản và gọn gàng hơn so với cách truyền thống, đặc biệt vào các pipelines sử dụng PythonOperators.

Sau đây là ví dụ khi sử dụng cách viết mới Taskflow API trong Airflow 2.0:

import urllib.request, json

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

default_args = { 'owner': '[email protected]' }

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def taskflow_api_etl():
  """
  ### TaskFlow API ETL Documentation
  This is document for this DAG
  """
  data_source_url = 'https://data.duyet.net/_/orders/123.json'

  @task
  def extract():
    """
    #### Extract task
    This is document for this task - extract().
    A simple Extract task to get data ready for the rest of the data
    pipeline. In this case, getting data from remote data source url.
    """
    src = urllib.request.urlopen(data_source_url)
    data = json.loads(url.read().decode())

    return data

  @task(multiple_outputs=True)
  def transform(data: dict):
    """
    ### Transform task
    """
    total_order_value = 0
    order_keys = data.keys()

    for value in data.values():
      total_order_value += value

    return {
      "total_order_value": total_order_value,
      "order_keys": order_keys
    }

  @task()
  def load(total_order_value: float, order_keys: list):
    print("Total order value is: %.2f" % total_order_value)
    print("List of order keys: %s" % order_keys)

  order_data = extract()
  order_summary = transform(order_data)
  load(order_summary["total_order_value"], order_summary["order_keys"])

dag = taskflow_api_etl()

1. DAG

Đây là cách viết mới bằng cách sử dụng các Python decorators của Taskflow API: @dag@task

Trong ví dụ trên, chúng ta sử dụng @dag decorator cho python function taskflow_api_etl, đây là DAG ID, phần mô tả nằm trong docblockr sẽ hiển thị trên Airflow webserver.

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def taskflow_api_etl():
  """
  ### TaskFlow API ETL Documentation
  This is document for this DAG
  """

DAG cũng hỗ trợ parameterize nếu bạn thêm tham số vào DAG function, tham số này sẽ được sử dụng khi trigger DAG manually. Xem thêm về Passing Parameters when triggering dags.

@dag
def example_dag(email: str = '[email protected]'):
  ...

2. Tasks

Trong pipeline ở trên, ta có 3 tasks python function, sử dụng @task decorator. Tên của function dùng để đặt tên cho task_id. Cách viết mới này chỉ cần sử dụng @task thay vì định nghĩa python function rồi bỏ vào PythonOperator.

@task
def extract():
  """
  #### Extract task
  This is document for this task - extract().
  A simple Extract task to get data ready for the rest of the data
  pipeline. In this case, getting data from remote data source url.
  """
  src = urllib.request.urlopen(data_source_url)
  data = json.loads(url.read().decode())

  return data

Outputs và inputs sẽ được gửi qua lại giữa các tasks sử dụng XCom. Output return từ function task, được sử dụng để làm input cho các tasks tiếp theo. Với cách này, input và output, mối quan hệ giữa các task sẽ tường minh hơn.

Sử dụng @task(multiple_outputs=True) để tách ra nhiều giá trị XCom nếu task task trả về một dictionaries, lists hoặc tuples.

Ví dụ:

@task(multiple_outputs=True)
def transform(data: dict):
  ...
  return {
    "total_order_value": total_order_value,
    "order_keys": order_keys
  }

output = transform(data)
print(output.total_order_value)
print(output.order_keys)

Nếu sử dụng typing Dict cho function return type thì multiple_outputs cũng tự động được set thành True.

@task
def identity_dict(x: int, y: int) -> Dict[str, int]:
    return {"x": x, "y": y}

Nếu gọi decorated function nhiều lần trong 1 DAG, decorated function sẽ tự động generate ra các task_id mới.

@dag()
def taskflow_api_etl():
  @task
  def extract(uid: str):
    # ...
    return data

  # This will generate an operator for each uid
  for uid in user_ids:
    extract(uid)

Ví dụ trên, DAG sẽ tạo ra các task ids sau: [extract, extract__1, extract__2, ...].

3. Context

Để truy cập vào context, bạn có thể sử dụng get_current_context.

from airflow.operators.python import task, get_current_context
@task
def my_task():
    context = get_current_context()
    ti = context["ti"]

Gọi method này ngoài execution context sẽ raise error.

4. Airflow decorators vs Operator

Taskflow API decorators có thể được sử dụng kết hợp với các Operator truyền thống, như ví dụ dưới đây:

# ...
with DAG('send_server_ip', default_args=default_args) as dag:
  get_ip = SimpleHttpOperator(task_id='get_ip', xcom_push=True)

  @task
  def prepare_email(raw_json: str) -> Dict[str, str]:
    external_ip = json.loads(raw_json)['origin']
    return {
      'subject':f'Server connected from {external_ip}',
      'body': f'External IP {external_ip}'
    }

  email_info = prepare_email(get_ip.output)
  send_email = EmailOperator(
      task_id='send_email',
      to='[email protected]',
      subject=email_info['subject'],
      html_content=email_info['body']
  )
# ...
@dag
def send_server_ip(default_args=default_args):
  get_ip = SimpleHttpOperator(task_id='get_ip', xcom_push=True)

  @task
  def prepare_email(raw_json: str) -> Dict[str, str]:
    external_ip = json.loads(raw_json)['origin']
    return {
      'subject':f'Server connected from {external_ip}',
      'body': f'External IP {external_ip}'
    }

  email_info = prepare_email(get_ip.output)
  send_email = EmailOperator(
      task_id='send_email',
      to='[email protected]',
      subject=email_info['subject'],
      html_content=email_info['body']
  )

DAG = send_server_ip()

Kết

Bây giờ bạn đã biết khi viết 1 DAG sử dụng Taskflow API trong Airflow 2.0 sẽ đơn giản và tường minh hơn như thế nào. Tham khảo thêm tại trang Concepts để xem các giải thích chi tiết về Airflow như DAGs, Tasks, Operators, ... cũng như Python task decorator.

Data EngineerAirflowData EngineerAirflow 2.0