gpt4 book ai didi

scala - 将应用程序从 Kafka 0.8.2.1 移植到 Kafka 0.9.0。读数偏移问题

转载 作者:行者123 更新时间:2023-12-04 23:02:04 31 4
gpt4 key购买 nike

我们遇到了与我们的应用程序代码从 Apache Kafka 版本 0.8.2.1 迁移到 0.9.0.0 相关的问题。

在这种情况下,我们指的是 Cloudera 发布的 Kafka 版本:

kafka_2.10-0.8.2.0-kafka-1.3.2

kafka_2.11-0.9.0-kafka-2.0.2

我们在读取和写入 __consumer_offsets 元数据主题上的偏移量时检测到该问题。特别是,我们使用 BlockingChannel 连接到 Kafka Broker,在调用 receive() 方法时我们得到一个 EOFException。

特别是:

java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129)
at kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120)

一个可能的原因可能是两个版本的 Kafka API 之间的差异。

卡夫卡 0.8.2

在我们的应用中,我们调用

ConsumerMetadataResponse.readFrom(channel.receive().buffer())

接收方法如下

def receive(): Receive = {
if(!connected)
throw new ClosedChannelException()

val response = new BoundedByteBufferReceive()
response.readCompletely(readChannel)

response
}

正如我们所见,它返回一个 kafka.network.Receive,这是一个扩展 trait kafka.network.Transmission 的 trait。在这个Receive中,buffer方法是存在的,在kafka.network.BoundedByteBufferReceive中被覆盖

def buffer: ByteBuffer = {
expectComplete()
contentBuffer
}

卡夫卡 0.9.0

我们将上一行更改为

GroupCoordinatorResponse.readFrom(channel.receive().payload())

此版本API中的receive方法如下

 def receive(): NetworkReceive = {
if(!connected)
throw new ClosedChannelException()

val response = readCompletely(readChannel)
response.payload().rewind()

response
}

private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
val response = new NetworkReceive
while (!response.complete())
response.readFromReadableChannel(channel)
response
}

正如我们所见,这返回了一个 kafka.network.NetworkReceive,它是一个实现接口(interface) kafka.network.Receive 的类,现在是用 java 编写的,与之前的完全不同。这里没有buffer方法,只有一个返回内容的payload方法

    private ByteBuffer buffer;

我们怎么解决?提前致谢

最佳答案

Kafka 0.9 保留了旧的 Kafka 消费者,以实现与 Kafka 0.8.2 代理的向后兼容。您正在使用 Kafka 0.9 中仍然存在的旧消费者来读取来自 Kafka 0.9 的消息。您应该开始使用 Kafka 0.9 的新消费者 API 从 Kafka 0.9 代理读取数据。

希望这对您有所帮助。

关于scala - 将应用程序从 Kafka 0.8.2.1 移植到 Kafka 0.9.0。读数偏移问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41727191/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com