In RisingWave, the Iceberg table engine allows you to create and manage tables directly within the system, while storing their underlying data in the Apache Iceberg format on external object storage. This offers an alternative way to persist data compared to RisingWave’s default row-based internal storage format (which also typically uses object storage).

Using the Iceberg table engine provides several benefits:

  • Native management: RisingWave manages the table’s lifecycle (creation, schema, writes). You interact with it like any other RisingWave table (querying, inserting, using in materialized views).
  • Iceberg format storage: Data is physically stored according to the Iceberg specification, using a configured Iceberg catalog and object storage path. This ensures compatibility with the Iceberg ecosystem.
  • Simplified pipelines: You don’t need a separate CREATE SINK step to export data out of RisingWave into Iceberg format if Iceberg is your desired end format managed by RisingWave. Data ingested or computed can land directly in these Iceberg tables.
  • Interoperability: Tables created with the Iceberg engine are standard Iceberg tables and can be read by external Iceberg-compatible query engines (like Spark, Trino, Flink, Dremio) using the same catalog and storage configuration.

This guide details how to set up and use the Iceberg table engine.

Setup and usage

1. Create an Iceberg connection

The Iceberg connection contains information about catalog and object storage. For syntax and properties, see CREATE CONNECTION.

The following examples show how to create an Iceberg connection using different catalog types.

These examples use S3 for object storage. You can also use Google Cloud Storage (GCS) or Azure Blob Storage by replacing the S3 parameters with the appropriate parameters for your chosen storage backend. See the object storage configuration for details.

For enhanced security, you can store credentials like access keys as secrets instead of providing them directly. If you wish to use this feature, see Manage secrets.
JDBC catalog
CREATE CONNECTION public.conn WITH (
    type = 'iceberg',
    warehouse.path = '<storage_path>',
    <object_storage_parameters>,
    catalog.type = 'jdbc',
    catalog.uri = 'jdbc:postgresql://127.0.0.1:8432/metadata',
    catalog.jdbc.user = 'postgres',
    catalog.jdbc.password = '123',
    catalog.name = 'dev',
);

For details on catalog configuration parameters, see Catalog configuration.

2. Set connection as default (optional)

To simplify table creation, you can set a default connection for the Iceberg engine:

SET rw_iceberg_connection = 'public.conn';

When a default connection is set, you can create Iceberg tables without specifying a connection.

3. Create an Iceberg table

Create a table using the Iceberg engine:

-- With default connection set
CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    properties JSONB,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg;

-- Or specify connection explicitly
CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    properties JSONB,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg
WITH (connection = 'public.conn');

4. Work with your table

Once created, Iceberg tables work like any other RisingWave table:

-- Insert data
INSERT INTO user_events VALUES 
(1, 'login', '2024-01-01 10:00:00', '{"ip": "192.168.1.1"}'),
(2, 'purchase', '2024-01-01 11:00:00', '{"amount": 99.99}');

-- Query data
SELECT * FROM user_events WHERE event_type = 'login';

-- Use in materialized views
CREATE MATERIALIZED VIEW user_login_count AS
SELECT user_id, COUNT(*) as login_count
FROM user_events 
WHERE event_type = 'login'
GROUP BY user_id;

Stream data into Iceberg tables

You can stream data directly from sources into Iceberg tables:

-- Create a Kafka source
CREATE SOURCE user_activity_stream
WITH (
    connector = 'kafka',
    topic = 'user_events',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT JSON;

-- Create materialized view that inserts into Iceberg table
CREATE MATERIALIZED VIEW insert_into_iceberg AS
INSERT INTO user_events
SELECT user_id, event_type, event_time, properties
FROM user_activity_stream;

Time travel

Query historical snapshots of your Iceberg tables:

-- Query a specific snapshot by timestamp
SELECT * FROM user_events FOR SYSTEM_TIME AS OF TIMESTAMPTZ '2024-01-01 12:00:00';

-- Query a specific snapshot by snapshot ID
SELECT * FROM user_events FOR SYSTEM_VERSION AS OF 1234567890;

External access

Tables created with the Iceberg engine are standard Iceberg tables that can be accessed by external tools:

Spark:

spark.sql("""
    SELECT * FROM iceberg_catalog.your_database.user_events
    WHERE event_type = 'purchase'
""")

Trino:

SELECT user_id, COUNT(*) as event_count
FROM iceberg.your_database.user_events
GROUP BY user_id;

Use Amazon S3 Tables with the Iceberg table engine

Amazon S3 Tables provides an AWS-native Iceberg catalog service. When using S3 Tables as the catalog for Iceberg tables, you get automatic compaction benefits.

Create S3 Tables connection

CREATE CONNECTION s3_tables_conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://DOC-EXAMPLE-BUCKET/my-table-bucket/',
    s3.region = 'us-east-1',
    catalog.type = 'rest',
    catalog.uri = 'https://s3tables.us-east-1.amazonaws.com/tables',
    catalog.rest.signing_region = 'us-east-1',
    catalog.rest.signing_name = 's3tables',
    catalog.rest.sigv4_enabled = 'true'
);

Create Iceberg table with S3 Tables

SET rw_iceberg_connection = 's3_tables_conn';

CREATE TABLE my_iceberg_table (
    id INT,
    name VARCHAR,
    age INT,
    city VARCHAR,
    PRIMARY KEY (id)
) ENGINE = iceberg;

For more details on S3 Tables configuration, see Object storage configuration.

Configuration options

Commit intervals

Control how frequently data is committed to the Iceberg table:

CREATE CONNECTION conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://my-bucket/warehouse/',
    s3.access.key = 'your-key',
    s3.secret.key = 'your-secret',
    s3.region = 'us-west-2',
    hosted_catalog = true,
    commit_checkpoint_interval = 10  -- Commit every 10 checkpoints
);

The approximate time to commit is calculated as:

time = barrier_interval_ms × checkpoint_frequency × commit_checkpoint_interval

Where barrier_interval_ms and checkpoint_frequency are system parameters that define the base checkpointing rate.

Limitations

Current limitations of the Iceberg table engine:

  • No automatic compaction: Tables don’t automatically compact small files (though S3 Tables provides this)
  • Limited DDL operations: Some schema changes may require recreating the table
  • Single writer: Only RisingWave should write to tables created with this engine

Best practices

  1. Use hosted catalog for simple setups: Start with hosted_catalog = true for quick development.
  2. Configure appropriate commit intervals: Balance between latency and file size.
  3. Consider S3 Tables for production: Automatic compaction and AWS-native management.
  4. Design proper partitioning: Plan your partition strategy for query performance.
  5. Monitor file sizes: Be aware of small file accumulation and plan compaction strategy.

Next steps