Skip to main content
Try our SQL generator tool! We’ve built an interactive tool to help you generate SQL statements for creating Iceberg tables. Check it out at https://sql.risingwave.com/table/iceberg-table.
You can create and manage Apache Iceberg tables directly in RisingWave. When you create an internal Iceberg table (that is, a RisingWave-managed Iceberg table), RisingWave handles its lifecycle, while the underlying data is stored in the open Apache Iceberg format in an object store you configure.

Create an internal Iceberg table

Creating and using an internal Iceberg table is a two-step process: first, you define the storage and catalog details in a CONNECTION object, and then you create the table itself.

Step 1: Create an Iceberg Connection

An Iceberg CONNECTION defines the catalog and object storage configuration. You must specify the type and warehouse.path parameters, along with the required parameters for your catalog and object storage. To use the JDBC-based built-in catalog, set hosted_catalog to true. You can also set the optional commit_checkpoint_interval parameter to control the Iceberg commit frequency. The default is 60, which means RisingWave commits changes to Iceberg about every 60 seconds in the default configuration. You can adjust this value based on your freshness and performance requirements. When you create a CONNECTION, you specify the object storage backend where the table data will be stored. You also specify the catalog that will manage the table’s metadata.
For S3 credentials (applies to all catalogs):
  • If enable_config_load = false: you must provide s3.access.key and s3.secret.key (you may also set s3.iam_role_arn).
  • If enable_config_load = true: don’t provide s3.access.key/s3.secret.key (you may set s3.iam_role_arn, or rely on the role already available in your environment/config).
See Object storage configuration.
For more details on the available catalog options, see Iceberg catalog configuration.
CREATE CONNECTION my_iceberg_conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://my-bucket/warehouse/',
    s3.region = 'us-west-2',
    s3.access.key = 'your-key',
    s3.secret.key = 'your-secret',
    hosted_catalog = true
);
For more details, see Built-in catalog.

Step 2: Create an internal Iceberg table

Create an internal Iceberg table using the ENGINE = iceberg clause. To create Iceberg tables, RisingWave needs to know which Iceberg CONNECTION to use (this connection contains both the object storage settings and the catalog settings). Choose one option below.
-- Option 1: Set a default connection for the session
SET iceberg_engine_connection = 'public.my_iceberg_conn';

CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg;
-- Option 2: Specify the connection explicitly
CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg
  WITH (connection = 'public.my_iceberg_conn');
You can also define a partition strategy in the WITH clause to optimize query performance.
CREATE TABLE partitioned_events (
    user_id INT,
    event_type VARCHAR,
    event_date DATE,
    PRIMARY KEY (event_date, user_id)
) WITH (
    partition_by = 'event_date'
) ENGINE = iceberg;
Supported partitioning strategies include by column, by multiple columns, and by applying transforms like bucket(n, column) or truncate(n, column). The partition key must be a prefix of the primary key.

Work with internal tables

Once created, you can work with an internal Iceberg table using familiar SQL (insert, query, materialized views). One important difference: new writes become queryable only after an Iceberg commit. By default, Iceberg commits happen about every 60 seconds (controlled by commit_checkpoint_interval).

Ingest data

You can ingest data using standard INSERT statements or by streaming data from a source using CREATE SINK ... INTO.
-- Manual inserts
INSERT INTO user_events VALUES (1, 'login', '2024-01-01 10:00:00Z');

-- Stream data from a Kafka source into the same table
CREATE SOURCE user_events_src (
  user_id INT,
  event_type VARCHAR,
  timestamp TIMESTAMPTZ
) WITH (
  connector = 'kafka',
  topic = 'user_events',
  properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

CREATE SINK to_user_events INTO user_events AS
SELECT user_id, event_type, timestamp
FROM user_events_src;

Query data

Query the table directly with SELECT or use it as a source for a materialized view.
-- Ad hoc query
-- Note: you may need to wait for the next Iceberg commit (default ~60s) to see recent writes.
SELECT * FROM user_events WHERE event_type = 'login';

-- Create a materialized view
CREATE MATERIALIZED VIEW user_login_count AS
SELECT user_id, COUNT(*) as login_count
FROM user_events 
WHERE event_type = 'login'
GROUP BY user_id;

Time travel

Time travel queries work on committed Iceberg snapshots. Make sure at least one Iceberg commit has happened before using these queries.
-- Query a snapshot by timestamp
SELECT * FROM user_events FOR SYSTEM_TIME AS OF TIMESTAMPTZ '2024-01-01 12:00:00Z';

-- Query a snapshot by ID
SELECT * FROM user_events FOR SYSTEM_VERSION AS OF 1234567890;

Partition strategy

RisingWave’s Iceberg table engine supports table partitioning using the partition_by option. Partitioning helps organize data for efficient storage and query performance. You can partition by one or multiple columns, separated by commas, and optionally apply a Transform function to each column to customize partitioning. Supported transformations include identity, truncate(n), bucket(n), year, month, day, hour, and void. For more details on Iceberg partitioning, see Partition transforms.
CREATE TABLE t_partition (
    v1 INT,
    v2 INT,
    v3 TIMESTAMP,
    v4 TIMESTAMP,
    PRIMARY KEY (v1, v2, v3, v4)
)
WITH (
    -- `commit_checkpoint_interval` controls Iceberg commit frequency. Default: about every 60 seconds; set to 1 for faster commits and visibility.
    commit_checkpoint_interval = 1,
    partition_by = 'truncate(4,v2),bucket(5,v1)'
)
ENGINE = ICEBERG;

Table maintenance

To maintain good performance and manage storage costs, internal Iceberg tables require periodic maintenance, including compaction and snapshot expiration. RisingWave provides both automatic and manual maintenance options. For complete details, see the Iceberg table maintenance guide.

External access

Because internal tables are standard Iceberg tables, they can be read by external query engines like Spark or Trino using the same catalog and storage configuration. Spark Example:
spark.sql("SELECT * FROM iceberg_catalog.your_database.user_events")

Limitations

  • Advanced schema evolution operations are not yet supported.
  • To ensure data consistency, only RisingWave should write to internal Iceberg tables.