September 27, 2021 By Databand 5 min read

Spark is critical to the modern data stack. As such, it’s extremely important to have the right level of observability for your Spark environments. There are plenty of options for monitoring Spark—including SaaS programs that provide you with pre-configured dashboards for Spark and Spark SQL metrics. What if that’s not enough?

Typical Spark application setup, whether it’s a self-hosted or managed solution, includes some operational dashboards for cluster health monitoring. But while those dashboards are very useful, they only bring us an infrastructure overview and not the actual metrics related to data. Yes, we can assume there may be something wrong with the app when the CPU has increased usage or the cluster is running out of RAM, but it doesn’t help when the source changed the schema or data that came from another department is broken. Most issues that engineers face are caused by data and not by the underlying infrastructure so they have to spend a lot of time reproducing issues or tinkering around with files and buckets like detectives. This where the actual application monitoring can help.

Every situation calls for a different level of visibility, and data engineers need to have the ability to go a level deeper than execution metrics. Otherwise, you can spend a significant amount of time debugging data quality issues in Spark.

In this guide, you’ll learn how to get high-levels and low-levels of data observability for Spark. For the high-level, you’ll be using Spark’s internal systems like Listener APIs and Query Execution Listeners. For the low-level, you’ll learn how to use libraries to track data quality metrics.

After learning to do both, you’ll have the option to pick whichever one works best for the problem you’re trying to solve.

Low-level ways to monitor Apache Spark

Spark Listener

This is a very old and bulletproof way of getting metrics. Actually, Spark UI utilizes the very same mechanism to visualize metrics. Spark listeners API allows developers to track events which Spark emits during application execution. Those events are typically application start/end, job start/end, stage start/end etc. You can find the full list in Spark JavaDoc. It’s easy to configure and easy to use Spark Listeners to grab metrics. After performing each of the operations, Spark will call Spark Listener and pass some metadata information to it’s method. This will include things like execution time, records read/written, bytes read/written and other.

This very basic and low-level data quality monitoring will check records count and size. Imagine you have some job which runs on a daily basis and executes some transformation/analytics on incoming data sets. You can write a listener which checks how many records were read from input and compare it with the previous day’s result. When the difference is significant, we can assume that something can be wrong with the datasource.

However, this approach requires writing in-house monitoring solutions. Metric values should be stored somewhere, alert mechanisms should be configured. When application code will change, all metrics keys will also change and one should handle it properly.

However, even a simple Spark Listener can give some insights to your data.

Here is an example of such a Spark Listener:

public class SomeSparkListener extends SparkListener { /** * This very simple spark listener print metrics collected for every stage. * * @param stageCompleted */ @Override public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { StageInfo stageInfo = stageCompleted.stageInfo(); Iterator it = stageInfo.taskMetrics().accumulators().iterator(); while (it.hasNext()) { AccumulatorV2
next = it.next(); String key = next.name().get(); Object value = next.value(); System.out.printf("key: %s, value: %s%n", key, value); } } }

You can add Spark Listener to your application in several ways:

Add it programmatically:

SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());

Or pass it via spark-submit/spark cluster driver options:

spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"

Spark Query Execution Listener

This is another mechanism for Spark monitoring provided out-of-the-box. Instead of focusing on very low-level metrics, Query Execution Listener allows developers to subscribe to query completion events. It provides a more high-level metadata about query executed like logical and physical plans, and execution metrics.

You can get metrics like records read/written by query, but this time aggregated for the whole query instead of specific tasks/jobs/stages.

Also the very useful information can be extracted from plans like data location and schema. You can extract and store schema along with dataframe dimensions and compare it to the previous runs, triggering alerts when something is going wrong.

However, extracting data from a plan can be complicated because you’re forced to use a low-level Spark API.

Also, all operational burdens with implementing metrics storage and alerting mechanisms are still present. What you’ll get from the Spark is just metadata. It’s the developer’s responsibility to utilise it.

Here is an example of simple Query Execution Listener which prints plan and metrics:

public class ExampleQueryExecutionListener implements QueryExecutionListener { /** * Print plan and query metrics * * @param funcName * @param qe * @param durationNs */ @Override public void onSuccess(String funcName, QueryExecution qe, long durationNs) { System.out.println(qe.executedPlan().prettyJson()); Iterator it = qe.executedPlan().metrics().iterator(); while (it.hasNext()) { Tuple2 next = it.next(); System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value()); } } @Override public void onFailure(String funcName, QueryExecution qe, Exception exception) { } }

Query execution listeners can be added either programmatically or via configuration:

