Introduction:
Running an incremental ingest involves checking the destination table for files already loaded and excluding them from the insert/copy into. Adding an aggregating index to the destination table can speed up this check. This article demonstrates how to create and use an aggregating index to optimize incremental ingestion. By the end of this article, you’ll understand how to create and use an aggregating index with the games
table for efficient incremental ingestion.
TL;DR:
-
Create Games Table: Define the table to store aggregated gaming data.
-
Create Aggregating Index: Create an index to accelerate data aggregation.
-
Incremental Ingestion: Use the index to load new data incrementally.
Step 1: Step-by-Step Guide:
All the example SQL code uses the Ultra Fast Gaming data set.
Create the Games Tables
Establish the games
table. This table will hold detailed game information, including the timestamp of the source files ingested.
Example SQL code:
CREATE EXTERNAL TABLE IF NOT EXISTS ext_games (
gameid INTEGER,
title TEXT,
abbreviation TEXT,
series TEXT,
version NUMERIC(10, 2),
gamedescription TEXT,
category TEXT,
launchdate DATE,
author TEXT,
supportedplatforms ARRAY (TEXT null),
gameconfiguration TEXT
)
URL = 's3://firebolt-sample-datasets-public-us-east-1/gaming/parquet/games/'
OBJECT_PATTERN = '*'
TYPE = (PARQUET);
CREATE TABLE IF NOT EXISTS games (
gameid INTEGER,
title TEXT,
abbreviation TEXT,
series TEXT,
version NUMERIC(10, 2),
gamedescription TEXT,
category TEXT,
launchdate DATE,
author TEXT,
supportedplatforms ARRAY (TEXT null),
gameconfiguration TEXT,
source_file_name TEXT,
source_file_timestamp TIMESTAMPNTZ
) PRIMARY INDEX gameid, title;
Step 2: Create the Aggregating Index
Develop an aggregating index to identify that latest source_file_timestamp.
Example SQL code:
CREATE AGGREGATING INDEX ix_games_max_source_file_timestamp
ON games
(
MAX(source_file_timestamp )
);
Step 3: Incremental Ingestion
Load new gaming data into games
using the aggregating index to maintain accurate data aggregation. After the initial load, the aggregating index will be used to quickly return the latest source_file_timestamp in games
. This will be faster than scanning all the rows in the table to get it.
Example SQL code:
INSERT INTO games
SELECT
*,
$SOURCE_FILE_NAME,
$SOURCE_FILE_TIMESTAMP
FROM ext_games
WHERE $SOURCE_FILE_TIMESTAMP > (SELECT COALESCE(MAX(source_file_timestamp),'1970-01-01'::TIMESTAMPNTZ) FROM games);
Full Example Code
CREATE EXTERNAL TABLE IF NOT EXISTS ext_games (
gameid INTEGER,
title TEXT,
abbreviation TEXT,
series TEXT,
version NUMERIC(10, 2),
gamedescription TEXT,
category TEXT,
launchdate DATE,
author TEXT,
supportedplatforms ARRAY (TEXT null),
gameconfiguration TEXT
)
URL = 's3://firebolt-sample-datasets-public-us-east-1/gaming/parquet/games/'
OBJECT_PATTERN = '*'
TYPE = (PARQUET);
CREATE TABLE IF NOT EXISTS games (
gameid INTEGER,
title TEXT,
abbreviation TEXT,
series TEXT,
version NUMERIC(10, 2),
gamedescription TEXT,
category TEXT,
launchdate DATE,
author TEXT,
supportedplatforms ARRAY (TEXT null),
gameconfiguration TEXT,
source_file_name TEXT,
source_file_timestamp TIMESTAMPNTZ
) PRIMARY INDEX gameid, title;
CREATE AGGREGATING INDEX ix_games_max_source_file_timestamp
ON games
(
MAX(source_file_timestamp )
);
INSERT INTO games
SELECT
*,
$SOURCE_FILE_NAME,
$SOURCE_FILE_TIMESTAMP
FROM ext_games
WHERE $SOURCE_FILE_TIMESTAMP > (SELECT COALESCE(MAX(source_file_timestamp),'1970-01-01'::TIMESTAMPNTZ) FROM games);