Read our new blog about 'Data Monitoring Advice for When Things Absolutely Must Not Break'

Everyday Data Engineering: Best Practices on Data Replication to Snowflake with Python

Databand
2021-01-22 11:25:08

Everyday Data Engineering: Databand’s blog series – A guide on common data engineering activities, written for data engineers by data engineers.


A big portion of what data engineers do is move around data from one database to another – also known as data replication and migration. Every data team has gone through a data replication project – whether it’s for the purpose of moving data to a database better suited for analytical querying, protecting an operational database from high analytical load, or doing a cloud migration process.

In these kinds of processes, it’s critical to know that data is being consistently and accurately moved. Otherwise, analytics will be broken, ML models will underperform, and engineers will spend all day troubleshooting. In this guide, we will demonstrate how to replicate data to Snowflake with python using Snowflake best practices, while ensuring data integrity. The data replication process will take data from an on-premise PostgreSQL database to a Snowflake cloud database. 

Setting up the Prerequisites 

We will be using SQLAlchemy to interact with the on-premise PostgreSQL database, Snowflake’s Python connector to interact with Snowflake, and Databand’s open source library (“DBND”) to track our data and check for data integrity. We will also be using Pandas to efficiently perform transformations.

You can install all the tools used in this guide with the following command:

pip install dbnd dbnd-snowflake SQLAlchemy snowflake-python-connector 

If any dependencies are missing, you can find more detailed installation instructions on the following documentation pages: 

You will also need a PostgreSQL database, and a Snowflake account with privileges to create tables and stages. For information on account privileges, visit https://docs.snowflake.com/en/user-guide/security-access-control-privileges.html.

Connecting to the Databases

We will need two connections established: one connection to our PostgreSQL database (using SQLAlchemy engine) and another to Snowflake (using Snowflake Python Connector). We can do this with SQL alchemy and Snowflake Python Connector: 

from sqlalchemy import create_engine import pandas as pd import snowflake.connector from dbnd import log_dataframe, log_metric from dbnd import log_snowflake_resouce_usage, log_snowflake_table import datetime def establish_postgres_connection(): """uses sqlalchemy to establish connection to postgresql db""" pg_db = create_engine('postgresql+psycopg2://dbnd_postgres:[email protected]/dbnd_postgres') return pg_db def establish_snowflake_connection(): """uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string""" snowflake_connection = snowflake.connector.connect( user=credentials.USER, password=credentials.PW, account=f'{credentials.ACCOUNT}' ) connection_string = f'snowflake://{credentials.USER}:{credentials.PW}@{credentials.ACCOUNT}/?account={credentials.ACCOUNT}' return snowflake_connection, connection_string

The connection string is used by DBND to track Snowflake resource usages and tables. The connection string should be in the format of: 

snowflake://<user>:<password>@<full account name>/?account=<full account name>

Where the full account name will be your full Snowflake account name, including the region and cloud platform, if applicable. For example, if your account name is xyz12345 and your region and cloud platform are US East (N. Virginia) and AWS respectively, then your full account name would be 

xyz12345.us-east-2.aws

Exploring the Example Data

Before we continue with the guide on data replication, let’s take a quick look at the example data used in this guide. This will help you adapt the steps in this guide to better suit your workflows. 

The table columns are:

  • index
    – The indices of PostgreSQL table.
  • invoice_ID
    – Unique ID generated for each transaction.
  • transaction_amt
    – Amount exchanged in the transaction.
  • transaction_time
    – POSIX timestamp of transaction
  • transaction_fulfilled
    – Whether the transactions has been fulfilled

Querying Data from PostgreSQL 

Next we query data from our on-premise PostgreSQL database, we can do this by creating a function that will use the COPY command to store a piece of our table or the full table into a local file. We will use this file for staging in a later section. For larger datasets, it is recommended to COPY the data incrementally instead. 

After copying the data to a file, we will track our data in the form of a dataframe using DBND’s log_dataframe function. This dataframe will be used to perform a data integrity check at the end of our workflow. We will also change the index column to pg_index. This will prevent any confusion in the future. If the index column is no longer required for your application, you can drop that column here or modify the SQL query to not select the index column. 

def find_transactions(pg_connection, start_day=None): """finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.""" staging_file = path.join(".", "tmp", "transactions.csv") if start_day: start = (datetime.datetime.now()- datetime.timedelta(days=start_day)).timestamp() query = f"COPY (SELECT * FROM transactions_data WHERE transaction_time >= {start} )TO '{staging_file}' WITH DELIMITER ',' CSV HEADER;" else: query = f"COPY transactions_data TO '{staging_file}' WITH DELIMITER ',' CSV HEADER;" transaction_data = pd.read_csv(staging_file) transaction_data.rename(columns={'index':'pg_index'}, inplace=True) transaction_data.to_csv(staging_file, index=False) log_dataframe("PostgreSQL data", transaction_data, with_histograms=True) return transaction_data

