Ingest CDC data from MySQL
Change Data Capture (CDC) refers to the process of identifying and capturing data changes in a database, and then delivering the changes to a downstream service in real time.
RisingWave supports ingesting row-level data (INSERT
, UPDATE
, and DELETE
operations) from the changes of a MySQL database. The supported MySQL versions are 5.7 and 8.0.x.
You can ingest CDC data from MySQL in two ways:
-
Using the native MySQL CDC connector in RisingWave
With this connector, RisingWave can connect to MySQL databases directly to obtain data from the binlog without starting additional services.
-
Using a CDC tool and a message broker
You can use a CDC tool and then use the Kafka, Pulsar, or Kinesis connector to send the CDC data to RisingWave.
This topic describes how to ingest MySQL CDC data into RisingWave using the native MySQL CDC connector. Using an external CDC tool and a message broker is introduced in Create source via event streaming systems.
Set up MySQL
Before using the native MySQL CDC connector in RisingWave, you need to complete several configurations on MySQL.
- Self-hosted
- AWS RDS MySQL and Aurora (MySQL-Compatible)
To use the MySQL CDC features, we need to create a MySQL user account with appropriate privileges on all databases for which RisingWave will read from.
Create a user and grant privileges
- Create a MySQL user with the following query.
CREATE USER 'user'@'%' IDENTIFIED BY 'password';
- Grant the appropriate privileges to the user.
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'@'%';
- Finalize the privileges.
FLUSH PRIVILEGES;
Enable the binlog
The binlog must be enabled for MySQL replication. The binary logs record transaction updates for replication tools to propagate changes.
- Check if the
log-bin
is already on.
SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
- If it is
OFF
, configure your MySQL server configuration file, my.cnf, with the following properties described below. Restart your MySQL server to let the configurations take effect.
server-id = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
- Confirm your changes by checking the
log-bin
again.
SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
See Setting up MySQL for more details.
The configuration process is different for AWS RDS MySQL or Aurora (MySQL-Compatible) compared to the self-hosted version. We will use a standard class AWS RDS MySQL instance without Multi-AZ deployment for illustration, but the process will be similar for Aurora.
-
Turn on binary logging and choose a non-zero value for the Retention period.
-
Create a parameter group for MySQL instances. We created a parameter group named MySQL-CDC for the instance that runs MySQL 5.7.x.
-
Click the MySQL-CDC parameter group to edit the values of binlog_format to ROW and binlog_row_image to full.
-
Modify your RDS instance and apply the modified parameter group to your database.
-
Click Continue and choose Apply immediately. Finally, click Modify DB instance to save the changes. Remember to reboot your MySQL instance.
-
Ensure your MySQL users can access the tables and replications.
Notes about running RisingWave from binaries
If you are running RisingWave locally from binaries and intend to use the native CDC source connectors or the JDBC sink connector, make sure that you have JDK 11 or later versions installed in your environment.
Create a table using the native CDC connector in RisingWave
To ensure all data changes are captured, you must create a table and specify primary keys. See the CREATE TABLE
command for more details.
Syntax
Syntax for creating a CDC source.
CREATE SOURCE [ IF NOT EXISTS ] source_name WITH (
connector='mysql-cdc',
<field>=<value>, ...
);
Syntax for creating a CDC table. Note that a primary key is required and must be consistent with the upstream table.
CREATE TABLE [ IF NOT EXISTS ] table_name (
column_name data_type PRIMARY KEY , ...
PRIMARY KEY ( column_name, ... )
)
[ INCLUDE timestamp AS column_name ]
WITH (
snapshot='true'
)
FROM source TABLE table_name;
Connector parameters
All the fields listed below are required. Note that the value of these parameters should be enclosed in single quotation marks.
Field | Notes |
---|---|
hostname | Hostname of the database. |
port | Port number of the database. |
username | Username of the database. |
password | Password of the database. |
database.name | Name of the database. Note that RisingWave cannot read data from a built-in MySQL database, such as mysql , sys , etc. |
table.name | Name of the table that you want to ingest data from. |
server.id | Required if creating a shared source. A numeric ID of the database client. It must be unique across all database processes that are running in the MySQL cluster. If not specified, RisingWave will generate a random ID. |
ssl.mode | Optional. The ssl.mode parameter determines the level of SSL/TLS encryption for secure communication with MySQL. It accepts three values: disabled , preferred , and required . The default value is disabled . When set to required , it enforces TLS for establishing a connection. |
transactional | Optional. Specify whether you want to enable transactions for the CDC table that you are about to create. By default, the value is 'true' for shared sources, and 'false' otherwise. This feature is also supported for shared CDC sources for multi-table transactions. For performance considerations, transactions involving changes to more than 4096 rows cannot be guaranteed. |
The following fields are used when creating a CDC table.
Field | Notes |
---|---|
snapshot | Optional. If false , CDC backfill will be disabled and only upstream events that have occurred after the creation of the table will be consumed. This option can only be applied for tables created from a shared source. |
snapshot.interval | Optional. Specifies the barrier interval for buffering upstream events. The default value is 1 . |
snapshot.batch_size | Optional. Specifies the batch size of a snapshot read query from the upstream table. The default value is 1000 . |
Regarding the INCLUDE timestamp AS column_name
clause, it allows you to ingest the upstream commit timestamp. For historical data, the commit timestamp will be set to 1970-01-01 00:00:00+00:00
. Here is an example:
CREATE TABLE mytable (v1 int PRIMARY KEY, v2 varchar)
INCLUDE timestamp AS commit_ts
FROM pg_source TABLE 'public.mytable';
SELECT * FROM t2 ORDER BY v1;
----RESULT
v1 | v2 | commit_ts
----+----+---------------------------
1 | aa | 1970-01-01 00:00:00+00:00
2 | bb | 1970-01-01 00:00:00+00:00
3 | cc | 2024-05-20 09:01:08+00:00
4 | dd | 2024-05-20 09:01:08+00:00
You can see the INCLUDE clause for more details.
Debezium parameters
Debezium v2.6 connector configuration properties can also be specified under the WITH
clause when creating a table or shared source. Add the prefix debezium.
to the connector property you want to include.
For instance, to skip unknown DDL statements, specify the schema.history.internal.skip.unparseable.ddl
parameter as debezium.schema.history.internal.skip.unparseable.ddl
.
CREATE SOURCE mysql_mydb WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888,
debezium.schema.history.internal.skip.unparseable.ddl = 'true'
);
Data format
Data is in Debezium JSON format. Debezium is a log-based CDC tool that can capture row changes from various database management systems such as PostgreSQL, MySQL, and SQL Server and generate events with consistent structures in real time. The MySQL CDC connector in RisingWave supports JSON as the serialization format for Debezium data. The data format does not need to be specified when creating a table with mysql-cdc
as the source.
Metadata options
Below are the metadata columns available for MySQL CDC.
Field | Notes |
---|---|
database_name | Name of the database. |
schema_name | Name of the schema. |
table_name | Name of the table. |
For instance, the person table below contains columns for typical personal information. It also includes metadata fields (database_name
, schema_name
, table_name
) to provide contextual information about where the data resides within the MySQL database.
CREATE TABLE person (
id int,
name varchar,
email_address varchar,
credit_card varchar,
city varchar,
PRIMARY KEY (id)
) INCLUDE TIMESTAMP AS commit_ts
INCLUDE DATABASE_NAME as database_name
INCLUDE SCHEMA_NAME as schema_name
INCLUDE TABLE_NAME as table_name
FROM mysql_source TABLE 'public.person';
Examples
Connect to the upstream database by creating a CDC source using the CREATE SOURCE
command and MySQL CDC parameters. The data format is fixed as FORMAT PLAIN ENCODE JSON
so it does not need to be specified.
CREATE SOURCE mysql_mydb WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
With the source created, you can create multiple CDC tables that ingest data from different tables in the upstream database without needing to specify the database connection parameters again.
For instance, the following CDC table in RisingWave ingests data from table t1
in the database mydb
. When specifying the MySQL table name in the FROM
clause after the keyword TABLE
, the database name must also be specified.
CREATE TABLE t1_rw (
v1 int,
v2 int,
PRIMARY KEY(v1)
) FROM mysql_mydb TABLE 'mydb.t1';
You can also create another CDC table in RisingWave that ingests data from table t3
in the same database mydb
.
CREATE TABLE t3_rw (
v1 INTEGER,
v2 timestamptz,
PRIMARY KEY (v1)
) FROM mysql_mydb TABLE 'mydb.t3';
To check the progress of backfilling historical data, find the corresponding internal table using the SHOW INTERNAL TABLES
command and query from it. For instance, the following SQL query shows the progress of a CDC table named orders_rw
.
SELECT * FROM __internal_orders_rw_4002_streamcdcscan_5002;
-[ RECORD 1 ]-----+---------------------------------------------------------------
split_id | 5001
o_orderkey | 4024320
backfill_finished | f
row_count | 1006080
cdc_offset | {"MySql": {"filename": "binlog.000005", "position": 60946679}}
Data type mapping
The following table shows the corresponding data type in RisingWave that should be specified when creating a source. For details on native RisingWave data types, see Overview of data types.
RisingWave data types marked with an asterisk indicate that while there is no corresponding RisingWave data type, the ingested data can still be consumed as the listed type.
MySQL type | RisingWave type |
---|---|
BOOLEAN, BOOL | BOOLEAN |
BIT(1) | BOOLEAN* |
BIT(>1) | No support |
TINYINT | SMALLINT |
SMALLINT[(M)] | SMALLINT |
MEDIUMINT[(M)] | INTEGER |
INT, INTEGER[(M)] | INTEGER |
BIGINT[(M)] | BIGINT |
REAL[(M,D)] | REAL |
FLOAT[(P)] | REAL |
FLOAT(M,D) | DOUBLE PRECISION |
DOUBLE[(M,D)] | DOUBLE PRECISION |
CHAR[(M)] | CHARACTER VARYING |
VARCHAR[(M)] | CHARACTER VARYING |
BINARY[(M)] | BYTEA |
VARBINARY[(M)] | BYTEA |
TINYBLOB | BYTEA |
TINYTEXT | CHARACTER VARYING |
BLOB | BYTEA |
TEXT | CHARACTER VARYING |
MEDIUMBLOB | BYTEA |
MEDIUMTEXT | CHARACTER VARYING |
LONGBLOB | BYTEA |
LONGTEXT | BYTEA or CHARACTER VARYING |
JSON | JSONB |
ENUM | CHARACTER VARYING* |
SET | No support |
YEAR[(2|4)] | INTEGER |
TIMESTAMP[(M)] | TIMESTAMPTZ |
DATE | DATE |
TIME[(M)] | TIME |
DATETIME[(fsp)] Optional fractional seconds precision (fsp: 0-6). If omitted, the default precision is 0. | TIMESTAMP |
NUMERIC[(M[,D])] | NUMERIC |
DECIMAL[(M[,D])] | NUMERIC |
GEOMETRY, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION | STRUCT |
Please be aware that the range of specific values varies among MySQL types and RisingWave types. Refer to the table below for detailed information.
MySQL type | RisingWave type | MySQL range | RisingWave range |
---|---|---|---|
TIME | TIME | -838:59:59.000000 to 838:59:59.000000 | 00:00:00 to 23:59:59 |
DATE | DATE | 1000-01-01 to 9999-12-31 | 0001-01-01 to 9999-12-31 |
DATETIME | TIMESTAMP | 1000-01-01 00:00:00.000000 to 9999-12-31 23:59:59.49999 | 1973-03-03 09:46:40 to 5138-11-16 09:46:40 |
TIMESTAMP | TIMESTAMPTZ | 1970-01-01 00:00:01.000000 to 2038-01-19 03:14:07.499999 | 0001-01-01 00:00:00 to 9999-12-31 23:59:59 |
Use dbt to ingest data from MySQL CDC
Here is an example of how to use dbt to ingest data from MySQL CDC. In this dbt example, source
and table_with_connector
models will be used. For more details about these two models, please refer to Use dbt for data transformations.
First, we create a source
model mysql_mydb.sql
.
{{ config(materialized='source') }}
CREATE SOURCE {{ this }} WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
And then we create a table_with_connector
model t1_rw.sql
.
{{ config(materialized='table_with_connector') }}
CREATE TABLE {{ this }} (
v1 int,
v2 int,
PRIMARY KEY(v1)
) FROM {{ ref('mysql_mydb') }} TABLE 'mydb.t1';
Automatically map upstream table schema
This feature is only available in the premium edition of RisingWave. The premium edition offers additional advanced features and capabilities beyond the free and community editions. If you have any questions about upgrading to the premium edition, please contact our sales team at sales@risingwave-labs.com.
This feature is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.
RisingWave supports automatically mapping the upstream table schema when creating a CDC table from a MySQL CDC source. Instead of defining columns individually, you can use *
when creating a table to ingest all columns from the source table. Note that *
cannot be used if other columns are specified in the table creation process.
Below is an example to create a table that ingests all columns from the upstream table from the MySQL database:
CREATE TABLE supplier (*) FROM mysql_source TABLE 'public.supplier';
And this it the output of DESCRIBE supplier;
Name | Type | Is Hidden | Description
-------------------+-------------------+-----------+-------------
s_suppkey | bigint | false |
s_name | character varying | false |
s_address | character varying | false |
s_nationkey | bigint | false |
s_phone | character varying | false |
s_acctbal | numeric | false |
s_comment | character varying | false |
primary key | s_suppkey | |
distribution key | s_suppkey | |
table description | supplier | |
(10 rows)
Automatically change schema
This feature is only available in the premium edition of RisingWave. The premium edition offers additional advanced features and capabilities beyond the free and community editions. If you have any questions about upgrading to the premium edition, please contact our sales team at sales@risingwave-labs.com.
This feature is in the public preview stage, meaning it's nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.
RisingWave supports auto schema changes in MySQL CDC. It ensures that your RisingWave pipeline stays synchronized with any schema changes in the source database, reducing the need for manual updates and preventing inconsistencies.
Currently, RisingWave supports the ALTER TABLE
command with the following operations, and we plan to add support for additional DDL operations in future releases.
-
ADD COLUMN [DEFAULT expr]
: Allows you to add a new column to an existing table. Only constant value expressions are supported for the default value. -
DROP COLUMN
: Allows you to remove an existing column from a table.
To enable this feature, set auto.schema.change = 'true'
in your MySQL CDC source configuration:
CREATE SOURCE mysql_source WITH (
connector = 'mysql-cdc',
hostname = 'localhost',
port = '3306',
username = 'root',
password = 'your_password',
database.name = 'mytest',
server.id = '5701',
auto.schema.change = 'true'
);
Create a RisingWave table from the MySQL source:
CREATE TABLE rw_customers (
id BIGINT,
modified TIMESTAMP,
custinfo JSONB,
PRIMARY KEY (id)
) FROM mysql_source TABLE 'mytest.customers';
Add columns to the MySQL table and observe the changes in RisingWave:
-- In MySQL:
USE mytest;
ALTER TABLE customers ADD COLUMN v1 VARCHAR(255);
ALTER TABLE customers ADD COLUMN v2 DOUBLE(5,2);
-- In RisingWave (after a brief pause):
DESCRIBE rw_customers;
And this it the output of DESCRIBE rw_customers;
Name | Type | Is Hidden | Description
----------------------------+--------------------------------+-----------+-------------
id | bigint | false | NULL
modified | timestamp without time zone | false | NULL
custinfo | jsonb | false | NULL
v1 | character varying | false | NULL
v2 | double precision | false | NULL
primary key id | NULL | NULL | NULL
distribution key id | NULL | NULL | NULL
table description rw_customers | NULL | NULL | NULL
Monitor the progress of direct CDC
To observe the progress of direct CDC for MySQL, use the following methods:
For historical data
Historical data needs to be backfilled into the table. You can check the internal state of the backfill executor as follows:
-
Create a table to backfill historical data:
CREATE TABLE t3 (id INTEGER, v1 TIMESTAMP WITH TIME ZONE, PRIMARY KEY(id)) FROM mysql_source TABLE 'mydb.t3';
-
List the internal tables to find the relevant backfill executor state:
SHOW INTERNAL TABLES;
Output:
Name
---------------------------------
__internal_t3_3_streamcdcscan_4
__internal_mysql_source_1_source_2
(2 rows) -
Check the internal state of the backfill executor:
SELECT * FROM __internal_t3_3_streamcdcscan_4;
Output:
split_id | id | backfill_finished | row_count | cdc_offset
----------+----+-------------------+-----------+--------------------------------------------------
3 | 5 | t | 4 | {"MySql": {"filename": "binlog.000067", "position": 39101}}
(1 row)
For real-time data
RisingWave stores source offset in the internal state table of source executor. You can check the current consumed offset by checking this table and comparing it with the upstream database's log offset.
To get the current binlog offset, run this SQL query on upstream MySQL (earlier than 8.4.0):
SHOW MASTER STATUS;
Then compare the above offset with source offset stored in the state table to determine the CDC progress.