Press Release - IBM Acquires Databand to Extend Leadership in Observability

Read now

Everyday Data Engineering: Monitoring Airflow with Prometheus, StatsD and Grafana

Databand
2021-04-02 15:53:26

Everyday Data Engineering: Databand’s blog series – A guide on common data engineering activities, written for data engineers by data engineers.

Data pipeline monitoring is vital for data engineering day-to-day operations. It must be done effectively if you want your consumers to trust your data. Without the right data observability and data monitoring systems set up, data can break easily.

Monitoring Airflow production pipelines can be especially painful. In order to debug health problems or find the root cause of failures, a data engineer needs to hop between the Apache Airflow UI, DAG logs, various monitoring tools, and Python code. To make this process smoother, we can use operational dashboards to get a bird’s-eye view of our system, clusters, and overall health.

We’ll need to configure a data observability dashboard. There are two routes you can take when looking for a data observability solution: an open-source solution or a managed one.

Airflow monitoring dashboards: open-source vs. managed service

There are advantages and disadvantages to both.

Open-source pros and cons

  • Pros
    • Lower initial cost—Your only cost for implementation is labor.
    • Community support—Contribution from the global community.
  • Cons
    • Higher long-term cost—Maintenance and troubleshooting can become difficult as your needs become more complex.
    • Usability can be difficult—As your data team grows, ease of use, scalability, and governance can become unmanageable.

Managed Service pros and cons

  • Pros
    • Greater usability—Better UI and automation can make your team more efficient.
    • Better support—You have a team of Solution Architects standing by to help.
  • Cons
    • Higher initial costs—The pricing model might not make sense for some organizations.
    • Less flexibility—Unless the managed service is built on open-source code, functionality can be limited.

In this guide, we’ll be exploring the best practices for going the open-source route to building an operational dashboard. This guide’s goal is to easily answer questions like:

  • Is our cluster alive?
  • How many DAGs do we have in a bag?
  • Which operators succeeded and which failed lately?
  • How many tasks are running right now?
  • How long did it take for the DAG to complete? (and so on….)

Building an open-source Airflow monitoring solution

For this guide, we will focus on monitoring and visualizing Airflow cluster metrics. These types of metrics are great indicators of cluster and infrastructure health and should be constantly monitored.

Airflow exposes metrics such as DAG bag size, number of currently running tasks, and task duration time, every moment the cluster is running. Airflow uses the StatsD format to expose these metrics and we will use this in our solution (we’ll get into more details about this below). You can find a list of all the different metrics exposed, along with descriptions, in the official Airflow documentation.

Open-source tools

Let’s see how we can utilize various open-source tools like StatsD, Prometheus, and Grafana to build a monitoring solution for Airflow. By leveraging this trio, you can find out whenever the scheduler is running, how many DAGs are in a bag now, or most other critical problems in the cluster’s health. 

1. StatsD

We’ll start with StatsD. StatsD is a widely used service for collecting and aggregating metrics from various sources. Airflow has built-in support for sending metrics into the StatsD server. Once configured, Airflow will then push metrics to the StatsD server and we will be able to visualize them.

2. Prometheus

Prometheus is a popular solution for storing metrics and alerting. Because it is typically used to collect metrics from other sources, like RDBMSes and webservers, we will use Prometheus as the main storage for our metrics. Because Airflow doesn’t have an integration with Prometheus, we’ll use Prometheus StatsD Exporter to collect metrics and transform them into a Prometheus-readable format. StatsD Exporter acts as a regular StatsD server, and Airflow won’t notice any difference between them.

3. Grafana

Grafana is our preferred metrics visualization tool. It has native Prometheus support and we will use it to set up our Airflow Cluster Monitoring Dashboard.

The basic architecture of our monitoring solution will look like this:

Airflow Cluster reports metrics to StatsD Exporter which performs transformations and aggregations and passes them to Prometheus. Grafana then queries Prompetheus and displays everything in a gorgeous dashboard. In order for this to happen, we will need to set up all of those pieces.

First, we will configure Airflow, then StatsD Exporter and then Grafana. 

