gpt4 book ai didi

java - kafka KStream - 进行 n 秒计数的拓扑

转载 作者:太空宇宙 更新时间:2023-11-04 12:15:05 26 4
gpt4 key购买 nike

我有一个 JSON 对象流,我在其中键入一些值的哈希值。我希望以 n 秒(10?60?)间隔按键进行计数,并使用这些值进行一些模式分析。

我的拓扑:K->aggregateByKey(n 秒)->process()

process - init() 步骤中,我调用了ProcessorContent.schedule(60 * 1000L),希望能够调用.punctuate()。从这里我将循环遍历内部哈希中的值并采取相应的行动。

我看到值通过聚合步骤并点击 process() 函数,但 .punctuate() 永远不会被调用。

<小时/>

代码:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);

KStream<String, String> mapped = opxLines.map(new ReMapper());

KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
new AggregateInit(),
new OpxAggregate(),
TimeWindows.of("opx_aggregate", 60000));

ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
@Override
public Processor<Windowed<String>, String> get() {
return new AggProcessor();
}
});

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

AggregateInit() 返回 null。

我想我可以用一个简单的计时器来完成 .punctuate() 的等效操作,但我想知道为什么这段代码没有按照我希望的方式工作。

最佳答案

我认为这与kafka集群设置不当有关。将文件描述符计数更改为比默认值(1024 -> 65535)高得多的值后,这似乎符合规范。

关于java - kafka KStream - 进行 n 秒计数的拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39500383/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com