Virtual Event: Data-Driven and Coffee-Fueled with Kate Strachnyi, founder of Datacated. Register now

Everyday Data Engineering: Monitoring Airflow with Prometheus, StatsD and Grafana

Databand
2021-04-02 15:53:26

Everyday Data Engineering: Monitoring Airflow with Prometheus, StatsD and Grafana

Everyday Data Engineering: Databand’s blog series – A guide on common data engineering activities, written for data engineers by data engineers.

Data pipeline monitoring is vital for data engineering day-to-day operations. It must be done effectively if you want your consumers to trust your data. Without the right data observability and data monitoring systems set up, data can break easily.

Monitoring Airflow production pipelines can be especially painful. In order to debug health problems or find the root cause of failures, a data engineer needs to hop between the Apache Airflow UI, DAG logs, various monitoring tools, and Python code. To make this process smoother, we can use operational dashboards to get a bird’s-eye view of our system, clusters, and overall health.

We’ll need to configure a data observability dashboard. There are two routes you can take when looking for a data observability solution: an open-source solution or a managed one.

Airflow monitoring dashboards: open-source vs. managed service

There are advantages and disadvantages to both.

Open-source pros and cons

  • Pros
    • Lower initial cost—Your only cost for implementation is labor.
    • Community support—Contribution from the global community.
  • Cons
    • Higher long-term cost—Maintenance and troubleshooting can become difficult as your needs become more complex.
    • Usability can be difficult—As your data team grows, ease of use, scalability, and governance can become unmanageable.

Managed Service pros and cons

  • Pros
    • Greater usability—Better UI and automation can make your team more efficient.
    • Better support—You have a team of Solution Architects standing by to help.
  • Cons
    • Higher initial costs—The pricing model might not make sense for some organizations.
    • Less flexibility—Unless the managed service is built on open-source code, functionality can be limited.

In this guide, we’ll be exploring the best practices for going the open-source route to building an operational dashboard. This guide’s goal is to easily answer questions like:

  • Is our cluster alive?
  • How many DAGs do we have in a bag?
  • Which operators succeeded and which failed lately?
  • How many tasks are running right now?
  • How long did it take for the DAG to complete? (and so on….)

Building an open-source Airflow monitoring solution

For this guide, we will focus on monitoring and visualizing Airflow cluster metrics. These types of metrics are great indicators of cluster and infrastructure health and should be constantly monitored.

Airflow exposes metrics such as DAG bag size, number of currently running tasks, and task duration time, every moment the cluster is running. Airflow uses the StatsD format to expose these metrics and we will use this in our solution (we’ll get into more details about this below). You can find a list of all the different metrics exposed, along with descriptions, in the official Airflow documentation.

Open-source tools

Let’s see how we can utilize various open-source tools like StatsD, Prometheus, and Grafana to build a monitoring solution for Airflow. By leveraging this trio, you can find out whenever the scheduler is running, how many DAGs are in a bag now, or most other critical problems in the cluster’s health.

1. StatsD

We’ll start with StatsD. StatsD is a widely used service for collecting and aggregating metrics from various sources. Airflow has built-in support for sending metrics into the StatsD server. Once configured, Airflow will then push metrics to the StatsD server and we will be able to visualize them.

2. Prometheus

Prometheus is a popular solution for storing metrics and alerting. Because it is typically used to collect metrics from other sources, like RDBMSes and webservers, we will use Prometheus as the main storage for our metrics. Because Airflow doesn’t have an integration with Prometheus, we’ll use Prometheus StatsD Exporter to collect metrics and transform them into a Prometheus-readable format. StatsD Exporter acts as a regular StatsD server, and Airflow won’t notice any difference between them.

3. Grafana

Grafana is our preferred metrics visualization tool. It has native Prometheus support and we will use it to set up our Airflow Cluster Monitoring Dashboard.

The basic architecture of our monitoring solution will look like this:

Airflow Cluster reports metrics to StatsD Exporter which performs transformations and aggregations and passes them to Prometheus. Grafana then queries Prompetheus and displays everything in a gorgeous dashboard. In order for this to happen, we will need to set up all of those pieces.

First, we will configure Airflow, then StatsD Exporter and then Grafana.

Let’s start!

Prometheus, StatsD Exporter and metrics mapping

This guide assumes you’re already familiar with Prometheus. If you aren’t yet, it has great documentation and is really easy to get started with as Prometheus doesn’t require special configuration. StasD Exporter will be used to receive metrics and provide them to Prometheus. Usually it doesn’t require much configuration, but because of the way Airflow sends metrics, we will need to re-map them.

