Read our new blog about 'Data Monitoring Advice for When Things Absolutely Must Not Break'

Open and extensible DataOps management

A core part of our DataOps platform, Databand’s open source library enables you to track data quality information, monitor pipeline health, and automate complex data processes. We keep our library open source to provide users control over how data is tracked and build custom extensions for any requirement.

Data quality monitoring

Run health checks on your data lake and database tables, like S3, Snowflake, and Redshift. Built using Databand’s open source library, which makes it easy to report data quality and performance metrics to Databand’s monitoring system or your local logging system.

Automated Data Tracking

Out of the box metrics for standard data shape tracking, like schema changes and data completeness

Easily Configurable

Customize the scripts according to your most important data quality checks

Instant Setup

Instant setup, easy to run as Airflow DAGs or simple CRON jobs

def monitor_redshift(**op_kwarg): log_metric("Max table column count", table_shapes["columns"].max()) log_metric("Min table column count", table_shapes["columns"].max()) log_metric("Mean table column count", round(table_shapes["columns"].mean(), 2)) log_metric("Median table column count", table_shapes["columns"].median()) log_metric("Disk capacity (GB)", disk_capacity) log_metric("Disk used (GB)", disk_used) log_metric("Disk free (GB)", disk_free) log_metric("Percent Disk usage", round((disk_used / disk_capacity) * 100, 2))
from dbnd_snowflake import log_snowflake_resource_usage log_snowflake_resource_usage( query_text, database="DATABASE", user="user", connection_string="snowflake://<user>:<password>@<account>/", session_id=123456, )

Pipeline logging and metrics tracking

Integrate Databand into pipelines to report metrics about your data quality and job performance.

Custom Metrics

Define and report any custom metric about your data every time your pipeline runs.

Data Profiles

Automatically generate data profiling and statistics on data files and tables.

Input / Output

Track workflow inputs and outputs and lineage of data across tasks and broader pipelines.

from dbnd_snowflake.airflow_operators import LogSnowflakeResourceOperator, log_snowflake_resource_operator select_query = 'select * from "SNOWFLAKE_SAMPLE_DATA"."TPCDS_SF100TCL". "CUSTOMER" limit 1000' # Airflow snowflake_operator get_customers_task = SnowflakeOperator( sql=select_query, snowflake_conn_id="test_snowflake_conn", task_id="get_customers", ) # Databand operator for Snowflake resource tracking log_snowflake_resources_task = LogSnowflakeResourceOperator( query_text=select_query, snowflake_conn_id="airflow_snowflake_conn", warehouse=None, database=database, schema=schema, account=account, task_id="log_snowflake_resources_task", )
from dbnd_snowflake import log_snowflake_resource_usage log_snowflake_resource_usage( query_text, database="DATABASE", user="user", connection_string="snowflake://<user>:<password>@<account>/", session_id=123456, )

Automation tools

Centralized pipeline configuration management and a CLI that makes it easy to build, run, and deploy pipelines to production.

Central Config Management

Abstract out configurations to compute environments and data locations

Dynamic Runs

Define different versions of pipelines to run based on changing data or parameters

Decorator Definition

Easily define flows by annotating your code, without making big changes to your workflow

@task def buy_vegetables(veg_list) from store import veg_store return veg_store.purchase(veg_list) @task def cut(vegetables): chopped = [] for veg in vegetables: chopped.append(veg.dice()) return [x + "\n" for x in chopped] @task def add_dressing(chopped_vegetables, dressing, salt_amount="low"): for veg in chopped_vegetables: veg.season(salt_amount) return chopped_vegetables @pipeline def prepare_salad(vegetables_list=data_repo.vegetables, dressing="oil"): vegetables = buy_vegetables(vegetables_list) chopped = cut(vegetables) dressed = add_dressing(chopped, dressing) return dressed with DAG(dag_id="prepare_salad") as dag: salad = prepare_salad() """ CLI: airflow backfill -s 2020-06-01 -e 2020-06-02 prepare_salad """
@task def buy_vegetables(veg_list) from store import veg_store return veg_store.purchase(veg_list) @task def cut(vegetables): chopped = [] for veg in vegetables: chopped.append(veg.dice()) return [x + "\n" for x in chopped] @task def add_dressing(chopped_vegetables, dressing, salt_amount="low"): for veg in chopped_vegetables: veg.season(salt_amount) return chopped_vegetables @pipeline def prepare_salad(vegetables_list=data_repo.vegetables, dressing="oil"): vegetables = buy_vegetables(vegetables_list) chopped = cut(vegetables) dressed = add_dressing(chopped, dressing) return dressed """ CLI: dbnd run prepare_salad """


Contributions to the community

We’ve benefited greatly from the work of other developers and we want to share the love. These are some recent contributions we’ve made to the community.

github

Start a free trial or demo

Contact us for a free trial or to see a demo of the solution in action.