Airflow’s best kept secrets: How to track metadata with Airflow Cluster Policies & Task Callbacks

2021-09-07 10:05:13

The time that it takes to detect problems within your Airflow tasks is a major problem. There’s one big reason for that. You didn’t write every DAG. It’s hard to find a scalable way to make your platform maintainable when there are hundreds of pipelines written in logic you might not understand without proper documentation. If you don’t understand how the pipelines function, then you cannot find a way to track & alert on important data quality & pipeline issues. That’s a huge problem.

While this can sound like a hopeless situation, it’s not — it’s just a challenging one. Let’s simplify this challenge into the three moving pieces:

  1. Collect all the Operator objects
  2. Collect execution metadata
  3. Set up alerting & monitoring dashboard based on the aforementioned metadata

There are a lot of different ways you can tackle these problems. You can spin up your own solution using open source tools and code, or you can use a managed solution that will centralize your metadata, monitoring, and alerting for you.

But for this article, you’ll learn how to use Airflow’s cluster policies and Task callbacks to achieve 1 & 2, and you’ll have a way to monitor:

  • Tasks’ durations
  • Tasks’ status
  • The data quality of the dataset you tasks interacting with.

As for #3, we’ll tackle that in a different article.

Wrapping your operators with Airflow Cluster Policies

Where to start? Your first thought might be to create some wrapping Operators objects or decorators to add to the current Operators the teams are using, but implementing your solution to every single operator can take a lot of time for a large organization.

Luckily, Airflow provides a solution just for that – cluster policies let you manipulate the DAGs and Operators in different phases by adding a policy to airflow setup. Airflow exposes 3 policies, each is a function that airflow will load and call at different phases:

  • task_policy – Will be called for any Operator on load time.
  • dag_policy – Will be called for any DAG on load time.
  • task_instance_mutation_hook – Will be called for any TaskInstance, right before task execution.

For our case, any of the policies can be a great candidate for implementation so we will come back for that soon.

Collecting Event metadata with Airflow Task Callbacks

The second problem you need to solve is: how can you catch different events to report on?

There are two routes you can go to do this.

The first option is pretty straightforward; you can wrap the execute method of your tasks and add the new functionality relative to the original execute. Here’s an example of how you could do that:

from contextlib import contextmanager from airflow.models import BaseOperator from pendulum import now @contextmanager def time_task_duration(context): start_time = now() try: yield finally: period = now() - start_time print(f"task duration {period.in_words()}") def execute_wrapper(func, context_manager): def inner(context): with context_manager(context): return func(context) return inner def task_policy(task: BaseOperator): task.execute = execute_wrapper(task.execute, time_task_duration)

In the above example, we are using task policy to enforce wrapping all our Airflow Tasks with our time_task_duraiton logger. task_policy is the function airflow is looking for to wrap every Operator with, and task_time_duration is in charge of calculating the task’s duration. Implementing this will make it possible to track the task duration.

The other option is to use the Airflow task callbacks. I’m talking about callback functions that airflow calls for different events in the task instance run process.

A solution that doesn’t use the task callbacks, like the one above, might suffice for your needs, but I’d argue for many circumstances it’s not the best move. If you want to build a more maintainable, reliable, and scalable data platform, using the Airflow task callbacks is the ideal solution.

Why? It’s less intrusive to the user’s code. Manipulating the execute method (read as: user’s code) can affect the user’s logic and lead us to unexpected behavior. As a rule of thumb, as platform engineers, you should avoid that and do whatever you can to make sure the user code would execute as expected.

So — what is an Airflow task callback? Here are all the callbacks and what they do:

  • pre_execute – runs right before the execute method run.
  • post_execute – runs right after the execute method run and only if there was no error.
  • on_failure_callback – runs if there was an error in execute and the task failed
  • on_success_callback – runs if there was no error in the execute process and the task succeed.
  • on_retry_callback – runs if there was an error in execute and the task set to retry.
  • on_kill – runs if execute method timed-out, right before raising the timeout error or after getting a SIGTERM.

We will use some of them in the upcoming examples.

Tracking Task Duration

Has your DAG ever taken shorter than expected? Maybe it ran for two minutes when it usually runs for two hours? This example will explore how to solve that problem.

def pre_duration(context): context["task_instance"].task._tracking_start_time = now() def post_duration(context, result): task = context["task_instance"].task duration = now() - task._tracking_start_time print(f"task duration {duration.in_words()}") def task_policy(task: BaseOperator): task.pre_execute = pre_duration task.post_execute = post_duration

In this example, I created two functions – pre_duration and post_duration that work together logs the task’s run duration. Using the pre_execute and post_execute callbacks to catch the beginning and end of the task.

Now, you can add general functions wrap and add_policy to reduce the boilerplate of adding multiple policies.