By default, Airflow exposes a lot of metrics, which labels are composed from DAG names. For convenience, these metrics should be properly mapped. By utilizing mapping we can then build Grafana dashboards with per-airflow instances and per-dag views.

Let’s take dag.<dag_id>.<task_id>.duration metric for example. The raw metric name sent by Airflow will look like airflow_dag_sample_dag_dummy_task_duration. For each Airflow instance you have and for each DAG you have, it will report duration for each Task producing combinatorics explosion of the metrics. For simple DAGs, it’s not an issue. But when tasks add up, things start being more complicated and you wouldn’t want to bother with Grafana configuration.

To solve this, StatsD Exporter provides a built-in relabeling configuration. There is great documentation and examples of these on the StatsD Exporter page.

Now let’s apply this to our DAG duration metric.

The relabel config will look like this:

mappings: - match: "*.dag.*.*.duration" match_metric_type: observer name: "af_agg_dag_task_duration" labels: airflow_id: "$1" dag_id: "$2" task_id: "$3"

We are extracting three labels from this metric:

  1. Airflow instance ID (which should be different across the instances)
  2. DAG ID
  3. Task ID

Prometheus will then take these labels and we’ll be able to configure dashboards with instance/DAG/task selectors and provide observability on different detalization levels.

We will repeat re-labeling config for each metric exposed by Airflow. See “Source Code” section at the end of the article for a complete example.

Airflow configuration

A couple of options should be added to airflow.cfg. Please note that Airflow will fail to start if StatsD server won’t be available at the start-up time! Make sure you have an up and running StatsD Exporter instance.

The very basic config section in airflow.cfg will look like this:

[metrics] statsd_on = True statsd_host = localhost statsd_port = 8125 statsd_prefix = airflow

For the more details, please refer to Airflow Metrics documentation.

Configuring Grafana Dashboards

Now, when we have all our metrics properly mapped, we can proceed to creating the dashboards. We will have two dashboards—one for cluster overview and another for DAG metrics.

For the first dashboard we will have the Airflow instance selector:

You can see here all vital metrics: like scheduler heartbeat, dagbag size, queued/running tasks count, currently running DAGs aggregated by tasks etc:

For the second dashboard we will have the DAG selector:

You can see DAG-related metrics: success DAG run duration, failed DAG run duration, DAG run dependency check time and DAG run schedule delay.

Source Code

Complete source code including StatsD mapping config, two Grafana dashboards—one for Cluster overview and another for DAG stats, can be found in our GitHub project: https://github.com/databand-ai/airflow-dashboards/.

Conclusion

Airflow provides a decent out-of-the-box solution for monitoring DAGs, but it lacks accessibility and comprehensiveness. In this tutorial we have configured Airflow, StatsD Exporter and Grafana to get nice and useful dashboards.

Dashboards like these can save a lot of time when troubleshooting cluster health issues like executors being down or DAG parsing being stuck because it has some heavyweight DB query. For more robust and convenient monitoring, alerts should also be configured, but this is out of the scope of the current article.

Stay tuned for more guides!

Happy engineering! 🙂

– This guide was written by Vova Rozhkov, Databand’s experienced Software Engineer.

Find and fix Airflow pipeline issues fast

Databand centralizes your Airflow pipeline metadata in one place, so you can get ahead of data health issues.

Best Practices on Data Replication from PostgreSQL to Snowflake with Python

Databand
2021-01-22 11:25:08

Best Practices on Data Replication from PostgreSQL to Snowflake with Python

A big portion of what data engineers do is move around data from one database to another – also known as data replication and migration. Every data team has gone through a data replication project like this – whether it’s for the purpose of moving data to a database better suited for analytical querying, protecting an operational database from high analytical load, or doing a cloud migration process. One way data teams commonly do this is through a data replication from PostgreSQL to Snowflake.

In these kinds of processes, it’s critical to know that data is being consistently and accurately moved. Otherwise, analytics will be brokenML models will underperform, and engineers will spend all day troubleshooting. In this guide, we will demonstrate how to replicate data to Snowflake with python using Snowflake best practices, while ensuring data integrity. The data replication process will take data from an on-premise PostgreSQL database to a Snowflake cloud database.

Data replication process overview

PostgreSQL to Snowflake: Setting up the Prerequisites

