Skip to main content


In stream processing, watermarks are integral when using event time processing logic with event time based operations. Watermarks are like markers or signals that track the progress of event time, allowing you to process events within their corresponding time windows. A watermark is an estimate of the maximum event time observed so far, or a threshold indicating that events received so far have a timestamp later than or equal to the current watermark. Events that arrive with a timestamp earlier than the current watermark are considered late and are not processed within its time window.

Let us go over an example on how watermarks are generated and utilized during window computations. Say the following events and their corresponding event-time timestamps arrive.

Event F11:59:30 AM
Event G12:00:00 PM
Event H12:00:10 PM
Event I11:59:50 PM

Consider a scenario where the watermark is set as the maximum event time observed so far minus 10 seconds. So the following watermarks will be generated.

Event F11:59:30 AM11:59:20 AM
Event G12:00:00 PM11:59:50 AM
Event H12:00:11 PM12:00:01 PM
Event I11:59:50 PM12:00:01 PM

Now let us assume there is a window counting events for the hour ending at 12 PM. Therefore, the window will wait until there is a watermark with a timestamp of at least 12:00:00 PM before producing results. As a result, Events F and G are considered on-time and will be included in the calculation. Events H and I will not be included in the calculation for the window ending at 12 PM, with Event I being considered late since its event time timestamp is earlier than the current watermark timestamp.


Watermarks can be generated directly on sources.

The syntax of the WATERMARK clause in RisingWave is as follows:

WATERMARK FOR column_name as expr

column_name is a column that is created when generating the source, usually the event time column.

expr specifies the watermark generation strategy. The return type of the watermark must be of type timestamp. A watermark will be updated if the return value is greater than the current watermark.

For example, the watermark generation strategy can be specified as:

  • Maximum observed timestamp

    WATERMARK FOR time_col as time_col
  • Maximum observed timestamp with a delay

    WATERMARK FOR time_col as time_col - INTERVAL 'string' time_unit

    Supported time_unit values include: second, minute, hour, day, month, and year. For more details, see the interval data type under Overview of data types.


We can generate the watermark as the latest timestamp observed in order_time minus 5 seconds.

product VARCHAR,
order_time TIMESTAMP,
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
connector = 'kafka',
topic = 'test_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'

Help us make this doc better!

Was this page helpful?

Happy React is loading...