Let’s start!

Prometheus, StatsD Exporter and metrics mapping

This guide assumes you’re already familiar with Prometheus. If you aren’t yet, it has great documentation and is really easy to get started with as Prometheus doesn’t require special configuration. StasD Exporter will be used to receive metrics and provide them to Prometheus. Usually it doesn’t require much configuration, but because of the way Airflow sends metrics, we will need to re-map them.

By default, Airflow exposes a lot of metrics, which labels are composed from DAG names. For convenience, these metrics should be properly mapped. By utilizing mapping we can then build Grafana dashboards with per-airflow instances and per-dag views.

Let’s take dag.<dag_id>.<task_id>.duration metric for example. The raw metric name sent by Airflow will look like airflow_dag_sample_dag_dummy_task_duration. For each Airflow instance you have and for each DAG you have, it will report duration for each Task producing combinatorics explosion of the metrics. For simple DAGs, it’s not an issue. But when tasks add up, things start being more complicated and you wouldn’t want to bother with Grafana configuration.

To solve this, StatsD Exporter provides a built-in relabeling configuration. There is great documentation and examples of these on the StatsD Exporter page

Now let’s apply this to our DAG duration metric.

The relabel config will look like this:

mappings: - match: "*.dag.*.*.duration" match_metric_type: observer name: "af_agg_dag_task_duration" labels: airflow_id: "$1" dag_id: "$2" task_id: "$3"

We are extracting three labels from this metric:

  1. Airflow instance ID (which should be different across the instances)
  2. DAG ID
  3. Task ID

Prometheus will then take these labels and we’ll be able to configure dashboards with instance/DAG/task selectors and provide observability on different detalization levels.

We will repeat re-labeling config for each metric exposed by Airflow. See “Source Code” section at the end of the article for a complete example.

Airflow configuration

A couple of options should be added to airflow.cfg. Please note that Airflow will fail to start if StatsD server won’t be available at the start-up time! Make sure you have an up and running StatsD Exporter instance.

The very basic config section in airflow.cfg will look like this:

[metrics] statsd_on = True statsd_host = localhost statsd_port = 8125 statsd_prefix = airflow

For the more details, please refer to Airflow Metrics documentation.

Configuring Grafana Dashboards

Now, when we have all our metrics properly mapped, we can proceed to creating the dashboards. We will have two dashboards—one for cluster overview and another for DAG metrics.

For the first dashboard we will have the Airflow instance selector:

You can see here all vital metrics: like scheduler heartbeat, dagbag size, queued/running tasks count, currently running DAGs aggregated by tasks etc:

For the second dashboard we will have the DAG selector:

You can see DAG-related metrics: success DAG run duration, failed DAG run duration, DAG run dependency check time and DAG run schedule delay.

Source Code

Complete source code including StatsD mapping config, two Grafana dashboards—one for Cluster overview and another for DAG stats, can be found in our GitHub project: https://github.com/databand-ai/airflow-dashboards/.

Conclusion

Airflow provides a decent out-of-the-box solution for monitoring DAGs, but it lacks accessibility and comprehensiveness. In this tutorial we have configured Airflow, StatsD Exporter and Grafana to get nice and useful dashboards.

Dashboards like these can save a lot of time when troubleshooting cluster health issues like executors being down or DAG parsing being stuck because it has some heavyweight DB query. For more robust and convenient monitoring, alerts should also be configured, but this is out of the scope of the current article.

Stay tuned for more guides!


Happy engineering! 🙂

– This guide was written by Vova Rozhkov, Databand’s experienced Software Engineer.

Databand.ai is a unified data observability platform built for data engineers. Databand.ai centralizes your pipeline metadata so you can get end-to-end observability into your data pipelines, identify the root cause of health issues quickly, and fix the problem fast. To learn more about Databand and how our platform helps data engineers with their data pipelines, request a demo!

Top 6 Airflow Features To Look Out For

Databand
2020-09-25 15:05:29

