Skip to main content

DMS to Snowflake End-to-End Pipeline

This document reflects the current implementation in:

  • infra/server_infra/dms_pipeline.py
  • infra/server_infra/lambda_pii_cleaner/pii_cleaner.py
  • infra/server_infra/__main__.py

Purpose​

Replicate selected MongoDB collections to a RAW S3 bucket, project allowlisted fields into clean NDJSON, and optionally notify Snowpipe via S3 to SQS events.

High-Level Flow​

Processing Sequence (Implemented)​

  1. DMS reads MongoDB and writes compressed CSV to RAW S3 target:
    • bucket_folder: mongodb-data
    • data_format: csv
    • compression_type: GZIP
    • timestamp_column_name: dms_replicated_at
    • add_column_name: true
    • migration_type: full-load-and-cdc
  2. DMS table mappings include only collections present in EXPECTED_FIELDS_FOR_ANALYTICS keys.
  3. RAW bucket notification invokes pii_cleaner Lambda for s3:ObjectCreated:* with prefix mongodb-data/legion/.
  4. Lambda validation and routing:
    • Skips objects not in RAW_BUCKET (if set), not under RAW_PREFIX, or already under CLEAN_PREFIX.
    • Infers collection from key pattern RAW_PREFIX/<db>/<collection>/....
  5. Lambda projection:
    • Looks up per-collection mask from EXPECTED_FIELDS_FOR_ANALYTICS.
    • Adds passthrough fields automatically: oid__id, dms_replicated_at.
    • For nested masks, parses JSON cells and recursively projects nested objects/arrays.
  6. Lambda output:
    • Converts each input row to one NDJSON line.
    • Uploads gzip output with ContentType=application/x-ndjson and ContentEncoding=gzip.
    • Destination key rewrites RAW prefix to CLEAN prefix and normalizes suffix to .json.gz.
  7. Optional Snowpipe notification:
    • Configured only when both create_shared_bucket and snowpipe-sqs-arn are set.
    • Event filter prefix is <environment>-<region>/clean/.

PII Cleaner​

File: infra/server_infra/lambda_pii_cleaner/pii_cleaner.py

Simple flow:

  1. Get RAW CSV file from S3.
  2. Read rows from the file.
  3. Keep only allowlisted fields (EXPECTED_FIELDS_FOR_ANALYTICS) plus oid__id and dms_replicated_at.
  4. Write cleaned rows as NDJSON (.json.gz) to the CLEAN prefix.

If you need to change behavior:

  1. Fields/collections: infra/server_infra/dms_pipeline.py (EXPECTED_FIELDS_FOR_ANALYTICS).
  2. Cleaner logic: infra/server_infra/lambda_pii_cleaner/pii_cleaner.py.
  3. Tests: infra/server_infra/lambda_pii_cleaner/test_pii_cleaner.py.

Source of Truth​

EXPECTED_FIELDS_FOR_ANALYTICS drives both:

  • DMS collection inclusion.
  • Cleaner field and nested-field allowlisting.
  • Automatic passthrough fields for every collection: oid__id, dms_replicated_at.

This doc intentionally does not list concrete collections/fields to avoid drift.
Use infra/server_infra/dms_pipeline.py as the current source.

How to Add Fields and Collections​

All changes start in EXPECTED_FIELDS_FOR_ANALYTICS in infra/server_infra/dms_pipeline.py.

  1. Add fields to an existing collection:
    • Update that collection mask in EXPECTED_FIELDS_FOR_ANALYTICS (top-level or nested mask).
    • Deploy the infra stack in every region where deploy_dms=True.
    • New files will include the new fields after the updated Lambda is deployed.
    • Example (audit before -> after):
# before
"audit": {
"customer_id": True,
"array_nodes": {
"data": {
"label": True,
},
},
}

# after
"audit": {
"customer_id": True,
"status": True, # new top-level field
"array_nodes": {
"data": {
"label": True,
"step_id": True, # new nested field
},
},
}
  • Result object shape (one cleaned NDJSON line):
{
"customer_id": "cust_123",
"status": "open",
"array_nodes": [
{
"data": {
"label": "Start",
"step_id": "step_1"
}
}
],
"oid__id": "67bd3e9f1",
"dms_replicated_at": "2026-02-22T10:15:00Z"
}
  • Notes:
  • Only allowlisted fields are kept.
  • For arrays, use the exact column name as it appears in RAW CSV headers, and the nested mask is applied to each element.
  • MongoDB -> DMS export names this field as array_nodes, so use array_nodes in EXPECTED_FIELDS_FOR_ANALYTICS (not nodes).
  • oid__id and dms_replicated_at are added automatically.
  • Missing or NULL fields are omitted from the output object.
  1. Add a new collection:
    • Add a new top-level key in EXPECTED_FIELDS_FOR_ANALYTICS with its allowlisted fields.
    • Deploy the infra stack in every region where deploy_dms=True (table mappings are generated from this dict).
    • Start the DMS replication task in each deployed region after deployment.
    • Simple example (add new cases collection):
"cases": {
"id": True,
"customer_id": True,
}
  • Manual start is required because the task is configured with start_replication_task=False.
  1. If database name is not legion:
    • Update the RAW S3 to Lambda notification prefix (currently hardcoded as mongodb-data/legion/) so new collection files trigger the cleaner.

Bucket and Region Behavior​

  • RAW bucket name pattern: legion-analytics-<region>-<environment><stamp_suffix>.
  • CLEAN bucket name is fixed: legion-clean-analytics.
  • CLEAN prefix is <environment>-<region>/clean/.
  • Shared CLEAN bucket creation is gated by region == us-west-2 && environment == production, but is created using the us-east-1 provider.
  • Non-primary stacks reference the existing CLEAN bucket name and do not create bucket-level resources there.

Lifecycle and Retention​

  • RAW bucket has:
    • A general transition rule at 30 days to STANDARD_IA.
    • An explicit expiration rule for mongodb-data/ at 7 days.
  • CLEAN bucket lifecycle expiration (7 days) is created only in the primary stack and only for that stack's <environment>-<region>/clean/ prefix.

Deployment and Config Gates​

  • DMS pipeline is created only when deploy_dms=True.
  • dms-mongodb-database defaults to legion.
  • dms-instance-class defaults to dms.t3.micro in __main__.py.
  • dms-storage-gb defaults to 20.
  • Snowpipe SQS integration requires snowpipe-sqs-arn and primary-stack bucket creation.

Important Implementation Notes​

  • DMS MongoDB source does not do include-only column filtering; included collections replicate full rows to RAW.
  • PII minimization and allowlisting happen in Lambda before files land in CLEAN.
  • Replication task is created with start_replication_task=False; starting it is a separate operational step.
  • RAW S3 to Lambda notification prefix is hardcoded to mongodb-data/legion/. If dms-mongodb-database changes from legion, this filter must be updated.

Basic Operational Checks​

  • RAW files are arriving under mongodb-data/.
  • Lambda logs show processed row counts and uploaded destination keys.
  • CLEAN objects are .json.gz NDJSON under <environment>-<region>/clean/.
  • S3 to SQS bucket notification exists when Snowpipe integration is enabled.