- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个主题和一个有多个订阅者的订阅。我的应用场景是我想处理不同订阅者上的消息,并且一次处理特定数量的消息。意味着首先假设正在处理 8 条消息,然后如果一条消息处理完成,那么在确认已处理消息后,下一条消息应该从主题中获取,同时注意在任何订阅者上都找不到重复消息,并且每次都应该在后台处理 8 条消息。
为此,我使用 max_messages = 8 的同步拉取方法,但下一次拉取是在所有消息处理完成后完成的。因此,我们创建了自己的调度程序,其中应同时在后台运行 8 个进程,并一次拉取 1 条消息,但在所有 8 条消息处理完成后,仍会传递下一条消息。
这是我的代码:
#!/usr/bin/env python3
import logging
import multiprocessing
import time
import sys
import random
from google.cloud import pubsub_v1
project_id = 'xyz'
subscription_name = 'abc'
NUM_MESSAGES = 4
ACK_DEADLINE = 50
SLEEP_TIME = 20
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
def worker(msg):
logger.info("Received message:{}".format(msg.message.data))
random_sleep = random.randint(200,800)
logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
time.sleep(random_sleep)
def message_puller():
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
while(True):
try:
response = subscriber.pull(subscription_path, max_messages=1)
message = response.received_messages[0]
msg = message
ack_id = message.ack_id
process = multiprocessing.Process(target=worker, args=(message,))
process.start()
while process.is_alive():
# `ack_deadline_seconds` must be between 10 to 600.
subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
time.sleep(SLEEP_TIME)
# Final ack.
subscriber.acknowledge(subscription_path, [ack_id])
logger.info("Acknowledging message: {}".format(msg.message.data))
except Exception as e:
print (e)
continue
def synchronous_pull():
p = []
for i in range(0,NUM_MESSAGES):
p.append(multiprocessing.Process(target=message_puller))
for i in range(0,NUM_MESSAGES):
p[i].start()
for i in range(0,NUM_MESSAGES):
p[i].join()
if __name__ == '__main__':
synchronous_pull()
有时,即使 while 循环始终为 True,subscriber.pull 也不会拉取任何消息。它给了我错误 列表索引 (0) 超出范围得出的结论是,即使消息在主题上,subscriber.pull 也不会拉入消息,但一段时间后它开始拉动。为什么会这样?
我尝试过异步拉取和流量控制,但在多个订阅者上发现重复消息。如果任何其他方法可以解决我的问题,请告诉我。提前致谢。
最佳答案
Google Cloud PubSub 确保至少一次 ( docs )。这意味着,消息可能会被多次传递。为了解决这个问题,您需要使您的程序/系统 idempotent
您有多个订阅者,每个订阅者拉取 8 条消息。
为了避免同一消息被多个订阅者处理,请在任何订阅者提取该消息并进一步处理时立即确认
该消息,而不是在消息的整个处理结束后才确认。
此外,当队列中没有消息时,不要连续运行主脚本,而是使用 sleep
持续一段固定时间。
我有一个类似的代码,除了没有使用并行处理之外,我使用了同步拉取。
代码如下:
PubSubHandler - 处理 Pubsub 相关操作的类
from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded
class PubSubHandler:
def __init__(self, subscriber_config):
self.project_name = subscriber_config['PROJECT_NAME']
self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']
self.subscriber = pubsub_v1.SubscriberClient()
self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)
def pull_messages(self,number_of_messages):
try:
response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
received_messages = response.received_messages
except DeadlineExceeded as e:
received_messages = []
print('No messages caused error')
return received_messages
def ack_messages(self,message_ids):
if len(message_ids) > 0:
self.subscriber.acknowledge(self.subscriber_path, message_ids)
return True
Utils - util 方法的类
import json
class Utils:
def __init__(self):
pass
def decoded_data_to_json(self,decoded_data):
try:
decoded_data = decoded_data.replace("'", '"')
json_data = json.loads(decoded_data)
return json_data
except Exception as e:
raise Exception('error while parsing json')
def raw_data_to_utf(self,raw_data):
try:
decoded_data = raw_data.decode('utf8')
return decoded_data
except Exception as e:
raise Exception('error converting to UTF')
Orcestrator - 主要脚本
import time
import json
import logging
from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler
class Orcestrator:
def __init__(self):
self.MAX_NUM_MESSAGES = 2
self.SLEEP_TIME = 10
self.util_methods = Utils()
self.pub_sub_handler = PubSubHandler(subscriber_config)
def main_handler(self):
to_ack_ids = []
pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)
if len(pulled_messages) < 1:
self.SLEEP_TIME = 1
print('no messages in queue')
return
logging.info('messages in queue')
self.SLEEP_TIME = 10
for message in pulled_messages:
raw_data = message.message.data
try:
decoded_data = self.util_methods.raw_data_to_utf(raw_data)
json_data = self.util_methods.decoded_data_to_json(decoded_data)
print(json_data)
except Exception as e:
logging.error(e)
to_ack_ids.append(message.ack_id)
if self.pub_sub_handler.ack_messages(to_ack_ids):
print('acknowledged msg_ids')
if __name__ == "__main__":
orecestrator = Orcestrator()
print('Receiving data..')
while True:
orecestrator.main_handler()
time.sleep(orecestrator.SLEEP_TIME)
关于python-3.x - 谷歌云pubsub python同步拉取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58321230/
这两个包看起来非常相似: http://www.passportjs.org/packages/passport-google-oauth2/ http://www.passportjs.org/pa
我想在我的网站上添加通过 Google 和 Twitter 登录的按钮。我需要只使用应用程序的客户端而不是服务器端来完成此操作。但我没有找到任何 API。对于我发现的所有内容,我需要使用带有 key
我使用此链接通过 google plus 共享我的页面。 https://plus.google.com/share?url=http%3A%2F%2Fexample.com%2Fcompany%2
我正在尝试学习 google API,并且我的经验是使用 Python,因此我尝试使用 google api python 客户端来访问一些 google 服务,但在构建服务对象时遇到错误。 从 ap
在其实际的实时托管平台上构建实时站点的努力中,有没有办法告诉谷歌不要索引该网站?我发现了以下内容: http://support.google.com/webmasters/bin/answer.py
我正在开发一个 iOS 应用程序。当我运行用于 google+ 登录的程序时,在我点击允许访问按钮后,会显示此消息。 You've reached this page because we have
我有一个非常复杂的网站,每个页面包含 11 个 js 文件。 我最近添加了 google +1 按钮,代码如下: 这会正确显示 +1 按钮,直到我单击它。当我单击它时,出现此错误:https://
我正在尝试使用 google API 创建一个 html 文件,以便在 google MAPS 上显示 KML 文件。 这是 HTML 代码: function initMap() {
我是使用 Google Benchmark 的新手,在本地运行代码与在 Quick-Bench.com 上运行代码时,我收到了运行相同基准测试(下方)的不同结果,该基准测试使用 C++ 检索本地时间.
我已按照 Google 网站上的说明通过添加以下元标记在我的 AngularJS 网站上启用 Ajax 抓取: 呈现的内容有一些链接,如: User 1 User 2 User 3 还有一些呈现动态
通过 Google 手册实现 Google AppInvite - link . 启动 Invite Activity 并在 LogCat 中获取下一步: E/AppInviteAgent: Get
那么有人用过 Google 的 Go 吗?我想知道数学性能(例如触发器)与其他具有垃圾收集器的语言(如 Java 或 .NET)相比如何? 有人调查过吗? 最佳答案 理论性能:纯 Go 程序的理论性能
Stackdriver 测试我的网站启动速度慢 我们使用 cloudflare 作为我们的站点 CDN 提供商。我们使用 stackdriver 从外部测试站点可用性,我们将时间检查间隔设置为 1 分
我正在尝试使用 stax.GeneralConv() ( https://jax.readthedocs.io/en/latest/_modules/jax/experimental/stax.htm
我有一个从谷歌金融中提取日内数据的软件。但是,由于昨天 Google 更新了 API,所以软件报错了 Conversion from string HTML HEAD meta http-equiv=
我们在尝试从 Google 获取 oAuth token 时遇到“redirect_uri_mismatch”错误: [client 127.0.0.1:49892] {\n "error" : "
我的网站正在使用 Google reCAPTCHA 控件,但我听说它被阻止了 中国,反正我看到有人报告说将 API 更改为 https://www.recaptcha.net在中国工作? Anyone
背景 WordPress Google Adsense 谷歌自动插入 anchor 定广告 https://pptmon.com 问题 如下图所示,主播广告的容器高度太大了! 如何调整高度? 这是谷歌
我在使用 Google Colab 时遇到问题。当我想制作一个新的 Python3 Notebook 时,由于我登录了我的 Google 帐户,因此无法加载刚刚打开的新页面。 我该怎么办? 感谢您的帮
我正在使用 facebook和 google oauth2使用 passport js 登录, 有了这个流 用户点击登录按钮 重定向到 facebook/google auth 页面(取决于用户选择的
我是一名优秀的程序员,十分优秀!