> ## Documentation Index
> Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Ingest data from Iceberg tables

> Ingest data from external Apache Iceberg tables in RisingWave for real-time analytics.

This guide shows how to read data from external Apache Iceberg tables in RisingWave. Use it when your Iceberg tables are managed by external systems and you want to treat them as a streaming source for real-time processing.

You can ingest data from Iceberg tables using two approaches:

1. **[Continuous ingestion (default)](#continuous-ingestion-with-create-source)**: Create an Iceberg source with the `CREATE SOURCE` statement for continuous, streaming ingestion of append-only data.
2. **[Periodic full reload](#periodic-full-reload-with-create-table)**: Create an Iceberg table with `refresh_mode = 'FULL_RELOAD'` for scheduled full table refreshes. Note that you must use `CREATE TABLE` (not `CREATE SOURCE`), and data will only be loaded after you trigger a refresh—either manually or via the configured schedule.

After the source or table is created, you can run ad hoc queries against it or maintain materialized views for continuous analytics.

## Prerequisites

* An existing Apache Iceberg table managed by external systems.
* Access credentials for the underlying storage system (e.g., S3 access key and secret key).
* Network connectivity between RisingWave and your storage system.
* Knowledge of your Iceberg catalog type and configuration.

## Continuous ingestion with CREATE SOURCE

### Limitations

* **PRIMARY KEY is not supported.** Iceberg `CREATE SOURCE` is for append-only, streaming ingestion. RisingWave internally uses a hidden, system-generated `_row_id` column to uniquely identify each record, which is incompatible with user-defined primary keys. If you need to ingest a mutable Iceberg table (with updates and deletes), use [`CREATE TABLE` with `refresh_mode = 'FULL_RELOAD'`](#periodic-full-reload-with-create-table) instead — note that `FULL_RELOAD` is periodic snapshot-style ingestion, not continuous streaming.
* **Column definitions are not allowed for Iceberg sources.** RisingWave automatically infers the schema from the Iceberg table metadata. Specifying column definitions in the `CREATE SOURCE` statement will result in an error. Use `CREATE SOURCE <name> WITH (...)` rather than `CREATE SOURCE <name> (<columns>) WITH (...)`.

### Basic connection example

The following example creates a source for a table in S3 using AWS Glue as the catalog:

```sql theme={null}
CREATE SOURCE my_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'analytics',
    table.name = 'user_events',
    catalog.type = 'glue',
    catalog.name = 'my_glue_catalog',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);
```

When you read from an external Iceberg table, RisingWave automatically derives column names and data types from the Iceberg table metadata. Use the [DESCRIBE](/sql/commands/sql-describe) statement to view the schema:

```sql theme={null}
DESCRIBE my_iceberg_source;
```

### Parameters

| Parameter                    | Description                                                                                                           | Example         |
| :--------------------------- | :-------------------------------------------------------------------------------------------------------------------- | :-------------- |
| `connector`                  | Required. For Iceberg sources, it must be `'iceberg'`                                                                 | `'iceberg'`     |
| `database.name`              | Required. The Iceberg database/namespace name.                                                                        | `'analytics'`   |
| `table.name`                 | Required. The Iceberg table name.                                                                                     | `'user_events'` |
| `commit_checkpoint_interval` | Optional. Determines the Iceberg commit frequency. Default: `60` (about **60 seconds** in the default configuration). | `60`            |

You also need to specify catalog and object storage parameters in the `CREATE SOURCE` statement. Because these parameters are shared across all Iceberg objects—sources, sinks, and internal Iceberg tables—they are documented separately.

* **Object storage**: [Object storage configuration](/iceberg/object-storage)
* **Catalogs**: [Catalog configuration](/iceberg/catalogs)

For details on how data types are mapped between RisingWave and Iceberg, see the [Data type mapping guide](/iceberg/data-types).

### Source example

For a REST catalog:

```sql theme={null}
CREATE SOURCE rest_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'my_database',
    table.name = 'my_table',
    catalog.type = 'rest',
    catalog.uri = 'http://rest-catalog:8181',
    catalog.name = 'production',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);
```

## Periodic full reload with CREATE TABLE

<Note>
  Added in v2.7.0. It is currently in **[technical preview](/changelog/product-lifecycle#product-release-lifecycle)** stage.
</Note>

For batch-style workloads where you need to periodically reload an entire Iceberg table, you can create a table with `refresh_mode = 'FULL_RELOAD'`. This mode is useful when:

* The external Iceberg table supports mutable data (updates and deletes).
* You need a point-in-time snapshot of the entire table.
* You want to apply Iceberg deletes (PositionDeletes and EqualityDeletes) for accurate query results.
* Periodic full reloads fit your use case better than continuous streaming.

### Create a refreshable table

```sql theme={null}
CREATE TABLE iceberg_batch_table (
    PRIMARY KEY (id)
) WITH (
    connector = 'iceberg',
    catalog.type = 'storage',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'public',
    table.name = 'my_iceberg_table',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2',
    refresh_mode = 'FULL_RELOAD',        -- Required for periodic refresh
    refresh_interval_sec = '60'          -- Reload every 60 seconds
);
```

<Note>
  Do not define column types in the `CREATE TABLE` statement. RisingWave automatically infers the schema from the Iceberg table. Only specify a `PRIMARY KEY` constraint if the corresponding column exists in the Iceberg table schema.
</Note>

### Parameters

| Parameter              | Description                                                             | Required | Example         |
| :--------------------- | :---------------------------------------------------------------------- | :------- | :-------------- |
| `refresh_mode`         | Must be set to `'FULL_RELOAD'` to enable periodic refresh functionality | Yes      | `'FULL_RELOAD'` |
| `refresh_interval_sec` | Interval in seconds between automatic refresh operations                | No       | `'60'`          |

RisingWave checks all refreshable tables at a configurable interval (default: 60 seconds, configured by `stream_refresh_scheduler_interval_sec` in the RisingWave configuration file). Setting a `refresh_interval_sec` value lower than this scheduler interval may result in refresh triggers not occurring at the expected frequency.

<Note>
  If you omit `refresh_interval_sec`, the table will only refresh when you manually execute `REFRESH TABLE`, giving you complete control over when data is loaded.
</Note>

### Manual refresh

You can manually trigger a refresh at any time using the `REFRESH TABLE` command:

```sql theme={null}
REFRESH TABLE iceberg_batch_table;
```

### Monitor delete files

You can verify discovered delete files via the `rw_iceberg_files` system catalog:

```sql theme={null}
SELECT * FROM rw_iceberg_files 
WHERE source_name = '__iceberg_source_iceberg_batch_table';
```

This query shows all data and delete files associated with the table. The `content` column indicates the file type:

* `Data`: Regular data files
* `PositionDeletes`: Position-based delete files
* `EqualityDeletes`: Equality-based delete files

### Monitor refresh status

Query the `rw_catalog.rw_refresh_table_state` system catalog to monitor refresh operations:

```sql theme={null}
SELECT table_id, current_status, last_trigger_time, last_success_time, trigger_interval_secs
FROM rw_catalog.rw_refresh_table_state;
```

The `current_status` field shows the current state of the refresh job:

* `IDLE`: No refresh operation is currently in progress
* `REFRESHING`: A refresh operation is in progress

For more details, see [REFRESH TABLE](/sql/commands/sql-refresh-table).

## What's next?

### Query data

Once created, you can query data from the Iceberg source:

```sql theme={null}
-- Basic query
SELECT * FROM my_iceberg_source
WHERE event_date >= '2024-01-01'
LIMIT 100;

-- Aggregations
SELECT event_type, COUNT(*) as event_count
FROM my_iceberg_source
WHERE event_date >= '2024-01-01'
GROUP BY event_type;
```

### Query historical snapshots of your Iceberg data

Query historical snapshots of your Iceberg tables:

```sql theme={null}
-- Query data as it existed at a specific timestamp
SELECT * FROM my_iceberg_source 
FOR SYSTEM_TIME AS OF TIMESTAMPTZ '2024-01-01 12:00:00'
WHERE user_id = 123;

-- Query a specific snapshot by ID
SELECT COUNT(*) FROM my_iceberg_source
FOR SYSTEM_VERSION AS OF 1234567890;
```

### Continuous data processing jobs

You can create materialized views that continuously process data from the Iceberg source:

```sql theme={null}
-- Create a materialized view for real-time aggregation
CREATE MATERIALIZED VIEW user_event_summary AS
SELECT 
    user_id,
    event_type,
    COUNT(*) as event_count,
    MAX(event_timestamp) as last_event_time
FROM my_iceberg_source
GROUP BY user_id, event_type;

-- Create a table that combines Iceberg data with real-time streams
CREATE MATERIALIZED VIEW enriched_events AS
SELECT 
    i.user_id,
    i.event_type,
    i.event_timestamp,
    u.user_name,
    u.user_tier
FROM my_iceberg_source i
JOIN user_profiles u ON i.user_id = u.user_id;
```

### Inspect Iceberg metadata through system tables

Use system tables to inspect Iceberg metadata:

```sql theme={null}
-- View table snapshots
SELECT * FROM my_iceberg_source$snapshots;

-- View table files
SELECT * FROM my_iceberg_source$files;

-- View table history
SELECT * FROM my_iceberg_source$history;

-- View table manifests
SELECT * FROM my_iceberg_source$manifests;
```
