-- nightly aggregation: source documents → ledger WITH staged AS ( SELECT document_id, source_ref, SUM(line_amount) AS total, DATE_TRUNC(invoice_date, MONTH) AS period FROM warehouse.staging_invoices WHERE ingestion_status = 'PARSED' AND invoice_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY) GROUP BY 1, 2, invoice_date ) SELECT s.period, s.source_ref, COUNT(DISTINCT s.document_id) AS doc_count, ROUND(SUM(s.total), 2) AS gross_amount FROM staged s LEFT JOIN warehouse.dim_sources d ON d.source_ref = s.source_ref WHERE d.is_active = TRUE GROUP BY 1, 2 ORDER BY 1 DESC, gross_amount DESC; -- delta vs prior period for review queue SELECT curr.source_ref, curr.gross_amount, curr.gross_amount - prev.gross_amount AS delta FROM monthly_summary curr LEFT JOIN monthly_summary prev ON prev.source_ref = curr.source_ref AND prev.period = DATE_SUB(curr.period, INTERVAL 1 MONTH); -- nightly aggregation: source documents → ledger WITH staged AS ( SELECT document_id, source_ref, SUM(line_amount) AS total, DATE_TRUNC(invoice_date, MONTH) AS period FROM warehouse.staging_invoices WHERE ingestion_status = 'PARSED' AND invoice_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY) GROUP BY 1, 2, invoice_date ) SELECT s.period, s.source_ref, COUNT(DISTINCT s.document_id) AS doc_count, ROUND(SUM(s.total), 2) AS gross_amount FROM staged s LEFT JOIN warehouse.dim_sources d ON d.source_ref = s.source_ref WHERE d.is_active = TRUE GROUP BY 1, 2 ORDER BY 1 DESC, gross_amount DESC; -- delta vs prior period for review queue SELECT curr.source_ref, curr.gross_amount, curr.gross_amount - prev.gross_amount AS delta FROM monthly_summary curr LEFT JOIN monthly_summary prev ON prev.source_ref = curr.source_ref AND prev.period = DATE_SUB(curr.period, INTERVAL 1 MONTH); -- nightly aggregation: source documents → ledger WITH staged AS ( SELECT document_id, source_ref, SUM(line_amount) AS total, DATE_TRUNC(invoice_date, MONTH) AS period FROM warehouse.staging_invoices WHERE ingestion_status = 'PARSED' AND invoice_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY) GROUP BY 1, 2, invoice_date ) SELECT s.period, s.source_ref, COUNT(DISTINCT s.document_id) AS doc_count, ROUND(SUM(s.total), 2) AS gross_amount FROM staged s LEFT JOIN warehouse.dim_sources d ON d.source_ref = s.source_ref WHERE d.is_active = TRUE GROUP BY 1, 2 ORDER BY 1 DESC, gross_amount DESC; -- delta vs prior period for review queue SELECT curr.source_ref, curr.gross_amount, curr.gross_amount - prev.gross_amount AS delta FROM monthly_summary curr LEFT JOIN monthly_summary prev ON prev.source_ref = curr.source_ref AND prev.period = DATE_SUB(curr.period, INTERVAL 1 MONTH);
-- nightly aggregation: source documents → ledger WITH staged AS ( SELECT document_id, source_ref, SUM(line_amount) AS total, DATE_TRUNC(invoice_date, MONTH) AS period FROM warehouse.staging_invoices WHERE ingestion_status = 'PARSED' AND invoice_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY) GROUP BY 1, 2, invoice_date ) SELECT s.period, s.source_ref, COUNT(DISTINCT s.document_id) AS doc_count, ROUND(SUM(s.total), 2) AS gross_amount FROM staged s LEFT JOIN warehouse.dim_sources d ON d.source_ref = s.source_ref WHERE d.is_active = TRUE GROUP BY 1, 2 ORDER BY 1 DESC, gross_amount DESC; -- delta vs prior period for review queue SELECT curr.source_ref, curr.gross_amount, curr.gross_amount - prev.gross_amount AS delta FROM monthly_summary curr LEFT JOIN monthly_summary prev ON prev.source_ref = curr.source_ref AND prev.period = DATE_SUB(curr.period, INTERVAL 1 MONTH); -- nightly aggregation: source documents → ledger WITH staged AS ( SELECT document_id, source_ref, SUM(line_amount) AS total, DATE_TRUNC(invoice_date, MONTH) AS period FROM warehouse.staging_invoices WHERE ingestion_status = 'PARSED' AND invoice_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY) GROUP BY 1, 2, invoice_date ) SELECT s.period, s.source_ref, COUNT(DISTINCT s.document_id) AS doc_count, ROUND(SUM(s.total), 2) AS gross_amount FROM staged s LEFT JOIN warehouse.dim_sources d ON d.source_ref = s.source_ref WHERE d.is_active = TRUE GROUP BY 1, 2 ORDER BY 1 DESC, gross_amount DESC; -- delta vs prior period for review queue SELECT curr.source_ref, curr.gross_amount, curr.gross_amount - prev.gross_amount AS delta FROM monthly_summary curr LEFT JOIN monthly_summary prev ON prev.source_ref = curr.source_ref AND prev.period = DATE_SUB(curr.period, INTERVAL 1 MONTH); -- nightly aggregation: source documents → ledger WITH staged AS ( SELECT document_id, source_ref, SUM(line_amount) AS total, DATE_TRUNC(invoice_date, MONTH) AS period FROM warehouse.staging_invoices WHERE ingestion_status = 'PARSED' AND invoice_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY) GROUP BY 1, 2, invoice_date ) SELECT s.period, s.source_ref, COUNT(DISTINCT s.document_id) AS doc_count, ROUND(SUM(s.total), 2) AS gross_amount FROM staged s LEFT JOIN warehouse.dim_sources d ON d.source_ref = s.source_ref WHERE d.is_active = TRUE GROUP BY 1, 2 ORDER BY 1 DESC, gross_amount DESC; -- delta vs prior period for review queue SELECT curr.source_ref, curr.gross_amount, curr.gross_amount - prev.gross_amount AS delta FROM monthly_summary curr LEFT JOIN monthly_summary prev ON prev.source_ref = curr.source_ref AND prev.period = DATE_SUB(curr.period, INTERVAL 1 MONTH);
import pandas as pd from dataclasses import dataclass def reconcile_documents( parsed: list[dict], ledger: pd.DataFrame, ) -> pd.DataFrame: """Match parsed source documents against the existing ledger.""" df = pd.DataFrame(parsed) df["unique_key"] = ( df["source_ref"].astype(str).str.strip() + "::" + df["doc_number"].astype(str).str.strip() ) merged = df.merge( ledger[["unique_key", "ledger_id", "status"]], on="unique_key", how="left", indicator=True, ) new_rows = ( merged[merged["_merge"] == "left_only"] .drop(columns=["_merge"]) .assign(ingestion_status="PENDING_REVIEW") .reset_index(drop=True) ) return new_rows def apply_credit_offsets(rows, credit_table): # match credits to source by unique_key # adjust gross_amount in place for idx, r in rows.iterrows(): match = credit_table.get(r["unique_key"]) if match: rows.at[idx, "gross_amount"] -= match.amount return rows import pandas as pd from dataclasses import dataclass def reconcile_documents( parsed: list[dict], ledger: pd.DataFrame, ) -> pd.DataFrame: """Match parsed source documents against the existing ledger.""" df = pd.DataFrame(parsed) df["unique_key"] = ( df["source_ref"].astype(str).str.strip() + "::" + df["doc_number"].astype(str).str.strip() ) merged = df.merge( ledger[["unique_key", "ledger_id", "status"]], on="unique_key", how="left", indicator=True, ) new_rows = ( merged[merged["_merge"] == "left_only"] .drop(columns=["_merge"]) .assign(ingestion_status="PENDING_REVIEW") .reset_index(drop=True) ) return new_rows def apply_credit_offsets(rows, credit_table): # match credits to source by unique_key # adjust gross_amount in place for idx, r in rows.iterrows(): match = credit_table.get(r["unique_key"]) if match: rows.at[idx, "gross_amount"] -= match.amount return rows import pandas as pd from dataclasses import dataclass def reconcile_documents( parsed: list[dict], ledger: pd.DataFrame, ) -> pd.DataFrame: """Match parsed source documents against the existing ledger.""" df = pd.DataFrame(parsed) df["unique_key"] = ( df["source_ref"].astype(str).str.strip() + "::" + df["doc_number"].astype(str).str.strip() ) merged = df.merge( ledger[["unique_key", "ledger_id", "status"]], on="unique_key", how="left", indicator=True, ) new_rows = ( merged[merged["_merge"] == "left_only"] .drop(columns=["_merge"]) .assign(ingestion_status="PENDING_REVIEW") .reset_index(drop=True) ) return new_rows def apply_credit_offsets(rows, credit_table): # match credits to source by unique_key # adjust gross_amount in place for idx, r in rows.iterrows(): match = credit_table.get(r["unique_key"]) if match: rows.at[idx, "gross_amount"] -= match.amount return rows
import pandas as pd from dataclasses import dataclass def reconcile_documents( parsed: list[dict], ledger: pd.DataFrame, ) -> pd.DataFrame: """Match parsed source documents against the existing ledger.""" df = pd.DataFrame(parsed) df["unique_key"] = ( df["source_ref"].astype(str).str.strip() + "::" + df["doc_number"].astype(str).str.strip() ) merged = df.merge( ledger[["unique_key", "ledger_id", "status"]], on="unique_key", how="left", indicator=True, ) new_rows = ( merged[merged["_merge"] == "left_only"] .drop(columns=["_merge"]) .assign(ingestion_status="PENDING_REVIEW") .reset_index(drop=True) ) return new_rows def apply_credit_offsets(rows, credit_table): # match credits to source by unique_key # adjust gross_amount in place for idx, r in rows.iterrows(): match = credit_table.get(r["unique_key"]) if match: rows.at[idx, "gross_amount"] -= match.amount return rows import pandas as pd from dataclasses import dataclass def reconcile_documents( parsed: list[dict], ledger: pd.DataFrame, ) -> pd.DataFrame: """Match parsed source documents against the existing ledger.""" df = pd.DataFrame(parsed) df["unique_key"] = ( df["source_ref"].astype(str).str.strip() + "::" + df["doc_number"].astype(str).str.strip() ) merged = df.merge( ledger[["unique_key", "ledger_id", "status"]], on="unique_key", how="left", indicator=True, ) new_rows = ( merged[merged["_merge"] == "left_only"] .drop(columns=["_merge"]) .assign(ingestion_status="PENDING_REVIEW") .reset_index(drop=True) ) return new_rows def apply_credit_offsets(rows, credit_table): # match credits to source by unique_key # adjust gross_amount in place for idx, r in rows.iterrows(): match = credit_table.get(r["unique_key"]) if match: rows.at[idx, "gross_amount"] -= match.amount return rows import pandas as pd from dataclasses import dataclass def reconcile_documents( parsed: list[dict], ledger: pd.DataFrame, ) -> pd.DataFrame: """Match parsed source documents against the existing ledger.""" df = pd.DataFrame(parsed) df["unique_key"] = ( df["source_ref"].astype(str).str.strip() + "::" + df["doc_number"].astype(str).str.strip() ) merged = df.merge( ledger[["unique_key", "ledger_id", "status"]], on="unique_key", how="left", indicator=True, ) new_rows = ( merged[merged["_merge"] == "left_only"] .drop(columns=["_merge"]) .assign(ingestion_status="PENDING_REVIEW") .reset_index(drop=True) ) return new_rows def apply_credit_offsets(rows, credit_table): # match credits to source by unique_key # adjust gross_amount in place for idx, r in rows.iterrows(): match = credit_table.get(r["unique_key"]) if match: rows.at[idx, "gross_amount"] -= match.amount return rows
<!-- weekly operational digest :: generated --> <section class="report" data-period="W42"> <header class="report__head"> <h2 class="report__title"> Operational Summary </h2> <span class="report__meta"> 2026 — Week 42 </span> </header> <table class="report__grid"> <thead> <tr> <th>Period</th> <th>Documents</th> <th>Reconciled</th> <th>Δ</th> </tr> </thead> <tbody> <tr> <td>2026-W41</td> <td>184</td> <td>181</td> <td>+3</td> </tr> <tr> <td>2026-W42</td> <td>207</td> <td>207</td> <td>0</td> </tr> </tbody> </table> </section> <!-- weekly operational digest :: generated --> <section class="report" data-period="W42"> <header class="report__head"> <h2 class="report__title"> Operational Summary </h2> <span class="report__meta"> 2026 — Week 42 </span> </header> <table class="report__grid"> <thead> <tr> <th>Period</th> <th>Documents</th> <th>Reconciled</th> <th>Δ</th> </tr> </thead> <tbody> <tr> <td>2026-W41</td> <td>184</td> <td>181</td> <td>+3</td> </tr> <tr> <td>2026-W42</td> <td>207</td> <td>207</td> <td>0</td> </tr> </tbody> </table> </section> <!-- weekly operational digest :: generated --> <section class="report" data-period="W42"> <header class="report__head"> <h2 class="report__title"> Operational Summary </h2> <span class="report__meta"> 2026 — Week 42 </span> </header> <table class="report__grid"> <thead> <tr> <th>Period</th> <th>Documents</th> <th>Reconciled</th> <th>Δ</th> </tr> </thead> <tbody> <tr> <td>2026-W41</td> <td>184</td> <td>181</td> <td>+3</td> </tr> <tr> <td>2026-W42</td> <td>207</td> <td>207</td> <td>0</td> </tr> </tbody> </table> </section>
<!-- weekly operational digest :: generated --> <section class="report" data-period="W42"> <header class="report__head"> <h2 class="report__title"> Operational Summary </h2> <span class="report__meta"> 2026 — Week 42 </span> </header> <table class="report__grid"> <thead> <tr> <th>Period</th> <th>Documents</th> <th>Reconciled</th> <th>Δ</th> </tr> </thead> <tbody> <tr> <td>2026-W41</td> <td>184</td> <td>181</td> <td>+3</td> </tr> <tr> <td>2026-W42</td> <td>207</td> <td>207</td> <td>0</td> </tr> </tbody> </table> </section> <!-- weekly operational digest :: generated --> <section class="report" data-period="W42"> <header class="report__head"> <h2 class="report__title"> Operational Summary </h2> <span class="report__meta"> 2026 — Week 42 </span> </header> <table class="report__grid"> <thead> <tr> <th>Period</th> <th>Documents</th> <th>Reconciled</th> <th>Δ</th> </tr> </thead> <tbody> <tr> <td>2026-W41</td> <td>184</td> <td>181</td> <td>+3</td> </tr> <tr> <td>2026-W42</td> <td>207</td> <td>207</td> <td>0</td> </tr> </tbody> </table> </section> <!-- weekly operational digest :: generated --> <section class="report" data-period="W42"> <header class="report__head"> <h2 class="report__title"> Operational Summary </h2> <span class="report__meta"> 2026 — Week 42 </span> </header> <table class="report__grid"> <thead> <tr> <th>Period</th> <th>Documents</th> <th>Reconciled</th> <th>Δ</th> </tr> </thead> <tbody> <tr> <td>2026-W41</td> <td>184</td> <td>181</td> <td>+3</td> </tr> <tr> <td>2026-W42</td> <td>207</td> <td>207</td> <td>0</td> </tr> </tbody> </table> </section>
-- staging dedupe + integrity check CREATE OR REPLACE TABLE warehouse.documents_clean AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY unique_key ORDER BY ingested_at DESC ) AS rn FROM warehouse.documents_raw ) WHERE rn = 1 AND document_id IS NOT NULL AND source_ref IS NOT NULL; -- assertion: no duplicate keys downstream SELECT unique_key, COUNT(*) AS hits FROM warehouse.documents_clean GROUP BY 1 HAVING hits > 1; -- expected: 0 rows -- review queue feed SELECT document_id, source_ref, ingestion_status, flagged_at FROM warehouse.documents_clean WHERE ingestion_status IN ( 'PENDING_REVIEW', 'AMOUNT_MISMATCH', 'MISSING_REF' ) ORDER BY flagged_at DESC LIMIT 200; -- staging dedupe + integrity check CREATE OR REPLACE TABLE warehouse.documents_clean AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY unique_key ORDER BY ingested_at DESC ) AS rn FROM warehouse.documents_raw ) WHERE rn = 1 AND document_id IS NOT NULL AND source_ref IS NOT NULL; -- assertion: no duplicate keys downstream SELECT unique_key, COUNT(*) AS hits FROM warehouse.documents_clean GROUP BY 1 HAVING hits > 1; -- expected: 0 rows -- review queue feed SELECT document_id, source_ref, ingestion_status, flagged_at FROM warehouse.documents_clean WHERE ingestion_status IN ( 'PENDING_REVIEW', 'AMOUNT_MISMATCH', 'MISSING_REF' ) ORDER BY flagged_at DESC LIMIT 200; -- staging dedupe + integrity check CREATE OR REPLACE TABLE warehouse.documents_clean AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY unique_key ORDER BY ingested_at DESC ) AS rn FROM warehouse.documents_raw ) WHERE rn = 1 AND document_id IS NOT NULL AND source_ref IS NOT NULL; -- assertion: no duplicate keys downstream SELECT unique_key, COUNT(*) AS hits FROM warehouse.documents_clean GROUP BY 1 HAVING hits > 1; -- expected: 0 rows -- review queue feed SELECT document_id, source_ref, ingestion_status, flagged_at FROM warehouse.documents_clean WHERE ingestion_status IN ( 'PENDING_REVIEW', 'AMOUNT_MISMATCH', 'MISSING_REF' ) ORDER BY flagged_at DESC LIMIT 200;
-- staging dedupe + integrity check CREATE OR REPLACE TABLE warehouse.documents_clean AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY unique_key ORDER BY ingested_at DESC ) AS rn FROM warehouse.documents_raw ) WHERE rn = 1 AND document_id IS NOT NULL AND source_ref IS NOT NULL; -- assertion: no duplicate keys downstream SELECT unique_key, COUNT(*) AS hits FROM warehouse.documents_clean GROUP BY 1 HAVING hits > 1; -- expected: 0 rows -- review queue feed SELECT document_id, source_ref, ingestion_status, flagged_at FROM warehouse.documents_clean WHERE ingestion_status IN ( 'PENDING_REVIEW', 'AMOUNT_MISMATCH', 'MISSING_REF' ) ORDER BY flagged_at DESC LIMIT 200; -- staging dedupe + integrity check CREATE OR REPLACE TABLE warehouse.documents_clean AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY unique_key ORDER BY ingested_at DESC ) AS rn FROM warehouse.documents_raw ) WHERE rn = 1 AND document_id IS NOT NULL AND source_ref IS NOT NULL; -- assertion: no duplicate keys downstream SELECT unique_key, COUNT(*) AS hits FROM warehouse.documents_clean GROUP BY 1 HAVING hits > 1; -- expected: 0 rows -- review queue feed SELECT document_id, source_ref, ingestion_status, flagged_at FROM warehouse.documents_clean WHERE ingestion_status IN ( 'PENDING_REVIEW', 'AMOUNT_MISMATCH', 'MISSING_REF' ) ORDER BY flagged_at DESC LIMIT 200; -- staging dedupe + integrity check CREATE OR REPLACE TABLE warehouse.documents_clean AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY unique_key ORDER BY ingested_at DESC ) AS rn FROM warehouse.documents_raw ) WHERE rn = 1 AND document_id IS NOT NULL AND source_ref IS NOT NULL; -- assertion: no duplicate keys downstream SELECT unique_key, COUNT(*) AS hits FROM warehouse.documents_clean GROUP BY 1 HAVING hits > 1; -- expected: 0 rows -- review queue feed SELECT document_id, source_ref, ingestion_status, flagged_at FROM warehouse.documents_clean WHERE ingestion_status IN ( 'PENDING_REVIEW', 'AMOUNT_MISMATCH', 'MISSING_REF' ) ORDER BY flagged_at DESC LIMIT 200;
CNS Logical.
Data Automation Architects.