Creating the Snowflake Tables

Before we stage the file to be loaded into a Snowflake table, we have to first create the tables we need. Snowflake’s architecture allows for row-level updates, making for easier delta data migration processes. The best way to do this is by loading the extracted Postgres data to an intermediate table, then updating or adding rows to the final table as required. Snowflake provides support for three types of tables: temporary tables, transient tables, and permanent tables. Temporary tables are session-based and only exist until the session in which they are created has ended. When the session has ended, data that on these temporary tables are completely purged. Transient tables are more similar to permanent tables in that they exist past the session expiration time and must be explicitly dropped. Transient tables differ in that there is no Fail-safe period available for the data stored within them. Temporary and transient tables are useful for storing data that does not need to be maintained for extended periods of time. In this guide, our intermediate table will be a temporary table used to load data into our final, permanent, table. To create these tables in Snowflake: 

CREATE TABLE "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" ("pg_index" INTEGER, "invoice_ID" STRING, "transaction_amt" DOUBLE, "transaction_time" DOUBLE, "transaction_fulfilled" INTEGER); CREATE TEMPORARY TABLE "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" ("pg_index" INTEGER, "invoice_ID" STRING, "transaction_amt" DOUBLE, "transaction_time" DOUBLE, "transaction_fulfilled" INTEGER);

Snowflake supports a variety of data types, throughout this guide we use simple data types. 

Staging 

There are two types of stages – internal and external. In either case, a stage is a reference to a location where you can store files to load or unload from Snowflake tables. 

Internal Staging

An internal stage is a specified location on the local machine where data file(s) are stored to  be loaded into a table. An internal stage has the advantage of being easier to set up but it does not support automated continuous data ingestion. For a one-shot bulk data migration, it is often easier to create an internal stage.

After creating an internal stage, we will load the contents onto an intermediate table – this will allow us to update our final table according.

We will also be keeping track of the Query ID of each of our queries – this will later be used to discover the Snowflake resource usage (credits, etc.) and any performance bottlenecks of our queries. 

def migrate_from_internal_stage(sf_cursor): """creates and populates an internal stage, then copies file(s) into an intermediate table""" query_ids = [] # first create the internal stage: stage_creation_query = """CREATE OR REPLACE TEMPORARY STAGE pg_internal_stage FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);""" sf_cursor.execute(stage_creation_query) query_ids.append(sf_cursor.sfqid) sf_cursor.execute("PUT file://./tmp/internal_stage/transactions.csv @pg_internal_stage;") query_ids.append(sf_cursor.sfqid) sf_cursor.execute('COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @pg_internal_stage;') query_ids.append(sf_cursor.sfqid) return query_ids

External Staging 

An external stage is a specified cloud storage location where data file(s) are stored so that they can be loaded into a table. At the time of writing, Snowflake supports the following cloud storage services:

  • Amazon S3 Buckets
  • Google Cloud Storage Buckets
  • Microsoft Azure Containers

In this guide, we are using an AWS S3 Bucket as our external stage. For the required parameters of your cloud storage service, visit Snowflake’s official documentation.

External stages require only a few extra steps but are more versatile. External stages support continuous data ingestion that can be automated with SnowPipe. 

We can also create a permanent storage integration for our Snowflake database that will allow Snowflake to read data from and write data to our AWS bucket. Only one storage integration can exist for each database, and by creating a new storage integration, the previously associated storage integration links will be overwritten. 

As with the internal staging process, we will also be keeping track of the Query ID of each of our queries – this will later be used to discover the Snowflake resource usage (credits, etc.) and any performance bottlenecks of our queries. 

def migrate_from_external_s3_stage(sf_cursor): """creates and populates an external stage, then copies files into an intermediate table""" query_ids = [] # first create the storage integration on snowflake: sf_cursor.execute("USE ROLE ACCOUNTADMIN;") snowflake_storage_integration = """CREATE OR REPLACE STORAGE INTEGRATION s3_integration type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam:::role/snowflake-role' storage_allowed_locations = ('s3:///'); """ sf_cursor.execute(snowflake_storage_integration) query_ids.append(sf_cursor.sfqid) # next create the external stage: ex_stage_creation_query = """CREATE OR REPLACE STAGE aws_external_stage URL = 's3:///' STORAGE_INTEGRATION = s3_int FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);""" sf_cursor.execute(ex_stage_creation_query) query_ids.append(sf_cursor.sfqid) # copy from external stage to intermediate table sf_cursor.execute('COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @aws_external_stage') query_ids.append(sf_cursor.sfqid) return query_ids

Tracking Our Data

