gpt4 book ai didi

java - 使用带有 Gson 的 mapValue() 方法的 Kafka-streams 应用程序错误

转载 作者:行者123 更新时间:2023-12-02 09:00:40 25 4
gpt4 key购买 nike

我编写了一个 kafka-streams 应用程序,用于从主题“topic_one”获取数据(数据已从 MySQL 接收)。然后我想用KStream接口(interface)获取该数据的一部分(“之后”部分,见下文)以进行其他操作。但我在序列化时遇到错误,然后我使用mapValue()。我是 kafka-streams 的新手,不知道如何制作和使用合适的 serde。有人可以帮助我吗?

来自 topic_one 的源数据:

[KSTREAM-SOURCE-0000000000]: null, {"before": null, "after": {"id": 1, "category": 1, "item": "abc"}, "source": {"version": "0.8.3.Final", "name": "example", "server_id": 1, "ts_sec": 1581491071, "gtid": null, "file": "mysql-bin.000013", "pos": 217827349, "row": 0, "snapshot": false, "thread": 95709, "db": "example", "table": "item", "query": null}, "op": "c", "ts_ms": 1581491071727}

我想要得到:

{"id": 1, "category": 1, "item": "abc"}

我的代码:

    Properties properties = getProperties();

try {
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> resourceStream = builder.stream("topic_one");
resourceStream.print(Printed.toSysOut());

KStream<String, String> resultStream = resourceStream.mapValues(value ->
new Gson().fromJson(value, JsonObject.class).get("after").getAsJsonObject().toString());
resultStream.print(Printed.toSysOut());

Topology topology = builder.build();

KafkaStreams streams = new KafkaStreams(topology, properties);

streams.cleanUp();
streams.start();

} catch (Exception e) {
System.out.println(e.getMessage());
}
}

private static Properties getProperties() {

Properties properties = new Properties(); // TODO настройки вынести в отдельный файл?

properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put("schema.registry.url", "http://localhost:8081");

return properties;
}

错误:

Exception in thread "streams_id-db618fbf-c3e4-468b-a5a2-18e6b0b9c6be-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=matomo.matomo.matomo_scenarios_directory, partition=0, offset=30, stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. **Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: org.apache.avro.generic.GenericData$Record.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.**
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
... 10 more

at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
**Caused by: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: org.apache.avro.generic.GenericData$Record.**
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
... 5 more
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
... 10 more

最佳答案

在您的 getProperties() 方法中,您将值 serde 定义为 GenericAvroSerde.class,但是当您创建流时,您使用的是 String 作为值类型。这就是您在运行时收到异常的原因。

KStream<String, String> resourceStream = ...
KStream<String, String> resultStream = ...

如果您确实使用 Avro 作为消息格式,那么在定义 KStream 时,您可以使用正确的类型。但看起来,您只有 JSON 字符串作为值,因此您可以通过替换来设置正确的值 serde

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

希望有帮助。

关于java - 使用带有 Gson 的 mapValue() 方法的 Kafka-streams 应用程序错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60184174/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com