gpt4 book ai didi

java - 无法通过 KafkaIO 在光束中读取 kafka

转载 作者:行者123 更新时间:2023-12-02 10:26:14 27 4
gpt4 key购买 nike

我在 Apchea Beam 中编写了一个非常简单的管道,如下所示,用于从 Confluence Cloud 上的 kafka 集群读取数据,如下所示:

        Pipeline pipeline = Pipeline.create(options);

Map<String, Object> propertyBuilder = new HashMap();
propertyBuilder.put("ssl.endpoint.identification.algorithm", "https");
propertyBuilder.put("sasl.mechanism","PLAIN");
propertyBuilder.put("request.timeout.ms","20000");
propertyBuilder.put("retry.backoff.ms","500");

pipeline
.apply(KafkaIO.<byte[], byte[]>readBytes()
.withBootstrapServers("pkc-epgnk.us-central1.gcp.confluent.cloud:9092")
.withTopic("gcp-ingestion-1")
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.updateConsumerProperties(propertyBuilder)
.withoutMetadata() // PCollection<KV<Long, String>>
) .apply(Values.<byte[]>create());

但是,当运行上面的代码从我的 kafka 集群读取数据时,我遇到了以下问题

我在直接java runner上运行,我使用的是beam 2.8,

我可以读取消息并生成消息到我的 kafka 融合集群,但不能通过上面的代码。

最佳答案

如果您跟踪堆栈跟踪,代码似乎会尝试将超时配置属性转换为Integer:https://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L112

但它得到的是一个字符串。我的猜测是,这是因为您在此处将其设置为字符串:propertyBuilder.put("request.timeout.ms","20000")。我认为正确的做法是将其设置为 Integer,例如就像 propertyBuilder.put("request.timeout.ms", 20000) (超时值周围没有引号)。

您也可能对其他配置属性(例如重试退避)有类似的问题,您需要仔细检查属性类型。

关于java - 无法通过 KafkaIO 在光束中读取 kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53939658/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com