gpt4 book ai didi

google-cloud-messaging - 谷歌云发布订阅数据丢失

转载 作者:行者123 更新时间:2023-12-04 12:38:38 31 4
gpt4 key购买 nike

我遇到了 GCP pubsub 的问题,即在几秒钟内发布数千条消息时丢失了一小部分数据。

我正在记录两个 message_id来自 pubsub 和 session_id发布端和接收端的每条消息都是唯一的,我看到的结果是接收端的某些消息具有相同的 session_id ,但不同 message_id .此外,还丢失了一些消息。

例如,在一项测试中,我向 pubsub 发送了 5,000 条消息,并且恰好收到了 5,000 条消息,其中 8 条消息丢失。日志丢失消息如下所示:

MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)

messageId FOUND: messageId:108562396466545

API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)

Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)

重复项看起来像:
======= Duplicates FOUND on sessionId: 730=======

sessionId: 730, messageId:108562396466545

sessionId: 730, messageId:108561339282318

(both are logs from pull request)

所有丢失的数据和重复项看起来像这样。

从上面的例子可以看出,有些消息已经取了 message_id的另一条消息,已经用两个不同的 message_id 发送了两次s。

我想知道是否有人能帮我弄清楚发生了什么?提前致谢。

代码

我有一个向 pubsub 发送消息的 API,如下所示:

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json


app = Flask(__name__)
ps = pubsub.Client()

...

@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
pubsub_topic = 'test_topic'
data = request.data

topic = ps.topic(pubsub_topic)

event = json.loads(data)

messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"

这是我用来从 pubsub 读取的代码:

从 google.cloud 导入 pubsub
进口重新
导入json
ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')

max_messages = 1
stop = False

messages = []

class Message(object):
"""docstring for Message."""
def __init__(self, sessionId, messageId):
super(Message, self).__init__()
self.seesionId = sessionId
self.messageId = messageId


def pull_all():
while stop == False:

m = sub.pull(max_messages = max_messages, return_immediately = False)

for data in m:
ack_id = data[0]
message = data[1]
messageId = message.message_id
data = message.data
event = json.loads(data)
sessionId = str(event["sessionId"])
messages.append(Message(sessionId = sessionId, messageId = messageId))

print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******"

sub.acknowledge(ack_ids = [ack_id])

pull_all()

为了生成 session_id,从 API 发送请求和记录响应:

// generate trackable sessionId
var sessionId = 0

var increment_session_id = function () {
sessionId++;
return sessionId;
}

var generate_data = function () {
var data = {};
// data.sessionId = faker.random.uuid();
data.sessionId = increment_session_id();
data.user = get_rand(userList);
data.device = get_rand(deviceList);
data.visitTime = new Date;
data.location = get_rand(locationList);
data.content = get_rand(contentList);

return data;
}

var sendData = function (url, payload) {
var request = $.ajax({
url: url,
contentType: 'application/json',
method: 'POST',
data: JSON.stringify(payload),
error: function (xhr, status, errorThrown) {
console.log(xhr, status, errorThrown);
$('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>")
$('.result').prepend("<div>errorThrown: " + errorThrown + "</div>")
$('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>")
}
}).done(function (xhr) {
console.log(xhr);
$('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>")
})
}

$(submit_button).click(function () {
var request_num = get_request_num();
var request_url = get_url();
for (var i = 0; i < request_num; i++) {
var data = generate_data();
var loadData = changeVerb(data, 'load');
sendData(request_url, loadData);
}
})

更新

我对 API 进行了更改,问题似乎消失了。我所做的更改不是使用一个 pubsub.Client()对于所有请求,我为每个传入的请求初始化了一个客户端。新的 API 如下所示:

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json


app = Flask(__name__)

...

@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():

ps = pubsub.Client()


pubsub_topic = 'test_topic'
data = request.data

topic = ps.topic(pubsub_topic)

event = json.loads(data)

messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"

最佳答案

与谷歌的一些人交谈过,这似乎是 Python 客户端的问题:

The consensus on our side is that there is a thread-safety problem in the current python client. The client library is being rewritten almost from scratch as we speak, so I don't want to pursue any fixes in the current version. We expect the new version to become available by end of June.

Running the current code with thread_safe: false in app.yaml or better yet just instantiating the client in every call should is the work around -- the solution you found.


详细解决方法请看 更新 在问题中

关于google-cloud-messaging - 谷歌云发布订阅数据丢失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44250348/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com