Handling upserts via CDC and the LEDGERING approach

Problem Statement

How do you handle upserts (deletes, updates and inserts) when you have a CDC (change data capture) solution in place to capture all changes to source database rows and copy them over to S3?

This article describes one possible way to handle this, while focusing on

  • keeping a full audit trail of changes in the data warehouse

  • having an immutable table where data is only appended

  • using Firebolt's aggregating indexes to get performance at scale as the data grows

The approach we are taking is called ledgering and is very similar to the ledgering process in accounting, where every change is recorded as a difference between previous state and new state, so the final state is merely an aggregated view of all the changes across time.

Benefits of this approach - immutability of data, performance, keeps history of events through time

Downsides - seemingly complex to implement and maintain (until you read through this article and see the code below )

Recommended Architecture

Main concept:

The main concept of ledgering is to treat changes to your transactional data as an append-only log of events.

This means that you never update data in your main transactional table - you only append rows whenever something changes in the source system. The change can be a deleted row, a completely new row or a change in an existing row (either an attribute change or an amount/measure change).

image.png

Data flow

Data flows into S3 in the form of files that are prepared with the CDC process, which extracts data from the source system when changes occur on specific entities/tables.

The external table defined in Firebolt is used to access those files on S3, both for the initial ingestion (historical backfill) and continuous/incremental ingestion.

A staging fact table is defined in Firebolt to act as a temporary table from which we will get the changes into the final table. We are using this instead of the external table for better performance.

The final (payments) fact table is used to hold all of the transactional data, including some metadata to know when each row was changed and why. There is a view on top of this table which, in combination with an aggregating index, serves as the source of truth about transactions. All downstream systems that want to use the transactions data need to query this view. The fact table underneath this view holds all of the deltas (changes) to payments, while the view represents data on an aggregated level and shows the latest status.

This is the detailed flow, and a SQL example is provided at the end of this article:

  1. Set up and initial ingestion

    1. Set up the DDL (create all tables, views, indexes)

      1. Note that for this demo we use a fact table instead of an external table and we mimic files on S3 by populating this fact table with a few insert statements

    2. Perform the initial ingestion (historical backfill)

      1. First we load the increment from the external table into the staging table

      2. Then we load from the staging table into the final fact table

        1. Assumption here is that the final fact table is empty; we are starting from blank

        2. All rows are inserted with the action i (initial load)

        3. A city hash value is calculated out of all the attributes that are subject to change; we will use this hash together with the amount (or any other measure) to track changes

      3. Clean up the staging table

  2. Continuous ingestion

    1. Whenever there is a change in the source system, the CDC process will upload new data into the S3 bucket. Each row has an action which describes why this row was captured on the source - it could be a new row, a change to an existing row, or a deleted row)

      1. We mimic this process by populating our external with a few insert statements

      2. Then we kick off the merge process which includes the ledgering logic:

        1. Load the increment from the external table into the staging table

        2. Get the rows from the staging table that are newer (don’t exist in) the final table

        3. Left join these new rows to the final table based on the natural (business) key

        4. Determine which of the rows are new and insert those (action i - new row inserted)

          1. Also, if a row is updated at the source (u) and not found in the final table, it will be treated as an insert with a specific note

        5. Determine which of the rows are deleted and insert those with a reverse amount (action d - row deleted at source)

          1. If the deleted row is not found in the final table, it will be inserted with a 0 amount with a specific note

        6. Determine which of the rows have an amount change and insert those as a delta between the last (existing) amount and the new amount (action u - existing row updated)

        7. Determine which of the rows have an attribute change (the hash value is different between the existing row and the new row) and insert those as:

          1. Reverse amount of the original row (action d - deleted because of an attribute change)

          2. Insert the new row with the new attributes and amount (action i - inserted because of an attribute change)

      3. Clean up the staging table

Small Example:

This is a small example of payments data flowing into Firebolt. SQL queries to simulate this are provided below. We’ll keep it simple:

Initially, you sell a Coca Cola bottle for $2.50, a Sprite bottle for $1.75 and a bottle of wine for $9.99

This gets inserted into Firebolt as three rows in your payments table:

Payment ID Product Amount Action
1 Coca Cola $2.50 I - new row
2 Sprite $1.75 I - new row
3 Wine $9.99 I - new row

Then you delete the Sprite bottle (it was returned to the store as it had a leak), you sell a can of beer for $2.00, you update the bottle of wine to $8.50 (there was a discount on the bottle that wasn’t accounted for initially), and you change the Coca Cola bottle for Pepsi (there was a mistake when entering this transaction in the system)

