Skip to main content
Version: Next

Aggregation Merge Engine

Overview

The Aggregation Merge Engine is designed for scenarios where users only care about aggregated results rather than individual records. It aggregates each value field with the latest data one by one under the same primary key according to the specified aggregate function.

Each field not part of the primary keys can be assigned an aggregate function. The recommended way depends on the client you are working with:

  • For Flink SQL or Spark SQL, use DDL and connector options ('fields.<field-name>.agg')
  • For Java clients, use the Schema API

If no function is specified for a field, it will use last_value_ignore_nulls aggregation as the default behavior.

This merge engine is useful for real-time aggregation scenarios such as:

  • Computing running totals and statistics
  • Maintaining counters and metrics
  • Tracking maximum/minimum values over time
  • Building real-time dashboards and analytics

Configuration

To enable the aggregation merge engine, set the following table property:

CREATE TABLE product_stats (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
last_update_time TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.price.agg' = 'max',
'fields.sales.agg' = 'sum'
-- last_update_time defaults to 'last_value_ignore_nulls'
);

Specify the aggregate function for each non-primary key field using connector options:

'fields.<field-name>.agg' = '<function-name>'

Usage Examples

Creating a Table with Aggregation

CREATE TABLE product_stats (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
last_update_time TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.price.agg' = 'max',
'fields.sales.agg' = 'sum'
-- last_update_time defaults to 'last_value_ignore_nulls'
);

Writing Data

-- Insert data - these will be aggregated
INSERT INTO product_stats VALUES
(1, 23.0, 15, TIMESTAMP '2024-01-01 10:00:00'),
(1, 30.2, 20, TIMESTAMP '2024-01-01 11:00:00'); -- Same primary key - triggers aggregation

Querying Results

SELECT * FROM product_stats;

Result after aggregation:

+------------+-------+-------+---------------------+
| product_id | price | sales | last_update_time |
+------------+-------+-------+---------------------+
| 1 | 30.2 | 35 | 2024-01-01 11:00:00 |
+------------+-------+-------+---------------------+
  • product_id: 1
  • price: 30.2 (max of 23.0 and 30.2)
  • sales: 35 (sum of 15 and 20)
  • last_update_time: 2024-01-01 11:00:00 (last non-null value)

Supported Aggregate Functions

Fluss currently supports the following aggregate functions:

sum

Aggregates values by computing the sum across multiple rows.

  • Supported Data Types: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL
  • Behavior: Adds incoming values to the accumulator
  • Null Handling: Null values are ignored

Example:

CREATE TABLE test_sum (
id BIGINT,
amount DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.amount.agg' = 'sum'
);

INSERT INTO test_sum VALUES
(1, 100.50),
(1, 200.75);

SELECT * FROM test_sum;
+------------+---------+
| id | amount |
+------------+---------+
| 1 | 301.25 |
+------------+---------+

product

Computes the product of values across multiple rows.

  • Supported Data Types: TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL
  • Behavior: Multiplies incoming values with the accumulator
  • Null Handling: Null values are ignored

Example:

CREATE TABLE test_product (
id BIGINT,
discount_factor DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.discount_factor.agg' = 'product'
);

INSERT INTO test_product VALUES
(1, 0.9),
(1, 0.8);

SELECT * FROM test_product;
+------------+---------+
| id | amount |
+------------+---------+
| 1 | 0.72 |
+------------+---------+

max

Identifies and retains the maximum value.

  • Supported Data Types: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
  • Behavior: Keeps the larger value between accumulator and incoming value
  • Null Handling: Null values are ignored

Example:

CREATE TABLE test_max (
id BIGINT,
temperature DOUBLE,
reading_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.temperature.agg' = 'max',
'fields.reading_time.agg' = 'max'
);

INSERT INTO test_max VALUES
(1, 25.5, TIMESTAMP '2024-01-01 10:00:00'),
(1, 28.3, TIMESTAMP '2024-01-01 11:00:00');

SELECT * FROM test_max;
+------------+----------------+---------------------+
| id | temperature | reading_time |
+------------+----------------+---------------------+
| 1 | 28.3 | 2024-01-01 11:00:00 |
+------------+----------------+---------------------+

min

Identifies and retains the minimum value.

  • Supported Data Types: CHAR, STRING, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
  • Behavior: Keeps the smaller value between accumulator and incoming value
  • Null Handling: Null values are ignored

Example:

CREATE TABLE test_min  (
id BIGINT,
lowest_price DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.lowest_price.agg' = 'min'
);

INSERT INTO test_min VALUES
(1, 99.99),
(1, 79.99),
(1, 89.99);

SELECT * FROM test_min;
+------------+--------------+
| id | lowest_price |
+------------+--------------+
| 1 | 79.99 |
+------------+--------------+

last_value

Replaces the previous value with the most recently received value.

  • Supported Data Types: All data types
  • Behavior: Always uses the latest incoming value
  • Null Handling: Null values will overwrite previous values

