cover

Nitesh Ranka

DataData qualityDatabricksETL Migration

Data Quality Testing for Migrated Pipeline: From On-Premise to Databricks

Author
Nitesh Ranka
Cover
WhatsApp Image 2024-12-18 at 17.20.19.jpeg
Slug
data-quality-testing-for-migrated-pipeline
Person
Published
Published
Date
Dec 18, 2024
Category
Data
Data quality
Databricks
ETL Migration

Introduction

 
Data pipelines are essential for turning raw data into actionable insights. However,  ensuring the quality of data becomes even more critical when migrating from an on-prem infrastructure to cloud platforms like Databricks. Poor data quality can lead to wrong decisions, financial loss, and operational inefficiency. This guide dives into the challenges and solutions for data quality testing during pipeline migration.
This guide walks you through how to validate your data pipelines during migration, specifically by implementing automated tests using Pytest. This guide also explores how to prepare data for testing, detailing when to use full datasets or samples, and covers essential test cases to validate data consistency, schema accuracy, null handling, and more. The goal is to ensure that your pipelines deliver high-quality data post-migration.

Why Is Data Quality Testing Important?

Poor data quality can lead to incorrect insights, faulty decision-making, and lost revenue. Data migrations, especially from on-prem to cloud, introduce risks of:
  • Data loss: Missing records or incomplete migration.
  • Schema mismatches: Differences in data formats between on-prem and cloud.
  • Inconsistent records: Discrepancies between the source (on-prem) and target (Databricks) datasets.
  • Inaccurate data transformations: Errors caused by transformations applied in Databricks that don't mirror the on-prem environment.
There are many cases where companies had to bear big losses due to bad data quality. For example, In Q1 2022, Unity Technologies, renowned for its real-time 3D content platform, encountered a significant data quality issue that severely impacted its business. The company's Pinpointer tool suffered from inaccurate data ingestion from a large customer, resulting in approximately $110 million revenue loss. This incident not only affected their financials but also led to a
37% drop in Unity's shares, highlighting the critical importance of maintaining high data quality standards in business operations. There are many such big loss cases which can check here.
By implementing continuous data validation during migration, you can mitigate these risks and ensure that your Databricks pipelines provide the same (or better) quality of data as your on-premise systems. This validation ensures consistency, accuracy, completeness, and validity.

Key Data Quality Dimensions to Consider

Before delving into the actual testing strategies, it's essential to identify the dimensions of data quality that need to be checked:
  • Completeness: All required data fields and records must be present.
  • Accuracy: Data must reflect correct values and be free from errors.
  • Consistency: Data should remain consistent across different systems and formats.
  • Validity: The data must conform to specified formats and ranges.
  • Uniqueness: There should be no duplicate records.
 

Preparing Data for Testing

Choosing Between Full and Sample Datasets

Below are the advantages and disadvantages of using full dataset and sample dataset.
Full Dataset
Sample Dataset
When to use it
You want comprehensive testing under production-like conditions.
You need fast feedback during development.
Advantages
Covers all edge cases and gives accurate performance metrics.
Faster and less resource-heavy, can focus on targeted tests.
Disadvantages
Resource-intensive and time-consuming.
May miss rare edge cases or performance bottlenecks.

Best Practices for Sampling

  • Random Sampling: Random sampling is a method where a random subset of the data is selected from the overall dataset. This method provides a quick and unbiased way to test the pipeline and validate data without having to process the entire dataset. It's particularly useful for general-purpose testing during the early stages of migration or development, where broad coverage is required but performance constraints are a concern.
Example:
-- Random sample of 1% data SELECT * FROM source_table WHERE RAND() <= 0.01;
 
  • Stratified Sampling: Stratified sampling involves dividing the data into distinct categories or "strata" and sampling from each category proportionally. This method ensures that all relevant categories of data are represented in the sample. For example, in telco data, categories may include customer regions, customer types (e.g., prepaid vs. postpaid), or types of usage (e.g., mobile data, voice, SMS).
    • Example(Stratified sampling by customer_type):
      -- Stratified sample, selecting 1% from each customer type SELECT * FROM billing_data WHERE customer_type = 'prepaid' AND RAND() <= 0.01 UNION ALL SELECT * FROM billing_data WHERE customer_type = 'postpaid' AND RAND() <= 0.01;
 
  • Edge Case Sampling: Edge case sampling focuses on manually selecting records that are known to cause issues or fall outside the normal distribution of the data. For example, in telco data, this might include customers with extraordinarily high data usage or records with missing or inconsistent values. This method is used for testing data quality or handling anomalies
    • Example (Edge case sampling for high download data usage):
      -- Select records where data usage exceeds 100 GB SELECT * FROM usage_data WHERE dl_usage_gb > 100;
 