This gets inserted into Firebolt as five rows, like this:

Payment ID Product Amount Action
1 Coca Cola $2.50 I - new row
2 Sprite $1.75 I - new row
3 Wine $9.99 I - new row
2 Sprite -$1.75 D - deleted at source
4 Beer $2.00 I - new row
3 Wine -$1.49 U - updated at source
1 Coca Cola -$2.50 D - deleted because of attribute change
1 Pepsi $2.50 I - inserted because of attribute change

As you can see, your payments table acts as a ledger and keeps all events that happened throughout time. It is immutable, history is preserved and you can get a clear view or lineage across your transactions.

To get the latest state of your table, you have a view on top of it that is simply an aggregate across the rows (with the Payment ID and Product attributes), summarize the amount and remove anything that’s net zero. So you would get this when you query the view:

Payment ID Product Amount
1 Pepsi $2.50
3 Wine $8.50
4 Beer $2.00

That’s it - simple as that. Due to Firebolt’s aggregating index, the read performance of this view will be great, even as data volume grows.

Sample code

The code sample below simulates an initial load and then a few increments that come in, and contain new rows, deleted rows and updates to existing rows.

Data:

You can find some sample data with a randomly generated dataset here (gz.csv files):

You can test with the code snippet below. First create the DDLs, then copy the initial load file to your bucket and load that in. Then copy the increment 1 to the bucket and load that to see the changes that were made. You can do the same for increment 2, etc.

