Change Data Capture (CDC) refers to the process of identifying and capturing data changes in a database, and then delivering the changes to a downstream service in real time.
INSERT
, UPDATE
, and DELETE
operations) from the changes of a MySQL database. The supported MySQL versions are 5.7 and 8.0.x.
You can ingest CDC data from MySQL in two ways:
Field | Notes |
---|---|
hostname | Hostname of the database. |
port | Port number of the database. |
username | Username of the database. |
password | Password of the database. |
database.name | Name of the database. Note that RisingWave cannot read data from a built-in MySQL database, such as mysql, sys, etc. |
table.name | Name of the table that you want to ingest data from. |
server.id | Required if creating a shared source. A numeric ID of the database client. It must be unique across all database processes that are running in the MySQL cluster. If not specified, RisingWave will generate a random ID. |
auto.schema.change | Optional. Specify whether you want to enable replicating MySQL table schema change. Set auto.schema.change = 'true' to enable it. |
ssl.mode | Optional. The ssl.mode parameter determines the level of SSL/TLS encryption for secure communication with MySQL. Accepted values are disabled, preferred, and required. The default value is disabled. When set to required, it enforces TLS for establishing a connection. |
transactional | Optional. Specify whether you want to enable transactions for the CDC table that you are about to create. By default, the value is ‘true’ for shared sources, and ‘false’ otherwise. This feature is also supported for shared CDC sources for multi-table transactions. For performance considerations, transactions involving changes to more than 4096 rows cannot be guaranteed. |
Field | Notes |
---|---|
snapshot | Optional. If false, CDC backfill will be disabled and only upstream events that have occurred after the creation of the table will be consumed. This option can only be applied for tables created from a shared source. |
snapshot.interval | Optional. Specifies the barrier interval for buffering upstream events. The default value is 1. |
snapshot.batch_size | Optional. Specifies the batch size of a snapshot read query from the upstream table. The default value is 1000. |
INCLUDE timestamp AS column_name
clause, it allows you to ingest the upstream commit timestamp. For historical data, the commit timestamp will be set to 1970-01-01 00:00:00+00:00
. Here is an example:
WITH
clause when creating a table or shared source. Add the prefix debezium.
to the connector property you want to include.
For instance, to skip unknown DDL statements, specify the schema.history.internal.skip.unparseable.ddl
parameter as debezium.schema.history.internal.skip.unparseable.ddl
.
mysql-cdc
as the source.
Field | Notes |
---|---|
database_name | Name of the database. |
table_name | Name of the table. |
database_name
, table_name
) to provide contextual information about where the data resides within the MySQL database.
FORMAT PLAIN ENCODE JSON
so it does not need to be specified.
t1
in the database mydb
. When specifying the MySQL table name in the FROM
clause after the keyword TABLE
, the database name must also be specified.
t3
in the same database mydb
.
orders_rw
.
MySQL type | RisingWave type |
---|---|
BOOLEAN, BOOL | BOOLEAN |
BIT(1) | BOOLEAN* |
BIT(>1) | No support |
TINYINT | SMALLINT |
SMALLINT[(M)] | SMALLINT |
MEDIUMINT[(M)] | INTEGER |
INT, INTEGER[(M)] | INTEGER |
BIGINT[(M)] | BIGINT |
REAL[(M,D)] | REAL |
FLOAT[(P)] | REAL |
FLOAT(M,D) | DOUBLE PRECISION |
DOUBLE[(M,D)] | DOUBLE PRECISION |
CHAR[(M)] | CHARACTER VARYING |
VARCHAR[(M)] | CHARACTER VARYING |
BINARY[(M)] | BYTEA |
VARBINARY[(M)] | BYTEA |
TINYBLOB | BYTEA |
TINYTEXT | CHARACTER VARYING |
BLOB | BYTEA |
TEXT | CHARACTER VARYING |
MEDIUMBLOB | BYTEA |
MEDIUMTEXT | CHARACTER VARYING |
LONGBLOB | BYTEA |
LONGTEXT | BYTEA or CHARACTER VARYING |
JSON | JSONB |
ENUM | CHARACTER VARYING* |
SET | No support |
YEAR[(2|4)] | INTEGER |
TIMESTAMP[(M)] | TIMESTAMPTZ |
DATE | DATE |
TIME[(M)] | TIME |
DATETIME[(fsp)] Optional fractional seconds precision (fsp: 0-6). If omitted, the default precision is 0. | TIMESTAMP |
NUMERIC[(M[,D])] | NUMERIC |
DECIMAL[(M[,D])] | NUMERIC |
GEOMETRY, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION | Not supported |
MySQL type | RisingWave type | MySQL range | RisingWave range |
---|---|---|---|
TIME | TIME | -838:59:59.000000 to 838:59:59.000000 | 00:00:00 to 23:59:59 |
DATE | DATE | 1000-01-01 to 9999-12-31 | 0001-01-01 to 9999-12-31 |
DATETIME | TIMESTAMP | 1000-01-01 00:00:00.000000 to 9999-12-31 23:59:59.49999 | 1973-03-03 09:46:40 to 5138-11-16 09:46:40 |
TIMESTAMP | TIMESTAMPTZ | 1970-01-01 00:00:01.000000 to 2038-01-19 03:14:07.499999 | 0001-01-01 00:00:00 to 9999-12-31 23:59:59 |
source
and table_with_connector
models will be used. For more details about these two models, please refer to Use dbt for data transformations.
First, we create a source
model mysql_mydb.sql
.
table_with_connector
model t1_rw.sql
.
*
when creating a table to ingest all columns from the source table. Note that *
cannot be used if other columns are specified in the table creation process.
Below is an example to create a table that ingests all columns from the upstream table from the MySQL database:
DESCRIBE supplier;
ALTER TABLE
command with the following operations, and we plan to add support for additional DDL operations in future releases.
ADD COLUMN [DEFAULT expr]
: Allows you to add a new column to an existing table. Only constant value expressions are supported for the default value.DROP COLUMN
: Allows you to remove an existing column from a table.auto.schema.change = 'true'
in your MySQL CDC source configuration:
DESCRIBE rw_customers;
next_id
is not a column from the source MySQL table. Instead, it is a generated column that RisingWave computes dynamically while ingesting data. The value of next_id
for each row is always equal to id + 1
:
name
, an upstream column, is placed after the generated column next_id
: