gpt4 book ai didi

python - 如何使用 python 将确认从后台函数返回到 pubsub

转载 作者:行者123 更新时间:2023-12-01 07:57:03 25 4
gpt4 key购买 nike

我正在设置一个新的 GCP 项目,以便在 CSV 文件上传到存储桶后立即读取和解析该文件。就这个程度而言,我创建了一个发布到发布/订阅的触发器。 Pub/Sub 本身会向后台函数发送消息。

一切似乎都工作正常,例如一旦文件上传,触发器就会启动,向 Pubsub 发送消息,然后向该函数发送消息。我还可以看到传递到该函数的消息。

然而,问题是将 Ack 发送回 pub/sub。我在某处读到发回任何 2xx 状态应该可以完成这项工作(从队列中删除消息),但事实并非如此。结果,pubsub“认为”消息尚未传递并一遍又一遍地发送消息。

def parse_data(data, context):


if 'data' in data:
args = base64.b64decode(data['data']).decode('utf-8')
pubsub_message = args.replace('\n', ' ')
properties = json.loads(pubsub_message)
myBucket = validate_message(properties, 'bucket')
myFileName = validate_message(properties, "name")
fileLocation = 'gs://'+myBucket+'/'+myFileName
readAndEnhanceData(fileLocation)
return 'OK', 200
else:
return 'Something went wrong, no data received'

这是显示该函数被连续调用的日志文件。

D  CSV_Parser_Raw_Data 518626734652287 Function execution took 72855 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626734652287

D CSV_Parser_Raw_Data 518626708442766 Function execution took 131886 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626708442766

D CSV_Parser_Raw_Data 518624470100006 Function execution took 65412 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518624470100006

D CSV_Parser_Raw_Data 518626734629237 Function execution took 68004 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626734629237

D CSV_Parser_Raw_Data 518623777839079 Function execution took 131255 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623777839079

D CSV_Parser_Raw_Data 518623548622842 Function execution took 131186 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623548622842

D CSV_Parser_Raw_Data 518623769252453 Function execution took 133981 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623769252453

所以我很高兴知道我在这里缺少什么! IE。我怎样才能打破这个循环?

* 问题更新 *感谢@kamal,他强制我睁开眼睛,要求自己重新创建存储桶/主题等。在我执行任务时,重新审查所有内容并意识到,我在子文件夹中但在同一个存储桶中使用了临时文件作为上传文件!这就是问题所在。 Finalize 事件适用于存储桶中任何位置创建的任何对象。所以卡迈勒是对的,多次上传正在发生!

如果您以相同的方式处理您的项目,请确保创建一个 tmp 文件夹并确保不向该文件夹添加任何触发器。

<小时/>

最佳答案

一般来说,Google Cloud Pub/Sub 保证 at least once delivery的消息。这意味着总是有可能获得重复的内容,尽管它们应该相对较少。就您而言,并不是一遍又一遍地处理同一条消息,而是不同的消息。 518626734652287 等数字是消息 ID。由于每次都不同,这意味着发布了多条消息。很可能发生以下两种情况之一:

  1. 文件被多次上传。
  2. GCS 触发器已设置多次。您可以通过运行 gsutil notification list gs://<bucket name> 来检查这一点.

如果后者是问题所在,您将看到多个条目,例如:

projects/_/buckets/my-bucket/notificationConfigs/1
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

projects/_/buckets/my-bucket/notificationConfigs/2
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

projects/_/buckets/my-bucket/notificationConfigs/3
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic

您可以通过使用配置名称发出删除来删除额外的通知,例如 gsutil notification delete projects/_/buckets/my-bucket/notificationConfigs/2 .

还值得注意的是,通过 Cloud Functions 和 Pub/Sub,可以设置两种类型的订阅:由用户配置的订阅和由 Cloud Functions 本身配置的订阅。默认情况下,前者的 ack 截止时间为 10 秒。这意味着如果消息在 10 秒内未得到确认,则会重新发送。对于后者,默认值为 600 秒。如果消息的处理时间超过此时间段,则可能会发生重新传递。

您可以尝试减少处理消息所需的时间,或者可以增加确认截止时间。您可以使用 gcloud 增加确认截止时间工具:

gcloud pubsub subscriptions update <subscription name> --ack-deadline=180

这会将截止时间延长至 3 分钟。您也可以在 Cloud Console Pub/Sub page 中执行此操作单击订阅,单击“编辑”,然后将“确认截止日期”更改为更大的值。

使用 Cloud Functions,您不需要返回 HTTP 状态。仅当您使用 push subscription 时才需要这样做。直接。

关于python - 如何使用 python 将确认从后台函数返回到 pubsub,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55923559/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com