Airflow’s defining feature is the flexibility to intake and execute all workflows with code. As an engineer, all of the opportunity for configuration is extremely powerful for making Airflow fit your needs, but it’s definitely a time-intensive investment to learn (and implement) every one of the Airflow features available.

If you are using Airflow today, it’s helpful to understand Airflow’s high level of configuration and the tools you have at your disposal. In this post, we’re outlining the top utilities that we found to be the most useful to help you decipher the great features Airflow has to offer.

A lot of engineers select Airflow because of its great web UI, but its core orchestration capabilities are likewise powerful, and tapping into more of those Airflow features will help you produce a more optimized infrastructure and higher engineering productivity.

 

Benefits of code as abstraction layer

Using only code for your data flows improves transparency and reproducibility of failures. When your workflows are automated with only code, your ELT failures are much easier to troubleshoot because no part of the broken process is trapped in a developer’s head.

As an Airflow co-creator, Maxime Beauchamin writes, in Rise of the Data Engineer:

“Code allows for arbitrary levels of abstractions, allows for all logical operation in a familiar way, integrates well with source control, is easy to version and to collaborate on”

 

Challenges of code as abstraction layer

Running your Data Infrastructure as code through Airflow, in conjunction with a suite of Cloud Services, is a double edged sword. Companies often need to build out Minimally Viable Data Products, such as integrating a data source, while leveraging all available tools to enable engineers to focus on business domain specific problems.

 

1. Official Airflow Docker Image

 

Official Apache Airflow Docker Image

 

For a long time, Airflow developers had to either automate their own environments, use 3rd party Docker images, or go without any automation in their deployed work. In early 2019, Airflow released an official Dockerfile in Airflow version 1.10. This means that you can run your business’s Airflow code without having to document and automate the process of running Airflow on a server. Instead, you can pull this official docker image with this command:

 

docker pull apache/airflow

 

And as long as you have the required dependencies, like Python and a proper SQL Server, you can deploy your Airflow DAGs with minimal headache.

 

2. Web Hooks

 

Web Hooks are mostly used to connect with databases or platforms in a safe way. You can store your connection details in the Airflow metadata database, as normal, but Hooks form an abstraction layer between that database and your pipelines.

The main reason you would want to build these abstraction layers, or interfaces, to your platforms is security. Keeping authentication details outside of your pipeline executions is a best security practice, especially as Data Pipelines’ information grasp grows wider.

The other major reason to to leverage hooks, is an increasingly complex business architecture, spanning many services. If you aren’t opposed to looking through some code, Airflow provides many open source hooks to popular tools such as; Slack, GCP services, AWS services, Databricks, MongoDB, Postgres, etc.

 

 

An example of a custom hook that takes care of a highly repeatable task would be a Slack Hook connection, sending custom Data Pipeline warning alerts to a Slack channel. You can be proactive about Pipeline execution and latency by building alerts on hooks which connect data that is most important to the business. You can only start to collect data going forward, so the sooner you can start generating a source of record for performance, the better.

 

3. Custom XCom Backends

 

XCom stands for cross-communication between Airflow tasks. So, when a task finishes and returns a value, an XCom can then push that value to be made generally available to any other task that wishes to pull that value for its own execution. This becomes useful when you need specifically inter-task communication and want to avoid a global setting like Variables and Connections.

You can build your own XComs to change the behavior of your system’s serialization of task results. This kind of customization is quite technical and requires yet another parameter to keep track in your DAGs. However, you can start to collect task metadata from these XCom Backends, which builds observability into your infrastructure.

If you want deeper visibility into pipeline health and data quality, check out Databand. This custom metadata gathering is one of Databand’s coolest features. Databand uses one of several custom APIs which collect metadata from Airflow tasks, at the moment of execution. Building out this kind of metadata gathering is really time-intensive. Of course it’s an investment that pays off, but the benefit of Databand being this observability built in and available instantly.

 

4. Variables and Connections

 

Airflow needs to be able to connect to data entities; Databases, APIs, Servers, etc. Variables and Connections are Airflow features that allow you to ensure these connections without hard coding them into your workflows every time you need to connect with an outside entity.

