Skip to main content
You can create and manage Apache Iceberg tables directly in RisingWave. When you create an internal Iceberg table, RisingWave handles its lifecycle, while the underlying data is stored in the open Apache Iceberg format in an object store you configure. This approach allows you to persist data in an open format that is directly queryable by other tools and provides several benefits:
  • Fully managed: Interact with the table like any other RisingWave table for querying, inserting, and building materialized views.
  • Open storage: Data is stored in the standard Iceberg format, ensuring compatibility with external query engines like Spark, Trino, Flink, and Dremio.
  • Simplified pipelines: Ingest data from a stream directly into an Iceberg table without a separate CREATE SINK step to a different system.

Create an internal Iceberg table

Creating and using an internal Iceberg table is a two-step process: first, you define the storage and catalog details in a CONNECTION object, and then you create the table itself.

Step 1: Create a connection

An Iceberg CONNECTION contains the catalog and object storage configuration. The following examples show how to create a connection for different catalog types.
These examples use S3 for object storage. For details on configuring other backends like GCS or Azure Blob Storage, see the Object storage guide. For enhanced security, you can store credentials as secrets using CREATE SECRET.
  • Hosted catalog (default)
  • JDBC catalog
  • Glue catalog
  • REST catalog
  • S3 Tables catalog
For the simplest setup, use RisingWave’s built-in JDBC-based hosted catalog. This requires no external dependencies.
CREATE CONNECTION my_iceberg_conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://my-bucket/warehouse/',
    s3.region = 'us-west-2',
    s3.access.key = 'your-key',
    s3.secret.key = 'your-secret',
    hosted_catalog = true
);
For more details, see Hosted Iceberg catalog.
For complete details on catalog parameters, see Catalog configuration.

Step 2: Create the table

Create a table using the ENGINE = iceberg clause and associate it with your connection. To simplify creation, you can set a default connection for your session.
-- Option 1: Set a default connection for the session
SET iceberg_engine_connection = 'my_iceberg_conn';

CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg;

-- Option 2: Specify the connection explicitly
CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg
  WITH (connection = 'my_iceberg_conn');

Work with internal tables

Once created, an internal Iceberg table behaves like any other table in RisingWave.

Writing data

You can write data using standard INSERT statements or by streaming data from a source using CREATE SINK ... INTO.
-- Insert data directly
INSERT INTO user_events VALUES (1, 'login', '2024-01-01 10:00:00Z');

-- Stream data from a Kafka source into the table
CREATE SINK insert_into_iceberg INTO user_events AS
SELECT user_id, event_type, event_time
FROM user_activity_stream;

Querying data

Query the table directly with SELECT or use it as a source for a materialized view.
-- Ad hoc query
SELECT * FROM user_events WHERE event_type = 'login';

-- Create a materialized view
CREATE MATERIALIZED VIEW user_login_count AS
SELECT user_id, COUNT(*) as login_count
FROM user_events 
WHERE event_type = 'login'
GROUP BY user_id;

Time travel

Query historical snapshots of the table using FOR SYSTEM_TIME AS OF or FOR SYSTEM_VERSION AS OF.
-- Query a snapshot by timestamp
SELECT * FROM user_events FOR SYSTEM_TIME AS OF TIMESTAMPTZ '2024-01-01 12:00:00Z';

-- Query a snapshot by ID
SELECT * FROM user_events FOR SYSTEM_VERSION AS OF 1234567890;

Advanced configuration

Partitioning

Define a partition strategy in the WITH clause to optimize query performance.
CREATE TABLE partitioned_events (
    user_id INT,
    event_type VARCHAR,
    event_date DATE,
    PRIMARY KEY (event_date, user_id)
) WITH (
    partition_by = 'event_date'
) ENGINE = iceberg;
Supported partitioning strategies include by column, by multiple columns, and by applying transforms like bucket(n, column) or truncate(n, column). The partition key must be a prefix of the primary key.

Commit interval

Control how frequently data is committed to the Iceberg table by setting commit_checkpoint_interval in the CONNECTION.
CREATE CONNECTION my_iceberg_conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://my-bucket/warehouse/',
    -- ... other properties
    hosted_catalog = true,
    commit_checkpoint_interval = 10  -- Commit every 10 checkpoints
);

Table maintenance

To maintain good performance and manage storage costs, internal Iceberg tables require periodic maintenance, including compaction and snapshot expiration. RisingWave provides both automatic and manual maintenance options. For complete details, see the Iceberg table maintenance guide.

External access

Because internal tables are standard Iceberg tables, they can be read by external query engines like Spark or Trino using the same catalog and storage configuration. Spark Example:
spark.sql("SELECT * FROM iceberg_catalog.your_database.user_events")

Limitations

  • Limited DDL: Advanced schema evolution operations are not yet supported.
  • Single writer: To ensure data consistency, only RisingWave should write to internal Iceberg tables.
I