gpt4 book ai didi

python-3.x - 谷歌云pubsub python同步拉取

转载 作者:行者123 更新时间:2023-12-02 19:50:22 24 4
gpt4 key购买 nike

我有一个主题和一个有多个订阅者的订阅。我的应用场景是我想处理不同订阅者上的消息,并且一次处理特定数量的消息。意味着首先假设正在处理 8 条消息,然后如果一条消息处理完成,那么在确认已处理消息后,下一条消息应该从主题中获取,同时注意在任何订阅者上都找不到重复消息,并且每次都应该在后台处理 8 条消息。

为此,我使用 max_messages = 8 的同步拉取方法,但下一次拉取是在所有消息处理完成后完成的。因此,我们创建了自己的调度程序,其中应同时在后台运行 8 个进程,并一次拉取 1 条消息,但在所有 8 条消息处理完成后,仍会传递下一条消息。

这是我的代码:

    #!/usr/bin/env python3

import logging
import multiprocessing
import time
import sys
import random
from google.cloud import pubsub_v1

project_id = 'xyz'
subscription_name = 'abc'

NUM_MESSAGES = 4
ACK_DEADLINE = 50
SLEEP_TIME = 20

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
logger.info("Received message:{}".format(msg.message.data))
random_sleep = random.randint(200,800)
logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
time.sleep(random_sleep)

def message_puller():
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
while(True):
try:
response = subscriber.pull(subscription_path, max_messages=1)
message = response.received_messages[0]
msg = message
ack_id = message.ack_id
process = multiprocessing.Process(target=worker, args=(message,))
process.start()
while process.is_alive():
# `ack_deadline_seconds` must be between 10 to 600.
subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
time.sleep(SLEEP_TIME)
# Final ack.
subscriber.acknowledge(subscription_path, [ack_id])
logger.info("Acknowledging message: {}".format(msg.message.data))
except Exception as e:
print (e)
continue

def synchronous_pull():
p = []
for i in range(0,NUM_MESSAGES):
p.append(multiprocessing.Process(target=message_puller))

for i in range(0,NUM_MESSAGES):
p[i].start()

for i in range(0,NUM_MESSAGES):
p[i].join()

if __name__ == '__main__':
synchronous_pull()

有时,即使 while 循环始终为 True,subscriber.pull 也不会拉取任何消息。它给了我错误 列表索引 (0) 超出范围得出的结论是,即使消息在主题上,subscriber.pull 也不会拉入消息,但一段时间后它开始拉动。为什么会这样?

我尝试过异步拉取和流量控制,但在多个订阅者上发现重复消息。如果任何其他方法可以解决我的问题,请告诉我。提前致谢。

最佳答案

Google Cloud PubSub 确保至少一次 ( docs )。这意味着,消息可能会被多次传递。为了解决这个问题,您需要使您的程序/系统 idempotent

您有多个订阅者,每个订阅者拉取 8 条消息。
为了避免同一消息被多个订阅者处理,请在任何订阅者提取该消息并进一步处理时立即确认该消息,而不是在消息的整个处理结束后才确认。

此外,当队列中没有消息时,不要连续运行主脚本,而是使用 sleep 持续一段固定时间。

我有一个类似的代码,除了没有使用并行处理之外,我使用了同步拉取。

代码如下:

PubSubHandler - 处理 Pubsub 相关操作的类

from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded


class PubSubHandler:

def __init__(self, subscriber_config):

self.project_name = subscriber_config['PROJECT_NAME']
self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']

self.subscriber = pubsub_v1.SubscriberClient()
self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)


def pull_messages(self,number_of_messages):

try:
response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
received_messages = response.received_messages
except DeadlineExceeded as e:
received_messages = []
print('No messages caused error')
return received_messages


def ack_messages(self,message_ids):

if len(message_ids) > 0:
self.subscriber.acknowledge(self.subscriber_path, message_ids)
return True

Utils - util 方法的类

import json

class Utils:


def __init__(self):
pass


def decoded_data_to_json(self,decoded_data):
try:
decoded_data = decoded_data.replace("'", '"')
json_data = json.loads(decoded_data)
return json_data
except Exception as e:
raise Exception('error while parsing json')


def raw_data_to_utf(self,raw_data):
try:
decoded_data = raw_data.decode('utf8')
return decoded_data
except Exception as e:
raise Exception('error converting to UTF')

Orcestrator - 主要脚本


import time
import json
import logging

from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler

class Orcestrator:

def __init__(self):

self.MAX_NUM_MESSAGES = 2
self.SLEEP_TIME = 10
self.util_methods = Utils()
self.pub_sub_handler = PubSubHandler(subscriber_config)


def main_handler(self):
to_ack_ids = []
pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)

if len(pulled_messages) < 1:
self.SLEEP_TIME = 1
print('no messages in queue')
return

logging.info('messages in queue')
self.SLEEP_TIME = 10

for message in pulled_messages:
raw_data = message.message.data
try:
decoded_data = self.util_methods.raw_data_to_utf(raw_data)
json_data = self.util_methods.decoded_data_to_json(decoded_data)
print(json_data)

except Exception as e:
logging.error(e)
to_ack_ids.append(message.ack_id)

if self.pub_sub_handler.ack_messages(to_ack_ids):
print('acknowledged msg_ids')


if __name__ == "__main__":

orecestrator = Orcestrator()
print('Receiving data..')
while True:
orecestrator.main_handler()
time.sleep(orecestrator.SLEEP_TIME)

关于python-3.x - 谷歌云pubsub python同步拉取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58321230/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com