Airflow sensors
Apache Airflow sensors are a special kind of operator that are designed to wait for something to happen. When sensors run, they check to see if a certain condition is met before they are marked successful and let their downstream tasks execute. When used properly, they can be a great tool for making your DAGs more event driven.
In this guide, you’ll learn how sensors are used in Airflow, best practices for implementing sensors in production, and how to use deferrable versions of sensors.
For a video course on Airflow sensors, check out the Astronomer Academy.
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
Sensor basics
Sensors are a type of operator that checks if a condition is met at a specific interval. If the condition is met, the task is marked successful and the DAG can move to downstream tasks. If the condition isn’t met, the sensor waits for another interval before checking again.
All sensors inherit from the BaseSensorOperator and have the following parameters:
- mode : How the sensor operates. There are two types of modes:
- poke : This is the default mode. When using poke , the sensor occupies a worker slot for the entire execution time and sleeps between pokes. This mode is best if you expect a short runtime for the sensor.
- reschedule : When using this mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time. This mode is best if you expect a long runtime for the sensor, because it is less resource intensive and frees up workers for other tasks.
Different types of sensors have different implementation details.
Commonly used sensors
Many Airflow provider packages contain sensors that wait for various criteria in different source systems. The following are some of the most commonly used sensors:
- @task.sensor decorator: Allows you to turn any Python function that returns a PokeReturnValue into an instance of the BaseSensorOperator class. This way of creating a sensor is useful when checking for complex logic or if you are connecting to a tool via an API that has no specific sensor available.
- S3KeySensor : Waits for a key (file) to appear in an Amazon S3 bucket. This sensor is useful if you want your DAG to process files from Amazon S3 as they arrive.
- DateTimeSensor : Waits for a specified date and time. This sensor is useful if you want different tasks within the same DAG to run at different times.
- ExternalTaskSensor : Waits for an Airflow task to be completed. This sensor is useful if you want to implement cross-DAG dependencies in the same Airflow environment.
- HttpSensor : Waits for an API to be available. This sensor is useful if you want to ensure your API requests are successful.
- SqlSensor : Waits for data to be present in a SQL table. This sensor is useful if you want your DAG to process data as it arrives in your database.
To review the available Airflow sensors, go to the Astronomer Registry.
Example implementation
The following example DAG shows how you might use the SqlSensor sensor:
from airflow.decorators import task, dag from airflow.providers.common.sql.sensors.sql import SqlSensor from typing import Dict from pendulum import datetime def _success_criteria(record): return record def _failure_criteria(record): return True if not record else False @dag( description="DAG in charge of processing partner data", start_date=datetime(2021, 1, 1), schedule="@daily", catchup=False, ) def partner(): waiting_for_partner = SqlSensor( task_id="waiting_for_partner", conn_id="postgres", sql="sql/CHECK_PARTNER.sql", parameters="name": "partner_a">, success=_success_criteria, failure=_failure_criteria, fail_on_empty=False, poke_interval=20, mode="reschedule", timeout=60 * 5, ) @task def validation() -> Dict[str, str]: return "partner_name": "partner_a", "partner_validation": True> @task def storing(): print("storing") waiting_for_partner >> validation() >> storing() partner()
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.common.sql.sensors.sql import SqlSensor from typing import Dict from pendulum import datetime def _success_criteria(record): return record def _failure_criteria(record): return True if not record else False with DAG( dag_id="partner", description="DAG in charge of processing partner data", start_date=datetime(2021, 1, 1), schedule="@daily", catchup=False, ): waiting_for_partner = SqlSensor( task_id="waiting_for_partner", conn_id="postgres", sql="sql/CHECK_PARTNER.sql", parameters="name": "partner_a">, success=_success_criteria, failure=_failure_criteria, fail_on_empty=False, poke_interval=20, mode="reschedule", timeout=60 * 5, ) def validation_function() -> Dict[str, str]: return "partner_name": "partner_a", "partner_validation": True> validation = PythonOperator( task_id="validation", python_callable=validation_function ) def storing_function(): print("storing") storing = PythonOperator(task_id="storing", python_callable=storing_function) waiting_for_partner >> validation >> storing
This DAG waits for data to be available in a Postgres database before running validation and storing tasks. The SqlSensor runs a SQL query and is marked successful when that query returns data. Specifically, when the result is not in the set (0, ‘0’, », None). The SqlSensor task in the example DAG ( waiting_for_partner ) runs the CHECK_PARTNER.sql script every 20 seconds (the poke_interval ) until the data is returned. The mode is set to reschedule , meaning between each 20 second interval the task will not take a worker slot. The timeout is set to 5 minutes, and the task fails if the data doesn’t arrive within that time. When the SqlSensor criteria is met, the DAG moves to the downstream tasks. You can find the full code for this example in the webinar-sensors repo.
Sensor decorator / PythonSensor
If no sensor exists for your use case you can create your own using either the @task.sensor decorator (added in Airflow 2.5) or the PythonSensor. The @task.sensor decorator returns a PokeReturnValue as an instance of the BaseSensorOperator. The PythonSensor takes a python_callable that returns True or False .
The following DAG shows how to use either the sensor decorator or the PythonSensor to create the same custom sensor:
""" ### Create a custom sensor using the @task.sensor decorator This DAG showcases how to create a custom sensor using the @task.sensor decorator to check the availability of an API. """ from airflow.decorators import dag, task from pendulum import datetime import requests # importing the PokeReturnValue from airflow.sensors.base import PokeReturnValue @dag(start_date=datetime(2022, 12, 1), schedule="@daily", catchup=False) def sensor_decorator(): # supply inputs to the BaseSensorOperator parameters in the decorator @task.sensor(poke_interval=30, timeout=3600, mode="poke") def check_shibe_availability() -> PokeReturnValue: r = requests.get("http://shibe.online/api/shibes?count=1&urls=true") print(r.status_code) # set the condition to True if the API response was 200 if r.status_code == 200: condition_met = True operator_return_value = r.json() else: condition_met = False operator_return_value = None print(f"Shibe URL returned the status code r.status_code>") # the function has to return a PokeReturnValue # if is_done = True the sensor will exit successfully, if # is_done=False, the sensor will either poke or be rescheduled return PokeReturnValue(is_done=condition_met, xcom_value=operator_return_value) # print the URL to the picture @task def print_shibe_picture_url(url): print(url) print_shibe_picture_url(check_shibe_availability()) sensor_decorator()
Here, the @task.sensor decorates the check_shibe_availability() function, which checks if a given API returns a 200 status code. If the API returns a 200 status code, the sensor task is marked as successful. If any other status code is returned, the sensor pokes again after the poke_interval has passed.
The optional xcom_value parameter in PokeReturnValue defines what data will be pushed to XCom once is_done=true . You can use the data that was pushed to XCom in any downstream tasks.
""" ### Create a custom sensor using the PythonSensor This DAG showcases how to create a custom sensor using the PythonSensor to check the availability of an API. """ from airflow.decorators import dag, task from pendulum import datetime import requests from airflow.sensors.python import PythonSensor def check_shibe_availability_func(**context): r = requests.get("http://shibe.online/api/shibes?count=1&urls=true") print(r.status_code) # set the condition to True if the API response was 200 if r.status_code == 200: operator_return_value = r.json() # pushing the link to the Shibe picture to XCom context["ti"].xcom_push(key="return_value", value=operator_return_value) return True else: operator_return_value = None print(f"Shibe URL returned the status code r.status_code>") return False @dag( start_date=datetime(2022, 12, 1), schedule=None, catchup=False, tags=["sensor"], ) def pythonsensor_example(): # turn any Python function into a sensor check_shibe_availability = PythonSensor( task_id="check_shibe_availability", poke_interval=10, timeout=3600, mode="reschedule", python_callable=check_shibe_availability_func, ) # click the link in the logs for a cute picture :) @task def print_shibe_picture_url(url): print(url) print_shibe_picture_url(check_shibe_availability.output) pythonsensor_example()
Here, the PythonSensor uses the check_shibe_availability_func to check if a given API returns a 200 status code. If the API returns a 200 status code, the API response is pushed to XCom and the function returns True , causing the sensor task to be marked as successful. If any other status code is returned the check_shibe_availability_func returns False and the sensor pokes again after the poke_interval has passed.
Sensor best practices
When using sensors, keep the following in mind to avoid potential performance issues:
- Always define a meaningful timeout parameter for your sensor. The default for this parameter is seven days, which is a long time for your sensor to be running. When you implement a sensor, consider your use case and how long you expect the sensor to wait and then define the sensor’s timeout accurately.
- Whenever possible and especially for long-running sensors, use the reschedule mode so your sensor is not constantly occupying a worker slot. This helps avoid deadlocks in Airflow where sensors take all of the available worker slots.
- If your poke_interval is very short (less than about 5 minutes), use the poke mode. Using reschedule mode in this case can overload your scheduler.
- Define a meaningful poke_interval based on your use case. There is no need for a task to check a condition every 60 seconds (the default) if you know the total amount of wait time will be 30 minutes.
Deferrable operators
Deferrable operators (sometimes referred to as asynchronous operators) were released with Airflow 2.2 and are designed to eliminate the problem of any operator or sensor taking up a full worker slot for the entire time they are running. Deferrable versions of many sensors exist in open source Airflow and in the Astronomer Providers package. Astronomer recommends using these in most cases to reduce resource costs.
For DAG authors, using deferrable sensors is no different from using regular sensors. All you need is to do is run a triggerer process in Airflow and replace the names of all sensors in your DAG code with their deferrable counterparts. For more details, see Deferrable operators.