Intro to AIP-31
More and more data teams are relying on Airflow for running their pipelines. As the user base grows, Airflow is being pulled into a lot of new and exciting directions. Airflow’s growth, along with the growth of data engineering generally, is also forcing it to adapt to new types of scenarios. Some of these scenarios are newly complex, for example:
- Machine learning workflows
- CI/CD cycles
- Spark execution
Other scenarios are simpler, with data engineering teams that are looking for a lightweight, easy way to create their first pipelines.
Airflow is taking over everything from hardcore ML processes running on Spark or GPUs, to simple ETL scripts pulling marketing data from sources like Facebook.
How can we help Airflow evolve in a more demanding market, where it’s being stretched in so many new directions? In our experience with the solution, one key and fundamental area to improve on is how we write DAGs in Airflow.
Airflow’s current DAG composition
Airflow has a very flexible way to define pipelines, but Airflow’s operator approach is not ideal for all scenarios, especially for quickly creating complex pipelines with many chains of tasks. Why? Airflow does not have explicit inter-operator communication (no easy way to pass messages between operators!), so building large pipelines requires a lot of hardcoded definitions in how those operators communicate.
The recommended approach in these cases is to use XCom.
XCom is a task communication method in airflow, and stands for “cross communication”. For more information, see the docs.
XCom is the preferred approach (over template-based file paths) for inter-operator communication in Airflow for a few reasons:
- Easier to debug — XCom values appear in UI!
- Data can be inserted into DB easily (e.g. model training metrics)
- Used natively by Airflow OSS operators to transfer data
However, when looking at the code itself, this solution is not intuitive for an average pythonist. Constructing your own XCom hierarchy can create a lot of overhead, and is prone to errors: from type-o’s to keeping track of operator I\O hierarchy, but most of all — As quoted from python zen: “Readability counts.”
Introducing — AIP-31 Functional DAGs
AIP-31 introduces a new way to write DAGs in Airflow, using a more familiar syntax that’s closer to the standard way of writing python.
Voila! Writing our DAG tasks as function calls, we can connect our operator I\O as you would in any python script. “Functionally defining” DAGs gives the user the necessary access to input and output directly from the operator so that we have a more concise, readable way of defining our pipelines.
Note: These changes will be a part of Airflow 2.0
Let’s go over what’s changed:
Annotating a function with the
@task decorator converts the function to a “PythonFunctionalOperator” that’s created behind the scenes when Airflow prepares your DAG for execution. The
multiple_outputs attribute marks that this function will return more than a single value.
The operator’s output is automatically assigned an XCom value for the user to wire to the next operator. The
get_ip.outputattribute constructs a ready-to-use XComArg that represents the operator’s output (what’s returned in the function). When the next operator executes, the XCom value is resolved and the true value is set, passing the
raw_jsonparameter to the prepare_email function.
Decorated Function Return Value
The value returned by calling the decorated
prepare_email function is in itself an XCom argument that represents that operator’s output, and can be subscripted. In our case — the
email_info object. Its value it equal to
operator.output . To reach the actual operator — use
Dependency graph is now implicit — Using this new functional syntax for our Airflow DAG, there is no need to explicitly define a separate dependency graph. When the code is executed, Airflow will understand the dependency graph through the templated XCom arguments that the user passes between operators, so you can omit the classic “set upstream\downstream” statement. This frees the user from having to explicitly keep track of task dependencies.
This new approach simplifies the DAG construction process. We effectively saved writing about 40% of the “surrounding code” — allowing the user to focus on writing business logic rather than orchestration code.
Functional DAG’s future
AIP-31 offers an improvement for writing intuitive and readable DAGs. There are a few features that can definitely be taken further:
- Addition of the
@dagdecorator — will decorate the ‘pipeline’ function that wires all operators and create a DAG object from it
- XCom backend — marshalling and serialization (e.g. automatically converting data frames to CSV when XCom backend in use) — without the ability to serialize XCom values across filesystems we lose a lot of the value that this feature provides.
- Being able to decorate tasks and call them without Airflow execution — allows much easier testing and converting of existing projects
- Leading the way for further Airflow development, such as Configurable Execution Context, decorated
@spark_task, and even data lineage features
- Databand.ai — (Jonathan Shir) My next blog post will present how we approached and implemented these issues at Databand.ai
Making our pipelines feel like any standard python module helps Airflow cover more kinds of use cases because it’s more readable, debuggable, and easier to scale our graphs from a development perspective.
It’s simultaneously better for more complex graphs and for newly minted data engineers. For new data engineers, Functional DAGs makes it easier to get started with Airflow because there’s a smaller learning curve from the standard way of writing python. For advanced cases, it’s easier to scale into more complex graphs because it’s less cumbersome for developers to extend or modify pipeline tasks.
We are excited to contribute these improvements to push Airflow forward, making it a stronger and more future-proofed orchestrator. We look forward to seeing your contributions!