Skip to main content
This guide covers two connector options for sinking data to SQL Server:
  • JDBC connector (connector='jdbc') - Recommended for better performance
  • SQL Server native connector (connector='sqlserver')
You can test out this process on your own device by using the sqlserver_sink.slt demo in the e2e_test directory of the RisingWave repository.
PREMIUM FEATUREThis is a premium feature. For a comprehensive overview of all premium features and their usage, please see RisingWave premium features.

Prerequisites

Before sinking data from RisingWave to SQL Server, please ensure the following:
  • The SQL Server table you want to sink to is accessible from RisingWave.
  • You have an upstream materialized view or table in RisingWave that you can sink data from.
  • If you are running RisingWave locally from binaries and intend to use the JDBC sink connector, make sure you have JDK 11 or later versions installed in your environment.

Create a sink

RisingWave supports two connector types for SQL Server sinks. The JDBC connector is recommended for better performance.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='jdbc' | 'sqlserver',
   connector_parameter = 'value', ...
);

Parameters (JDBC connector)

Parameter NamesDescription
connectorRequired. Sink connector type. Use jdbc for the JDBC connector.
jdbc.urlRequired. The JDBC URL of the destination database necessary for the driver to recognize and connect to the database.
table.nameRequired. The SQL Server table you want to sink to.
schema.nameOptional. The SQL Server schema name. If not specified, the default schema will be used.
typeRequired. Allowed values: append-only and upsert.
force_append_onlyOptional. If true, forces the sink to be append-only, even if it cannot be.
primary_keyConditional. The primary keys of the sink. Use ’,’ to delimit the primary key columns. Primary keys are required for upsert sinks.
jdbc.query.timeoutOptional. Specifies the timeout for the operations to downstream. If not set, the default is 60s.
jdbc.auto.commitOptional. Controls whether to automatically commit transactions for JDBC sink. If not set, the default is false.

Parameters (SQL Server native connector)

Parameter NamesDescription
connectorRequired. Sink connector type. Use sqlserver for the native SQL Server connector.
typeRequired. Allowed values: append-only and upsert.
force_append_onlyOptional. If true, forces the sink to be append-only, even if it cannot be.
primary_keyConditional. The primary keys of the sink. Use ’,’ to delimit the primary key columns. Primary keys are required for upsert sinks.
sqlserver.hostRequired. The SQL Server host.
sqlserver.portRequired. The SQL Server port.
sqlserver.userRequired. The user for SQL Server access.
sqlserver.passwordRequired. The password for SQL Server access.
sqlserver.databaseRequired. The SQL Server database you want to sink to.
sqlserver.tableRequired. The SQL Server table you want to sink to.

Examples

jdbc as connector
CREATE SINK s_many_data_type from t_many_data_type_rw WITH (
  connector = 'jdbc',
  type = 'upsert',
  jdbc.url='jdbc:sqlserver://sqlserver-server:1433;databaseName=SinkTest;user=SA;password=SomeTestOnly@SA;trustServerCertificate=true',
  primary_key = 'k1,k2',
  schema.name='test_schema',
  table.name = 't_many_data_type',
);
sqlserver as connector
CREATE SINK s_many_data_type from t_many_data_type_rw WITH (
  connector = 'sqlserver',
  type = 'upsert',
  sqlserver.host = 'sqlserver-server',
  sqlserver.port = 1433,
  sqlserver.user = 'SA',
  sqlserver.password = 'SomeTestOnly@SA',
  sqlserver.database = 'SinkTest',
  sqlserver.schema = 'test_schema',
  sqlserver.table = 't_many_data_type',
  primary_key = 'k1,k2',
);

Data type mapping

The following table shows the corresponding data types between RisingWave and SQL Server that should be specified when creating a sink. For details on native RisingWave data types, see Overview of data types.
SQL Server typeRisingWave type
bitboolean
smallintsmallint
intinteger
bigintbigint
float(24)real
float(53)double
decimaldecimal
datedate
nvarcharvarchar
timetime
datetime2time without time zone
biginttimestamp without time zone
No supportinterval
No supportstruct
No supportarray
varbinarybytea
No supportjsonb
No supportserial