We will be using SQLAlchemy to interact with the on-premise PostgreSQL database, Snowflake’s Python connector to interact with Snowflake, and Databand’s open source library (“DBND”) to track our data and check for data integrity. We will also be using Pandas to efficiently perform transformations.

You can install all the tools used in this guide with the following command:

pip install dbnd dbnd-snowflake SQLAlchemy snowflake-python-connector

If any dependencies are missing, you can find more detailed installation instructions on the following documentation pages:

You will also need a PostgreSQL database, and a Snowflake account with privileges to create tables and stages. For information on account privileges, visit https://docs.snowflake.com/en/user-guide/security-access-control-privileges.html.

Connecting to PostgreSQL and Snowflake

We will need two connections established: one connection to our PostgreSQL database (using SQLAlchemy engine) and another to Snowflake (using Snowflake Python Connector). We can do this with SQL alchemy and Snowflake Python Connector:

from sqlalchemy import create_engine import pandas as pd import snowflake.connector from dbnd import log_dataframe, log_metric from dbnd import log_snowflake_resouce_usage, log_snowflake_table import datetime def establish_postgres_connection(): """uses sqlalchemy to establish connection to postgresql db""" pg_db = create_engine('postgresql+psycopg2://dbnd_postgres:[email protected]/dbnd_postgres') return pg_db def establish_snowflake_connection(): """uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string""" snowflake_connection = snowflake.connector.connect( user=credentials.USER, password=credentials.PW, account=f'{credentials.ACCOUNT}' ) connection_string = f'snowflake://{credentials.USER}:{credentials.PW}@{credentials.ACCOUNT}/?account={credentials.ACCOUNT}' return snowflake_connection, connection_string

The connection string is used by DBND to track Snowflake resource usages and tables. The connection string should be in the format of:

snowflake://<user>:<password>@<full account name>/?account=<full account name>

Where the full account name will be your full Snowflake account name, including the region and cloud platform, if applicable. For example, if your account name is xyz12345 and your region and cloud platform are US East (N. Virginia) and AWS respectively, then your full account name would be

xyz12345.us-east-2.aws

Exploring the Example Data

Before we continue with the guide on data replication, let’s take a quick look at the example data used in this guide. This will help you adapt the steps in this guide to better suit your workflows.

The table columns are:

  • index
    – The indices of PostgreSQL table.
  • invoice_ID
    – Unique ID generated for each transaction.
  • transaction_amt
    – Amount exchanged in the transaction.
  • transaction_time
    – POSIX timestamp of transaction
  • transaction_fulfilled
    – Whether the transactions has been fulfilled

Querying Data from PostgreSQL

Next we query data from our on-premise PostgreSQL database, we can do this by creating a function that will use the COPY command to store a piece of our table or the full table into a local file. We will use this file for staging in a later section. For larger datasets, it is recommended to COPY the data incrementally instead.

Tracking Data with a Dataframe

After copying the data to a file, we will track our data in the form of a dataframe using DBND’s log_dataframe function. This dataframe will be used to perform a data integrity check at the end of our workflow.

Changing the Index Column

We will also change the index column to pg_index. This will prevent any confusion in the future. If the index column is no longer required for your application, you can drop that column here or modify the SQL query to not select the index column.

def find_transactions(pg_connection, start_day=None): """finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.""" staging_file = path.join(".", "tmp", "transactions.csv") if start_day: start = (datetime.datetime.now()- datetime.timedelta(days=start_day)).timestamp() query = f"COPY (SELECT * FROM transactions_data WHERE transaction_time >= {start} )TO '{staging_file}' WITH DELIMITER ',' CSV HEADER;" else: query = f"COPY transactions_data TO '{staging_file}' WITH DELIMITER ',' CSV HEADER;" transaction_data = pd.read_csv(staging_file) transaction_data.rename(columns={'index':'pg_index'}, inplace=True) transaction_data.to_csv(staging_file, index=False) log_dataframe("PostgreSQL data", transaction_data, with_histograms=True) return transaction_data

Creating the Snowflake Tables

Before we stage the file to be loaded into a Snowflake table, we have to first create the tables we need. Snowflake’s architecture allows for row-level updates, making for easier delta data migration processes. The best way to do this is by loading the extracted Postgres data to an intermediate table, then updating or adding rows to the final table as required.

Snowflake provides support for three types of tables:

  • temporary tables
  • transient tables
  • premanent tables

Temporary Tables

Temporary tables are session-based and only exist until the session in which they are created has ended. When the session has ended, data on these temporary tables are completely purged.

