> ## Documentation Index
> Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Monitor Kafka consumer lag

> Monitor the consumption progress and lag of Kafka sources in RisingWave.

This topic explains how to monitor Kafka source consumption progress and lag in RisingWave. Monitoring helps you identify performance bottlenecks and track data freshness.

## Overview

RisingWave provides two system catalogs to monitor Kafka consumer lag:

* **`rw_kafka_source_metrics`** (table): Raw Kafka metrics for each source partition
* **`rw_kafka_job_lag`** (view): Computed lag per job, source, fragment, and partition

## System catalog: `rw_kafka_source_metrics`

The `rw_kafka_source_metrics` table exposes raw Kafka metrics for each source partition.

### Schema

```sql theme={null}
DESCRIBE rw_catalog.rw_kafka_source_metrics;
```

| Column           | Type    | Description                                                     |
| ---------------- | ------- | --------------------------------------------------------------- |
| `source_id`      | int     | ID of the Kafka source                                          |
| `partition_id`   | varchar | Kafka partition ID                                              |
| `high_watermark` | bigint  | Latest offset available in the Kafka topic partition (nullable) |
| `latest_offset`  | bigint  | Latest offset processed by RisingWave (nullable)                |

### Example usage

View all Kafka source metrics:

```sql theme={null}
SELECT * FROM rw_catalog.rw_kafka_source_metrics;
```

Check metrics for a specific source:

```sql theme={null}
SELECT 
    partition_id, 
    high_watermark, 
    latest_offset
FROM rw_catalog.rw_kafka_source_metrics
WHERE source_id = 1001; -- Replace with your source ID
```

## System catalog: `rw_kafka_job_lag`

The `rw_kafka_job_lag` view provides a summary of Kafka consumption lag, designed to help diagnose consumption issues across materialized views and sinks.

### Schema

```sql theme={null}
DESCRIBE rw_catalog.rw_kafka_job_lag;
```

| Column            | Type    | Description                                                                                                                                                    |
| ----------------- | ------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `job_id`          | int     | ID of the streaming job (materialized view or sink)                                                                                                            |
| `source_id`       | int     | ID of the Kafka source                                                                                                                                         |
| `fragment_id`     | int     | ID of the fragment processing this partition                                                                                                                   |
| `partition_id`    | varchar | Kafka partition ID                                                                                                                                             |
| `lag_phase`       | varchar | Phase of consumption: `BACKFILL` (processing historical data) or `LIVE` (processing real-time data)                                                            |
| `high_watermark`  | bigint  | Latest offset available in the Kafka topic partition (nullable)                                                                                                |
| `consumer_offset` | bigint  | Current consumer offset — during `BACKFILL`, this is the backfill progress offset; during `LIVE`, this is the latest offset processed by RisingWave (nullable) |
| `lag`             | bigint  | Number of unprocessed messages, calculated as `greatest(high_watermark - consumer_offset - 1, 0)` (nullable)                                                   |

### Lag phase

The `lag_phase` column indicates the current phase of Kafka consumption:

* **`BACKFILL`**: The consumer is processing historical data during initial job creation. In this phase, the `consumer_offset` reflects the backfill progress offset.
* **`LIVE`**: The backfill has completed (or no backfill state exists) and the consumer is processing real-time streaming data. In this phase, the `consumer_offset` reflects the latest offset reported by the source reader.

### Example usage

Check for jobs with significant consumer lag:

```sql theme={null}
SELECT 
    job_id, 
    partition_id, 
    lag_phase, 
    lag 
FROM rw_catalog.rw_kafka_job_lag 
WHERE lag > 1000 
ORDER BY lag DESC;
```

Monitor backfill progress for a specific job:

```sql theme={null}
SELECT 
    source_id,
    fragment_id,
    partition_id,
    lag_phase,
    consumer_offset,
    high_watermark,
    lag
FROM rw_catalog.rw_kafka_job_lag
WHERE job_id = 1001 -- Replace with your job ID
ORDER BY partition_id;
```

View average lag by job:

```sql theme={null}
SELECT 
    job_id,
    lag_phase,
    COUNT(*) as partition_count,
    AVG(lag) as avg_lag,
    MAX(lag) as max_lag
FROM rw_catalog.rw_kafka_job_lag
WHERE lag IS NOT NULL
GROUP BY job_id, lag_phase
ORDER BY avg_lag DESC;
```

## Best practices

* **Monitor during initial creation**: When creating a new materialized view or sink from a Kafka source, monitor the backfill progress to ensure it completes successfully.
* **Track lag trends**: Regularly query `rw_kafka_job_lag` to identify partitions that consistently show high lag, which may indicate performance issues.
* **Alert on high lag**: Set up monitoring alerts when lag exceeds acceptable thresholds for your use case.
* **Check lag phase transitions**: Monitor when jobs transition from `BACKFILL` to `LIVE` phase to understand when real-time processing begins.

## Troubleshooting high lag

If you observe high consumer lag:

1. **Check resource utilization**: High lag may indicate that compute nodes are under-provisioned or overloaded.
2. **Review parallelism settings**: Consider [adjusting the parallelism](/sql/commands/sql-alter-fragment) of your streaming jobs to increase processing capacity.
3. **Inspect Kafka broker health**: Verify that Kafka brokers are operating normally and not experiencing performance issues.
4. **Examine query complexity**: Complex transformations in materialized views may slow down consumption.
5. **Check NULL values**: If `high_watermark` or `consumer_offset` is NULL, metrics may not yet be available. This can happen shortly after source creation or if the Prometheus endpoint is not configured.