Data Required to Test Pipelines

Before we dive into the test cases, it’s important to understand the required data for validating data pipelines:
  • On-prem Input: The data produced by on-prem systems before any transformation.
  • On-prem Output: The final output produced by the on-prem pipeline after transformation.
  • Databricks Input: The data ingested by Databricks pipelines (usually the same as on-prem input but after ingestion into the cloud).
  • Databricks Output: The final output produced by the Databricks pipeline.
Typically, you'll be comparing on-prem output with the Databricks output. However, some tests might involve validating against the input to ensure consistency in ingestion.

Setting Up Pytest for Spark and DataFrames

To enable automated testing, we will use Pytest along with Spark to compare the datasets. Unit Test Best Practices with Pytest

To ensure effective unit testing, follow these best practices:
  • Modular Tests: Each test case should be independent and reusable.
  • Fixtures: Use Pytest fixtures to set up and reuse common resources like Spark sessions and configurations.
  • Assertions: Provide clear and informative assertion messages.
  • Test Structure: Follow the Arrange-Act-Assert pattern for clarity.

Configuration for Data Quality Parameters

Create a config.json file with the necessary configurations. This file should include paths to the S3 bucket and Delta tables, as well as other relevant configurations.
Example config.json:
{ "application_name": "DataValidationApp", "pipeline_configuration": { "dbx_output_configuration": { "target_table_name": "your_delta_table" }, "onprem_configuration": { "format": "parquet", "source_data_s3_path": "s3://your-bucket/onpremise-source-data", "output_data_s3_path": "s3://your-bucket/onpremise-output-data" }, "data_quality_configuration": { "null_checks": ["column1", "column2"], "range_checks": { "arpu": {"min": 100, "max": 5000}, "call_duration": {"min": 0, "max": 60} } }, "unique_checks": ["imei", "phone"], "statistical_checks": ['arpu', 'data_usage'], "aggregation_checks": {"call_duration": "sum", "arpu": "avg"}, "min_max_checks": ["call_duration", "arpu"], "aggregation_checks": { "sum(call_duration)": "sum", "avg(arpu)": "avg" }, "date_format_checks": ["date_col1", "date_col2"] } }
 

Spark Session Fixture

We will start by setting up a fixture that initializes a Spark session and loads the pipeline configuration. This fixture will be used across all test cases to ensure the Spark context is available.
@pytest.fixture(scope="module") def spark(): config_path = "./config.json" spark, pipeline_config = initialize_pipeline(config_path) yield spark, pipeline_config
 

DataFrame Fixture

The fixture will load the dataframes for comparison — one from the on-prem data (usually stored in S3 or another external storage) and the other from the Delta table in Databricks.
@pytest.fixture(scope="module") def dataframes(spark): spark_session, pipeline_config = spark onprem_config = pipeline_config.onprem_configuration endpoint_config = pipeline_config.endpoint_configuration # Load on-prem data from S3 onprem_df = ( spark_session.read.format(onprem_config.format) .option("recursiveFileLookup", "true") .option("inferSchema", "true") .load(onprem_config.output_s3_path) ) # Load Databricks data from the Delta table delta_df = spark_session.read.table(endpoint_config.target_table_name) return onprem_df, delta_df
 

Data Quality Testing Approaches

Now that your data is in Databricks,here are the essential tests you need to run to ensure data quality:

Completeness Tests

These tests ensure that no records or essential data fields are missing after migration.
  • Row Count Matching
The purpose of this test is to verify that the row counts in the Databricks pipeline output match those in the on-premise output. By comparing row counts between the source and target datasets, this test checks for potential data loss or duplication during migration, ensuring that the migrated dataset maintains the same completeness as the original.
def test_row_count_matching(dataframes): onprem_df, delta_df = dataframes assert onprem_df.count() == delta_df.count(), "Row count mismatch between on-premise and Databricks output"
  • Null Count Validation