Transient Tables

Transient tables are more similar to permanent tables in that they exist past the session expiration time and must be explicitly dropped. Transient tables differ in that there is no Fail-safe period available for the data stored within them. Temporary and transient tables are useful for storing data that does not need to be maintained for extended periods of time. In this guide, our intermediate table will be a temporary table used to load data into our final, permanent, table.

To create these tables in Snowflake:

CREATE TABLE "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" ("pg_index" INTEGER, "invoice_ID" STRING, "transaction_amt" DOUBLE, "transaction_time" DOUBLE, "transaction_fulfilled" INTEGER); CREATE TEMPORARY TABLE "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" ("pg_index" INTEGER, "invoice_ID" STRING, "transaction_amt" DOUBLE, "transaction_time" DOUBLE, "transaction_fulfilled" INTEGER);

Snowflake supports a variety of data types, throughout this guide we use simple data types.

Staging

There are two types of stages – internal and external. In either case, a stage is a reference to a location where you can store files to load or unload from Snowflake tables.

Internal Staging

An internal stage is a specified location on the local machine where data file(s) are stored to  be loaded into a table. An internal stage has the advantage of being easier to set up but it does not support automated continuous data ingestion. For a one-shot bulk data migration, it is often easier to create an internal stage.

After creating an internal stage, we will load the contents onto an intermediate table – this will allow us to update our final table according.

We will also be keeping track of the Query ID of each of our queries – this will later be used to discover the Snowflake resource usage (credits, etc.) and any performance bottlenecks of our queries.

def migrate_from_internal_stage(sf_cursor): """creates and populates an internal stage, then copies file(s) into an intermediate table""" query_ids = [] # first create the internal stage: stage_creation_query = """CREATE OR REPLACE TEMPORARY STAGE pg_internal_stage FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);""" sf_cursor.execute(stage_creation_query) query_ids.append(sf_cursor.sfqid) sf_cursor.execute("PUT file://./tmp/internal_stage/transactions.csv @pg_internal_stage;") query_ids.append(sf_cursor.sfqid) sf_cursor.execute('COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @pg_internal_stage;') query_ids.append(sf_cursor.sfqid) return query_ids

External Staging

An external stage is a specified cloud storage location where data file(s) are stored so that they can be loaded into a table. At the time of writing, Snowflake supports the following cloud storage services:

  • Amazon S3 Buckets
  • Google Cloud Storage Buckets
  • Microsoft Azure Containers

In this guide, we are using an AWS S3 Bucket as our external stage. For the required parameters of your cloud storage service, visit Snowflake’s official documentation.

External stages require only a few extra steps but are more versatile. External stages support continuous data ingestion that can be automated with SnowPipe.

Permanent Storage Integration

We can also create a permanent storage integration for our Snowflake database that will allow Snowflake to read data from and write data to our AWS bucket. Only one storage integration can exist for each database, and by creating a new storage integration, the previously associated storage integration links will be overwritten.

Query ID

As with the internal staging process, we will also be keeping track of the Query ID of each of our queries – this will later be used to discover the Snowflake resource usage (credits, etc.) and any performance bottlenecks of our queries.

def migrate_from_external_s3_stage(sf_cursor): """creates and populates an external stage, then copies files into an intermediate table""" query_ids = [] # first create the storage integration on snowflake: sf_cursor.execute("USE ROLE ACCOUNTADMIN;") snowflake_storage_integration = """CREATE OR REPLACE STORAGE INTEGRATION s3_integration type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam:::role/snowflake-role' storage_allowed_locations = ('s3:///'); """ sf_cursor.execute(snowflake_storage_integration) query_ids.append(sf_cursor.sfqid) # next create the external stage: ex_stage_creation_query = """CREATE OR REPLACE STAGE aws_external_stage URL = 's3:///' STORAGE_INTEGRATION = s3_int FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);""" sf_cursor.execute(ex_stage_creation_query) query_ids.append(sf_cursor.sfqid) # copy from external stage to intermediate table sf_cursor.execute('COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @aws_external_stage') query_ids.append(sf_cursor.sfqid) return query_ids

Tracking Our Data

Before we complete the data replication process by updating the final Snowflake table, we will use Pandas and Databand DBND to make sure our data was replicated successfully into the intermediate table. To do this, we can select all the data from our intermediate table, then compare the contents in the intermediate table with the contents extracted from our PostgreSQL database.

