This guide is for users who are already familiar with Apache Iceberg and have an existing data lake with tables managed by an external catalog like AWS Glue, a JDBC catalog, or a REST service. It provides concise, practical examples of how to connect RisingWave to your existing tables to perform streaming reads and writes.

Prerequisites

Before you begin, ensure you have the following:
  • An existing Iceberg table.
  • A configured object store (e.g., AWS S3).
  • A configured external Iceberg catalog (e.g., AWS Glue).
  • The necessary access credentials for your storage and catalog.
  • A running RisingWave cluster. For details about installing and starting RisingWave, see RisingWave quickstart.

Read from an existing Iceberg table

To read data from an existing Iceberg table, you use the CREATE SOURCE command. Run the following commands in psql or a client that is connected to RisingWave. This command creates a connection to your table, allowing you to ingest its data for stream processing in RisingWave. Example with AWS Glue Catalog:
CREATE SOURCE my_iceberg_source
WITH (
    connector = 'iceberg',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'analytics',
    table.name = 'user_events',
    catalog.type = 'glue',
    catalog.name = 'my_glue_catalog',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key',
    s3.region = 'us-west-2'
);
Once the source is created, you can query it like any other table in RisingWave:
SELECT * FROM my_iceberg_source LIMIT 10;

Write to an existing Iceberg table

To write data from RisingWave to an existing Iceberg table, you use the CREATE SINK command. This is useful for exporting the results of a materialized view or streaming pipeline into your data lake. Example with a REST Catalog:
CREATE SINK my_iceberg_sink 
FROM my_materialized_view
WITH (
    connector = 'iceberg',
    type = 'upsert',
    primary_key = 'user_id',
    warehouse.path = 's3://my-data-lake/warehouse',
    database.name = 'analytics_results',
    table.name = 'processed_events',
    catalog.type = 'rest',
    catalog.uri = 'http://my-catalog-server:8181',
    s3.access.key = 'your-access-key',
    s3.secret.key = 'your-secret-key'
);
This sink will handle upsert operations, which means it will insert new rows and update existing ones based on the primary_key.

Next steps