Skip to main content
This guide shows how to read data from external Apache Iceberg tables in RisingWave. Use it when your Iceberg tables are managed by external systems and you want to treat them as a streaming source for real-time processing. You can ingest data from Iceberg tables using two approaches:
  1. Continuous ingestion (default): Create an Iceberg source with the CREATE SOURCE statement for continuous, streaming ingestion of append-only data.
  2. Periodic full reload: Create an Iceberg table with refresh_mode = 'FULL_RELOAD' for scheduled full table refreshes. Note that you must use CREATE TABLE (not CREATE SOURCE), and data will only be loaded after you trigger a refresh—either manually or via the configured schedule.
After the source or table is created, you can run ad hoc queries against it or maintain materialized views for continuous analytics.

Prerequisites

  • An existing Apache Iceberg table managed by external systems.
  • Access credentials for the underlying storage system (e.g., S3 access key and secret key).
  • Network connectivity between RisingWave and your storage system.
  • Knowledge of your Iceberg catalog type and configuration.

Continuous ingestion with CREATE SOURCE

Basic connection example

The following example creates a source for a table in S3 using AWS Glue as the catalog:
CREATE SOURCE my_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'analytics',
    table.name = 'user_events',
    catalog.type = 'glue',
    catalog.name = 'my_glue_catalog',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);
When you read from an external Iceberg table, RisingWave automatically derives column names and data types from the Iceberg table metadata. Use the DESCRIBE statement to view the schema:
DESCRIBE my_iceberg_source;

Parameters

ParameterDescriptionExample
connectorRequired. For Iceberg sources, it must be 'iceberg''iceberg'
database.nameRequired. The Iceberg database/namespace name.'analytics'
table.nameRequired. The Iceberg table name.'user_events'
commit_checkpoint_intervalOptional. Determines the commit frequency (RisingWave commits every N checkpoints).60
You also need to specify catalog and object storage parameters in the CREATE SOURCE statement. Because these parameters are shared across all Iceberg objects—sources, sinks, and internal Iceberg tables—they are documented separately. For details on how data types are mapped between RisingWave and Iceberg, see the Data type mapping guide.

Source example

For a REST catalog:
CREATE SOURCE rest_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'my_database',
    table.name = 'my_table',
    catalog.type = 'rest',
    catalog.uri = 'http://rest-catalog:8181',
    catalog.name = 'production',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

Periodic full reload with CREATE TABLE

Added in v2.7.0. It is currently in technical preview stage.
For batch-style workloads where you need to periodically reload an entire Iceberg table, you can create a table with refresh_mode = 'FULL_RELOAD'. This mode is useful when:
  • The external Iceberg table supports mutable data (updates and deletes).
  • You need a point-in-time snapshot of the entire table.
  • Periodic full reloads fit your use case better than continuous streaming.

Create a refreshable table

CREATE TABLE iceberg_batch_table (
    id int primary key,
    name varchar
) WITH (
    connector = 'iceberg',
    catalog.type = 'storage',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'public',
    table.name = 'my_iceberg_table',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2',
    refresh_mode = 'FULL_RELOAD',        -- Required for periodic refresh
    refresh_interval_sec = '60'          -- Reload every 60 seconds
);

Parameters

ParameterDescriptionRequiredExample
refresh_modeMust be set to 'FULL_RELOAD' to enable periodic refresh functionalityYes'FULL_RELOAD'
refresh_interval_secInterval in seconds between automatic refresh operationsNo'60'
RisingWave checks all refreshable tables at a configurable interval (default: 60 seconds, configured by stream_refresh_scheduler_interval_sec in the RisingWave configuration file). Setting a refresh_interval_sec value lower than this scheduler interval may result in refresh triggers not occurring at the expected frequency.

Manual refresh

You can manually trigger a refresh at any time using the REFRESH TABLE command:
REFRESH TABLE iceberg_batch_table;

Monitor refresh status

Query the rw_catalog.rw_refresh_table_state system catalog to monitor refresh operations:
SELECT table_id, current_status, last_trigger_time, last_success_time, trigger_interval_secs
FROM rw_catalog.rw_refresh_table_state;
The current_status field shows the current state of the refresh job:
  • IDLE: No refresh operation is currently in progress
  • REFRESHING: A refresh operation is in progress
For more details, see REFRESH TABLE.

What’s next?

Query data

Once created, you can query data from the Iceberg source:
-- Basic query
SELECT * FROM my_iceberg_source
WHERE event_date >= '2024-01-01'
LIMIT 100;

-- Aggregations
SELECT event_type, COUNT(*) as event_count
FROM my_iceberg_source
WHERE event_date >= '2024-01-01'
GROUP BY event_type;

Query historical snapshots of your Iceberg data

Query historical snapshots of your Iceberg tables:
-- Query data as it existed at a specific timestamp
SELECT * FROM my_iceberg_source 
FOR SYSTEM_TIME AS OF TIMESTAMPTZ '2024-01-01 12:00:00'
WHERE user_id = 123;

-- Query a specific snapshot by ID
SELECT COUNT(*) FROM my_iceberg_source
FOR SYSTEM_VERSION AS OF 1234567890;

Continuous data processing jobs

You can create materialized views that continuously process data from the Iceberg source:
-- Create a materialized view for real-time aggregation
CREATE MATERIALIZED VIEW user_event_summary AS
SELECT 
    user_id,
    event_type,
    COUNT(*) as event_count,
    MAX(event_timestamp) as last_event_time
FROM my_iceberg_source
GROUP BY user_id, event_type;

-- Create a table that combines Iceberg data with real-time streams
CREATE MATERIALIZED VIEW enriched_events AS
SELECT 
    i.user_id,
    i.event_type,
    i.event_timestamp,
    u.user_name,
    u.user_tier
FROM my_iceberg_source i
JOIN user_profiles u ON i.user_id = u.user_id;

Inspect Iceberg metadata through system tables

Use system tables to inspect Iceberg metadata:
-- View table snapshots
SELECT * FROM my_iceberg_source$snapshots;

-- View table files
SELECT * FROM my_iceberg_source$files;

-- View table history
SELECT * FROM my_iceberg_source$history;

-- View table manifests
SELECT * FROM my_iceberg_source$manifests;