If the contents of both tables match, DBND will log True; otherwise, DBND logs False. To keep things simple in this guide, we will allow unmatched data to pass onto the final table, but you can fail your pipeline here or report the discrepancy to an alerting system if you require your data to be completely identical.

sf_cursor.execute('SELECT * FROM "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA"') results = sf_cursor.fetchall() query_ids.append(sf_cursor.sfqid) snowflake_final_data = pd.DataFrame(results, columns =["pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled"]) log_metric("Data Identical",snowflake_final_data.equals(transaction_data)) log_dataframe("Snowflake Data", snowflake_final_data, with_histograms=True)

Updating the Final Table

Finally, all we need to do is to update the final table. There are several methods of approaching this:

  • Delete records that exist in both tables, then insert all records from the intermediate table to the final table.
  • Update the rows in the final table if they exist in both tables, insert rows from the intermediate table to the final table if they don’t yet exist.
  • Use Snowflake’s Merge command to insert new rows and update existing rows on the final table.

In our Python workflow, we will be using the MERGE option.

def merge_intermediate_table(sf_cursor): """Uses the MERGE command to merge intermediate table and final table""" query_ids = [] snowflake_merge = """MERGE into "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" final_table using "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" i_t on final_table."invoice_ID" = i_t."invoice_ID" WHEN matched THEN UPDATE SET final_table."pg_index" = i_t."pg_index", final_table."invoice_ID" = i_t."invoice_ID", final_table."transaction_amt" = i_t."transaction_amt", final_table."transaction_time" = i_t."transaction_time", final_table."transaction_fulfilled" = i_t."transaction_fulfilled" WHEN not matched THEN insert ("pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled") values (i_t."pg_index", i_t."invoice_ID", i_t."transaction_amt", i_t."transaction_time", i_t."transaction_fulfilled");""" sf_cursor.execute(snowflake_merge) query_ids.append(sf_cursor.sfqid) # clear the intermediate table sf_cursor.execute('DELETE FROM "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE"') query_ids.append(sf_cursor.sfqid) return query_ids

The Full Workflow

Now that we have all the pieces set up, we simply have to connect them together. Here is an example of what a batch replication workflow would look like with an external stage:

pg_connection = establish_postgres_connection() transaction_data = find_transactions_since(pg_connection) database = "TRANSACTIONS" sf_connection, sf_conn_string = establish_snowflake_connection() sf_cursor = sf_connection.cursor() sf_cursor.execute(f"USE DATABASE {database}") session_id = sf_cursor.connection.session_id query_ids = [] query_ids += migrate_from_external_s3_stage(sf_cursor) query_ids += merge_intermediate_table(sf_cursor) sf_cursor.execute('SELECT * FROM "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA"') results = sf_cursor.fetchall() query_ids.append(sf_cursor.sfqid) snowflake_final_data = pd.DataFrame(results, columns =["pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled"]) log_metric("Data Identical",snowflake_final_data.equals(transaction_data)) log_dataframe("Snowflake Data", snowflake_final_data, with_histograms=True) log_snowflake_resource_usage( database=database, connection_string=sf_conn_string, query_ids=query_ids, session_id=int(session_id), raise_on_error=True, delay=1 ) log_snowflake_table( table_name="TRANSACTION_DATA", connection_string=sf_conn_string, database=database, schema="PUBLIC", with_preview=True, raise_on_error=False ) sf_connection.close()

log_snowflake_resource_usage allows us to log the resources consumed throughout the whole process – including credits used, compilation time, bytes scanned, and more!  log_snowflake_table allows us to log the full Snowflake table with schema information, and previews.

What are the next steps?

Conclusion

If you’re dealing with large scale, you may want to tune your approach to integrity checks, like only collecting schema information or more specific metrics, rather than matching entire tables. This will have performance benefits but come at the cost of full assurance of 1:1 data replication. You also may want to explore operational monitoring or alerting tools to let you know when the job itself or your integrity checks fail, which is where Databand can come in to help.

Happy engineering!

This guide covers how to set up a data replication using best practices from Snowflake, and good fundamentals for tracking data integrity. It uses the example of PostgreSQL to Snowflake, but the same logic can be applied to multiple databases or tables with slight modifications to the queries. Likewise, the data integrity checks and tracking functionality would operate similarly in other cloud databases like Google BigQuery and AWS Redshift.

Implement end-to-end data health checks

Integrate Databand to 20+ tools in your tech stack to measure data health along every step of its journey.