Gửi Slack Alerts trên Airflow

Slack là một công cụ khá phổ biến trong các Team, slack giúp tập hợp mọi thông tin về Slack (như Jira alert, ETL pipelines, CI/CD status, deployments, ...) một cách thống nhất và dễ dàng theo dõi. Bài viết này mình hướng dẫn gửi mọi báo lỗi của Airflow đến Slack.

1. Slack Incoming Webhooks và Airflow Connection

Truy cập Slack App Directory tìm Incoming Webhooks: https://<workspace>.slack.com/apps/A0F7XDUAZ-incoming-webhooks

Ở mục Post to Channel chọn Channel, sau đó bấm Add Incoming Webhooks integration

Sau đó bạn sẽ nhận được 1 URL có dạng: https://hooks.slack.com/services/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2

Vào Airflow > Admin > Connections để thêm một connection mới

  • Conn Id: Slack
  • Conn Type: HTTP
  • Host: https://hooks.slack.com/services
  • Password: /T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2

2. Slack alert Utils

Tạo file utils chứa function alert, ví dụ: /dags/utils/slack_alert.py

from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

SLACK_CONN_ID = 'slack'


def task_fail_slack_alert(context):
    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = """
Task Failed.
*DAG*: {dag_id}
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
    """.format(
        dag_id=context.get('dag').dag_id,
        task=context.get('task_instance').task_id,
        dag=context.get('task_instance').dag_id,
        ti=context.get('task_instance'),
        exec_date=context.get('execution_date'),
        log_url=context.get('task_instance').log_url,
    )
    failed_alert = SlackWebhookOperator(
        task_id='slack_alert',
        http_conn_id=SLACK_CONN_ID,
        webhook_token=slack_webhook_token,
        message=slack_msg,
        username='airflow')
    return failed_alert.execute(context=context)

3. Config Slack alert cho từng DAG

Với mỗi DAG muốn alert, ta thêm thuộc tính on_failure_callback cho mỗi DAG. Ví dụ như dưới dây:

example_dag.py

from airflow import DAG
...
from utils.slack_alert import task_fail_slack_alert


default_args = {
    **params['default_args'],
    'owner': DAG_OWNER,
    'on_failure_callback': task_fail_slack_alert,
    ...
}

dag = DAG('dag_id', default_args=default_args)

...

Kết quả:

Tham khảo

Chúc các bạn thành công.

Data EngineerAirflowDataData Engineer