Kafka Streams 实时处理

概述

Kafka Streams 是构建在 Kafka 生产者和消费者之上的客户端库,用于开发实时流处理应用程序1

与其他流处理框架(如 Apache Flink、Apache Spark Streaming)不同,Kafka Streams 以库的形式运行,无需独立的集群基础设施。

┌─────────────────────────────────────────────────────────────┐
│                  Kafka Streams 定位                           │
│                                                              │
│  ┌──────────────────────────────────────────────────┐       │
│  │              独立流处理框架                        │       │
│  │         Flink / Spark Streaming / Storm            │       │
│  │         (需独立集群运行)                          │       │
│  └──────────────────────────────────────────────────┘       │
│                                                              │
│  ┌──────────────────────────────────────────────────┐       │
│  │              嵌入式流处理库                        │       │
│  │              Kafka Streams                          │       │
│  │         (运行在你的应用中)                          │       │
│  └──────────────────────────────────────────────────┘       │
└─────────────────────────────────────────────────────────────┘

核心优势

  • 无需独立集群,降低运维复杂度
  • Exactly-Once 语义内置
  • 与 Kafka 深度集成
  • 支持交互式查询状态

核心概念

Stream 与 Table

┌─────────────────────────────────────────────────────────────┐
│                    KStream vs KTable                         │
│                                                              │
│  KStream ( unbounded sequence of events)                    │
│  ┌─────┬─────┬─────┬─────┬─────┬─────┐                   │
│  │ e1  │ e2  │ e3  │ e4  │ e5  │ e6  │  →  time         │
│  └─────┴─────┴─────┴─────┴─────┴─────┘                   │
│  每个事件都是新的 fact,不可变                             │
│                                                              │
│  KTable ( changelog stream / materialized view)            │
│  ┌─────┬─────┬─────┬─────┬─────┬─────┐                   │
│  │ e1' │     │ e3' │     │ e5' │     │  →  time         │
│  └─────┴─────┴─────┴─────┴─────┴─────┘                   │
│  同一 Key 的新值覆盖旧值,表示最新状态                      │
└─────────────────────────────────────────────────────────────┘

KStream

  • 表示无界的事件流
  • 每条记录都是新的 fact
  • 适合:点击事件、传感器读数、交易记录

KTable

  • 表示 changelog 流
  • 同一 Key 的新记录覆盖旧记录
  • 适合:用户配置、账户余额、库存状态

领域特定语言(DSL)

Kafka Streams 提供高级 DSL,简化常见操作:

// Java DSL 示例
StreamsBuilder builder = new StreamsBuilder();
 
// 1. 创建源 Stream
KStream<String, Order> orders = builder.stream("orders");
 
// 2. 过滤
KStream<String, Order> highValueOrders = orders
    .filter((key, order) -> order.getAmount() > 1000);
 
// 3. 聚合
KTable<String, Double> revenueByRegion = orders
    .groupBy((key, order) -> KeyValue.pair(order.getRegion(), order.getAmount()))
    .reduce(Double::sum);
 
// 4. 输出到 Sink
revenueByRegion.toStream().to("revenue-by-region");

架构原理

Topology(拓扑)

应用的处理逻辑被建模为有向无环图(DAG):

┌─────────────────────────────────────────────────────────────┐
│                    Kafka Streams Topology                     │
│                                                              │
│  Source Processor (orders)                                   │
│         │                                                    │
│         ▼                                                    │
│  ┌─────────────┐    ┌─────────────┐                         │
│  │   Filter    │───▶│    Map      │                         │
│  │ (amount>100)│    │ (extract)   │                         │
│  └─────────────┘    └──────┬──────┘                         │
│                            │                                 │
│                            ▼                                 │
│                     ┌─────────────┐                         │
│                     │  GroupBy    │                         │
│                     │  (region)   │                         │
│                     └──────┬──────┘                         │
│                            │                                 │
│                            ▼                                 │
│                     ┌─────────────┐                         │
│                     │  Aggregate  │                         │
│                     │  (sum)      │───▶ State Store         │
│                     └──────┬──────┘                         │
│                            │                                 │
│                            ▼                                 │
│                     Sink Processor (revenue)                │
└─────────────────────────────────────────────────────────────┘

