Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='google_pubsub',
   connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
    key = 'value'
) ]
[KEY ENCODE key_encode [(...)]]
;

Basic parameter

ParameterDescription
pubsub.project_idRequired. The Pub/Sub Project ID.
pubsub.topicRequired. The Pub/Sub topic to publish messages.
pubsub.endpointRequired. The Pub/Sub endpoint URL.
pubsub.emulator_hostOptional. The Pub/Sub emulator, see Testing locally with Pub/Sub emulator.
pubsub.credentialsOptional. A JSON string containing the service account credentials for authorization,see Create credentials for a service account. The provided account credential must have the pubsub.publisher role.

FORMAT and ENCODE option

These options should be set in FORMAT data_format ENCODE data_encode (key = 'value'), instead of the WITH clause.

FieldNote
data_formatData format. Allowed format: PLAIN.
data_encodeData encode. Supported encode: JSON.
force_append_onlyRequired by default and must be true, which forces the sink to be PLAIN (also known as append-only).
key_encodeOptional. When specified, the key encode can only be TEXT, and the primary key should be one and only one of the following types: varchar, bool, smallint, int, and bigint; When absent, both key and value will use the same setting of ENCODE data_encode ( ... ).

Example

You can test the function locally before you deploying it. See guide on how to Test locally with the Pub/Sub emulator.

Below is the example of how you can configure the docker-compose.yaml file:

services:
  pubsub-emulator:
    image: google/cloud-sdk:latest
    command: >
      gcloud beta emulators pubsub start --project=demo --host-port=0.0.0.0:8900
    ports:
      - "8900:8900"

Below is the example of how to create a sink:

CREATE TABLE IF NOT EXISTS personnel (id integer, name varchar);

CREATE SINK pubusb_sink
FROM
  personnel
WITH
(
    connector = 'google_pubsub',
    pubsub.endpoint = 'localhost:8900',
    pubsub.emulator_host = 'localhost:8900', -- local emulator
    pubsub.project_id = 'demo',
    pubsub.topic = 'test',
) FORMAT PLAIN ENCODE JSON (
    force_append_only='true',
);