How to Perform an Upsert in Firebolt

Introduction

Performing an upsert (update or insert) operation is crucial when you have incoming data that contains both new and existing records. This is common in scenarios where incremental data loads include new customers and updates to existing customer information. By the end of this article, you will know how to efficiently perform an upsert operation in Firebolt.

TL;DR

  • Create a control table to track the latest timestamp.

  • Create a staging table for updates.

  • Delete existing records that need updates.

  • Insert new and updated records.

  • Drop temporary tables.

Step-by-step guide

All the example SQL code uses the Ultra Fast Gaming data set. To familiarize yourself with this data set, visit this link: Ultra Fast Gaming Firebolt Sample Dataset.

NOTE: For any tables that will be using the UPSERT process, it will be necessary to create an external table and use INSERT rather than COPY FROM. For the examples below, here is the external table definition:

CREATE EXTERNAL TABLE IF NOT EXISTS ex_players (
  PlayerID INTEGER,
  Nickname TEXT,
  Email TEXT,
  AgeCategory TEXT,
  Platforms ARRAY (TEXT NULL),
  RegisteredOn PGDATE,
  IsSubscribedToNewsletter BOOLEAN,
  InternalProbabilityToWin DOUBLE PRECISION) 
URL = 's3://firebolt-sample-datasets-public-us-east-1/gaming/parquet/players/'
OBJECT_PATTERN = '*'
TYPE = (PARQUET);

Step 1: Create a control table to track the latest timestamp

To identify which source data needs to be added, create a control table that holds the maximum date in the fact table.

Example SQL code:

CREATE TABLE IF NOT EXISTS control_maxdate AS (
SELECT MAX(source_file_timestamp) AS max_time
FROM players
);

This step ensures that only new records from the source data are considered for the upsert operation by keeping track of the latest timestamp.

Step 2: Create a staging table for updates

Create a staging table to contain records from your source file that need to be updated in your destination fact table.

Example SQL code:

CREATE TABLE IF NOT EXISTS updates_table AS (
WITH external_table AS (
SELECT *,
  $SOURCE_FILE_NAME AS source_file_name_new,
  $SOURCE_FILE_TIMESTAMP AS source_file_timestamp_new,
FROM ex_players
WHERE $source_file_timestamp > (SELECT max_time FROM control_maxdate)
AND playerid IN (SELECT DISTINCT playerid FROM players)
)
SELECT
e.* 
FROM players f
INNER JOIN external_table e
ON f.playerid = e.playerid
);

This staging table will temporarily hold the rows that are updates. Rows in the base table that have the same player_id and will need to be deleted.

Step 3: Delete existing records that have updates

Remove any records in the main table that also exist in the updates staging table to avoid duplication.

Example SQL code:

DELETE FROM players
WHERE playerid IN (SELECT playerid FROM updates_table);

This ensures that outdated records are removed, making space for updated ones.

Step 4: Insert new and updated records

Insert the records from the updates staging table into the main table, replacing the deleted records.

Example SQL code:

INSERT INTO players
SELECT
playerid,
nickname,
email,
agecategory,
platforms,
registeredon,
issubscribedtonewsletter,
internalprobabilitytowin,
source_file_name_new,
source_file_timestamp_new
FROM updates_table;

Then, insert any new records from your source file that do not already exist in your destination fact table.

Example SQL code:

INSERT INTO players
SELECT *,
$SOURCE_FILE_NAME,
$SOURCE_FILE_TIMESTAMP
FROM ex_players
WHERE $SOURCE_FILE_TIMESTAMP > (SELECT max_time FROM control_maxdate)
AND playerid NOT IN (SELECT playerid FROM players);

Step 5: Drop temporary tables

After the upsert operation is complete, drop the temporary date control and staging tables.

Example SQL code:

DROP TABLE IF EXISTS control_maxdate;
DROP TABLE IF EXISTS updates_table;

Full Example Code

CREATE EXTERNAL TABLE IF NOT EXISTS ex_players (
  PlayerID INTEGER,
  Nickname TEXT,
  Email TEXT,
  AgeCategory TEXT,
  Platforms ARRAY (TEXT NULL),
  RegisteredOn PGDATE,
  IsSubscribedToNewsletter BOOLEAN,
  InternalProbabilityToWin DOUBLE PRECISION) 
URL = 's3://firebolt-sample-datasets-public-us-east-1/gaming/parquet/players/'
OBJECT_PATTERN = '*'
TYPE = (PARQUET);

CREATE TABLE IF NOT EXISTS control_maxdate AS (
SELECT MAX(source_file_timestamp) AS max_time
FROM players
);

CREATE TABLE IF NOT EXISTS updates_table AS (
WITH external_table AS (
SELECT *,
  $SOURCE_FILE_NAME AS source_file_name_new,
  $SOURCE_FILE_TIMESTAMP AS source_file_timestamp_new,
FROM ex_players
WHERE $source_file_timestamp > (SELECT max_time FROM control_maxdate)
AND playerid IN (SELECT DISTINCT playerid FROM players)
)
SELECT
e.* 
FROM players f
INNER JOIN external_table e
ON f.playerid = e.playerid
);

DELETE FROM players
WHERE playerid IN (SELECT playerid FROM updates_table);

INSERT INTO players
SELECT
playerid,
nickname,
email,
agecategory,
platforms,
registeredon,
issubscribedtonewsletter,
internalprobabilitytowin,
source_file_name_new,
source_file_timestamp_new
FROM updates_table;

INSERT INTO players
SELECT *,
$SOURCE_FILE_NAME,
$SOURCE_FILE_TIMESTAMP
FROM ex_players
WHERE $SOURCE_FILE_TIMESTAMP > (SELECT max_time FROM control_maxdate)
AND playerid NOT IN (SELECT playerid FROM players);

DROP TABLE IF NOT EXISTS control_maxdate;
DROP TABLE IF NOT EXISTS updates_table;