gpt4 book ai didi

java - 如何使用 null 键对 kstream 执行操作?

转载 作者:行者123 更新时间:2023-11-30 02:10:07 25 4
gpt4 key购买 nike

我有一个像这样的kafka流对象:

[KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}
[KSTREAM-SOURCE-0000000000]: null, {"id": 2, "name": "jane", "age": 24, "updated_at": 1525774480784}
[KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}

当我尝试执行 map 操作时:

KStream<String, mysql> locationsStream = pgsql_users.map((k, v) -> new KeyValue<String, mysql>(k.toString(), new mysql ((Integer) v.get("id"), (String) v.get("name").toString(), (Integer) v.get("age")) ));

我遇到一个异常:

[KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}

Exception in thread "test-app-0d15a00c-51dc-4c1a-8293-82f47fc7ef90-StreamThread-1" java.lang.NullPointerException at com.aail.kafka_stream.lambda$main$0(kafka_stream.java:86) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

我以为我得到这个是因为 key 为空。有人可以帮我解决这个问题吗?

最佳答案

您正在使用 k.toString(),但 k 为 null。

您可能想按原样使用 k,在新键中再次使用 null,或者在以下情况下使用 String.valueOf(k)您实际上想要单词“null”(不推荐,因为它将所有消息发送到单个分区)

此外,由于键为空,您可以将 byte[] 设置为流的键对象。无需使用解串器

关于java - 如何使用 null 键对 kstream 执行操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50369900/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com