gpt4 book ai didi

apache-kafka - Apache Beam 从 Kafka 读取时出现 CoderException : java. io.EOFException

转载 作者:行者123 更新时间:2023-12-02 01:14:39 29 4
gpt4 key购买 nike

我已经根据此处的文档实现了从 Kafka 读取的 Beam 管道:https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L125

管道本身适用于有界源,我有测试用例,它可以毫无问题地从文件中读取。

从Kafka读取代码很简单,和例子基本一致:

    PCollection<String> input = p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(KAFKA_BROKER)
.withTopics(Arrays.asList(KAFKA_READ_TOPIC))
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
.withTimestampFn(new TimestampKafkaStrings())
.withoutMetadata())
.apply(Values.<String>create());

应用程序启动正常,似乎连接到 Kafka。但是,一旦我从另一个进程写入 Kafka 并且管道开始读取,我就会在第一次读取时收到以下异常:

INFO: Kafka version : 0.10.2.0
Apr 04, 2017 9:46:18 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 576d93a8dc0cf421
Apr 04, 2017 9:46:30 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader advance
INFO: Reader-0: first record offset 2000
Apr 04, 2017 9:46:30 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader consumerPollLoop
INFO: Reader-0: Returning from consumer pool loop
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:453)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:350)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:71)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
at com.groupbyinc.beam.SessionRollup.main(SessionRollup.java:186)
... 6 more
Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
at org.apache.beam.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:64)
at org.apache.beam.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:33)
at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.decode(KafkaIO.java:1018)
at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.advance(KafkaIO.java:989)
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:190)
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:128)
at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more

key 解码器尝试读取 Kafka 消息 key 的方式似乎有问题。在源数据中,这些键没有明确设置,所以我假设它们默认为 Kafka(?) 中的时间戳。

关于如何进一步调试此问题的任何想法?或者我可以看的资源?功能示例?

编辑:删除管道的 .withTimestampFn() 部分无效。代码似乎在到达那一点之前就失败了。

最佳答案

答案是 key 不长。似乎默认情况下, key 是一个随机哈希,它是一个 String。奇怪的是,Beam KafkaIO 库无法开箱即用地处理默认的 Kafka 用例。

所以我的理论是,当 BigEndianLongCoder 尝试解码该值时,它会遇到 EOF,因为 long 比 char 大,所以它在认为它已读够之前就没有可读的东西了很长一段时间的东西。

所以我的固定代码如下:

PCollection<String> input = p.apply(KafkaIO.<Long, String>readBytes()
.withBootstrapServers(KAFKA_BROKER)
.withTopics(Arrays.asList(KAFKA_READ_TOPIC))
.withTimestampFn(new TimestampKafkaStrings())
.withoutMetadata())
.apply(Values.<byte[]>create())
.apply(ParDo.of(new BytesToString()));

重要的细节是调用readBytes()而不是read(),然后自己将字节解析成字符串。

就我而言,此后我遇到了另一个问题,因为正在读取的字符串是来自 Node 进程的字符串化 JSON。由于某种原因,Jackson 无法处理从 Kafka 传入的转义 JSON,因此必须先对其进行非转义,然后再进行解析。

尽管如此,所有这些都指向 Apache Beam KafkaIO 库中的弱点。给出的使用示例是不准确的,并且在简单的默认情况下不起作用。此外,由于它太新了,很少有人将其使用示例放到网上,因此当您遇到问题时,很难找到解决方案。

我真的应该提交一个包含更好示例的拉取请求。

关于apache-kafka - Apache Beam 从 Kafka 读取时出现 CoderException : java. io.EOFException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43209743/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com