Skip to main content
RisingWave offers several methods for data ingestion, each tailored to different use cases. This guide covers the main patterns to help you choose the best approach for your needs. For a detailed comparison of core objects like Source, Table, Materialized View, and Sink, see our guide on Source, Table, MV, and Sink.

Continuous streaming ingestion

What it is: Real-time, continuous data ingestion from streaming sources that automatically updates as new data arrives. When to use: For real-time analytics, event-driven applications, live dashboards, and when you need immediate data freshness.

Example: Kafka streaming ingestion

-- Create a streaming table that continuously ingests from Kafka
CREATE TABLE user_events (
  user_id VARCHAR,
  event_type VARCHAR,
  timestamp TIMESTAMP,
  data JSONB
) WITH (
  connector = 'kafka',
  topic = 'user-events',
  properties.bootstrap.server = 'localhost:9092',
  scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Create a materialized view for real-time analytics
CREATE MATERIALIZED VIEW user_activity_summary AS
SELECT
  user_id,
  COUNT(*) as event_count,
  MAX(timestamp) as last_activity
FROM user_events
GROUP BY user_id;

Example: Database CDC (Change Data Capture)

-- Continuously ingest changes from PostgreSQL
CREATE TABLE orders_cdc (
  order_id INT,
  customer_id INT,
  amount DECIMAL,
  status VARCHAR,
  created_at TIMESTAMP
) WITH (
  connector = 'postgres-cdc',
  hostname = 'localhost',
  port = '5432',
  username = 'postgres',
  password = 'password',
  database.name = 'ecommerce',
  schema.name = 'public',
  table.name = 'orders'
);

Example: Message queues (MQTT, NATS, Pulsar)

-- Ingest from MQTT broker
CREATE TABLE iot_data (
  device_id VARCHAR,
  temperature DECIMAL,
  humidity DECIMAL,
  timestamp TIMESTAMP
) WITH (
  connector = 'mqtt',
  url = 'tcp://localhost:1883',
  topic = 'sensors/+/data',
  qos = 'at_least_once'
) FORMAT PLAIN ENCODE JSON;

One-time batch ingestion

What it is: Loading data once from external sources like databases, data lakes, or files. When to use: For initial data loads, historical data import, or when you need to load static datasets.

Example: Batch load from a database

The postgres-cdc connector can be used to perform a one-time snapshot of a PostgreSQL table. For other databases, such as MySQL, you can use the corresponding CDC connector and set snapshot.mode to initial_only.
-- Create a table that takes a snapshot of a PostgreSQL table.
CREATE TABLE historical_sales (
  sale_id INT,
  product_id INT,
  quantity INT,
  sale_date DATE,
  amount DECIMAL
) WITH (
  connector = 'postgres-cdc',
  hostname = 'localhost',
  port = '5432',
  username = 'postgres',
  password = 'password',
  database.name = 'analytics',
  schema.name = 'public',
  table.name = 'sales_history',
  snapshot.mode = 'initial_only' -- Perform a snapshot and then stop.
);

Example: Load from cloud storage (S3, GCS, Azure)

-- Create a source from S3 files
CREATE SOURCE s3_logs (
  log_time timestamptz,
  user_id VARCHAR,
  action VARCHAR
) WITH (
  connector = 's3',
  s3.region_name = 'us-west-2',
  s3.bucket_name = 'my-logs-bucket',
  s3.path = 'logs/2024/'
) FORMAT PLAIN ENCODE CSV;

-- Create a table and load data once
CREATE TABLE processed_logs AS
SELECT * FROM s3_logs;

Example: Load from a data lake (Iceberg)

-- Create a source from an existing Iceberg table
CREATE SOURCE iceberg_data WITH (
  connector = 'iceberg',
  warehouse.path = 's3://my-data-lake/warehouse',
  database.name = 'analytics',
  table.name = 'customer_data',
  catalog.type = 'glue'
);

-- Create a table and load data once
CREATE TABLE customers AS
SELECT * FROM iceberg_data;

Periodic ingestion with external orchestration

What it is: RisingWave doesn’t have a built-in scheduler, but you can achieve periodic ingestion using external orchestration tools like Cron or Airflow. When to use: For scheduled data updates, daily/hourly batch processing, or when you need precise control over ingestion timing.

Example: Incremental loading pattern

A common pattern is to use a control table to track the last load time and only ingest new data.
-- A control table to track the last load time for each source
CREATE TABLE load_control (
  table_name VARCHAR PRIMARY KEY,
  last_load_timestamp TIMESTAMP
);

-- Your incremental load script, to be run periodically by an external scheduler
INSERT INTO target_table
SELECT * FROM source_table
WHERE updated_at > (
  SELECT COALESCE(last_load_timestamp, '1970-01-01')
  FROM load_control
  WHERE table_name = 'target_table'
);

-- Update the control table after each successful load
INSERT INTO load_control (table_name, last_load_timestamp)
VALUES ('target_table', NOW())
ON CONFLICT (table_name)
DO UPDATE SET last_load_timestamp = NOW();

Other ingestion methods

Direct data insertion

You can always insert data directly into a standard table using the INSERT statement.
-- Create a standard table
CREATE TABLE transactions (
  id INT,
  amount DECIMAL,
  timestamp TIMESTAMP
);

-- Manually insert data
INSERT INTO transactions VALUES
(1, 100.50, '2024-01-01 10:00:00'),
(2, 250.75, '2024-01-01 10:05:00');

Test data generation

For development and testing, you can use the built-in datagen connector to generate mock data streams.
-- Generate a stream of test data
CREATE TABLE test_users (
  id INT,
  name VARCHAR,
  email VARCHAR,
  created_at TIMESTAMP
) WITH (
  connector = 'datagen',
  fields.id.kind = 'sequence',
  fields.id.start = '1',
  fields.name.kind = 'random',
  fields.name.length = '10',
  fields.email.kind = 'random',
  fields.email.length = '15',
  datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;

Ingestion method support matrix

Data SourceContinuous StreamingOne-Time BatchPeriodicNotes
Apache Kafka⚠️Streaming only; periodic via external tools
Redpanda⚠️Streaming only; periodic via external tools
Apache Pulsar⚠️Streaming only; periodic via external tools
AWS Kinesis⚠️Streaming only; periodic via external tools
Google Pub/Sub⚠️Streaming only; periodic via external tools
NATS JetStream⚠️Streaming only; periodic via external tools
MQTT⚠️Streaming only; periodic via external tools
PostgreSQL CDC⚠️CDC for streaming; direct connection for batch
MySQL CDC⚠️CDC for streaming; direct connection for batch
SQL Server CDC⚠️CDC for streaming; direct connection for batch
MongoDB CDC⚠️CDC for streaming; direct connection for batch
AWS S3⚠️Batch only; periodic via external tools
Google Cloud Storage⚠️Batch only; periodic via external tools
Azure Blob⚠️Batch only; periodic via external tools
Apache Iceberg⚠️Batch only; periodic via external tools
DatagenTest data generation only
Direct INSERT⚠️Manual insertion; periodic via external tools
Legend:
  • Natively Supported: Built-in support for this ingestion method.
  • Not Supported: This ingestion method is not available for this source.
  • ⚠️ External Tools Required: Requires external orchestration tools (e.g., Cron, Airflow).

Best practices

Choose the right method

  • Streaming: Use for real-time requirements and continuous data flows.
  • Batch: Use for historical data, large one-time loads, or static datasets.
  • Periodic: Use for scheduled updates with external orchestration tools.

Performance considerations

  • Streaming ingestion offers the best real-time performance.
  • Batch loading is efficient for large datasets.
  • Use materialized views to pre-compute and store results for fast querying.

Data consistency

  • CDC provides high-fidelity replication of database changes.
  • For message queues, understand the delivery guarantees (e.g., at-least-once) of your system.
  • Use transactions for atomic operations when inserting data manually.
  • Monitor data quality and set up alerts.

Monitoring and operations

  • Monitor streaming lag for real-time sources to ensure data freshness.
  • Track batch job success and failure rates.
  • Set up alerts for data quality issues.
  • Use RisingWave’s system tables and dashboards for monitoring.
I