Before we complete the data replication process by updating the final Snowflake table, we will use Pandas and Databand DBND to make sure our data was replicated successfully into the intermediate table. To do this, we can select all the data from our intermediate table, then compare the contents in the intermediate table with the contents extracted from our PostgreSQL database. If the contents of both tables match, DBND will log True; otherwise, DBND logs False. To keep things simple in this guide, we will allow unmatched data to pass onto the final table, but you can fail your pipeline here or report the discrepancy to an alerting system if you require your data to be completely identical.

sf_cursor.execute('SELECT * FROM "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA"') results = sf_cursor.fetchall() query_ids.append(sf_cursor.sfqid) snowflake_final_data = pd.DataFrame(results, columns =["pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled"]) log_metric("Data Identical",snowflake_final_data.equals(transaction_data)) log_dataframe("Snowflake Data", snowflake_final_data, with_histograms=True)

Updating the Final Table

Finally, all we need to do is to update the final table. There are several methods of approaching this: 

  • Delete records that exist in both tables, then insert all records from the intermediate table to the final table.
  • Update the rows in the final table if they exist in both tables, insert rows from the intermediate table to the final table if they don’t yet exist.
  • Use Snowflake’s Merge command to insert new rows and update existing rows on the final table. 

In our Python workflow, we will be using the MERGE option. 

def merge_intermediate_table(sf_cursor): """Uses the MERGE command to merge intermediate table and final table""" query_ids = [] snowflake_merge = """MERGE into "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" final_table using "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" i_t on final_table."invoice_ID" = i_t."invoice_ID" WHEN matched THEN UPDATE SET final_table."pg_index" = i_t."pg_index", final_table."invoice_ID" = i_t."invoice_ID", final_table."transaction_amt" = i_t."transaction_amt", final_table."transaction_time" = i_t."transaction_time", final_table."transaction_fulfilled" = i_t."transaction_fulfilled" WHEN not matched THEN insert ("pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled") values (i_t."pg_index", i_t."invoice_ID", i_t."transaction_amt", i_t."transaction_time", i_t."transaction_fulfilled");""" sf_cursor.execute(snowflake_merge) query_ids.append(sf_cursor.sfqid) # clear the intermediate table sf_cursor.execute('DELETE FROM "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE"') query_ids.append(sf_cursor.sfqid) return query_ids

The Full Workflow

Now that we have all the pieces set up, we simply have to connect them together. Here is an example of what a batch replication workflow would look like with an external stage:

pg_connection = establish_postgres_connection() transaction_data = find_transactions_since(pg_connection) database = "TRANSACTIONS" sf_connection, sf_conn_string = establish_snowflake_connection() sf_cursor = sf_connection.cursor() sf_cursor.execute(f"USE DATABASE {database}") session_id = sf_cursor.connection.session_id query_ids = [] query_ids += migrate_from_external_s3_stage(sf_cursor) query_ids += merge_intermediate_table(sf_cursor) sf_cursor.execute('SELECT * FROM "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA"') results = sf_cursor.fetchall() query_ids.append(sf_cursor.sfqid) snowflake_final_data = pd.DataFrame(results, columns =["pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled"]) log_metric("Data Identical",snowflake_final_data.equals(transaction_data)) log_dataframe("Snowflake Data", snowflake_final_data, with_histograms=True) log_snowflake_resource_usage( database=database, connection_string=sf_conn_string, query_ids=query_ids, session_id=int(session_id), raise_on_error=True, delay=1 ) log_snowflake_table( table_name="TRANSACTION_DATA", connection_string=sf_conn_string, database=database, schema="PUBLIC", with_preview=True, raise_on_error=False ) sf_connection.close()

log_snowflake_resource_usage allows us to log the resources consumed throughout the whole process – including credits used, compilation time, bytes scanned, and more!  log_snowflake_table allows us to log the full Snowflake table with schema information, and previews. 

Conclusion

This guide covers how to set up a data replication using best practices from Snowflake, and good fundamentals for tracking data integrity. It uses the example of PostgreSQL to Snowflake, but the same logic can be applied to multiple databases or tables with slight modifications to the queries. Likewise, the data integrity checks and tracking functionality would operate similarly in other cloud databases like Google BigQuery and AWS Redshift. 

What are the next steps? If you’re dealing with large scale, you may want to tune your approach to integrity checks, like only collecting schema information or more specific metrics, rather than matching entire tables. This will have performance benefits, but come at the cost of full assurance of 1:1 data replication. You also may want to explore operational monitoring or alerting tools to let you know when the job itself or your integrity checks fail, which is where Databand can come in to help.

Happy engineering!

To learn more on how Databand can help with data replication and data integrity – sign up for a free trial or request a demo

Databand Dataops Observability Platform for Pipelines

Everyday Data Engineering: Tips on Developing Production Pipelines in Airflow

Read next blog