RisingWave offers several methods for data ingestion, each tailored to different use cases. This guide covers the main patterns to help you choose the best approach for your needs. For a detailed comparison of core objects like Source, Table, Materialized View, and Sink, see our guide on Source, Table, MV, and Sink.

Continuous streaming ingestion

What it is: Real-time, continuous data ingestion from streaming sources that automatically updates as new data arrives. When to use: For real-time analytics, event-driven applications, live dashboards, and when you need immediate data freshness.

Example: Kafka streaming ingestion

-- Create a streaming table that continuously ingests from Kafka
CREATE TABLE user_events (
  user_id VARCHAR,
  event_type VARCHAR,
  timestamp TIMESTAMP,
  data JSONB
) WITH (
  connector = 'kafka',
  topic = 'user-events',
  properties.bootstrap.server = 'localhost:9092',
  scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Create a materialized view for real-time analytics
CREATE MATERIALIZED VIEW user_activity_summary AS
SELECT
  user_id,
  COUNT(*) as event_count,
  MAX(timestamp) as last_activity
FROM user_events
GROUP BY user_id;

Example: Database CDC (Change Data Capture)

-- Continuously ingest changes from PostgreSQL
CREATE TABLE orders_cdc (
  order_id INT,
  customer_id INT,
  amount DECIMAL,
  status VARCHAR,
  created_at TIMESTAMP
) WITH (
  connector = 'postgres-cdc',
  hostname = 'localhost',
  port = '5432',
  username = 'postgres',
  password = 'password',
  database.name = 'ecommerce',
  schema.name = 'public',
  table.name = 'orders'
);

Example: Message queues (MQTT, NATS, Pulsar)

-- Ingest from MQTT broker
CREATE TABLE iot_data (
  device_id VARCHAR,
  temperature DECIMAL,
  humidity DECIMAL,
  timestamp TIMESTAMP
) WITH (
  connector = 'mqtt',
  url = 'tcp://localhost:1883',
  topic = 'sensors/+/data',
  qos = 'at_least_once'
) FORMAT PLAIN ENCODE JSON;

One-time batch ingestion

What it is: Loading data once from external sources like databases, data lakes, or files. When to use: For initial data loads, historical data import, or when you need to load static datasets.

Example: Batch load from a database

The postgres-cdc connector can be used to perform a one-time snapshot of a PostgreSQL table. For other databases, such as MySQL, you can use the corresponding CDC connector and set snapshot.mode to initial_only.
-- Create a table that takes a snapshot of a PostgreSQL table.
CREATE TABLE historical_sales (
  sale_id INT,
  product_id INT,
  quantity INT,
  sale_date DATE,
  amount DECIMAL
) WITH (
  connector = 'postgres-cdc',
  hostname = 'localhost',
  port = '5432',
  username = 'postgres',
  password = 'password',
  database.name = 'analytics',
  schema.name = 'public',
  table.name = 'sales_history',
  snapshot.mode = 'initial_only' -- Perform a snapshot and then stop.
);

Example: Load from cloud storage (S3, GCS, Azure)

-- Create a source from S3 files
CREATE SOURCE s3_logs (
  log_time timestamptz,
  user_id VARCHAR,
  action VARCHAR
) WITH (
  connector = 's3',
  s3.region_name = 'us-west-2',
  s3.bucket_name = 'my-logs-bucket',
  s3.path = 'logs/2024/'
) FORMAT PLAIN ENCODE CSV;

-- Create a table and load data once
CREATE TABLE processed_logs AS
SELECT * FROM s3_logs;

Example: Load from a data lake (Iceberg)

-- Create a source from an existing Iceberg table
CREATE SOURCE iceberg_data WITH (
  connector = 'iceberg',
  warehouse.path = 's3://my-data-lake/warehouse',
  database.name = 'analytics',
  table.name = 'customer_data',
  catalog.type = 'glue'
);

-- Create a table and load data once
CREATE TABLE customers AS
SELECT * FROM iceberg_data;

Periodic ingestion with external orchestration

What it is: RisingWave doesn’t have a built-in scheduler, but you can achieve periodic ingestion using external orchestration tools like Cron or Airflow. When to use: For scheduled data updates, daily/hourly batch processing, or when you need precise control over ingestion timing.

Example: Incremental loading pattern

A common pattern is to use a control table to track the last load time and only ingest new data.
-- A control table to track the last load time for each source
CREATE TABLE load_control (
  table_name VARCHAR PRIMARY KEY,
  last_load_timestamp TIMESTAMP
);

-- Your incremental load script, to be run periodically by an external scheduler
INSERT INTO target_table
SELECT * FROM source_table
WHERE updated_at > (
  SELECT COALESCE(last_load_timestamp, '1970-01-01')
  FROM load_control
  WHERE table_name = 'target_table'
);

-- Update the control table after each successful load
INSERT INTO load_control (table_name, last_load_timestamp)
VALUES ('target_table', NOW())
ON CONFLICT (table_name)
DO UPDATE SET last_load_timestamp = NOW();

Other ingestion methods

Direct data insertion

You can always insert data directly into a standard table using the INSERT statement.
-- Create a standard table
CREATE TABLE transactions (
  id INT,
  amount DECIMAL,
  timestamp TIMESTAMP
);

-- Manually insert data
INSERT INTO transactions VALUES
(1, 100.50, '2024-01-01 10:00:00'),
(2, 250.75, '2024-01-01 10:05:00');

Test data generation

For development and testing, you can use the built-in datagen connector to generate mock data streams.
-- Generate a stream of test data
CREATE TABLE test_users (
  id INT,
  name VARCHAR,
  email VARCHAR,
  created_at TIMESTAMP
) WITH (
  connector = 'datagen',
  fields.id.kind = 'sequence',
  fields.id.start = '1',
  fields.name.kind = 'random',
  fields.name.length = '10',
  fields.email.kind = 'random',
  fields.email.length = '15',
  datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;

Ingestion method support matrix

Data SourceContinuous StreamingOne-Time BatchPeriodicNotes
Apache Kafka⚠️Streaming only; periodic via external tools
Redpanda⚠️Streaming only; periodic via external tools
Apache Pulsar⚠️Streaming only; periodic via external tools
AWS Kinesis⚠️Streaming only; periodic via external tools
Google Pub/Sub⚠️Streaming only; periodic via external tools
NATS JetStream⚠️Streaming only; periodic via external tools
MQTT⚠️Streaming only; periodic via external tools
PostgreSQL CDC⚠️CDC for streaming; direct connection for batch
MySQL CDC⚠️CDC for streaming; direct connection for batch
SQL Server CDC⚠️CDC for streaming; direct connection for batch
MongoDB CDC⚠️CDC for streaming; direct connection for batch
AWS S3⚠️Batch only; periodic via external tools
Google Cloud Storage⚠️Batch only; periodic via external tools
Azure Blob⚠️Batch only; periodic via external tools
Apache Iceberg⚠️Batch only; periodic via external tools
DatagenTest data generation only
Direct INSERT⚠️Manual insertion; periodic via external tools
Legend:
  • Natively Supported: Built-in support for this ingestion method.
  • Not Supported: This ingestion method is not available for this source.
  • ⚠️ External Tools Required: Requires external orchestration tools (e.g., Cron, Airflow).

Best practices

Choose the right method

  • Streaming: Use for real-time requirements and continuous data flows.
  • Batch: Use for historical data, large one-time loads, or static datasets.
  • Periodic: Use for scheduled updates with external orchestration tools.

Performance considerations

  • Streaming ingestion offers the best real-time performance.
  • Batch loading is efficient for large datasets.
  • Use materialized views to pre-compute and store results for fast querying.

Data consistency

  • CDC provides high-fidelity replication of database changes.
  • For message queues, understand the delivery guarantees (e.g., at-least-once) of your system.
  • Use transactions for atomic operations when inserting data manually.
  • Monitor data quality and set up alerts.

Monitoring and operations

  • Monitor streaming lag for real-time sources to ensure data freshness.
  • Track batch job success and failure rates.
  • Set up alerts for data quality issues.
  • Use RisingWave’s system tables and dashboards for monitoring.