gpt4 book ai didi

java - Kafka Streams App - 计数和总和

转载 作者:行者123 更新时间:2023-11-30 07:42:18 25 4
gpt4 key购买 nike

我正在尝试从 KGroupedStream 创建一个 KTable 来存储每个键的值的总和。

 final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
.groupByKey()
.aggregate(new Initializer<Long>() {
@Override
public Long apply() {
return Long.MIN_VALUE;
}
}, new Aggregator<String, Long, Long>() {
@Override
public Long apply(final String key, final Long value,final Long aggregate) {
aggregate += value;
return aggregate;
}
}, Materialized.<String, Long, KeyValueStore<Byte, byte[]>>as("counts-store"));

但是我得到了错误:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Materialized<String,Long,KeyValueStore<Byte,byte[]>>)

我见过的所有示例都将 Serde 作为第三个参数传递,但我已经尝试过这个并得到一个非常相似的错误(我认为这可能来自旧版本,因为它与当前版本的签名不匹配实现?):

final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
.groupByKey()
.aggregate(new Initializer<Long>() {
@Override
public Long apply() {
return Long.MIN_VALUE;
}
}, new Aggregator<String, Long, Long>() {
@Override
public Long apply(final String key, final Long value,final Long aggregate) {
aggregate += value;
return aggregate;
}
}, Serdes.Long());

错误:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Serde<Long>)

我做错了什么?

使用卡夫卡版本:2.1.0

最佳答案

您的代码中几乎没有问题:

  1. 对于 Materialized.as相反 java.lang.Byte你应该通过 org.apache.kafka.common.utils.Bytes
  2. 你不应该修改 final变量:aggregate += value;
  3. 您必须将键和值的类型添加到 StreamsBuilder::stream调用 ( builder.<String, Long>stream("streams-plaintext-input") )

修改后应该大致如下所示:

KTable<String, Long> sum = builder.<String, Long>stream("streams-plaintext-input")
.groupByKey()
.aggregate(new Initializer<Long>() {
@Override
public Long apply() {
return Long.MIN_VALUE;
}
}, new Aggregator<String, Long, Long>() {
@Override
public Long apply(final String key, final Long value,final Long aggregate) {
return aggregate + value;
}
}, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));

关于java - Kafka Streams App - 计数和总和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54929497/

25 4 0