Example:

CREATE TABLE test_last_value  (
id BIGINT,
status STRING,
last_login TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.status.agg' = 'last_value',
'fields.last_login.agg' = 'last_value'
);


INSERT INTO test_last_value VALUES
(1, 'online', TIMESTAMP '2024-01-01 10:00:00'),
(1, 'offline', TIMESTAMP '2024-01-01 11:00:00'),
(1, null, TIMESTAMP '2024-01-01 12:00:00'); -- Null overwrites previous 'offline' value

SELECT * FROM test_last_value;
+------------+---------+---------------------+
| id | status | last_login |
+------------+---------+---------------------+
| 1 | NULL | 2024-01-01 12:00:00 |
+------------+---------+---------------------+

Key behavior: Null values overwrite existing values, treating null as a valid value to be stored.

last_value_ignore_nulls

Replaces the previous value with the latest non-null value. This is the default aggregate function when no function is specified.

  • Supported Data Types: All data types
  • Behavior: Uses the latest incoming value only if it's not null
  • Null Handling: Null values are ignored, previous value is retained

Example:

CREATE TABLE test_last_value_ignore_nulls  (
id BIGINT,
email STRING,
phone STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.email.agg' = 'last_value_ignore_nulls',
'fields.phone.agg' = 'last_value_ignore_nulls'
);


INSERT INTO test_last_value_ignore_nulls VALUES
(1, 'user@example.com', '123-456'),
(1, null, '789-012'), -- Null is ignored, email retains previous value
(1, 'new@example.com', null);

SELECT * FROM test_last_value_ignore_nulls;
+------------+-------------------+---------+
| id | email | phone |
+------------+-------------------+---------+
| 1 | new@example.com | 789-012 |
+------------+-------------------+---------+

Key behavior: Null values do not overwrite existing non-null values, making this function ideal for maintaining the most recent valid data.

first_value

Retrieves and retains the first value seen for a field.

  • Supported Data Types: All data types
  • Behavior: Keeps the first received value, ignores all subsequent values
  • Null Handling: Null values are retained if received first

Example:

CREATE TABLE test_first_value  (
id BIGINT,
first_purchase_date DATE,
first_product STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.first_purchase_date.agg' = 'first_value',
'fields.first_product.agg' = 'first_value'
);

INSERT INTO test_first_value VALUES
(1, '2024-01-01', 'ProductA'),
(1, '2024-02-01', 'ProductB'); -- Ignored, first value retained

SELECT * FROM test_first_value;
+------------+---------------------+---------------+
| id | first_purchase_date | first_product |
+------------+---------------------+---------------+
| 1 | 2024-01-01 | ProductA |
+------------+---------------------+---------------+

first_value_ignore_nulls

Selects the first non-null value in a data set.

  • Supported Data Types: All data types
  • Behavior: Keeps the first received non-null value, ignores all subsequent values
  • Null Handling: Null values are ignored until a non-null value is received

Example:

CREATE TABLE test_first_value_ignore_nulls  (
id BIGINT,
email STRING,
verified_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.email.agg' = 'first_value_ignore_nulls',
'fields.verified_at.agg' = 'first_value_ignore_nulls'
);

INSERT INTO test_first_value_ignore_nulls VALUES
(1, null, null),
(1, 'user@example.com', '2024-01-01 10:00:00'),
(1, 'other@example.com', '2024-01-02 10:00:00'); -- Only the first non-null value is retained

SELECT * FROM test_first_value_ignore_nulls;
+------------+-------------------+---------------------+
| id | email | verified_at |
+------------+-------------------+---------------------+
| 1 | user@example.com | 2024-01-01 10:00:00 |
+------------+-------------------+---------------------+

listagg

Concatenates multiple string values into a single string with a delimiter.

  • Supported Data Types: STRING, CHAR
  • Behavior: Concatenates values using the specified delimiter
  • Null Handling: Null values are skipped
  • Delimiter: Specify delimiter directly in the aggregation function (default is comma ,)

Example:

CREATE TABLE test_listagg  (
id BIGINT,
tags1 STRING,
tags2 STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.tags1.agg' = 'listagg',
'fields.tags2.agg' = 'listagg',
'fields.tags2.delimiter' = ';' -- Specify delimiter inline
);

INSERT INTO test_listagg VALUES
(1, 'developer', 'developer'),
(1, 'java', 'java'),
(1, 'flink', 'flink');

SELECT * FROM test_listagg;
+------------+-----------------------+-----------------------+
| id | tags1 | tags2 |
+------------+-----------------------+-----------------------+
| 1 | developer,java,flink | developer;java;flink |
+------------+-----------------------+-----------------------+

string_agg

Alias for listagg. Concatenates multiple string values into a single string with a delimiter.

  • Supported Data Types: STRING, CHAR
  • Behavior: Same as listagg - concatenates values using the specified delimiter
  • Null Handling: Null values are skipped
  • Delimiter: Specify delimiter directly in the aggregation function (default is comma ,)

