- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个主题和一个有多个订阅者的订阅。我的应用场景是我想处理不同订阅者上的消息,并且一次处理特定数量的消息。意味着首先假设正在处理 8 条消息,然后如果一条消息处理完成,那么在确认已处理消息后,下一条消息应该从主题中获取,同时注意在任何订阅者上都找不到重复消息,并且每次都应该在后台处理 8 条消息。
为此,我使用 max_messages = 8 的同步拉取方法,但下一次拉取是在所有消息处理完成后完成的。因此,我们创建了自己的调度程序,其中应同时在后台运行 8 个进程,并一次拉取 1 条消息,但在所有 8 条消息处理完成后,仍会传递下一条消息。
这是我的代码:
#!/usr/bin/env python3
import logging
import multiprocessing
import time
import sys
import random
from google.cloud import pubsub_v1
project_id = 'xyz'
subscription_name = 'abc'
NUM_MESSAGES = 4
ACK_DEADLINE = 50
SLEEP_TIME = 20
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
def worker(msg):
logger.info("Received message:{}".format(msg.message.data))
random_sleep = random.randint(200,800)
logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
time.sleep(random_sleep)
def message_puller():
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
while(True):
try:
response = subscriber.pull(subscription_path, max_messages=1)
message = response.received_messages[0]
msg = message
ack_id = message.ack_id
process = multiprocessing.Process(target=worker, args=(message,))
process.start()
while process.is_alive():
# `ack_deadline_seconds` must be between 10 to 600.
subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
time.sleep(SLEEP_TIME)
# Final ack.
subscriber.acknowledge(subscription_path, [ack_id])
logger.info("Acknowledging message: {}".format(msg.message.data))
except Exception as e:
print (e)
continue
def synchronous_pull():
p = []
for i in range(0,NUM_MESSAGES):
p.append(multiprocessing.Process(target=message_puller))
for i in range(0,NUM_MESSAGES):
p[i].start()
for i in range(0,NUM_MESSAGES):
p[i].join()
if __name__ == '__main__':
synchronous_pull()
有时,即使 while 循环始终为 True,subscriber.pull 也不会拉取任何消息。它给了我错误 列表索引 (0) 超出范围得出的结论是,即使消息在主题上,subscriber.pull 也不会拉入消息,但一段时间后它开始拉动。为什么会这样?
我尝试过异步拉取和流量控制,但在多个订阅者上发现重复消息。如果任何其他方法可以解决我的问题,请告诉我。提前致谢。
最佳答案
Google Cloud PubSub 确保至少一次 ( docs )。这意味着,消息可能会被多次传递。为了解决这个问题,您需要使您的程序/系统 idempotent
您有多个订阅者,每个订阅者拉取 8 条消息。
为了避免同一消息被多个订阅者处理,请在任何订阅者提取该消息并进一步处理时立即确认
该消息,而不是在消息的整个处理结束后才确认。
此外,当队列中没有消息时,不要连续运行主脚本,而是使用 sleep
持续一段固定时间。
我有一个类似的代码,除了没有使用并行处理之外,我使用了同步拉取。
代码如下:
PubSubHandler - 处理 Pubsub 相关操作的类
from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded
class PubSubHandler:
def __init__(self, subscriber_config):
self.project_name = subscriber_config['PROJECT_NAME']
self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']
self.subscriber = pubsub_v1.SubscriberClient()
self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)
def pull_messages(self,number_of_messages):
try:
response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
received_messages = response.received_messages
except DeadlineExceeded as e:
received_messages = []
print('No messages caused error')
return received_messages
def ack_messages(self,message_ids):
if len(message_ids) > 0:
self.subscriber.acknowledge(self.subscriber_path, message_ids)
return True
Utils - util 方法的类
import json
class Utils:
def __init__(self):
pass
def decoded_data_to_json(self,decoded_data):
try:
decoded_data = decoded_data.replace("'", '"')
json_data = json.loads(decoded_data)
return json_data
except Exception as e:
raise Exception('error while parsing json')
def raw_data_to_utf(self,raw_data):
try:
decoded_data = raw_data.decode('utf8')
return decoded_data
except Exception as e:
raise Exception('error converting to UTF')
Orcestrator - 主要脚本
import time
import json
import logging
from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler
class Orcestrator:
def __init__(self):
self.MAX_NUM_MESSAGES = 2
self.SLEEP_TIME = 10
self.util_methods = Utils()
self.pub_sub_handler = PubSubHandler(subscriber_config)
def main_handler(self):
to_ack_ids = []
pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)
if len(pulled_messages) < 1:
self.SLEEP_TIME = 1
print('no messages in queue')
return
logging.info('messages in queue')
self.SLEEP_TIME = 10
for message in pulled_messages:
raw_data = message.message.data
try:
decoded_data = self.util_methods.raw_data_to_utf(raw_data)
json_data = self.util_methods.decoded_data_to_json(decoded_data)
print(json_data)
except Exception as e:
logging.error(e)
to_ack_ids.append(message.ack_id)
if self.pub_sub_handler.ack_messages(to_ack_ids):
print('acknowledged msg_ids')
if __name__ == "__main__":
orecestrator = Orcestrator()
print('Receiving data..')
while True:
orecestrator.main_handler()
time.sleep(orecestrator.SLEEP_TIME)
关于python-3.x - 谷歌云pubsub python同步拉取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58321230/
我在字符串中有一个大词。例子白 Wine 额外优惠。 我想在第一行使用“White”,在第二行使用“wine extra offer”。使用下面的代码: string value="White win
我想在无符号中执行一些算术运算,需要取负整数的绝对值,比如 do_some_arithmetic_in_unsigned_mode(int some_signed_value) { unsign
我正在努力使用 data.table 来总结向量函数的结果,这在 ddply 中很容易。 问题 1:使用带有矢量输出的(昂贵的)函数聚合 dt dt[ , as.list(quantile(x)),
我有两个分数列表; 说 A = [ 1/212, 5/212, 3/212, ... ] 和 B = [ 4/143, 7/143, 2/143, ... ] . 如果我们定义 A' = a[0] *
我已经使用 numpy 从 csv 文件中获取数据。 numpy 数组的尺寸为:100*20。我如何取列的平均值(比如 col 3,5,8)并用包含这 3 个 cols 平均值的新列替换它们 如果
在 Rust 中取任意数的 n 次根的最佳方法是什么?例如,num crate 只允许取整数类型的第 n 个主根,即 floor'ed 或 ceil'ed 值......如何最好地接近实际值? 最佳答
看起来这应该很容易,但我很困惑。我已经掌握了使用 dplyr 进行编程的大致技巧0.7,但为此苦苦挣扎:How do Iprogram in dplyr我想要编程的变量是否是一个字符串? 我正在抓取数
在 Rust 中取任意数的 n 次根的最佳方法是什么?例如,num crate 只允许取整数类型的第 n 个主根,即 floor'ed 或 ceil'ed 值......如何最好地接近实际值? 最佳答
我有一个 pandas 数据框,其中有一列名为“coverage”。对于一系列特定索引值,我想获取前 100 行的平均“覆盖率”值。例如,对于索引位置 1001,我想要第 901-1000 行的平均“
import pandas as pd data = {'date': ['1998-03-01', '2001-04-01','1998-06-01','2001-08-01','2001-05-0
我有一个包含 100 个数字的 NSArray。我想创建一个 5 个数字的 NSArray。第二个数组中的第一个数字是第一个数组中前 20 个数字的平均值。第二个数字是第一个数组中第二组 20 个数字
我该怎么做?我试过 abs() 但它只适用于整数。有内置的方法吗? CGFloat flo = -123; abs(flo) 返回 0 最佳答案 使用 fabs() CGFloat f = -123.
我正在采用以下计算的 log2: tl_out.a.bits.size := log2Ceil(s1_row * s2_column * 4.U) 其中,s1_row 和 s2_column 是 UI
如何从 m 个元素集合中取出 n 个元素,以便在元素用完时从头开始? List list = new List() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; List newL
我已经完成了研究,但似乎找不到有关该主题的足够文档。 在 Object streams 上尝试一些代码时,我注意到将 BufferedOutputStream 放入 ObjectOutputStrea
我需要计算数据中连续时间组之间的差异,如下所示 from io import StringIO import pandas as pd strio = StringIO("""\
我在 Mongo 数据库中有以下文档: { _id: 1, question: "Blue or red?", __v: 0, votes: [9, 5] } 我想在后
好吧,宇宙中一定有人知道这个问题的答案。 我已经在这里问过这个问题,但仍然没有解决方案。 我需要保留和换行 div 中的文本。到目前为止,我很难想出解决方案。我找到的最佳解决方案并不适用于所有浏览器。
我正在尝试采用 3 个单独的整数输入(年、月、日)并采用这 3 个条目并从中形成一个日期对象,以便我可以使用它来比较其他日期。 这是我目前所拥有的,不知从何而来: public void compar
在我的 IOS 项目中,我有一个包含该函数的自定义 Logger 类(单例) - (void)log:(NSString *)domain logLevel:(int)level logMessage
我是一名优秀的程序员,十分优秀!