Variables and connections are a common way in which Airflow communicates with the outside world.

Variables and Connections are a bit like global parameters for your Airflow code base. This becomes especially important in light of Airflow using code as the abstractions level of choice. By using variables and connections, you can more easily remove Data Silos and Intellectual Property (IP) Silos by protecting sensitive data, while also making it available for Engineering use.

 

 

Data Silos

Connecting Data sources to a central Data Warehouse Destination lets Analysts have transparency across all relevant data

IP Silos

Transparent and Reproducible workflows reduce human bottlenecks because Data Teams have complete access to the code that broke and the logs output. You don’t have to wait for a potential bottleneck because they’re the only ones who know the correct button to push. All stack traces, logs, and error codes are available to see.

 

5. Packaged DAGs

 

Packaged DAGs are a way to compress and bind your Airflow project files in a single zip file. You can follow the following code steps to zip Airflow files, or Packaged DAGs, in a way so that they can still be integrated into your Data Infrastructure’s code base.

 

virtualenv zip_dag
source zip_dag/bin/activate
mkdir zip_dag_contents
cd zip_dag_contents
pip install — install-option=” — install-lib=$PWD” my_useful_package
cp ~/my_dag.py .
zip -r zip_dag.zip *

 

There are several reasons why you would need to use Packaged DAGs. If you have many Airflow users in a business where the development environments can be assured to be consistent throughout the business. With Packaged DAGs, versioning and collaboration is made much easier, especially when they become complex and use multiple files. You don’t have to send or denote project files individually and can promote healthy file structure with feature based organization.

 

6. Macros

 

Macros are particularly useful when running ELT jobs that span large amounts of time. For example, backfills that have a parameterized date for every date of data that needs to be moved or transformed. Using an execution date macro in your Data Pipeline allows you to break up your pipelines’ memory and storage workloads into daily partitions. With an execution date macro, you can more easily manage pulling data from a REST API by managing the frequency and amount of data you’re attempting to pull.

 

 

Macros are used with Jinja templating, which allow you to input your parameters in the strings that form your architecture; organized S3 keys, backfilling data sources, or even real-time DAG or task instances’ metadata. Macros such as {{ prev_execution_date_success }} allow you to gather DAG metadata from previous runs. Thankfully, Databand has three custom APIs, one of which handles DAG metadata. These facts allow you to measure the healthiness of your Data Architecture, from real-time to trends over time.

 

Conclusion

 

With a tool as extensible as Airflow, there also comes the challenge of knowing which features are available. More importantly, there is also the challenge of knowing which Airflow features meet your business needs.

There are features for every level of data Infrastructure needs, from just starting out, to advanced observability needs. If your infrastructure is just starting out and you want to think about the most important global parameters; then macros, variables, and connections are Airflow features you would most want to look at. If your business is looking to create more observability into your infrastructure, then custom XCom backends are probably a great Airflow feature to look into.

Databand excels in helping you understand the health of your infrastructure at every level; global Airflow, DAG, task, and user-facing. Instead of spending data engineering time on learning highly specific features, Databand allows Data Engineers to focus on business specific problems.

To learn more about Databand‘s Monitor platform and Open Source Library and how it can help you make your data engineering job that much more efficient, Request a Demo here.

Streamline your Pipeline Code with Functional DAGs in Airflow 2.0

Databand
2020-07-13 14:58:22

Intro to AIP-31

 

AIP — Airflow Improvement Proposal

AIP-31 was developed collaboratively across Twitter (Gerard Casas Saez), Polidea (Tomasz Urbaszek), and Databand.ai (Jonathan ShirEvgeny Shulman)

 

Growth of Airflow github stars from 2016 to today

More and more data teams are relying on Airflow for running their pipelines. As the user base grows, Airflow is being pulled into a lot of new and exciting directions. Airflow’s growth, along with the growth of data engineering generally, is also forcing it to adapt to new types of scenarios. Some of these scenarios are newly complex, for example:

  • Machine learning workflows

Other scenarios are simpler, with data engineering teams that are looking for a lightweight, easy way to create their first pipelines.