... def wrap(func, wrapper): def inner(*args, **kwargs): wrapper(*args, **kwargs) return func(*args, **kwargs) return inner def add_policy(task, pre, post): task.pre_execute = wrap(task.pre_execute, pre) task.post_execute = wrap(task.post_execute, post) return task def task_policy(task: BaseOperator): task = add_policy(task, pre_duration, post_duration)

Putting this code under $AIRFLOW_HOME/config/ so Airflow will find your policy and load it.

After running your DAG, the output in the log file should look like this:

[2021-08-08 16:06:57,432] {} INFO - task duration 28 seconds

Tracking Task Status

When a task fails once that doesn’t necessarily mean there’s a big problem. So, it’s very common for DevOps or DataOps to configure their Airflow environments to rerun the task when that happens. The problem with that: you need to know whether that task has crossed the acceptable threshold of retries so you can start fixing the problem quickly if required.

So, here’s how you can use the same logic to collect metadata on Task Status:

def track_status(task): def report_failed(context): task_instance = context["task_instance"].task error = context["exception"] logger.error( f"task {task_instance.task_id} failed with {error}. " f"task state {task_instance.state}. " f"report to {task_instance.owner}." ) task.on_failure_callback = wrap(task.on_failure_callback, report_failed) task.on_retry_callback = wrap(task.on_retry_callback, report_failed) return task def task_policy(task: BaseOperator): task = add_policy(task, pre_duration, post_duration) task = track_status(task)

Tracking Data Health in Airflow using your new tools

As most Airflow users likely know, the scariest thing that can happen is your Airflow UI showing all green, but then the data wasn’t delivered in the form you were expecting. That means the data itself is breaking — not the pipeline — and Airflow doesn’t have a straightforward way to track that.

There are a lot of blind spots when it comes to data quality. You need visibility on anomalies in reading volumes from previous DAGs, data providers can change their API without notifying you — causing a problematic schema change, and you need to validate data health within your dataset (like the ratio of null values being too high).

The Airflow Cluster Policies and Callback can help you track data quality metrics like those.

In the following example, let’s assume you want to start tracking the effect of the Operators on the data they are using – reading or writing

Here’s some information you would like to know about the data:

  • The operation type – read or write.
  • Operation timestamp – to track how up-to-date our data.
  • The number of rows and columns affected by the operations

Given the metadata you are looking for, there are a lot of ways you can do that. For simplicity’s sake, I’m going to assume that our DAG is following this pattern:

  • The return value is the path of the output file of an Operator
  • We use Xcom to sync the return value
  • An Operator that takes input value is passing the Xcom to an argument named “path”

Here is what the DAG would look like to give you some context:

dag = DAG( dag_id="ingest_data", default_args=DEFAULT_ARGS, ) with dag: save_daily_data = PythonOperator( task_id="save_daily_data", python_callable=save_raw_daily_data, do_xcom_push=True, ) manipulate_data_task = PythonOperator( task_id="manipulate_data", python_callable=manipulate_data, op_kwargs={"path": "{{ ti.xcom_pull(task_ids='save_daily_data') }}"}, ) save_daily_data >> manipulate_data_task

The details about the python_callables are not important, the only thing you’ll need to keep in mind is that when the function has written data to an output file, the file path is the return value.

Now, let’s take a look at these tracking functions that log the shape of the dataframes:

def check_input(context): operator = context["task_instance"].task if "path" in operator.op_kwargs: path = operator.op_kwargs["path"] try: df = pd.read_csv(path) except: logger.warnning(f"there is no csv at {path}" else: f"reading dataframe from {path} with shape {df.shape} at {now()}" ) def check_output(context, result): try: df = pd.read_csv(result) except: logger.warnning(f"there is no csv at {path}" else: f"writing dataframe from {path} with shape {df.shape} at {now()}" )

In this example, check_input and check_output try to load dataframe from csv, and log the shape which include columns and rows count

And adding those functions to our policy:

def task_policy(task: BaseOperator): task = add_policy(task, pre_duration, post_duration) task = track_status(task) task = add_policy(task, check_input, check_output)

Airflow Metadata tracking is ready!

To sum this all up:

  1. You learned about “Airflow Cluster Policy” and how can we use it to track every Operator in our system.
  2. Learned about task’s callbacks, when they are executing and how to use them to collect execution data from our Operators

Now, it’s up to you to implement your own tracking system. This will be a future article, but I’ll leave you with some ideas for data that you might want to collect in the meantime:

  1. Extract information about queries from SnowflakeOperator,PostgresOperator, or any SQL Operator with sqlparse .
  2. Stats about each column – Number of nulls, percentile, histograms, and so on. Use pydeequ or great expectation to validate your Operator’s output
  3. Track the schema of the data you interacting with **** – so you can make sure to know when its changing.
  4. Collect system metrics – memory, cpu usages. Try filprofiler to extract memory usage.
  5. Track the [template_fields](<>) values at runtime

Let us know about other tracking capabilities that you might be interested in!

An insider’s guide to data pipeline incident management

Read next blog