You can create and manage native Apache Iceberg tables directly in RisingWave. When you create a native Iceberg table, RisingWave handles its lifecycle (creation, schema, writes), while the underlying data is stored in the open Apache Iceberg format in an object store you configure. This approach allows you to persist data in an open format that is directly queryable by other tools, offering an alternative to RisingWave’s internal storage format. Creating native Iceberg tables provides several benefits:
  • Native management: RisingWave manages the table’s lifecycle. You can interact with it like any other RisingWave table (querying, inserting, and using it in materialized views).
  • Open storage format: Data is physically stored according to the Iceberg specification, ensuring compatibility with the broader Iceberg ecosystem.
  • Simplified pipelines: You don’t need a separate CREATE SINK step to export data into Iceberg format. Data ingested or computed can land directly in these Iceberg tables.
  • Interoperability: Tables you create are standard Iceberg tables and can be read by external Iceberg-compatible query engines (like Spark, Trino, Flink, and Dremio) using the same catalog and storage configuration.
This guide details how to create and use native Iceberg tables in RisingWave.

Setup and usage

1. Create an Iceberg connection

The Iceberg connection contains information about the catalog and object storage. For syntax and properties, see CREATE CONNECTION. The following examples show how to create an Iceberg connection using different catalog types.
These examples use S3 for object storage. You can also use Google Cloud Storage (GCS) or Azure Blob Storage by replacing the S3 parameters with the appropriate parameters for your chosen storage backend. See the Object storage guide for details.
For enhanced security, you can store credentials like access keys as secrets instead of providing them directly. If you wish to use this feature, see Manage secrets.
JDBC catalog
CREATE CONNECTION public.conn WITH (
    type = 'iceberg',
    warehouse.path = '<storage_path>',
    <object_storage_parameters>,
    catalog.type = 'jdbc',
    catalog.uri = 'jdbc:postgresql://127.0.0.1:8432/metadata',
    catalog.jdbc.user = 'postgres',
    catalog.jdbc.password = '123',
    catalog.name = 'dev',
);
For details on catalog configuration parameters, see Catalog configuration.

2. Set connection as default (optional)

To simplify table creation, you can set a default connection for your session. This allows you to create Iceberg tables without specifying the connection each time.
SET iceberg_engine_connection = 'public.conn';

3. Create an Iceberg table

Create a table using the Iceberg engine:
-- With default connection set
CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    properties JSONB,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg;

-- Or specify connection explicitly
CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    properties JSONB,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg
WITH (connection = 'public.conn');

4. Work with your table

Once created, Iceberg tables work like any other RisingWave table:
-- Insert data
INSERT INTO user_events VALUES 
(1, 'login', '2024-01-01 10:00:00', '{"ip": "192.168.1.1"}'),
(2, 'purchase', '2024-01-01 11:00:00', '{"amount": 99.99}');

-- Query data
SELECT * FROM user_events WHERE event_type = 'login';

-- Use in materialized views
CREATE MATERIALIZED VIEW user_login_count AS
SELECT user_id, COUNT(*) as login_count
FROM user_events 
WHERE event_type = 'login'
GROUP BY user_id;

Stream data into Iceberg tables

