gpt4 book ai didi

apache-kafka - Kafka比较一个键的连续值

转载 作者:行者123 更新时间:2023-12-04 19:30:28 24 4
gpt4 key购买 nike

我们正在构建一个应用程序来从传感器获取数据。数据被流式传输到 Kafka,消费者将从那里将其发布到不同的数据存储。每个数据点将具有表示传感器状态的多个属性。

在其中一个消费者中,我们希望仅当值发生更改时才将数据发布到数据存储区。例如如果有温度传感器每 10 秒轮询一次数据,我们预计会收到类似的数据

----------------------------------------------------------------------
Key Value
----------------------------------------------------------------------
Sensor1 {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:50", temperature: 11}

在上述情况下,只应发布第一条记录和第三条记录。

为此,我们需要一些方法来比较键的当前值和具有相同键的先前值。我相信这应该可以通过 KTable 或 KStream 实现,但无法找到示例。

任何帮助都会很棒!

最佳答案

这里是一个如何使用 KStream#transformValues() 解决此问题的示例.

StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
Serdes.String(),
YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
.transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
private KeyValueStore<String, YourValueType> state;

@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}

@Override
public YourValueType transform(final String key, final YourValueType value) {
YourValueType prevValue = state.get(key);
if (prevValue != null) {
if (prevValue.temperature() != value.temperature()) {
return prevValue;
}
} else {
state.put(key, value);
}
return null;
}

@Override
public void close() {}
}, stateStorName))
.to(OUTPUT_TOPIC);

您将记录与存储在状态存储中的先前记录进行比较。如果温度不同,则从状态存储返回记录并将当前记录存储在状态存储中。如果温度相等,则丢弃当前记录。

关于apache-kafka - Kafka比较一个键的连续值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58745670/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com