Airflow is taking over everything from hardcore ML processes running on Spark or GPUs, to simple ETL scripts pulling marketing data from sources like Facebook.

 

Breakdown of Airflow use cases

How can we help Airflow evolve in a more demanding market, where it’s being stretched in so many new directions? In our experience with the solution, one key and fundamental area to improve on is how we write DAGs in Airflow.

 

Airflow’s current DAG composition

Airflow has a very flexible way to define pipelines, but Airflow’s operator approach is not ideal for all scenarios, especially for quickly creating complex pipelines with many chains of tasks. Why? Airflow does not have explicit inter-operator communication (no easy way to pass messages between operators!), so building large pipelines requires a lot of hardcoded definitions in how those operators communicate.

The recommended approach in these cases is to use XCom.

 

 

XCom is a task communication method in airflow, and stands for “cross communication”. For more information, see the docs.

XCom is the preferred approach (over template-based file paths) for inter-operator communication in Airflow for a few reasons:

  • Easier to debug — XCom values appear in UI!

However, when looking at the code itself, this solution is not intuitive for an average pythonist. Constructing your own XCom hierarchy can create a lot of overhead, and is prone to errors: from type-o’s to keeping track of operator I\O hierarchy, but most of all — As quoted from python zen: “Readability counts.”

 

Introducing — AIP-31 Functional DAGs

AIP-31 introduces a new way to write DAGs in Airflow, using a more familiar syntax that’s closer to the standard way of writing python.

 

 

Voila! Writing our DAG tasks as function calls, we can connect our operator I\O as you would in any python script. “Functionally defining” DAGs gives the user the necessary access to input and output directly from the operator so that we have a more concise, readable way of defining our pipelines.

Note: These changes will be a part of Airflow 2.0

 

Let’s go over what’s changed:

 

@task decorator

 

 

Annotating a function with the @task decorator converts the function to a “PythonFunctionalOperator” that’s created behind the scenes when Airflow prepares your DAG for execution. The multiple_outputs attribute marks that this function will return more than a single value.

 

BaseOperator.output

 

 

The operator’s output is automatically assigned an XCom value for the user to wire to the next operator. The get_ip.outputattribute constructs a ready-to-use XComArg that represents the operator’s output (what’s returned in the function). When the next operator executes, the XCom value is resolved and the true value is set, passing the raw_jsonparameter to the prepare_email function.

 

Decorated Function Return Value

 

 

The value returned by calling the decorated prepare_email function is in itself an XCom argument that represents that operator’s output, and can be subscripted. In our case — the email_info object. Its value it equal to operator.output . To reach the actual operator — use email_info.operator

 

Dependency Graph

 

 

Dependency graph is now implicit — Using this new functional syntax for our Airflow DAG, there is no need to explicitly define a separate dependency graph. When the code is executed, Airflow will understand the dependency graph through the templated XCom arguments that the user passes between operators, so you can omit the classic “set upstream\downstream” statement. This frees the user from having to explicitly keep track of task dependencies.

 

Cleaner code

This new approach simplifies the DAG construction process. We effectively saved writing about 40% of the “surrounding code” — allowing the user to focus on writing business logic rather than orchestration code.

 

Functional DAG’s future

AIP-31 offers an improvement for writing intuitive and readable DAGs. There are a few features that can definitely be taken further:

  1. Addition of the @dagdecorator — will decorate the ‘pipeline’ function that wires all operators and create a DAG object from it

 

Conclusion

Making our pipelines feel like any standard python module helps Airflow cover more kinds of use cases because it’s more readable, debuggable, and easier to scale our graphs from a development perspective.

It’s simultaneously better for more complex graphs and for newly minted data engineers. For new data engineers, Functional DAGs makes it easier to get started with Airflow because there’s a smaller learning curve from the standard way of writing python. For advanced cases, it’s easier to scale into more complex graphs because it’s less cumbersome for developers to extend or modify pipeline tasks.

We are excited to contribute these improvements to push Airflow forward, making it a stronger and more future-proofed orchestrator. We look forward to seeing your contributions!

 

