gpt4 book ai didi

java - 如何对使用 session 窗口的 kafka 流应用程序进行单元测试

转载 作者:行者123 更新时间:2023-12-01 10:36:24 24 4
gpt4 key购买 nike

我正在使用 Kafka Stream 2.1

我正在尝试为聚合的流应用程序编写一些测试
一些事件通过它们的键(即通过相关 ID)使用具有 300 毫秒不活动间隙的 session 窗口。

这是由方法表示的聚合实现:

    private static final int INACTIVITY_GAP = 300;

public KStream<String, AggregatedCustomObject> aggregate(KStream<String, CustomObject> source) {

return source
// group by key (i.e by correlation ID)
.groupByKey(Grouped.with(Serdes.String(), new CustomSerde()))
// Define a session window with an inactivity gap of 300 ms
.windowedBy(SessionWindows.with(Duration.ofMillis(INACTIVITY_GAP)).grace(Duration.ofMillis(INACTIVITY_GAP)))
.aggregate(
// initializer
() -> new AggregatedCustomObject(),
// aggregates records in same session
(s, customObject, aggCustomObject) -> {
// ...
return aggCustomObject;
},
// merge sessions
(s, aggCustomObject1, aggCustomObject2) -> {
// ...
return aggCustomObject2;
},
Materialized.with(Serdes.String(), new AggCustomObjectSerde())
)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.selectKey((stringWindowed, aggCustomObject) -> "someKey");
;
}

此流处理按预期工作。但对于单元测试,情况就不同了。

我的测试流配置如下所示:

        // ...

props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, myCustomObjectSerde.getClass());
// disable cache
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// commit ASAP
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);


StreamsBuilder builder = new StreamsBuilder();
aggregate(builder.stream(INPUT_TOPIC), OUTPUT_TOPIC, new AggCustomObjectSerde())
.to(OUTPUT_TOPIC);

Topology topology = builder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, MyCustomObject> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), myCustomSerializer)

// ...


测试如下所示:

List<ConsumerRecord<byte[], byte[]>> records = myCustomMessages.stream()
.map(myCustomMessage -> factory.create(INPUT_TOPIC, myCustomMessage.correlationId, myCustomMessage))
.collect(Collectors.toList());
testDriver.pipeInput(records);

ProducerRecord<String, AggregatedCustomMessage> record = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), myAggregatedCustomObjectSerde);

问题是, record始终为空。
我尝试了很多东西:
  • 循环读取,超时
  • 更改配置中的提交间隔,以便尽快提交结果
  • 紧随其后发送带有不同键的附加记录(以触发窗口关闭,如 KafkaStream 事件时间基于记录时间戳)
  • 调用advanceWallClockTime测试驱动方法

  • 好吧,没有什么帮助。有人可以告诉我我缺少什么,我应该如何测试基于 session 窗口的流应用程序?

    非常感谢

    最佳答案

    SessionWindows使用 event-time 而不是 wall-clock 。尝试正确设置记录的事件时间以模拟不活动间隙。就像是:

    testDriver.pipeInput(factory.create(INPUT_TOPIC, key1, record1, eventTimeMs));
    testDriver.pipeInput(factory.create(INPUT_TOPIC, key2, record2, eventTimeMs + inactivityGapMs));

    但首先,您需要自定义 TimestampExtractor像:
     public static class RecordTimestampExtractor implements TimestampExtractor {

    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
    return record.timestamp();
    }
    }

    必须像这样注册:
     streamProperties.setProperty(
    StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
    RecordTimestampExtractor.class.getName()
    );

    关于java - 如何对使用 session 窗口的 kafka 流应用程序进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57480927/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com