gpt4 book ai didi

amazon-web-services - 同一 Kinesis 流的多个不同使用者

转载 作者:行者123 更新时间:2023-12-04 08:13:56 25 4
gpt4 key购买 nike

我有一个 Kinesis 生产者,它将单一类型的消息写入流。我想在多个完全不同的消费者应用程序中处理这个流。因此,对于给定的主题/流,具有单个发布者的发布/订阅。我还想利用检查点来确保每个消费者处理写入流的每条消息。

最初,我为所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动了多个消费者,我就开始收到以下错误:

com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 used in GetShardIterator on shard shardId-000000000000 in stream PackageCreated under account ************ is invalid because it did not come from this stream. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ..)



这似乎是因为消费者在使用相同的应用程序名称时与他们的检查点发生冲突。

从阅读文档来看,使用检查点进行发布/订阅的唯一方法似乎是每个消费者应用程序都有一个流,这要求每个生产者了解所有可能的消费者。这比我想要的更紧密;这真的只是一个队列。

似乎 Kafka 支持我想要的东西:给定主题/分区的任意消费,因为消费者完全可以控制他们自己的检查点。如果我想要带有检查点的发布/订阅,我唯一的选择是转移到 Kafka 还是其他一些选择?

我的 RecordProcessor 代码,在每个消费者中都是相同的:
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
log.trace("Received record(s) from kinesis")
for {
record <- processRecordsInput.getRecords
json <- jawn.parseByteBuffer(record.getData).toOption
msg <- decode[T](json.toString).toOption
} yield subscriber ! msg
processRecordsInput.getCheckpointer.checkpoint()
}

该代码解析消息并将其发送给订阅者。现在,我只是将所有消息标记为已成功接收。我可以在 AWS Kinesis 仪表板上看到正在发送的消息,但没有读取发生,大概是因为每个应用程序都有自己的 AppName 并且没有看到任何其他消息。

最佳答案

支持您想要的模式,即从一个 Kinesis 流的一个发布者到多个消费者的模式。每个消费者不需要单独的流。

你是怎样做的?您需要为每个使用者提供不同的应用程序名称。这样,一个消费者的检查点信息不会与另一个消费者的信息发生冲突。

检查对此的第一个回复:https://forums.aws.amazon.com/message.jspa?messageID=554375

关于amazon-web-services - 同一 Kinesis 流的多个不同使用者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39167880/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com