Airflow Slack Channel Notifications for Failed or Successful DAG Tasks

The most common desired alerting mechanism when an Airflow task fails is slack notifications. This can be achieved simply with two different aspects or components of Airflow:

  1. Airflow Callbacks, and
  2. Airflow Slack provider package.

Callbacks

With Airflow Callback functions feature, one can run a particular Python function when a task fails, succeeds, is up for retry, right before execution or when the SLA is missed. Its usage is straightforward:

from airflow.decorators import dag
from airflow.operators.dummy_operator import DummyOperator

def task_failure_callback(context):
    ...

def task_success_callback(context):
    ...

@dag(
    scheduler_interval=...,
    on_failure_callback=task_failure_callback,
)
def dummy_dag():
    failure_task = DummyOperator(task_id='failure_task')

    success_task = DummyOperator(
        task_id='success_task',
        on_success_callback=task_success_callback
    )

    failure_task >> success_task

In the code above, we set the on_failure_callback at the DAG level, i.e., it will be inherited by all the tasks but the on_success_callback is only set on one task (success_task).

Slack Provider

Now, all we need to do is use the Slack provider package to send notifications from our task callbacks. Firstly, you’ll need to do is install the provider package:

pip install apache-airflow-providers-slack

Secondly, create a Slack app with “Incoming Webhooks” functionality. You’ll end up getting a Webhook URL of this format:

https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX

Thirdly, create an Airflow connection from the UI or CLI using the Webhook URL. The connection object will be used by our code to connect to Slack’s API. Here’s a screenshot of what the UI fields should look like (with sample values):

In the Webhook Token field above, put the T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX part of your Webhook URL.

Finally, we use the SlackWebhookHook hook from our callbacks to send a message via the webhook URL to the relevant channel you chose while creating the app:

from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

# Send slack notification from our failure callback
def task_failure_callback(context):
    slack_msg = f"""
    :red_circle: Airflow Task Failed.
    *Task*: {context.get('task_instance').task_id}
    *Dag*: {context.get('task_instance').dag_id}
    *Execution Time*: {context.get('execution_date')}
    *Log Url*: {context.get('task_instance').log_url}
    """

    slack_hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
    slack_hook.send(text=slack_msg)

Note: The slack_webhook_conn_id value comes from what you put in the UI (or CLI) above.

So whenever a task fails now, the task_failure_callback function (passed to on_failure_callback) will be called which will send slack alerts via the provider hook.

Leave a Reply

Your email address will not be published. Required fields are marked *