This guide explains how to connect RisingWave to existing Iceberg tables for batch data ingestion as part of the Bring Your Own Iceberg approach. Use this when you have Iceberg tables created and managed by external systems (like Spark, Flink, or batch ETL jobs) and want to stream that data into RisingWave for real-time processing.

When to use Iceberg sources

Choose Iceberg sources when:

  • Existing data lake: You have Iceberg tables populated by other systems that you want to ingest into RisingWave.
  • Lambda/Kappa architecture: You want to combine batch-processed data (in Iceberg) with real-time streams.
  • Multi-engine integration: Different systems write to Iceberg tables and RisingWave needs to process that data.
  • Historical data ingestion: You need to ingest large amounts of historical data stored in Iceberg format.

Prerequisites

  • An existing Apache Iceberg table managed by external systems.
  • Access credentials for the underlying storage system (e.g., S3 access key and secret key).
  • Network connectivity between RisingWave and your storage system.
  • Knowledge of your Iceberg catalog type and configuration.

Basic connection example

The following example shows how to connect to an Iceberg table stored on S3 using AWS Glue as the catalog:

CREATE SOURCE my_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'analytics',
    table.name = 'user_events',
    catalog.type = 'glue',
    catalog.name = 'my_glue_catalog',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

Replace the placeholders with your actual values.

RisingWave automatically derives column names and data types from the Iceberg table metadata. Use the DESCRIBE statement to view the schema:

DESCRIBE my_iceberg_source;

Configuration examples

AWS Glue catalog

For tables managed by AWS Glue Data Catalog:

CREATE SOURCE glue_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'my_database',
    table.name = 'my_table',
    catalog.type = 'glue',
    catalog.name = 'my_catalog',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

REST catalog

For tables managed by a REST catalog service:

CREATE SOURCE rest_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'my_database',
    table.name = 'my_table',
    catalog.type = 'rest',
    catalog.uri = 'http://rest-catalog:8181',
    catalog.name = 'production',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

JDBC catalog

For tables managed by a JDBC catalog (PostgreSQL/MySQL):

CREATE SOURCE jdbc_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'my_database',
    table.name = 'my_table',
    catalog.type = 'jdbc',
    catalog.uri = 'jdbc:postgresql://postgres:5432/iceberg_catalog',
    catalog.jdbc.user = 'catalog_user',
    catalog.jdbc.password = 'catalog_password',
    catalog.name = 'production',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

Storage catalog

For tables using direct filesystem metadata:

CREATE SOURCE storage_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'my_database',
    table.name = 'my_table',
    catalog.type = 'storage',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);

Query data

Once created, you can query data from the Iceberg source:

-- Basic query
SELECT * FROM my_iceberg_source
WHERE event_date >= '2024-01-01'
LIMIT 100;

-- Aggregations
SELECT event_type, COUNT(*) as event_count
FROM my_iceberg_source
WHERE event_date >= '2024-01-01'
GROUP BY event_type;

Create streaming jobs

Streaming ingestion is supported for append-only Iceberg tables. If you created the source before RisingWave v2.3, you might need to recreate it to enable streaming functionality.

You can create materialized views that continuously process data from the Iceberg source:

-- Create a materialized view for real-time aggregation
CREATE MATERIALIZED VIEW user_event_summary AS
SELECT 
    user_id,
    event_type,
    COUNT(*) as event_count,
    MAX(event_timestamp) as last_event_time
FROM my_iceberg_source
GROUP BY user_id, event_type;

-- Create a table that combines Iceberg data with real-time streams
CREATE MATERIALIZED VIEW enriched_events AS
SELECT 
    i.user_id,
    i.event_type,
    i.event_timestamp,
    u.user_name,
    u.user_tier
FROM my_iceberg_source i
JOIN user_profiles u ON i.user_id = u.user_id;

Time travel

Query historical snapshots of your Iceberg tables:

-- Query data as it existed at a specific timestamp
SELECT * FROM my_iceberg_source 
FOR SYSTEM_TIME AS OF TIMESTAMPTZ '2024-01-01 12:00:00'
WHERE user_id = 123;

-- Query a specific snapshot by ID
SELECT COUNT(*) FROM my_iceberg_source
FOR SYSTEM_VERSION AS OF 1234567890;

System tables

Access Iceberg metadata through system tables:

-- View table snapshots
SELECT * FROM my_iceberg_source$snapshots;

-- View table files
SELECT * FROM my_iceberg_source$files;

-- View table history
SELECT * FROM my_iceberg_source$history;

-- View table manifests
SELECT * FROM my_iceberg_source$manifests;

Configuration parameters

Required parameters

ParameterDescriptionExample
connectorMust be 'iceberg''iceberg'
database.nameIceberg database/namespace name'analytics'
table.nameIceberg table name'user_events'

Optional parameters

ParameterDescriptionDefault
commit_checkpoint_intervalCommit every N checkpoints60

Storage and catalog configuration

For detailed configuration options:

Integration patterns

Lambda architecture pattern

Combine batch and streaming data processing:

-- Batch layer: Historical data from Iceberg
CREATE SOURCE historical_events
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://data-lake/warehouse',
    database.name = 'batch',
    table.name = 'processed_events',
    catalog.type = 'glue'
);

-- Speed layer: Real-time data from Kafka
CREATE SOURCE realtime_events
WITH (
    connector = 'kafka',
    topic = 'live_events',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT JSON;

-- Serving layer: Combined view
CREATE MATERIALIZED VIEW unified_events AS
SELECT user_id, event_type, event_timestamp, 'batch' as source
FROM historical_events
UNION ALL
SELECT user_id, event_type, event_timestamp, 'realtime' as source  
FROM realtime_events;

Multi-engine data lake

Connect to tables managed by multiple systems:

-- Read from tables written by Spark
CREATE SOURCE spark_aggregates
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://data-lake/spark',
    database.name = 'analytics',
    table.name = 'daily_summaries',
    catalog.type = 'glue'
);

-- Process and enhance the data
CREATE MATERIALIZED VIEW enhanced_summaries AS
SELECT 
    date,
    metric_type,
    metric_value,
    metric_value * 1.1 as adjusted_value
FROM spark_aggregates;

Best practices

  1. Monitor checkpoint intervals: Adjust commit_checkpoint_interval based on your latency requirements.
  2. Use time travel for debugging: Leverage historical snapshots to troubleshoot data issues.
  3. Combine with real-time sources: Create comprehensive views that merge batch and stream data.
  4. Optimize query patterns: Structure your materialized views to match your query patterns.
  5. Handle schema evolution: Be prepared for schema changes in upstream Iceberg tables.

Limitations

  • Append-only streaming: Only append-only Iceberg tables support streaming ingestion.
  • Schema changes: Major schema changes may require recreating the source.
  • Catalog permissions: Ensure RisingWave has read access to your catalog and storage.

Troubleshooting

Connection issues

  • Verify catalog configuration and connectivity.
  • Check storage permissions and network access.
  • Ensure credentials are correct.

Schema issues

  • Use DESCRIBE to verify the derived schema.
  • Check for unsupported data types.
  • Verify table exists in the specified database.

Performance issues

  • Monitor checkpoint intervals and adjust if needed.
  • Consider partitioning in your source tables.
  • Review query patterns and create appropriate indexes.

Next steps