References

Databand Dataops Observability Platform for Pipelines

Advanced alerting on Airflow with Databand.ai in 3 steps

Databand
2020-05-05 08:13:00

Comprehensive monitoring in Airflow is hard to achieve — let alone advanced alerting in Airflow. While Airflow has a great UI for monitoring high-level schedules and task statuses, there’s a lot of room for improvement in regards to pipeline visibility. In fact, nearly half of respondents reported that logging, monitoring, and alerting on Airflow are top challenges or areas of improvement in the 2020 Airflow user survey. As power users and contributors ourselves, we’re intimately familiar with these challenges.

This is a big problem because Airflow is a standard in the data engineering toolkit. It’s become so important that huge organizations like Adobe, Walmart, Twitter, and more, have actively contributed to Airflow’s development. That’s because these organizations have invested record amounts in their data infrastructure and the creation of data products. 

That means the stakes are higher than ever for data engineers. They have the difficult task of discovering and fixing a data quality problem they reach their consumers.

A data engineer’s primary function is to create trust in their data. As a data engineer, your goal should be creating data architecture that fosters data production so accurate and efficient that your Slack and email address become forgotten by your data consumers. The only way you can do that is if you can see issues in your Airflow pipeline before they affect data delivery. You need to proactively solve problems so that your data SLAs aren’t at risk. In this article, we’ll show you how to implement advanced alerting in Airflow using Databand.

Monitoring for Airflow and 20+ other tools on Databand.ai

Databand.ai is a unified data observability platform that helps you identify and prioritize the pipeline issues that impact your data product the most.

The Databand Platform tracks pipeline custom metadata and provides alerting, trend analysis, and debugging tools so you can compare pipeline performance and quickly discover bottlenecks.

Databand.ai can tap into metadata like runtime statuses, logging info, and data metrics, and uses that info to build alerts on leading indicators of data health issues. You can proactively maintain the integrity of your data infrastructure and the quality of your data product with Databand. Databand.ai sends alerts to your email, Slack, Pagerduty, and others, so your DataOps teams can get ahead of any pipeline issues.  

Early Warning Signals on Long-Running Processes

Airflow is often used for orchestrating long running processes that take hours, days, or longer to complete.

Let’s take this example pipeline (also called a “DAG”) of a three step Python process:

What makes a simple pipeline like this a long running process? A lot of factors could be the culprit, including abnormally large volumes of “big” data, complex task logic, or database pooling causing waits in a queue.

Poor data quality and late delivery in critical pipelines spell trouble for your data product.  End-users like data scientists, analysts, and business users depend on pipelines finishing within a certain window of time.

Being able to receive an early warning signal of pipeline failures or delays is a huge advantage—especially when other organizations lose money in these situations. 

As a real-world example, let’s say our simple pipeline above was built by a stock trading company for analyzing market data. If the job starts at midday using prior day trading data and takes 4 hours to complete, it would need to finish around 4 pm to leave time for analysis and next-day strategy planning. If the job is running late, you want that notification as early in the process as possible.

Whereas an alert later in the process could mean losing an entire day of work, getting an alert at 1pm that tasks are delayed lets you get started on a resolution now. That early notice gives your team the ability to identify the cause of delay, fix the problem, and rerun the job before delivery is due.

Databand.ai can give you an early heads-up so you can intervene and fix the problem fast without losing time and money on wasted processing. With zero changes to your project’s existing code, you can use Databand to create robust alerting that gives you alerts on leading indicators of failures and delays.

Step 1 — Creating Leading Indicators of Problems

While you can use Databand’s production alerting system to notify on the duration of the overall pipeline, this alert is a lagging indicator of a problem. We want leading indicators — early warning signs.

A better approach is to use Databand.ai to alert on runtime properties of individual tasks in the pipeline. Databand.ai will monitor tasks as they run and send alerts if there are failures (or some other status change), if a task has an unusual number of retries, or if task duration exceeds some threshold. You can also receive an alert if a task does not start at the expected time. Setting these alerts on task runtime properties will give us insights on when data is at risk for late delivery much earlier than traditional monitoring methods.

