gpt4 book ai didi

java - Google PubSub Java (Scala) 客户端收到过多的重发消息

转载 作者:行者123 更新时间:2023-12-01 16:18:02 25 4
gpt4 key购买 nike

我有一个场景,我加载包含大约 1100 条消息的订阅。然后,我启动一个 Spark 作业,使用以下设置从该订阅中提取消息:

最大杰出元素计数:5

MaxAckExtensionPeriod:60 分钟

确认截止时间:600

要处理的第一条消息将启动缓存生成,大约需要 30 分钟才能完成。在此过程中到达的任何其他消息都会简单地“返回”,不带 ack 或 nack。此后,处理给定消息需要 1 分钟到 30 分钟。由于 ack 延长时间为 60 分钟,我绝不会期望看到消息的重新发送。

我看到的行为是,在生成初始缓存时,客户端每 10 分钟就会抓取 5 条新消息,并通过我的代码返回没有 ack 或 nack 的消息。这是出乎意料的。我预计最初 5 条消息的截止日期将延长至一个小时。

此外,在处理并确认大约 500 条消息后,我预计订阅中还剩下大约 600 条消息,但我看到的几乎是原始的 1100 条。当我在代码中记录这些消息时,结果发现这些消息是重复发送的。这也是非常出乎意料的。

这是处理并确认大约 500 条消息后来自 Google 控制台的屏幕截图(忽略第一个“驼峰”,这是一次中止的测试运行):

enter image description here

我错过了什么吗?

这是设置代码:

  val name = ProjectSubscriptionName.of(ConfigurationValues.ProjectId,
ConfigurationValues.PubSubSubscription)
val topic = ProjectTopicName.of(ConfigurationValues.ProjectId,
ConfigurationValues.PubSubSubscriptionTopic)
val pushConfig = PushConfig.newBuilder.build
val ackDeadlineSeconds = 600
subscriptionAdminClient.createSubscription(
name,
topic,
pushConfig,
ackDeadlineSeconds)

val flowControlSettings = FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(5L)
.build();

// create a subscriber bound to the asynchronous message receiver
val subscriber = Subscriber
.newBuilder(subscriptionName, new EtlMessageReceiver(spark))
.setFlowControlSettings(flowControlSettings)
.setMaxAckExtensionPeriod(Duration.ofMinutes(60))
.build
subscriber.startAsync.awaitRunning()

以下是接收器中的代码,该代码在生成缓存时消息到达时运行:

  if(!BIQConnector.cacheGenerationDone){
Utilities.logLine(
s"PubSub message for work item $uniqueWorkItemId ignored as cache is still being generated.")
return
}

最后,当消息被处理时:

  consumer.ack()
Utilities.logLine(s"PubSub message ${message.getMessageId} for $tableName acknowledged.")

// Write back to ETL Manager
Utilities.logLine(
s"Writing result message back to topic ${etlResultTopic} for table $tableName, $tableDetailsForLog.")
sendPubSubResult(importTableName, validTableName, importTimestamp, 2, etlResultTopic, stageJobData,
tableDetailsForLog, "Success", isDeleted)

最佳答案

您的 Spark 作业是否使用 a Pub/Sub client library拉消息?这些库确实应该不断将您的消息截止日期延长到您指定的 MaxAckExtensionPeriod。

如果您的作业使用 Pub/Sub 客户端库,则这是意外行为。您应该联系 Google Cloud 支持人员,并提供您的项目名称、订阅名称、客户端库版本以及您“返回”而无需确认的消息中的消息 ID 示例。他们将能够进一步调查您收到这些重新发送的消息的原因。

关于java - Google PubSub Java (Scala) 客户端收到过多的重发消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62354487/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com