gpt4 book ai didi

google-cloud-platform - 为什么 Airflow PubSubPullOperator 没有提取最大消息数?

转载 作者:行者123 更新时间:2023-12-05 08:03:32 24 4
gpt4 key购买 nike

我在 airflow 中使用 PubSubPullOperator 从 gcp 订阅中拉取消息。

pull_messages_task = PubSubPullOperator(
task_id="pull_messages",
ack_messages=True,
project_id=GCP_PROJECT_ID,
subscription="k8s-sub",
gcp_conn_id=GCP_CONN_ID,
max_messages=50
)

从订阅中提取消息并保存在 Xcom 中效果很好。我的问题是为什么 PubSubPullOperator 不能每次都拉取等于 max_messages 的消息?

例如,我向 GCP 主题发布 250 条消息。我的 Dag 每分钟运行一次,每次拉取 50 条消息。

下面是来自airflow的进程日志:

[2022-05-17 14:53:04,630] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:53:06,661] {pubsub.py:550} INFO - Pulled 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:54:04,312] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:54:06,239] {pubsub.py:550} INFO - Pulled 16 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:55:04,055] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:55:05,259] {pubsub.py:550} INFO - Pulled 4 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:56:04,590] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:56:06,527] {pubsub.py:550} INFO - Pulled 20 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:57:04,083] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:57:07,428] {pubsub.py:550} INFO - Pulled 38 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:58:05,561] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:58:07,431] {pubsub.py:550} INFO - Pulled 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:59:04,348] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:59:05,462] {pubsub.py:550} INFO - Pulled 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 15:00:06,882] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 15:00:08,710] {pubsub.py:550} INFO - Pulled 2 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 15:01:03,519] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 15:01:03,688] {pubsub.py:550} INFO - Pulled 20 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

我很确定每个 dag 的运行时间都在 1 分钟以内。并且 50 条消息大小未超过 Xcom 限制 (48KB)。

有人知道这种情况吗?或者有人知道 Operator 是如何决定拉取多少条消息的吗?

非常感谢。

最佳答案

似乎此功能源自 google 的 pubsub 客户端库,而不是 Airflow 运算符本身的功能/问题。

来自谷歌documentation

  The maximum number of messages to return for this request. Must be a positive integer. The Pub/Sub system may return fewer than the number specified.

运算符依赖 PubSubPullOperator使用 PubSubHook使用 SubscriberClient

关于google-cloud-platform - 为什么 Airflow PubSubPullOperator 没有提取最大消息数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72276934/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com