Step 2 — Diving into the Data

So far, you’ve learned how to set run and task level alerts. With Databand helping you track those metrics, you’ll have a much stronger detection system for pipeline issues.

What are you missing? 

To properly support our data consumers, it’s not enough to know that pipelines will complete on time. You also need to know that the data we are delivering is up to quality standards. Can the end-user actually work with it?

Luckily, you can use the Databand logging API to report on metrics about our data sets. 

Databand can automate this process, but for this example, you’ll be defining which custom metrics are going to be reported whenever your pipeline runs.

When Airflow runs the pipeline code, Databand’s logging API will report metrics to Databand’s tracking system, where we can alert on things like schema changes, data completeness rules, or other integrity checks.

In the above screenshot, the logging API is reporting the following metrics from the pipeline:

  • avg score: a custom KPI reported from the create_report task
  • number of columns: schema info about the data set being processed
  • removed duplicates: reported from the dedup_records task
  • replaced NaNs: this metric is reported from the first task in the pipeline, unit_imputation

Databand.ai can alert on any metrics reported to the system, and you are free to create as many as needed.

Step 3 — Using Metrics as Progress Checks

Now, it’s time to add data visibility into your alerting process.
Using Databand.ai’s advanced alerting and anomaly detection, you can use the same metrics tracking function for gaining insight into internal task progress. This is particularly useful when you have complex pipelines with lots of tasks and subprocesses, or a few tasks with complex logic inside them.

With conditional alerts, we can create intra-task status checks by tracking if a metric fails to report within a running task within a certain timeframe.

In our example, the first task, unit_imputation, reports the number of replaced NaNs in our data set. Looking at historical run trends, you can expect the overall task to complete within 1 hour of time. Based on where you place the metrics log in our code, the metric is usually reported about 30 mins after the task starts. You can use the expected behavior to create a conditional alert that gives you great insights into what’s happening inside your process.

Alert Logic:

If duration of unit_imputation task is greater than 45 minutes AND replaced NaNs metric is missing, THEN trigger alert

First, let’s describe the logic behind this alert.
The NaNs metric should be reported about halfway into task runtime, which typically takes 1 hour to fully complete. Your alert adds some buffer to that, saying we want a notification if the task runs for 45 minutes without sending any metric. Missing the metric over this duration of time serves as an early warning that the pipeline is hung up on something that could lead to further delays downstream. Since the alert is set up in the first task of the pipeline, you have enough time to check out the issue, and restart the process after a fix.

And what was required to set this up? Only a single use of the logging API. 

Databand’s alerting framework configures the rest in minutes.

Airflow alerting for staying proactive, not reactive

After following the provided logic, your alerting coverage includes:

  • Overall run durations
  • Task durations
  • Data quality metrics
  • Internal task progress

You’re now in a much better position to anticipate failures, delays, and problems in data quality before they affect your data product.
Databand.ai makes the process easy. Get your first alerts up and running with out-of-the-box metrics tracking, and get deeper performance insights with Databand.ai.

Start your 30-day free trial and get deeper visibility into your critical data pipelines.

Improving Performance of Apache Airflow Scheduler

Databand
2020-03-31 14:43:32

Apache Airflow is an open-source tool for creating and managing complex workflows. More recently, Airflow has been gaining a lot of traction and popularity among data scientists for managing machine learning workflows. With increasing usage, we can see user expectations increase too. Like all users, we also expect tools to be reliable, scalable and work out of the box. The Apache Airflow community is working on improving all of these aspects. The recent joint effort of Databand and Polidea has brought many performance improvements to the core of Airflow.

Why? Tackling Airflow Inefficiencies

Airflow is a big project with many features and moving parts. Airflow performance issues will often start to appear when you need to run hundreds of workflows with dozens of tasks. This is a major impediment to scaling Airflow for big use cases. The performance of Airflow scheduler has been a challenge for users with a lot of room for improvement. Why? Probably the main reason is the complexity and coupling of Airflow core internals. The codebase of the scheduler with the DAG parsing subprocess requires weeks to understand. The reason behind this is the complexity of the domain model that include in-memory structures and database models that are tightly coupled and it’s easy to forget what is what.

