gpt4 book ai didi

java - 使用 KStream 从阈值中过滤掉值

转载 作者:行者123 更新时间:2023-11-30 07:46:21 27 4
gpt4 key购买 nike

我想在 Kafka 中使用 Java KStream 来过滤掉所有超过特定值的值。值以 JSON 格式交换,例如:

ConsumerRecord(topic=u'test', partition=0, offset=1109, timestamp=1528110096230L, timestamp_type=0, key=None, value='{"device":"Internal","sensor":"Phone Microphone","value":"72.1"}', checksum=None, serialized_key_size=-1, serialized_value_size=64)

我想过滤掉低于20.0的数值(上面的例子中,数值是72.1就可以了)

public class WordCountExample {

@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception{

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Filter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "andrewnetwork.ddns.net:9095");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("test");

source = source
.filterNot((k,v) -> {
if(isParsableAsDouble(v) && Double.parseDouble(v) <= 50.0)
return true;
else return false;
});

source.to("mem");

过滤没有发生,我不知道为什么。有什么想法吗?

最佳答案

通过将 isParsableAsDouble(v) 放在 filterNot 中,您将过滤掉所有内容,因为 JSON 不可解析为 double 。我相信您误解了 Kafka 值和 JSON 中的 value 字段,后者不会自动提取。

您需要一个 JSON 反序列化器。 For example

    final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);


KStreamBuilder builder = new KStreamBuilder();
Properties props = new Properties();
// load props

KStream<Bytes, JsonNode> source = builder.stream(Serdes.BytesSerde(), jsonSerde, "test")
.filter((k, v) -> {
return v.get("value").asDouble() > 20.0;
});

关于java - 使用 KStream 从阈值中过滤掉值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50679113/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com