Skip to main content

Building a Robust Customer Data Validation System with Databricks

This document outlines the approaches, tools, and techniques for building a robust customer data validation system using Databricks. It provides alternatives for library-based and direct implementation methods to validate customer data against country-specific rules stored in a generic rules table and incorporates validation using external APIs for phone numbers and geographic fields.

Overview of the Use Case

The primary goal is to validate customer data files containing fields like customer_code, trading_name, address, phone_number, and tax_data. Validation involves:

  1. Country-specific rules (stored in a table).
  2. External APIs for validating:
    • Phone Numbers: Ensure correct format and existence.
    • Geography Data: Validate city, state, and country mappings.
  3. Handling optional fields, which may still require validation for correctness.

Approaches to Implement Customer Validation

1. Validation Based on Country-Specific Rules

1.1. Library-Based Validation Approaches
  • Great Expectations: For batch validation with dynamic rule creation.
  • Cerberus: For lightweight, JSON-schema-like record validation.
  • Pydantic: For model-driven, strict validation with Python typing.
1.1. Great Expectations

Usage: Ideal for batch data validation and generating data quality reports.

  • Steps:
    1. Load the dataset and the generic rules table.
    2. Filter rules by country.
    3. Define expectations dynamically based on filtered rules.
  • Code Example:
import great_expectations as ge
import pandas as pd

# Load the data and rules
rules_df = pd.read_csv("/mnt/datalake/generic_country_rules.csv")
data_file = "/mnt/datalake/customer_data.csv"
df = ge.read_csv(data_file)

# Filter for country-specific rules
country_rules = rules_df[rules_df["iso_country_code"] == "US"]

# Apply expectations dynamically
for _, rule in country_rules.iterrows():
if rule["field_name"] == "phone_number":
df.expect_column_values_to_match_regex("phone_number", rule["check_mask_exp"])
elif rule["field_name"] == "tax_data" and rule["mandatory_ind"] == "X":
df.expect_column_values_to_be_between(
"tax_data", min_value=rule["check_exact_lo"], max_value=rule["check_max_hi"]
)

# Validate and get results
results = df.validate()
print(results)
1.2. Cerberus

Usage: Best for validating individual records dynamically with JSON-like schemas.

  • Steps:
    1. Build a schema dynamically for each country.
    2. Validate each record individually.
  • Code Example:

from cerberus import Validator import pandas as pd

Load rules and data

rules_df = pd.read_csv("/mnt/datalake/generic_country_rules.csv") country_rules = rules_df[rules_df["iso_country_code"] == "US"]

Build schema

schema = {} for _, rule in country_rules.iterrows(): schema[rule["field_name"]] = {"type": "string"} if rule["mandatory_ind"] == "X": schema[rule["field_name"]]["required"] = True if rule["check_mask_exp"]: schema[rule["field_name"]]["regex"] = rule["check_mask_exp"]

Validator

v = Validator(schema)

Validate records

customer_data = pd.read_csv("/mnt/datalake/customer_data.csv") for _, record in customer_data.iterrows(): if v.validate(record.to_dict()): print("Valid Record:", record["customer_code"]) else: print("Invalid Record:", v.errors)

1.3. Pydantic

Usage: Suitable for strict typing and model-based validation.

  • Steps:
    1. Create dynamic models using the rules table.
    2. Validate records using these models.
  • Code Example:

from pydantic import BaseModel, Field, ValidationError import pandas as pd

Load rules and filter by country

rules_df = pd.read_csv("/mnt/datalake/generic_country_rules.csv") country_rules = rules_df[rules_df["iso_country_code"] == "US"]

Dynamic Model

class Customer(BaseModel): pass

for _, rule in country_rules.iterrows(): field_name = rule["field_name"] if rule["mandatory_ind"] == "X": setattr(Customer, field_name, (str, Field(...))) else: setattr(Customer, field_name, (str, Field(default=None)))

Validate records

customer_data = pd.read_csv("/mnt/datalake/customer_data.csv") for _, record in customer_data.iterrows(): try: customer = Customer(**record.to_dict()) print("Valid Record:", customer.dict()) except ValidationError as e: print("Invalid Record:", e.json())

2. Direct Approach Without Libraries

  • Perform rule checks using Python constructs and integrate APIs for further validation.

Code Snippet (API Integration Example):

Existing rule checks as before

def validate_customer(record, rules): errors = [] for field, rule in rules.items(): value = record.get(field, "") if rule["mandatory_ind"] == "X" and not value: errors.append(f" is mandatory") if rule["check_mask_exp"] and not re.match(rule["check_mask_exp"], str(value)): errors.append(f" does not match format") return errors

Extended with API validation

for _, record in customer_data.iterrows(): errors = validate_customer(record.to_dict(), rules) if not validate_phone_api(record["phone_number"], record["country_code"]): errors.append("Invalid Phone Number") if not validate_geo_api(record["city"], record["state"], record["country"]): errors.append("Invalid Geography")

if errors: print(f"Invalid Record: {record['customer_code']}, Errors: ") else: print(f"Valid Record: {record['customer_code']}")

Key Takeaways

  1. Integrated Validation:
    • Combine rule-based and API-based validations for comprehensive checks.
    • Ensure accuracy for both format-driven and real-time validations.
  2. Extensibility:
    • Add new API validations or rules without significant rework.
    • Modularize API integration for reuse across different workflows.
  3. Error Reporting:
    • Segregate records with detailed error logs to aid debugging.

Next Steps

  1. Proof of Concept:
    • Implement country-specific validation using one of the suggested approaches.
    • Integrate phone and geography validation APIs in a modular format.
  2. Testing and Reporting:
    • Generate separate reports for valid and invalid records.
    • Write the outputs back to the Data Lake for further processing.
  3. Scalability:
    • Optimize the validation logic to handle large datasets efficiently.

Optimal Approach

Why Use Pydantic with a Custom Mapping Layer:

  1. Dynamic Field Mapping:
    • Use a configuration or mapping layer to align source fields with generic rule table fields.
    • Handles discrepancies in field names dynamically.
  2. Country-Specific Schema Models:
    • Define Pydantic models dynamically for each country using the rules from the generic table.
    • Enables country-specific validation for mandatory checks, tax formats, and field lengths.
  3. Record-Level Validation:
    • Since we have a manageable dataset , Pydantic's record-level validation won't significantly impact performance.
  4. API Integration:
    • Easily incorporate phone and geography validation APIs at the record level during or after the schema validation process.
Was this page helpful?