Syntax

ALTER SOURCE current_source_name
    alter_option;

alteroption depends on the operation you want to perform on the source. For all supported clauses, see the sections below.

Clause

ADD COLUMN

ALTER SOURCE source_name
    ADD COLUMN col_name data_type;
Parameter or clauseDescription
ADD COLUMNThis clause adds a column to the specified source.
col_nameThe name of the new column you want to add to the source.
data_typeThe data type of the newly added column. With the struct data type, you can create a nested table. Elements in a nested table need to be enclosed with angle brackets (<>).
-- Add a column named "v3" to a source named "src1"
ALTER SOURCE src1
    ADD COLUMN v3 int;

NOTE

  • To alter columns in a source created with a schema registry, see FORMAT and ENCODE options.
  • You cannot add a primary key column to a source or table in RisingWave. To modify the primary key of a source or table, you need to recreate the table.
  • You cannot remove a column from a source in RisingWave. If you intend to remove a column from a source, you’ll need to drop the source and create the source again.

RENAME TO

ALTER SOURCE source_name
    RENAME TO new_source_name;
Parameter or clauseDescription
RENAME TOThis clause changes the name of the source.
new_source_nameThe new name of the source.
-- Change the name of a source named "src" to "src1"
ALTER SOURCE src
   RENAME TO src1;

OWNER TO

ALTER SOURCE current_source_name
    OWNER TO new_user;
Parameter or clauseDescription
OWNER TOThis clause changes the owner of the source.
new_userThe new owner you want to assign to the source.
-- Change the owner of the source named "src" to user "user1"
ALTER SOURCE src OWNER TO user1;

SET SCHEMA

ALTER SOURCE current_source_name
    SET SCHEMA schema_name;
Parameter or clauseDescription
SET SCHEMAThis clause moves the source to a different schema.
schema_nameThe name of the schema to which the source will be moved.
-- Move the source named "test_source" to the schema named "test_schema"
ALTER SOURCE test_source SET SCHEMA test_schema;

FORMAT and ENCODE options

At present, combined with the ALTER SOURCE command, you can refresh the schema registry of a source by refilling the FORMAT and ENCODE options. For more details about these options, see FORMAT and ENCODE parameters.

ALTER SOURCE source_name FORMAT data_format ENCODE data_encode [ (
    message='message',
    schema.location='location', ...) ];

Here is an example. Let’s assume the original FORMAT and ENCODE options are as follows:

-- Create a source.
CREATE SOURCE src_user WITH (
    connector = 'kafka',
    topic = 'sr_pb_test',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
    schema.registry = 'http://message_queue:8081',
    message = 'test.User');

Then you can refresh the schema registry by the following command:

ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
    schema.registry = 'http://message_queue:8081',
    message = 'test.UserWithMoreFields'
);

NOTE

Currently, it is not supported to modify the data_format and data_encode. Furthermore, when refreshing the schema registry of a source, it is not allowed to drop columns or change types.

Another way of refreshing the schema is using the REFRESH SCHEMA clause.

REFRESH SCHEMA

This is another way of refreshing the schema of sources when the FORMAT and ENCODE options are not changed.

ALTER SOURCE source_name REFRESH SCHEMA;

For example, assume we have a source as follows:

Create a source
CREATE SOURCE src_user WITH (
    connector = 'kafka',
    topic = 'sr_pb_test',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
    schema.registry = 'http://message_queue:8081',
    message = 'test.User'
);

Then we can refresh its schema with the following statement:

Refresh schema
ALTER SOURCE src_user REFRESH SCHEMA;

For more details about this example, see our test file.

SET SOURCE_RATE_LIMIT

ALTER SOURCE source_name
    SET SOURCE_RATE_LIMIT { TO | = } { default | rate_limit_number };

Use this statement to modify the rate limit of a source. For the specific value of SOURCE_RATE_LIMIT, refer to How to view runtime parameters.

Example
-- Alter the rate limit of a source to default
ALTER SOURCE kafka_source SET source_rate_limit TO default;
Example
-- Alter the rate limit of a source to 1000
ALTER SOURCE kafka_source SET source_rate_limit TO 1000;