Task 与 Partition

Kafka Streams 的并行度基于 Kafka Topic Partition:

// 输入 Topic: orders (4 partitions)
// 最大并行度 = 4
 
Topology topology = builder.build();
 
// Streams 自动创建 Task
// Task 0 → Partition 0
// Task 1 → Partition 1
// Task 2 → Partition 2
// Task 3 → Partition 3

Task 的特性

  • Task 是 Kafka Streams 的基本并行单位
  • 每个 Task 处理一个 Partition
  • Task 数量在运行时固定(由 Partition 数决定)

线程模型

// 配置多个 Stream Thread
Properties props = new Properties();
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);  // 4 个线程
 
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
┌─────────────────────────────────────────────────────────────┐
│                    Kafka Streams 线程模型                      │
│                                                              │
│  ┌──────────────────────────────────────────────────┐         │
│  │           KafkaStreams 实例                       │         │
│  │                                                   │         │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐        │         │
│  │  │ Thread 1│  │ Thread 2│  │ Thread 3│  ...   │         │
│  │  │ ┌─────┐ │  │ ┌─────┐ │  │ ┌─────┐ │        │         │
│  │  │ │Task0│ │  │ │Task1│ │  │ │Task2│ │        │         │
│  │  │ └─────┘ │  │ └─────┘ │  │ └─────┘ │        │         │
│  │  │ ┌─────┐ │  │ ┌─────┐ │  │ ┌─────┐ │        │         │
│  │  │ │Task3│ │  │ │Task4│ │  │ │Task5│ │        │         │
│  │  │ └─────┘ │  │ └─────┘ │  │ └─────┘ │        │         │
│  │  └─────────┘  └─────────┘  └─────────┘        │         │
│  └──────────────────────────────────────────────────┘         │
│                                                              │
│  每个 Task 独立处理,互不干扰                                │
└─────────────────────────────────────────────────────────────┘

状态管理

State Store

状态存储用于在流处理中维护状态:

// 创建状态存储
StoreBuilder<KeyValueStore<String, Double>> storeBuilder =
    Stores.keyValueStore(
        Stores.inMemoryKeyValueStore("revenue-store")
    );
 
// 或持久化存储(RocksDB)
StoreBuilder<KeyValueStore<String, Double>> persistentStore =
    Stores.keyValueStore(
        Stores.persistentKeyValueStore("revenue-store")
    );
 
// 注册状态存储
builder.addStateStore(persistentStore);

RocksDB 存储

Kafka Streams 默认使用 RocksDB 作为持久化状态后端:

// RocksDB 配置
StoreBuilder<KeyValueStore<String, Long>> countStore =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("click-counts"),
        Serdes.String(),
        Serdes.Long()
    )
    .withCachingEnabled(true)          // 开启缓存
    .withLoggingEnabled(true);        // 开启 changelog

RocksDB 特性

  • 嵌入式的 KV 存储
  • 本地持久化,性能优异
  • 支持大状态存储

交互式查询

Kafka Streams 支持从外部应用查询状态:

// 启用交互式查询
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "host:9092");
 
// 在处理器中使用状态存储
@ProcesssiInput
public class RegionalAggregator {
 
    private ProcessorContext context;
    private StateStore revenueStore;
 
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.revenueStore = context.getStateStore("revenue-store");
    }
 
    @Override
    public void process(String region, Double amount) {
        Double current = revenueStore.get(region);
        revenueStore.put(region, current == null ? amount : current + amount);
    }
 
    // REST API 暴露状态
    @GetMapping("/revenue/{region}")
    public Double getRevenue(@PathVariable String region) {
        return (Double) revenueStore.get(region);
    }
}