You can stream data directly from sources into Iceberg tables:
-- Create a Kafka source
CREATE SOURCE user_activity_stream
WITH (
    connector = 'kafka',
    topic = 'user_events',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT JSON;

-- Create materialized view that inserts into Iceberg table
CREATE MATERIALIZED VIEW insert_into_iceberg AS
INSERT INTO user_events
SELECT user_id, event_type, event_time, properties
FROM user_activity_stream;

Time travel

Query historical snapshots of your Iceberg tables:
-- Query a specific snapshot by timestamp
SELECT * FROM user_events FOR SYSTEM_TIME AS OF TIMESTAMPTZ '2024-01-01 12:00:00';

-- Query a specific snapshot by snapshot ID
SELECT * FROM user_events FOR SYSTEM_VERSION AS OF 1234567890;

External access

Tables created with the Iceberg engine are standard Iceberg tables that can be accessed by external tools: Spark:
spark.sql("""
    SELECT * FROM iceberg_catalog.your_database.user_events
    WHERE event_type = 'purchase'
""")
Trino:
SELECT user_id, COUNT(*) as event_count
FROM iceberg.your_database.user_events
GROUP BY user_id;

Partition strategy

RisingWave’s Iceberg table engine supports table partitioning using the partition_by option when creating tables. Partitioning helps organize data for efficient storage and query performance. The supported partition_by formats and examples are as follows:
  • 'column' — single column
  • 'column1,column2' — multiple columns
  • 'bucket(n, column), column2' — bucket partitioning
  • 'column1, truncate(n, column2)' — truncate partitioning
Example
CREATE TABLE t_partition1 (
id INT PRIMARY KEY,
name VARCHAR
)
WITH (
commit_checkpoint_interval = 1,
partition_by = 'id'
)
ENGINE = ICEBERG;
The partition key must be a prefix of the primary key. partition_by = 'c2,c3' with PRIMARY KEY(c1, c2, c3) will fail.

Compaction for native Iceberg tables

Added in v2.5.0.
PREMIUM FEATUREThis is a premium feature. For a comprehensive overview of all premium features and their usage, please see RisingWave premium features.
When you stream data into native Iceberg tables, the process can generate many small data files and delete files over time, which can slow down query performance. To address this, RisingWave provides a native Iceberg compaction feature for these tables. This feature periodically merges small files and removes outdated snapshots. To configure Iceberg compaction for a native Iceberg table, specify the following parameters in the WITH clause:
ParameterDescription
enable_compactionWhether to enable Iceberg compaction (true/false).
compaction_interval_secInterval (in seconds) between two compaction runs. Defaults to 3600 seconds.
enable_snapshot_expirationWhether to enable snapshot expiration. By default, it removes snapshots older than 5 days.
Example
CREATE TABLE t (
    id INT PRIMARY KEY, 
    name VARCHAR
) WITH (
    enable_compaction = true, 
    compaction_interval_sec = 3600, 
    enable_snapshot_expiration = true
) ENGINE = iceberg;
Iceberg compaction also requires a dedicated Iceberg compactor.Currently, please contact us via RisingWave Slack workspace to allocate the necessary resources. We are working on a self-service feature that will let you allocate Iceberg compactors directly from the cloud portal.
RisingWave also supports compaction for Iceberg sinks. For details, see Compaction for Iceberg sink.

Use Amazon S3 Tables with native Iceberg tables

Amazon S3 Tables provides an AWS-native Iceberg catalog service. When using S3 Tables as the catalog for your native Iceberg tables, you get the benefit of automatic compaction.

Create S3 Tables connection

CREATE CONNECTION s3_tables_conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://DOC-EXAMPLE-BUCKET/my-table-bucket/',
    s3.region = 'us-east-1',
    catalog.type = 'rest',
    catalog.uri = 'https://s3tables.us-east-1.amazonaws.com/tables',
    catalog.rest.signing_region = 'us-east-1',
    catalog.rest.signing_name = 's3tables',
    catalog.rest.sigv4_enabled = 'true'
);

Create Iceberg table with S3 Tables

SET iceberg_engine_connection = 's3_tables_conn';

CREATE TABLE my_iceberg_table (
    id INT,
    name VARCHAR,
    age INT,
    city VARCHAR,
    PRIMARY KEY (id)
) ENGINE = iceberg;
For more details on S3 Tables configuration, see Object storage configuration.

Configuration options

Commit intervals

Control how frequently data is committed to the Iceberg table:
CREATE CONNECTION conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://my-bucket/warehouse/',
    s3.access.key = 'your-key',
    s3.secret.key = 'your-secret',
    s3.region = 'us-west-2',
    hosted_catalog = true,
    commit_checkpoint_interval = 10  -- Commit every 10 checkpoints
);
The approximate time to commit is calculated as:
time = barrier_interval_ms × checkpoint_frequency × commit_checkpoint_interval
Where barrier_interval_ms and checkpoint_frequency are system parameters that define the base checkpointing rate.

Limitations

Current limitations of creating native Iceberg tables:
  • Limited DDL operations: Some schema changes may require recreating the table.
  • Single writer: Only RisingWave should write to tables created with this engine to ensure data consistency.

Best practices

  1. Use hosted catalog for simple setups: Start with hosted_catalog = true for quick development.
  2. Configure appropriate commit intervals: Balance between latency and file size.
  3. Consider S3 Tables for production: Automatic compaction and AWS-native management.
  4. Design proper partitioning: Plan your partition strategy for query performance.
  5. Monitor file sizes: Be aware of small file accumulation and plan a compaction strategy.

Next steps