Code:

 --initial clean upDROP TABLE IF EXISTS "DEMO_external_payments";DROP TABLE IF EXISTS "DEMO_staging_payments" CASCADE;DROP TABLE IF EXISTS "DEMO_final_payments" CASCADE;--external table pointing to the data on S3CREATE EXTERNAL TABLE IF NOT EXISTS "DEMO_external_payments" (    payment_id INT NOT NULL,     customer_id INT NOT NULL,    product_id INT NOT NULL,    amount FLOAT NOT NULL,    action TEXT NOT NULL) URL = 's3://your-bucket/'OBJECT_PATTERN = "*.csv.gz"TYPE = (CSV SKIP_HEADER_ROWS = 0)COMPRESSION = GZIP; --staging table to act as a buffer between the external table and the final table (can be cleaned up after each ingestion cycle)CREATE FACT TABLE IF NOT EXISTS "DEMO_staging_payments" (    payment_id INT NOT NULL,    customer_id INT NOT NULL,    product_id INT NOT NULL,    created_updated_at TIMESTAMP NOT NULL,    amount FLOAT NOT NULL,    action TEXT NOT NULL,     meta_hash_row LONG NOT NULL,    SOURCE_FILE_NAME TEXT NOT NULL,    SOURCE_FILE_TIMESTAMP TIMESTAMP NOT NULL) PRIMARY INDEX payment_id;--our main/final fact table used, stores all transactions in the ledger formatCREATE FACT TABLE IF NOT EXISTS "DEMO_final_payments" (    payment_id INT NOT NULL,     customer_id INT NOT NULL,    product_id INT NOT NULL,    created_updated_at TIMESTAMP NOT NULL,    amount FLOAT NOT NULL,    action TEXT NOT NULL,     meta_notes TEXT NOT NULL,    meta_hash_row LONG NOT NULL,    SOURCE_FILE_NAME TEXT NOT NULL,    SOURCE_FILE_TIMESTAMP TIMESTAMP NOT NULL) PRIMARY INDEX payment_id;--aggregate indexes to speed up queriesCREATE AND GENERATE AGGREGATING INDEX DEMO_staging_payments_agg_idx_max_timestamp ON DEMO_staging_payments(MAX(SOURCE_FILE_TIMESTAMP));CREATE AND GENERATE AGGREGATING INDEX DEMO_final_payments_agg_idx_max_timestamp ON DEMO_final_payments(MAX(SOURCE_FILE_TIMESTAMP));CREATE AND GENERATE AGGREGATING INDEX DEMO_final_payments_agg_idx_max_val ON DEMO_final_payments(payment_id, customer_id, product_id, meta_hash_row, SUM(amount), MAX(created_updated_at));CREATE AND GENERATE AGGREGATING INDEX DEMO_final_payments_agg_idx_max_aggregations ON DEMO_final_payments(customer_id, product_id, SUM(amount));CREATE VIEW IF NOT EXISTS DEMO_final_payments_view AS    SELECT payment_id, customer_id, product_id, meta_hash_row, SUM(amount) AS total_amount, MAX(created_updated_at) AS updated_at    FROM DEMO_final_payments     GROUP BY payment_id, customer_id, product_id, meta_hash_row    HAVING SUM(amount) > 0;--intial backfill into the staging table and into the final table, clean up the staging tableINSERT INTO DEMO_staging_payments (    payment_id,    customer_id,     product_id,    created_updated_at,    amount,    action,    meta_hash_row,    SOURCE_FILE_NAME,    SOURCE_FILE_TIMESTAMP)SELECT    payment_id,    customer_id,    product_id,    NOW(),    amount,    action,    CITY_HASH(customer_id, product_id),    "SOURCE_FILE_NAME",    "SOURCE_FILE_TIMESTAMP"FROM DEMO_external_payments WHERE "SOURCE_FILE_TIMESTAMP" > (SELECT MAX("SOURCE_FILE_TIMESTAMP") FROM DEMO_staging_payments);--insert some dummy initial data (in the real process, you would use the above commented approach)--INSERT INTO DEMO_staging_payments VALUES(1, 1, 1, NOW(), 2.50, 'i', CITY_HASH(1, 1), 'initial data.csv', NOW());--INSERT INTO DEMO_staging_payments VALUES(2, 1, 2, NOW(), 1.75, 'i', CITY_HASH(1, 2), 'initial data.csv', NOW());--INSERT INTO DEMO_staging_payments VALUES(3, 1, 4, NOW(), 9.99, 'i', CITY_HASH(1, 4), 'initial data.csv', NOW());INSERT INTO DEMO_final_payments (    payment_id,    customer_id,    product_id,    created_updated_at,    amount,    action,    meta_notes,    meta_hash_row,    SOURCE_FILE_NAME,    SOURCE_FILE_TIMESTAMP)SELECT    payment_id,    customer_id,    product_id,    NOW(),    amount,    action,    'initial load',    meta_hash_row,    "SOURCE_FILE_NAME",    "SOURCE_FILE_TIMESTAMP"FROM DEMO_staging_payments WHERE "SOURCE_FILE_TIMESTAMP" > (SELECT MAX("SOURCE_FILE_TIMESTAMP") FROM DEMO_final_payments);--now simulate CDC with another batch of data coming into the external tableINSERT INTO DEMO_staging_payments (    payment_id,    customer_id,    product_id,    created_updated_at,    amount,    action,    meta_hash_row,    SOURCE_FILE_NAME,    SOURCE_FILE_TIMESTAMP)SELECT    payment_id,    customer_id,    product_id,    NOW(),    amount,    action,    CITY_HASH(customer_id, product_id),    "SOURCE_FILE_NAME",    "SOURCE_FILE_TIMESTAMP"FROM DEMO_external_payments WHERE "SOURCE_FILE_TIMESTAMP" > (SELECT MAX("SOURCE_FILE_TIMESTAMP") FROM DEMO_staging_payments);--insert some dummy incremental/CDC data (in the real process, you would use the above commented approach)--INSERT INTO DEMO_staging_payments VALUES(2, 1, 2, NOW(), 1.75, 'd', CITY_HASH(1, 2), 'incremental data.csv', NOW());--INSERT INTO DEMO_staging_payments VALUES(4, 1, 5, NOW(), 2.00, 'i', CITY_HASH(1, 5), 'incremental data.csv', NOW());--INSERT INTO DEMO_staging_payments VALUES(3, 1, 4, NOW(), 8.50, 'u', CITY_HASH(1, 4), 'incremental data.csv', NOW());--INSERT INTO DEMO_staging_payments VALUES(1, 1, 3, NOW(), 2.50, 'i', CITY_HASH(1, 3), 'initial data.csv', NOW());--ledgering processINSERT INTO DEMO_final_payments (    payment_id,    customer_id,    product_id,    created_updated_at,    amount,    action,    meta_notes,    meta_hash_row,    SOURCE_FILE_NAME,    SOURCE_FILE_TIMESTAMP)WITH new AS ( --new data from the staging table    SELECT * FROM DEMO_staging_payments    WHERE SOURCE_FILE_TIMESTAMP > (SELECT MAX(SOURCE_FILE_TIMESTAMP) FROM DEMO_final_payments)),existing AS ( --existing data, latest state for transactions in scope (existing ones that need update/delete)	SELECT * FROM DEMO_final_payments_view	WHERE payment_id IN (SELECT DISTINCT payment_id FROM new))-- get new data where there are changes to the amount (everything else the same), a new row is inserted or an existing row is deleted - insert new row or insert the delta amount in case of delete/updateSELECT new.payment_id    , CASE         WHEN new.action = 'd' AND new.meta_hash_row <> existing.meta_hash_row THEN existing.customer_id        ELSE new.customer_id    END as customer_id    , CASE         WHEN new.action = 'd' AND new.meta_hash_row <> existing.meta_hash_row THEN existing.product_id        ELSE new.product_id    END as product_id    , new.created_updated_at	, CASE 	    WHEN new.action = 'd' AND existing.payment_id IS NULL THEN 0		WHEN new.action = 'd' OR new.meta_hash_row <> existing.meta_hash_row THEN coalesce(- existing.total_amount, 0)		ELSE new.amount - coalesce(existing.total_amount, 0)	END AS amount	, CASE     	WHEN new.meta_hash_row <> existing.meta_hash_row THEN 'd'    	WHEN new.action = 'u' AND existing.payment_id IS NULL THEN 'i'	    ELSE new.action	END AS action	, CASE     	WHEN new.action = 'd' AND existing.payment_id IS NULL THEN 'deleted exsiting row at source with amount 0 (deleted on source but not found)'    	WHEN new.action = 'd' AND new.meta_hash_row <> existing.meta_hash_row THEN 'deleted exsiting row at source with opposite amount and same attributes (deleted on source but not found)'    	WHEN new.action = 'd' THEN 'deleted exsiting row at source'    	WHEN new.action = 'i' THEN 'inserted new row at source'    	WHEN new.action = 'u' AND existing.payment_id IS NULL THEN 'inserted as new row (updated on source but not found)'    	WHEN new.action = 'u' THEN 'updated existing row at source'	END AS notes	, CASE         WHEN new.action = 'd' AND new.meta_hash_row <> existing.meta_hash_row THEN existing.meta_hash_row        ELSE new.meta_hash_row    END as meta_hash_row	, new.SOURCE_FILE_NAME	, new.SOURCE_FILE_TIMESTAMPFROM new LEFT OUTER JOIN existing ON new.payment_id = existing.payment_idWHERE     new.amount != coalesce(existing.total_amount, 0) OR new.action in ('d', 'i')UNION ALL-- get new data where there are changes to the fields (attributes)) - delete the old record with the old attributes (negative amount)SELECT new.payment_id    , existing.customer_id    , existing.product_id    , new.created_updated_at	, - existing.total_amount AS amount	, 'd' AS action	, 'delete existing row - attribute change'	, existing.meta_hash_row	, new.SOURCE_FILE_NAME	, new.SOURCE_FILE_TIMESTAMPFROM new LEFT OUTER JOIN existing ON new.payment_id = existing.payment_idWHERE     new.action in ('u') AND new.meta_hash_row != existing.meta_hash_rowUNION ALL-- get new data where there are changes to the fields (attributes)) - insert the new record (with the new attributes)SELECT new.payment_id    , new.customer_id    , new.product_id    , new.created_updated_at	, new.amount AS amount	, 'i' AS action	, 'insert new row - attribute change'	, new.meta_hash_row	, new.SOURCE_FILE_NAME	, new.SOURCE_FILE_TIMESTAMPFROM new LEFT OUTER JOIN existing ON new.payment_id = existing.payment_idWHERE     new.action in ('u') AND new.meta_hash_row != existing.meta_hash_row;    --show a trail of changesSELECT * FROM DEMO_final_payments WHERE payment_id = 3 ORDER BY created_updated_at, action;--show latest valueSELECT * FROM DEMO_final_payments_view WHERE payment_id = 3;

A few assumptions:

  • the CDC process captures the action, whether it’s an insert (i), delete (d) or update (u) to the existing rows

  • the files that the CDC process generates cannot contain multiple changes to the same transaction within the same file (this will not work properly in this sample implementation; it can be extended if needed by separating the incremental load into batches when comparing the staging and final fact table)

  • each transaction has a natural (primary / business) key that uniquely identifies a “business transaction”

  • for simplification, changes to dimensions are not part of this tutorial and could be handled as SCD (slowly changing dimension) if needed

  • it is assumed that the CDC process generates data frequently, which (usually) means that the data volume of the “deltas” is not large; this is important to note as we are using a fact-to-fact join between the staging and the final table to determine the changes that occurred since the last merge process happened

  • performance optimization could happen in a few places here - for example with materializing the CTE (using AS MATERIALIZED), doing more pruning on the existing data or having another aggregating index defined in the merge process, etc.

  • if you have multiple attributes that you need to track for changes, extend the CITY_HASH function to include all those attributes (this is done only during ingestion)

  • if you have multiple measures (like amount, quantity, etc.) that you need to track for changes, extend the comparison of the measures between the staging and final fact tables, and the corresponding view/aggregating index