Improving Performance of Apache Airflow Scheduler

How? Our Approach

First of all, our work on Airflow performance was not the first. One of the last ones was done by Ash Berlin-Tylor (Apache Airflow PMC). Ash focused on improving the performance of Python code execution. In our contributions, we focused on the time when the interpreter was idle. We were especially interested in the time when the scheduler is waiting for a response from Airflow metadatabase. To investigate this, we simply registered an event listener to SQLAlchemy engine that helped us to measure the number of queries performed by a function or a process, and more importantly, the time of those queries.

The numbers we got at first were unbelievable. For example, running DagFileProcessor process on 200 DAGs with 10 tasks each, we observed 1801 queries! This number was definitely disproportionate to the setup. So we looked at the logs and we’ve found that some queries were executed many times. The next natural step was to find the places where they are used and analyze if something can be improved. Well, that was not hard.

The problems are especially visible in pull request dealing with the DAG.is_paused attribute. This attribute specifies whether to create a new DAG Run for the given DAG. The code for this attribute looks like this. At first glance it doesn’t look like there is a problem.

However, it has become a serious performance issue. When this attribute was used in the code, it could not be seen that its reading causes a query to the database. Developers are accustomed that reading attributes are light operations and do not cause any problem.This is not true in this case.

The N+1 problem is a situation when data is processed in a loop and another database query is performed for each iteration of the loop. This situation can look like this.

The code snippet for the DagBag, which contained 200 DAGs, resulted in 200 queries and could be replaced by 1 query to the database. It is worth remembering that this problem may also occur when updating objects. When we use ORM, it is still sometimes necessary to consider whether the update operation will be effective, or should we rewrite the code and stop iterating in the loop and execute one hand-made UPDATE query.

Airflow is an application that uses many processes to ensure high performance. Therefore, tracing whether a given query is required is not easy, because the value saved to the object and passed to another object requires a deeper understanding of most of the code. In Airflow, there are not only short-lived objects that die after handling requests from the user like in classic web applications but also objects that are stateful for a long time.

Avoid regression

The last takeout from our Airflow performance story is how to avoid regression. Because once we improved the performance, we want to avoid any unnecessary or unfounded changes that have a negative impact. To achieve that we’ve created some additional tests that perform an operation in a context manager (the code is available on Github) that counts the queries, for example:

Thanks to those tests, we will be able to catch up changes that have a potential impact on Airflow performance. Such tests are critical in open-source projects like Apache Airflow because contributors come and go, and the knowledge is not always passed on.

The result

We did 7 PRs related to solving these problems. We have also prepared other changes that prevent performance regression or allow us to easily repeat our research on another piece of Airflow.

Finally for a DAG file that has the following characteristics:

  • 200 DAG object in one file

When we test DagFileProcessor.process_file method, we obtain the following results:

Before (commit):

  • Count queries: 1801

After (commit):

  • Count queries: 5

Difference:

  • Count queries: -1 796 (-99.7%)

What does it mean? There’s way less queries! This results in a speed increase of the whole scheduler and impacts the overall performance of Airflow! In our conditions there was a 10x improvement in DAG parsing time. This change put a lot less load on Airflow database. Of course this doesn’t translate to a 10x speed up in running tasks. The changes are only available in master branch and will be part of Airflow 2.0.

The problem of N+1 queries is rather easy to spot and usually to fix. Some tools can help you with spotting it, but registering an event listener on your database engine is the simplest one. However, instead of fixing this problem, we should try to avoid it. All ORMs have batch operations that are easy to use and we should use them whenever possible. Airflow case shows that numerous small inefficiencies sum up to big bottlenecks. Which, once removed, will increase the performance of your tool.

Authors

Kamil Breguła — Software Engineer at Polidea | Apache Airflow Committer
Tomek Urbaszek — Software Engineer at Polidea | Apache Airflow Committer
Evgeny Shulman — CTO at Databand