Read our new blog about 'Airflow 2.0 and Why We Are Excited at Databand'

A library for streamlining your data operations

Databand’s open source library helps data teams track pipeline metadata and add dynamic automation to data processes. Providing users full control over how metadata is collected and automation is defined.

Data monitoring

Leverage off-the-shelf Python scripts for tracking your data lake and database tables. You can run the scripts from Airflow or any other orchestrator. They are built using Databand’s open source library, which makes it easy to report data quality and performance metrics to Databand’s monitoring system.

Automated Data Tracking

Out of the box metrics for standard data shape tracking, like schema changes and data completeness

Easily Configurable

Customize the scripts according to your most important data quality checks

Instant Setup

Instant setup, easy to run as Airflow DAGs or simple CRON jobs

def monitor_redshift(**op_kwarg): log_metric("Max table column count", table_shapes["columns"].max()) log_metric("Min table column count", table_shapes["columns"].max()) log_metric("Mean table column count", round(table_shapes["columns"].mean(), 2)) log_metric("Median table column count", table_shapes["columns"].median()) log_metric("Disk capacity (GB)", disk_capacity) log_metric("Disk used (GB)", disk_used) log_metric("Disk free (GB)", disk_free) log_metric("Percent Disk usage", round((disk_used / disk_capacity) * 100, 2))
from dbnd_snowflake import log_snowflake_resource_usage log_snowflake_resource_usage( query_text, database="DATABASE", user="user", connection_string="snowflake://<user>:<password>@<account>/", session_id=123456, )

Pipeline logging and tracking

Store metadata to always be on top of your pipeline and data health.

Custom Metrics

Define and report any custom metric about your data every time your pipeline runs.

Data Profiles

Automatically generate data profiling and statistics on data files and tables.

Input / Output

Track workflow inputs and outputs and lineage of data across tasks and broader pipelines.

from dbnd_snowflake.airflow_operators import LogSnowflakeResourceOperator, log_snowflake_resource_operator select_query = 'select * from "SNOWFLAKE_SAMPLE_DATA"."TPCDS_SF100TCL". "CUSTOMER" limit 1000' # Airflow snowflake_operator get_customers_task = SnowflakeOperator( sql=select_query, snowflake_conn_id="test_snowflake_conn", task_id="get_customers", ) # Databand operator for Snowflake resource tracking log_snowflake_resources_task = LogSnowflakeResourceOperator( query_text=select_query, snowflake_conn_id="airflow_snowflake_conn", warehouse=None, database=database, schema=schema, account=account, task_id="log_snowflake_resources_task", )
from dbnd_snowflake import log_snowflake_resource_usage log_snowflake_resource_usage( query_text, database="DATABASE", user="user", connection_string="snowflake://<user>:<password>@<account>/", session_id=123456, )

DataOps utilities

For data engineers working in complex environments, using tools like Airflow, Kubernetes, and Spark, Databand provides tooling for dynamic DAG execution and centralized configuration management. Making it easier to build, run, and deploy pipelines to production.

Dynamic Runs

Define different versions of pipelines to run based on changing data or parameters

Central Configs

Abstract out configurations to compute environments and data locations

Decorator Definition

Easily define flows by annotating your code, without making big changes to your workflow

@task def buy_vegetables(veg_list) from store import veg_store return veg_store.purchase(veg_list) @task def cut(vegetables): chopped = [] for veg in vegetables: chopped.append(veg.dice()) return [x + "\n" for x in chopped] @task def add_dressing(chopped_vegetables, dressing, salt_amount="low"): for veg in chopped_vegetables: veg.season(salt_amount) return chopped_vegetables @pipeline def prepare_salad(vegetables_list=data_repo.vegetables, dressing="oil"): vegetables = buy_vegetables(vegetables_list) chopped = cut(vegetables) dressed = add_dressing(chopped, dressing) return dressed with DAG(dag_id="prepare_salad") as dag: salad = prepare_salad() """ CLI: airflow backfill -s 2020-06-01 -e 2020-06-02 prepare_salad """
@task def buy_vegetables(veg_list) from store import veg_store return veg_store.purchase(veg_list) @task def cut(vegetables): chopped = [] for veg in vegetables: chopped.append(veg.dice()) return [x + "\n" for x in chopped] @task def add_dressing(chopped_vegetables, dressing, salt_amount="low"): for veg in chopped_vegetables: veg.season(salt_amount) return chopped_vegetables @pipeline def prepare_salad(vegetables_list=data_repo.vegetables, dressing="oil"): vegetables = buy_vegetables(vegetables_list) chopped = cut(vegetables) dressed = add_dressing(chopped, dressing) return dressed """ CLI: dbnd run prepare_salad """

Contributions to the community

We’ve benefited greatly from the work of other developers and we want to share the love. These are some recent contributions we’ve made to the community.

github

Start a free trial or demo

Contact us for a free trial or to see a demo of the solution in action.