窗口操作

窗口类型

┌─────────────────────────────────────────────────────────────┐
│                    窗口类型对比                               │
│                                                              │
│  Tumbling Window (固定大小,不重叠)                         │
│  ├───────┤├───────┤├───────┤                               │
│  0-5min │5-10min│10-15min│                                │
│                                                              │
│  Hopping Window (固定大小,可重叠)                          │
│  ├─────┤  ├─────┤  ├─────┤                                  │
│  0-5min   5-10min  10-15min                                 │
│  advance: 5min                                                │
│                                                              │
│  Sliding Window (基于记录对齐)                               │
│  ┌─────────────────────────────────┐                        │
│  │      ┌───┐                      │                        │
│  │      │   │ ← 窗口随数据滑动                             │
│  └──────┴───┴──────────────────────┘                        │
│                                                              │
│  Session Window (基于活动会话)                               │
│  ┌──┐    ┌────┐  ┌──┐                                       │
│  │  │    │    │  │  │  ← 用户会话                          │
│  └──┘    └────┘  └──┘                                       │
└─────────────────────────────────────────────────────────────┘

时间语义

时间类型说明场景
Event Time事件发生时间业务分析
Processing Time处理时间监控告警
Ingestion Time消息入 Kafka 时间审计日志
// 配置时间语义
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
    WallclockTimestampExtractor.class.getName());
 
// 或使用事件时间
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
    "my.custom.TimestampExtractor");

窗口聚合示例

// 5分钟滚动窗口计算营收
KStream<String, Order> orders = builder.stream("orders");
 
KTable<Windowed<String>, Double> revenuePerRegion = orders
    .groupBy((key, order) -> order.getRegion())
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)))
    .reduce(
        Double::sum,
        Materialized.<String, Double, WindowStore<bytes, byte[]>>as("revenue-per-region")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Double())
    );
 
// 输出窗口结果
revenuePerRegion.toStream()
    .map((windowedRegion, revenue) -> 
        KeyValue.pair(windowedRegion.key(), 
            String.format("%s: $%.2f", windowedRegion.key(), revenue)))
    .to("revenue-output");

容错机制

Exactly-Once 语义

Kafka Streams 通过 Kafka 事务实现 Exactly-Once:

// 启用 Exactly-Once
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
    StreamsConfig.EXACTLY_ONCE_V2);
 
// 等同于配置
// - enable.idempotence = true
// - acks = all
// - transactional.id = ${application.id}-${thread.id}

Changelog Topic

状态变更被持久化到 Changelog Topic:

┌─────────────────────────────────────────────────────────────┐
│                    状态恢复机制                               │
│                                                              │
│  正常处理:                                                   │
│  ┌────────┐      ┌────────────┐      ┌────────────┐        │
│  │ Input  │ ───▶ │  Processor │ ───▶ │ State Store│       │
│  └────────┘      └────────────┘      └─────┬──────┘        │
│                                           │                 │
│                                           ▼                 │
│                                    ┌────────────┐           │
│                                    │ Changelog │           │
│                                    └────────────┘           │
│                                                              │
│  故障恢复:                                                   │
│  ┌────────────┐      ┌────────────┐      ┌────────────┐    │
│  │ Changelog  │ ───▶ │  Rebuilt   │ ───▶ │State Store │    │
│  └────────────┘      └────────────┘      └────────────┘    │
│  Processor 从 Changelog 重放状态                             │
└─────────────────────────────────────────────────────────────┘

故障恢复流程

// 故障检测与自动恢复
// 1. Task 失败 → Kafka Streams 检测
// 2. 重新分配 Partition 给其他实例
// 3. 新实例从 Changelog 重建状态
// 4. 继续处理
 
// 配置恢复参数
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);  // 定期提交
props.put(StreamsConfig.REPALL_TIMEOUT_MS_CONFIG, 60000); // 恢复超时