This test aims to compare null counts in both the on-premise and Databricks datasets, ensuring that missing values are handled consistently across the migration. By validating that each column has the same number of nulls in both datasets, you can confirm that any missing or incomplete data points have been accurately transferred without introducing inconsistencies.
def test_null_counts(dataframes): onprem_df, delta_df = dataframes onprem_null_counts = {col: onprem_df.filter(onprem_df[col].isNull()).count() for col in onprem_df.columns} delta_null_counts = {col: delta_df.filter(delta_df[col].isNull()).count() for col in delta_df.columns} assert onprem_null_counts == delta_null_counts, f"Null count mismatch: {onprem_null_counts} != {delta_null_counts}"

Consistency Tests

Accuracy tests ensure that the data values are correct and accurate after migration.
  • Data Consistency
The goal of this test is to ensure that the data between on-prem and Databricks is fully consistent by identifying any mismatches. By performing a difference operation between the two datasets, this test confirms that no discrepancies exist in the data values, providing assurance that the data content has been accurately migrated without alterations.
def test_data_consistency(dataframes): onprem_df, delta_df = dataframes diff_df = onprem_df.subtract(delta_df).union(delta_df.subtract(onprem_df)) assert diff_df.count() == 0, "Data inconsistency between on-premise and Databricks output"
  • Schema Validation
This test is used to validate that the schema, including column names and data types, in Databricks matches the schema of the on-premise output. By confirming that both schemas are identical, this test helps ensure that the structural definition of the data has been preserved in the migration, preventing schema-related issues in downstream analyses.
def test_schema_validation(dataframes): onprem_df, delta_df = dataframes onprem_schema = onprem_df.dtypes delta_schema = delta_df.dtypes assert onprem_schema == delta_schema, f"Schema mismatch: {onprem_schema} != {delta_schema}"

Uniqueness Tests

Uniqueness tests ensure that there are no duplicate records in key columns.
  • Unique Values Check
This test is designed to verify that key columns maintain unique values across the datasets. By ensuring that the count of distinct values in each key column matches between on-premise and Databricks, the test confirms that duplicate records have not been introduced during migration, preserving data quality.
def test_unique_values(dataframes): onprem_df, delta_df = dataframes key_columns = pipeline_configuration.["data_quality_configuration"]["unique_checks"] for column in key_columns: onprem_unique = onprem_df.select(column).distinct().count() delta_unique = delta_df.select(column).distinct().count() assert onprem_unique == delta_unique, f"Unique value mismatch for column {column}"
  • Duplicate Records
This test ensures that the number of duplicate records in key columns matches between the on-premise and Databricks outputs. By identifying duplicate records in each key column and comparing counts between both datasets, this test verifies that no additional duplicates were introduced, maintaining data uniqueness.
def test_duplicate_records(dataframes): onprem_df, delta_df = dataframes key_columns = pipeline_configuration["data_quality_configuration"]["unique_checks"] onprem_duplicates = onprem_df.groupBy(key_columns).count().filter("count > 1").count() delta_duplicates = delta_df.groupBy(key_columns).count().filter("count > 1").count() assert onprem_duplicates == delta_duplicates, "Duplicate record count mismatch"

Validity Tests

Validity checks ensure that the data adheres to defined formats, ranges, and constraints.
  • Date Format Consistency
This test checks if the date formats for specified columns are consistent between the on-premise and Databricks datasets. By identifying distinct date formats in each specified column and comparing them, the test ensures that date format consistency is maintained across datasets, helping prevent formatting-related errors in downstream processes.
def test_date_format_consistency(dataframes): date_columns = ["date_col"] onprem_df, delta_df = dataframes for column in date_columns: onprem_date_formats = onprem_df.select(column).distinct().collect() delta_date_formats = delta_df.select(column).distinct().collect() assert onprem_date_formats == delta_date_formats, f"Date format consistency mismatch for column {column}"

  • Column Value Range
