Airflow Slack Channel Notifications for Failed or Successful DAG Tasks
The most common desired alerting mechanism when an Airflow task fails is slack notifications. This can be achieved simply with two different aspects or components of Airflow:
- Airflow Callbacks, and
- Airflow Slack provider package.
Callbacks
With Airflow Callback functions feature, one can run a particular Python function when a task fails, succeeds, is up for retry, right before execution or when the SLA is missed. Its usage is straightforward:
from airflow.decorators import dag
from airflow.operators.dummy_operator import DummyOperator
def task_failure_callback(context):
...
def task_success_callback(context):
...
@dag(
scheduler_interval=...,
on_failure_callback=task_failure_callback,
)
def dummy_dag():
failure_task = DummyOperator(task_id='failure_task')
success_task = DummyOperator(
task_id='success_task',
on_success_callback=task_success_callback
)
failure_task >> success_task
In the code above, we set the on_failure_callback
at the DAG level, i.e., it will be inherited by all the tasks but the on_success_callback
is only set on one task (success_task
).
Slack Provider
Now, all we need to do is use the Slack provider package to send notifications from our task callbacks. Firstly, you’ll need to do is install the provider package:
pip install apache-airflow-providers-slack
Secondly, create a Slack app with “Incoming Webhooks” functionality. You’ll end up getting a Webhook URL of this format:
https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
Thirdly, create an Airflow connection from the UI or CLI using the Webhook URL. The connection object will be used by our code to connect to Slack’s API. Here’s a screenshot of what the UI fields should look like (with sample values):

In the Webhook Token
field above, put the T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
part of your Webhook URL.
Finally, we use the SlackWebhookHook
hook from our callbacks to send a message via the webhook URL to the relevant channel you chose while creating the app:
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
# Send slack notification from our failure callback
def task_failure_callback(context):
slack_msg = f"""
:red_circle: Airflow Task Failed.
*Task*: {context.get('task_instance').task_id}
*Dag*: {context.get('task_instance').dag_id}
*Execution Time*: {context.get('execution_date')}
*Log Url*: {context.get('task_instance').log_url}
"""
slack_hook = SlackWebhookHook(slack_webhook_conn_id='slack_webhook')
slack_hook.send(text=slack_msg)
Note: The slack_webhook_conn_id
value comes from what you put in the UI (or CLI) above.
So whenever a task fails now, the task_failure_callback
function (passed to on_failure_callback
) will be called which will send slack alerts via the provider hook.