Processor API

对于 DSL 不支持的场景,使用 Processor API:

// 自定义 Processor
public class RegionalAggregatorProcessor implements Processor<String, Order, String, Double> {
 
    private ProcessorContext context;
    private StateStore stateStore;
 
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.stateStore = context.getStateStore("revenue-store");
        this.context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, 
            this::punctuate);
    }
 
    @Override
    public void process(String key, Order value) {
        String region = value.getRegion();
        Double current = (Double) stateStore.get(region);
        Double sum = current == null ? value.getAmount() : current + value.getAmount();
        stateStore.put(region, sum);
    }
 
    @Override
    public void punctuate(long timestamp) {
        // 定期输出当前状态
        KeyValueIterator<String, Double> it = stateStore.all();
        while (it.hasNext()) {
            KeyValue<String, Double> pair = it.next();
            context.forward(pair.key, pair.value);
        }
    }
 
    @Override
    public void close() {
        stateStore.close();
    }
}

结合 DSL 与 Processor API

// 使用 DSL 过滤,用 Processor 聚合
builder.stream("orders")
    .filter((key, order) -> order.getAmount() > 100)
    .process(() -> new RegionalAggregatorProcessor(), "revenue-store")
    .to("revenue-output");

测试

TopologyTestDriver

// 使用 TopologyTestDriver 测试
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
 
TestInputTopic<String, Order> inputTopic = 
    testDriver.createInputTopic("orders", 
        new StringSerde().serializer(), 
        new JsonSerde<>(Order.class).serializer());
 
TestOutputTopic<String, Double> outputTopic = 
    testDriver.createOutputTopic("revenue-per-region",
        new StringSerde().deserializer(),
        new DoubleSerde().deserializer());
 
// 发送测试数据
inputTopic.pipeInput("us", new Order("us", 100.0));
inputTopic.pipeInput("eu", new Order("eu", 50.0));
inputTopic.pipeInput("us", new Order("us", 150.0));
 
// 验证结果
Map<String, Double> results = outputTopic.readKeyValuesToMap();
assertEquals(250.0, results.get("us"));
assertEquals(50.0, results.get("eu"));

生产最佳实践

资源规划

// 合理配置线程数和实例数
// 规则: 实例数 × 线程数 ≈ Topic Partition 数
 
int partitionCount = 12;  // 输入 Topic Partition 数
int threadsPerInstance = 4;
int instancesNeeded = partitionCount / threadsPerInstance;  // 3
 
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threadsPerInstance);

状态存储优化

// 大状态优化策略
StoreBuilder<KeyValueStore<String, Session>> sessionStore =
    Stores.keyValueStoreBuilder(
        Stores.persistentSessionStore("user-sessions", Duration.ofHours(2)),
        Serdes.String(),
        new SessionSerde<>(JsonSerde.class, JsonSerde.class)
    )
    .withCachingEnabled(true)           // 缓存减少 IO
    .withLoggingEnabled(true)            // changelog 持久化
    .withLoggingConfig(new LoggingConfig(Collections.singletonMap(
        "min.cleanable.dirty.ratio", "0.01"  // 降低清理频率
    )));

异常处理

// 配置错误处理
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndSkipExceptionHandler.class.getName());
 
// 或自定义处理器
public class DeadLetterQueueExceptionHandler 
    implements DeserializationExceptionHandler<Integer, String> {
 
    @Override
    public DeserializationHandlerResponse handle(
            final ProcessorContext context,
            final DeserializationException<Integer, String> exception) {
        
        // 发送到 Dead Letter Topic
        context.forward(exception.topic() + "-dlq", 
            new org.apache.kafka.streams.processor.FailuresException(exception));
        return DeserializationHandlerResponse.CONTINUE;
    }
}

相关主题


参考资料

Footnotes

  1. Kafka Streams Architecture - Apache Kafka