The purpose of this test is to ensure that the values in specified columns fall within the expected range in both dataframes. By defining acceptable minimum and maximum values for each column and validating that values stay within these ranges, this test helps to maintain data integrity by preventing the presence of outliers or erroneous values that fall outside the defined limits.
def test_column_value_range(dataframes, config): onprem_df, delta_df = dataframes for column, bounds in pipeline_configuration.["data_quality_configuration"]["range_checks"].items(): min_val, max_val = bounds["min"], bounds["max"] onprem_range_valid = onprem_df.filter((onprem_df[column] < min_val) | (onprem_df[column] > max_val)).count() == 0 delta_range_valid = delta_df.filter((delta_df[column] < min_val) | (delta_df[column] > max_val)).count() == 0 assert onprem_range_valid and delta_range_valid, f"Column value range mismatch for column {column}"

Accuracy Tests

These tests ensure that transformed and processed data remains consistent with the source and meets expectations.
  • Aggregate Validation
This test validates that aggregate functions, such as sum and average, return consistent results across both datasets. By performing aggregation on specified columns and comparing results between on-premise and Databricks, the test ensures that calculations are accurately maintained through migration, preserving critical metrics.
def test_compare_aggregated_values(dataframes): onprem_df, delta_df = dataframes aggregate_checks = pipeline_configuration.["data_quality_configuration"]["aggragate_checks"] # Build the aggregation dictionary for Spark aggregations = {key.split('(')[1].replace(')', ''): value for key, value in aggregation_checks.items()} # Perform the aggregation on both the on-prem and delta dataframes delta_agg = delta_df.agg(aggregations).collect()[0] onprem_agg = onprem_df.agg(aggregations).collect()[0] # Validate each aggregation from the config for agg_key, agg_function in aggregation_checks.items(): column_name = agg_key.split('(')[1].replace(')', '') assert delta_agg[f"{agg_function}({column_name})"] == onprem_agg[f"{agg_function}({column_name})"], \ f"{agg_function.capitalize()} for {column_name} mismatch between on-premise and Databricks output"

  • Min and Max Value Validation
The goal of this test is to ensure that the minimum and maximum values for critical columns match between on-prem and Databricks datasets. By checking min and max values across datasets, this test helps to verify that the data’s range remains consistent, highlighting potential discrepancies caused by data transformations.
def test_min_max_values(dataframes): onprem_df, delta_df = dataframes columns = pipeline_configuration.["data_quality_configuration"]["min_max_checks"] for column in columns: onprem_min = onprem_df.agg({column: "min"}).collect()[0][0] delta_min = delta_df.agg({column: "min"}).collect()[0][0] onprem_max = onprem_df.agg({column: "max"}).collect()[0][0] delta_max = delta_df.agg({column: "max"}).collect()[0][0] assert onprem_min == delta_min, f"Min value mismatch for {column}" assert onprem_max == delta_max, f"Max value mismatch for {column}"

  • Statistical Summary
This test compares the statistical summary (e.g., mean, standard deviation) of specified numerical columns across both dataframes. By calculating and comparing statistical summaries for each column, this test ensures consistency in data distributions, helping to identify any significant shifts or inconsistencies introduced during migration.
def test_statistical_summary(dataframes): numerical_columns = pipeline_configuration.["data_quality_configuration"]["statistical_checks"] onprem_df, delta_df = dataframes for col in numerical_columns: onprem_summary = onprem_df.describe([col]).collect() delta_summary = delta_df.describe([col]).collect() assert onprem_summary == delta_summary, f"Statistical summary mismatch in column {col}"

Conclusion

Migrating data pipelines from on-premise systems to cloud platforms like Databricks offers substantial benefits in scalability, performance, and efficiency. However, it also presents challenges, particularly in maintaining data quality throughout the migration. Ensuring that your pipelines continue to deliver reliable, accurate data is crucial for preserving the integrity of your analytics, machine learning, and decision-making processes.
To mitigate risks, it’s vital to address key data quality dimensions—completeness, accuracy, consistency, and validity—as part of your migration plan. For this guide we have used Pytest but you can also leverage automated validation tools such as, Great Expectations, and Delta Live Tables which can streamline this process and ensure thorough testing at every stage.
By combining strategic oversight with detailed, automated test cases, you can ensure that your data pipeline migration not only maintains but enhances data quality standards, setting the foundation for ongoing operational success in your cloud environment.

Related Posts