In application code: SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());

Via spark-submit:

spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"

Implementing low-level monitoring can be some serious heavy-lifting, however, the “system” way of monitoring has a huge benefit: it doesn’t introduce computational overhead. Since the metadata is emitted and recorded by Spark internals it doesn’t give any penalties to query execution times.

Using Listeners for monitoring allows you to avoid touching any application code. This can have huge benefits when you want to track data on existing and legacy applications but don’t have the budget to make changes. Just write a listener, pass it via spark configuration and get a picture of your data.

High-level ways to monitor Apache Spark

Manual data quality checks

You can greatly increase your confidence in incoming data by validating it manually. Let’s say we expect some number of records in the input datasource and this number shouldn’t be usually lower than X. We can write something very simple like:

The possibilities here are unlimited. We can compare counts, non-null values count, inferred schemas, etc.

Using data quality libraries

Since many quality checks are more or less trivial, like ensuring your dataframe has proper shape and contents, the community developed convenient libraries for such checks. One of those libraries is Deequ. It provides a rich Domain-specific language (DSL) for most cases. Check it out. Also it has advanced things, like the ability to profile columns—calculate min/max/mean/percentiles, calculate histograms, detect anomalies and many more.

Consider following example from Deequ docs:

You can see we have a huge set of checks wrapped in a nice and ready-to-use DSL.

More important, Deequ provides the ability to store checks results and automatically run comparisons with previous runs. This can be done by utilizing Metrics Repositories. One can write their own implementation and seamlessly integrate Deequ into existing monitoring infrastructure.

While high-level, application quality checks are way more flexible than low-level approaches, they come with a big downside: performance penalties. Since every calculation emits spark operation, overhead can be very significant in some cases, especially on the large data sets. Each “count” and “where” can lead into full scans. Spark internally will do its best to optimize execution plans but you should consider these implications and ensure data profiling won’t harm your performance.

Conclusion

We have reviewed several ways of monitoring data quality for Spark applications. Low-level approach utilizes Spark Event Listeners API and gives access to low-level metrics like records read/written, logical/physical plans and can be useful for building trends and making sure data pipeline produces proper results and getting overview on existing applications without any code modifications. High-level approaches like checking data by hand or using data quality libraries is much more convenient but has drawbacks like performance penalties.

As in any real world situation, there are always trade-offs and better scenarios for both approaches, depending on your application type. Use it wisely.

At IBM® Databand®, we utilize both ways to provide a comprehensive set of options to track Spark applications. While in our core we use Spark Listeners to build metric trends and data lineage, we also provide convenient Metrics Store for Deequ as well as ability to track individual manually-calculated metrics.

Learn more about Databand’s continuous data observability platform and how it helps detect data incidents earlier, resolve them faster and deliver more trustworthy data to the business. If you’re ready to take a deeper look, book a demo today.

Was this article helpful?
YesNo

More from Databand

IBM Databand achieves Snowflake Ready Technology Validation 

< 1 min read - Today we’re excited to announce that IBM Databand® has been approved by Snowflake (link resides outside ibm.com), the Data Cloud company, as a Snowflake Ready Technology Validation partner. This recognition confirms that the company’s Snowflake integrations adhere to the platform’s best practices around performance, reliability and security.  “This is a huge step forward in our Snowflake partnership,” said David Blanch, Head of Product for IBM Databand. “Our customers constantly ask for data observability across their data architecture, from data orchestration…

Introducing Data Observability for Azure Data Factory (ADF)

< 1 min read - In this IBM Databand product update, we’re excited to announce our new support data observability for Azure Data Factory (ADF). Customers using ADF as their data pipeline orchestration and data transformation tool can now leverage Databand’s observability and incident management capabilities to ensure the reliability and quality of their data. Why use Databand with ADF? End-to-end pipeline monitoring: collect metadata, metrics, and logs from all dependent systems. Trend analysis: build historical trends to proactively detect anomalies and alert on potential…

DataOps Tools: Key Capabilities & 5 Tools You Must Know About

4 min read - What are DataOps tools? DataOps, short for data operations, is an emerging discipline that focuses on improving the collaboration, integration and automation of data processes across an organization. DataOps tools are software solutions designed to simplify and streamline the various aspects of data management and analytics, such as data ingestion, data transformation, data quality management, data cataloging and data orchestration. These tools help organizations implement DataOps practices by providing a unified platform for data teams to collaborate, share and manage…

IBM Newsletters

Get our newsletters and topic updates that deliver the latest thought leadership and insights on emerging trends.
Subscribe now More newsletters