Example:

CREATE TABLE test_string_agg  (
id BIGINT,
tags1 STRING,
tags2 STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.tags1.agg' = 'string_agg',
'fields.tags2.agg' = 'string_agg',
'fields.tags2.delimiter' = ';' -- Specify delimiter inline
);

INSERT INTO test_string_agg VALUES
(1, 'developer', 'developer'),
(1, 'java', 'java'),
(1, 'flink', 'flink');

SELECT * FROM test_string_agg;
+------------+-----------------------+-----------------------+
| id | tags1 | tags2 |
+------------+-----------------------+-----------------------+
| 1 | developer,java,flink | developer;java;flink |
+------------+-----------------------+-----------------------+

bool_and

Evaluates whether all boolean values in a set are true (logical AND).

  • Supported Data Types: BOOLEAN
  • Behavior: Returns true only if all values are true
  • Null Handling: Null values are ignored

Example:

CREATE TABLE test_bool_and  (
id BIGINT,
has_all_permissions BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.has_all_permissions.agg' = 'bool_and'
);

INSERT INTO test_bool_and VALUES
(1, true),
(1, true),
(1, false);

SELECT * FROM test_bool_and;
+------------+----------------------+
| id | has_all_permissions |
+------------+----------------------+
| 1 | false |
+------------+----------------------+

bool_or

Checks if at least one boolean value in a set is true (logical OR).

  • Supported Data Types: BOOLEAN
  • Behavior: Returns true if any value is true
  • Null Handling: Null values are ignored

Example:

CREATE TABLE test_bool_or  (
id BIGINT,
has_any_alert BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'table.merge-engine' = 'aggregation',
'fields.has_any_alert.agg' = 'bool_or'
);

INSERT INTO test_bool_or VALUES
(1, false),
(1, false),
(1, true);

SELECT * FROM test_bool_or;
+------------+------------------+
| id | has_any_alert |
+------------+------------------+
| 1 | true |
+------------+------------------+

Delete Behavior

The aggregation merge engine provides limited support for delete operations. You can configure the behavior using the 'table.delete.behavior' option:

TableDescriptor.builder()
.schema(schema)
.property("table.merge-engine", "aggregation")
.property("table.delete.behavior", "allow") // Enable delete operations
.build();

Configuration options:

  • 'table.delete.behavior' = 'ignore' (default): Delete operations will be silently ignored without error
  • 'table.delete.behavior' = 'disable': Delete operations will be rejected with a clear error message
  • 'table.delete.behavior' = 'allow': Delete operations will remove records based on the update mode (see details below)

Delete Behavior with Different Update Modes

When 'table.delete.behavior' = 'allow', the actual delete behavior depends on whether you are using full update or partial update:

Full Update (Default Write Mode):

  • Delete operations remove the entire record from the table
  • All aggregated values for that primary key are permanently lost

Example:

// Full update mode (default)
UpsertWriter writer = table.newUpsert().createWriter();
writer.delete(primaryKeyRow); // Removes the entire record

Partial Update Mode:

  • Delete operations perform a partial delete on target columns only
  • Target columns (except primary key): Set to null
  • Non-target columns: Remain unchanged
  • Special case: If all non-target columns are null after the delete, the entire record is removed

Example:

// Partial update mode - only targeting specific columns
UpsertWriter partialWriter = table.newUpsert()
.partialUpdate("id", "count1", "sum1") // Target columns
.createWriter();

// Delete will:
// - Set count1 and sum1 to null
// - Keep count2 and sum2 unchanged (non-target columns)
// - Remove entire record only if count2 and sum2 are both null
partialWriter.delete(primaryKeyRow);
note

Current Limitation: The aggregation merge engine does not support retraction semantics (e.g., subtracting from a sum, reverting a max).

  • Full update mode: Delete operations can only remove the entire record
  • Partial update mode: Delete operations can only null out target columns, not retract aggregated values

Future versions may support fine-grained retraction by enhancing the protocol to carry row data with delete operations.

Limitations

Critical Limitations

When using the aggregation merge engine, be aware of the following critical limitations:

Exactly-Once Semantics

When writing to an aggregate merge engine table using the Flink engine, Fluss does provide exactly-once guarantees. Thanks to Flink's checkpointing mechanism, in the event of a failure and recovery, the Flink connector automatically performs an undo operation to roll back the table state to what it was at the last successful checkpoint. This ensures no over-counting or under-counting: data remains consistent and accurate.

However, when using the Fluss client API directly (outside of Flink), exactly-once is not provided out of the box. In such cases, users must implement their own recovery logic (similar to what the Flink connector does) by explicitly resetting the table state to a previous version by performing undo operations.

For detailed information about Exactly-Once implementation, please refer to: FIP-21: Aggregation Merge Engine

See Also