DMS to Snowflake End-to-End Pipeline
This document reflects the current implementation in:
infra/server_infra/dms_pipeline.pyinfra/server_infra/lambda_pii_cleaner/pii_cleaner.pyinfra/server_infra/__main__.py
Purposeâ
Replicate selected MongoDB collections to a RAW S3 bucket, project allowlisted fields into clean NDJSON, and optionally notify Snowpipe via S3 to SQS events.
High-Level Flowâ
Processing Sequence (Implemented)â
- DMS reads MongoDB and writes compressed CSV to RAW S3 target:
bucket_folder:mongodb-datadata_format:csvcompression_type:GZIPtimestamp_column_name:dms_replicated_atadd_column_name:truemigration_type:full-load-and-cdc
- DMS table mappings include only collections present in
EXPECTED_FIELDS_FOR_ANALYTICSkeys. - RAW bucket notification invokes
pii_cleanerLambda fors3:ObjectCreated:*with prefixmongodb-data/legion/. - Lambda validation and routing:
- Skips objects not in
RAW_BUCKET(if set), not underRAW_PREFIX, or already underCLEAN_PREFIX. - Infers collection from key pattern
RAW_PREFIX/<db>/<collection>/....
- Skips objects not in
- Lambda projection:
- Looks up per-collection mask from
EXPECTED_FIELDS_FOR_ANALYTICS. - Adds passthrough fields automatically:
oid__id,dms_replicated_at. - For nested masks, parses JSON cells and recursively projects nested objects/arrays.
- Looks up per-collection mask from
- Lambda output:
- Converts each input row to one NDJSON line.
- Uploads gzip output with
ContentType=application/x-ndjsonandContentEncoding=gzip. - Destination key rewrites RAW prefix to CLEAN prefix and normalizes suffix to
.json.gz.
- Optional Snowpipe notification:
- Configured only when both
create_shared_bucketandsnowpipe-sqs-arnare set. - Event filter prefix is
<environment>-<region>/clean/.
- Configured only when both
PII Cleanerâ
File: infra/server_infra/lambda_pii_cleaner/pii_cleaner.py
Simple flow:
- Get RAW CSV file from S3.
- Read rows from the file.
- Keep only allowlisted fields (
EXPECTED_FIELDS_FOR_ANALYTICS) plusoid__idanddms_replicated_at. - Write cleaned rows as NDJSON (
.json.gz) to the CLEAN prefix.
If you need to change behavior:
- Fields/collections:
infra/server_infra/dms_pipeline.py(EXPECTED_FIELDS_FOR_ANALYTICS). - Cleaner logic:
infra/server_infra/lambda_pii_cleaner/pii_cleaner.py. - Tests:
infra/server_infra/lambda_pii_cleaner/test_pii_cleaner.py.
Source of Truthâ
EXPECTED_FIELDS_FOR_ANALYTICS drives both:
- DMS collection inclusion.
- Cleaner field and nested-field allowlisting.
- Automatic passthrough fields for every collection:
oid__id,dms_replicated_at.
This doc intentionally does not list concrete collections/fields to avoid drift.
Use infra/server_infra/dms_pipeline.py as the current source.
How to Add Fields and Collectionsâ
All changes start in EXPECTED_FIELDS_FOR_ANALYTICS in infra/server_infra/dms_pipeline.py.
- Add fields to an existing collection:
- Update that collection mask in
EXPECTED_FIELDS_FOR_ANALYTICS(top-level or nested mask). - Deploy the infra stack in every region where
deploy_dms=True. - New files will include the new fields after the updated Lambda is deployed.
- Example (
auditbefore -> after):
- Update that collection mask in
# before
"audit": {
"customer_id": True,
"array_nodes": {
"data": {
"label": True,
},
},
}
# after
"audit": {
"customer_id": True,
"status": True, # new top-level field
"array_nodes": {
"data": {
"label": True,
"step_id": True, # new nested field
},
},
}
- Result object shape (one cleaned NDJSON line):
{
"customer_id": "cust_123",
"status": "open",
"array_nodes": [
{
"data": {
"label": "Start",
"step_id": "step_1"
}
}
],
"oid__id": "67bd3e9f1",
"dms_replicated_at": "2026-02-22T10:15:00Z"
}
- Notes:
- Only allowlisted fields are kept.
- For arrays, use the exact column name as it appears in RAW CSV headers, and the nested mask is applied to each element.
- MongoDB -> DMS export names this field as
array_nodes, so usearray_nodesinEXPECTED_FIELDS_FOR_ANALYTICS(notnodes). oid__idanddms_replicated_atare added automatically.- Missing or
NULLfields are omitted from the output object.
- Add a new collection:
- Add a new top-level key in
EXPECTED_FIELDS_FOR_ANALYTICSwith its allowlisted fields. - Deploy the infra stack in every region where
deploy_dms=True(table mappings are generated from this dict). - Start the DMS replication task in each deployed region after deployment.
- Simple example (add new
casescollection):
- Add a new top-level key in
"cases": {
"id": True,
"customer_id": True,
}
- Manual start is required because the task is configured with
start_replication_task=False.
- If database name is not
legion:- Update the RAW S3 to Lambda notification prefix (currently hardcoded as
mongodb-data/legion/) so new collection files trigger the cleaner.
- Update the RAW S3 to Lambda notification prefix (currently hardcoded as
Bucket and Region Behaviorâ
- RAW bucket name pattern:
legion-analytics-<region>-<environment><stamp_suffix>. - CLEAN bucket name is fixed:
legion-clean-analytics. - CLEAN prefix is
<environment>-<region>/clean/. - Shared CLEAN bucket creation is gated by
region == us-west-2 && environment == production, but is created using theus-east-1provider. - Non-primary stacks reference the existing CLEAN bucket name and do not create bucket-level resources there.
Lifecycle and Retentionâ
- RAW bucket has:
- A general transition rule at 30 days to
STANDARD_IA. - An explicit expiration rule for
mongodb-data/at 7 days.
- A general transition rule at 30 days to
- CLEAN bucket lifecycle expiration (7 days) is created only in the primary stack and only for that stack's
<environment>-<region>/clean/prefix.
Deployment and Config Gatesâ
- DMS pipeline is created only when
deploy_dms=True. dms-mongodb-databasedefaults tolegion.dms-instance-classdefaults todms.t3.microin__main__.py.dms-storage-gbdefaults to20.- Snowpipe SQS integration requires
snowpipe-sqs-arnand primary-stack bucket creation.
Important Implementation Notesâ
- DMS MongoDB source does not do include-only column filtering; included collections replicate full rows to RAW.
- PII minimization and allowlisting happen in Lambda before files land in CLEAN.
- Replication task is created with
start_replication_task=False; starting it is a separate operational step. - RAW S3 to Lambda notification prefix is hardcoded to
mongodb-data/legion/. Ifdms-mongodb-databasechanges fromlegion, this filter must be updated.
Basic Operational Checksâ
- RAW files are arriving under
mongodb-data/. - Lambda logs show processed row counts and uploaded destination keys.
- CLEAN objects are
.json.gzNDJSON under<environment>-<region>/clean/. - S3 to SQS bucket notification exists when Snowpipe integration is enabled.