gpt4 book ai didi

python-3.x - 为什么GCP发布/订阅无法确认消息?

转载 作者:行者123 更新时间:2023-12-02 22:56:18 25 4
gpt4 key购买 nike

我的GCP发布/订阅者消息确认有问题。我编写了三个类:PubSubPublisher(发布到主题),PubSubSubscriber(从订阅和从索引接收到Elasticsearch)和ElasticDailyIndexManager(Thread)。那就是我对他们的看法:

  • 我使用PubSubPublisher将10万条虚拟消息发布到我的主题(每秒大约处理10k条消息)。
  • 我第一次运行PubSubSubscriber.receive_and_index()。在处理消息时,它还会使用10个ElasticDailyIndexManager线程将其索引到Elasticsearch中。基本上,我连接到订阅并读取(和编制索引)60秒钟,然后退出。 100k被索引到Elasticsearch中(每秒大约处理1.5k条消息)。
  • 我第二次运行PubSubSubscriber.receive_and_index()-我希望没有任何索引,但是,大约有40k个新文档被索引了(elasticsearch中的id是随机的)。

  • 有时需要大约3-4次运行才能清空订阅,因此我想确认消息存在问题,但在运行时不会出错。我将数据与message.message_id一起编制索引,很明显,我有多个具有相同message_id的行。

    我看不出订户不会引发任何错误的原因而导致订阅者无法确认的原因。我在订阅上设置了30秒的确认超时,但没有帮助。

    下面提到的类的代码:
    class ProcessFutures(Thread):
    def __init__(self, futures_queue):
    Thread.__init__(self)

    self.queue = futures_queue

    self.counter = 0

    self.results = list()

    self.daemon = True

    self.start()

    def run(self):
    while getattr(self, 'keep_going', True):
    future = self.queue.get()

    self.results.append(future.result())

    self.queue.task_done()


    class PubSubPublisher:
    def __init__(self, project_id, topic_name):
    self.client = pubsub_v1.PublisherClient()
    self.project_id = project_id
    self.topic_name = topic_name

    self.keep_going = True
    self.futures_queue = Queue()
    self.future_process = ProcessFutures(self.futures_queue)

    def publish_message(self, message_body):
    """
    Publishes message to a Pub/Sub topic.

    future.result is verified in separate thread to avoid blocking of message publishing.
    """

    topic_path = self.client.topic_path(self.project_id, self.topic_name)

    if isinstance(message_body, dict):
    data = dumps(message_body)
    elif isinstance(message_body, str):
    data = message_body
    else:
    raise BaseException

    data = data.encode('utf-8')

    future = self.client.publish(topic_path, data=data)

    self.futures_queue.put(future)

    def finish(self):
    self.future_process.queue.join()

    print("Processed results: " + str(len(self.future_process.results)))


    @click.command()
    @click.option('--project-id', '-p', required=True, type=str, help='Google Cloud Platform Project Id')
    @click.option('--topic', '-t', required=True, type=str, help='Pub/Sub Topic to which messages will be published')
    @click.option('--message', '-m', required=True, type=str, help='Message body')
    @click.option('--amount', '-a', required=True, type=int, help='How many messages to send')
    def run(project_id, topic, message, amount):
    from time import time

    psp = PubSubPublisher(project_id, topic)

    time_start = time()

    for i in range(amount):
    message_body = dict(i=i, message=message)
    psp.publish_message(message_body)

    psp.finish()

    time_stop = time()

    seconds = time_stop - time_start

    print("Published {} messages in {:.2f} seconds. That is {:.2f} mps!".format(amount, seconds,
    amount / seconds))

    from elasticsearch import Elasticsearch, ElasticsearchException, NotFoundError, helpers
    from datetime import datetime
    from json import load
    from threading import Thread
    from queue import Queue
    from os import getenv

    from config import BASE_PATH


    class ElasticDailyIndexManager(Thread):
    def __init__(self, index_basename):
    Thread.__init__(self)

    es_port = 9200 if ElasticDailyIndexManager.str_to_bool(getenv("RUNNING_IN_CONTAINER", "False")) else 9201
    es_url = "elastic" if ElasticDailyIndexManager.str_to_bool(getenv("RUNNING_IN_CONTAINER", "False")) else "localhost"

    self.es = Elasticsearch(hosts=[es_url], port=es_port)

    self.index_template_name = index_basename
    self.index_name_mask = index_basename if index_basename.endswith("-") else index_basename + "-"

    while not self._template_exists():
    self._register_index_template()

    self.queue = Queue()
    self.daemon = True

    self.start()

    def run(self):
    def generator():
    while True:
    message_body, message_id = self.queue.get()

    metadata = dict()
    self.queue.task_done()

    yield self._prepare_bulk_doc(message_body, **metadata)

    bulk_load = helpers.streaming_bulk(self.es, generator(), 10, yield_ok=False)

    while True:
    for success, info in bulk_load:
    print(success, info)

    def index_document(self, document_body, id=None):
    document_body['@timestamp'] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')

    try:
    self.es.index(index=self.index_name_mask + datetime.utcnow().strftime('%Y.%m.%d'),
    doc_type='default',
    body=document_body,
    id=id)
    except ElasticsearchException as e:
    print(document_body, id, e.args)

    def _register_index_template(self):
    template_body = self._get_json_file_content("{}/config/templates/{}.json".format(BASE_PATH,
    self.index_template_name))

    try:
    if template_body is not None:
    self.es.indices.put_template(name=self.index_template_name,
    body=template_body,
    master_timeout="60s")

    except ElasticsearchException as e:
    print(e.args)

    def _template_exists(self):
    try:
    self.es.indices.get_template(self.index_template_name)
    return True
    except NotFoundError:
    return False

    @staticmethod
    def _get_json_file_content(file_dir_arg):
    """
    Wrapper on load function. Expects file with JSON inside.

    :param file_dir_arg: Path to file to be read.
    :return: Dictionary (Encoded JSON)
    """
    result = None

    try:
    with open(file_dir_arg, 'r', encoding='UTF-8-SIG') as f:
    result_tmp = f
    result = load(result_tmp)
    except Exception as e:
    print(e.args)

    return result

    def _prepare_bulk_doc(self, source_arg, **kwargs):
    """
    Function providing unified document structure for indexing in elasticsearch.
    The structure needs to be compliant with

    :param index_arg: index to which send data
    :param doc_type_arg: document type in index_arg
    :param source_arg: body of document
    :param kwargs: additional meta parameters (like doc _id)
    :return: Reformatted & enhanced source_arg
    """

    metadata = dict(**kwargs).get('metadata', dict())

    source_arg['@timestamp'] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')

    result = {
    '_index': self.index_name_mask + datetime.utcnow().strftime('%Y.%m.%d'),
    '_type': 'default',
    '_op_type': 'index',
    'doc': source_arg,
    'doc_as_upsert': False
    }

    result.update(metadata)

    return result

    @staticmethod
    def str_to_bool(str_arg):
    if str_arg.lower() == 'true':
    return True
    elif str_arg.lower() == 'false':
    return False
    else:
    return None


    ELASTIC_MANAGERS = environ.get("ElASTIC_MANAGERS", 10)


    class PubSubSubscriber:
    def __init__(self, project_id_arg, topic_name_arg, seconds_arg=None):
    self.elasticsearch_index_managers = list()

    for _ in range(ELASTIC_MANAGERS):
    self.elasticsearch_index_managers.append(ElasticDailyIndexManager(topic_name_arg))

    self.project_id = project_id_arg
    self.topic_name = topic_name_arg

    self.client = pubsub_v1.SubscriberClient()

    self.counter = 0

    self.latencies = list()

    self.seconds = seconds_arg

    self.lock = Lock()

    def receive_and_index(self):
    subscription_path = self.client.subscription_path(self.project_id,
    "{}-subscription-elastic".format(self.topic_name))

    def callback(message):
    latency = message._received_timestamp - message.publish_time.timestamp()

    document = PubSubSubscriber.struct_message(message.data)
    document['message_id'] = message.message_id

    self.elasticsearch_index_managers[self.counter % ELASTIC_MANAGERS].queue.put((document, None))

    message.ack()

    if self.seconds:
    self.latencies.append(latency)

    self.counter += 1

    future = self.client.subscribe(subscription_path, callback=callback)

    try:
    # When timeout is unspecified, the result method waits indefinitely.
    print('Listening for messages on {}'.format(subscription_path))
    print('Running for{}'.format(' ' + str(self.seconds) + 'seconds...' if self.seconds else 'ever'))

    future.result(timeout=self.seconds)
    except Exception as e:
    print('Listening for messages on {} threw an Exception: {}.'.format(subscription_path, e))
    finally:
    time_queue_join_start = time()

    for manager in self.elasticsearch_index_managers:
    manager.queue.join()

    time_queue_join_stop = time()

    self.seconds = self.seconds + time_queue_join_stop - time_queue_join_start

    print("Read {} messages in {:.2f} seconds. That is {:.2f} mps!".format(self.counter, self.seconds,
    self.counter / self.seconds))

    if self.latencies:
    avg_latency = float(sum(self.latencies)) / float(len(self.latencies))

    print("Average latency was {:.2f} ms.".format(avg_latency))

    @staticmethod
    def struct_message(message_arg, encoding='utf-8'):
    if isinstance(message_arg, dict):
    message = message_arg
    elif isinstance(message_arg, bytes):
    message = PubSubSubscriber.message_to_dict(message_arg.decode(encoding))
    elif isinstance(message_arg, str):
    message = PubSubSubscriber.message_to_dict(message_arg)
    else:
    message = None

    group_topics = message.get("group", dict()).get("group_topics", dict())

    if group_topics:
    message['group']['group_topics'] = [d['topic_name'] for d in message['group']['group_topics']]

    # time handling
    event_time = PubSubSubscriber.epoch_to_strtime(message.get("event", dict()).get("time", None))
    if event_time:
    message['event']['time'] = event_time

    mtime = PubSubSubscriber.epoch_to_strtime(message.get("mtime", None))
    if mtime:
    message['mtime'] = mtime

    # geo handling
    group_geo_lat = message.get("group", dict()).get("group_lat", None)
    group_geo_lon = message.get("group", dict()).get("group_lon", None)

    if group_geo_lon and group_geo_lat:
    message['group']['group_geo'] = PubSubSubscriber.create_geo_object(group_geo_lat, group_geo_lon)

    venue_geo_lat = message.get("venue", dict()).get("lat", None)
    venue_geo_lon = message.get("venue", dict()).get("lon", None)

    if venue_geo_lon and venue_geo_lat:
    message['venue']['venue_geo'] = PubSubSubscriber.create_geo_object(venue_geo_lat, venue_geo_lon)

    return message

    @staticmethod
    def epoch_to_strtime(epoch_time):
    try:
    result = strftime('%Y-%m-%dT%H:%M:%S', localtime(epoch_time / 1000))
    except:
    result = epoch_time

    return result

    @staticmethod
    def create_geo_object(lat, lon):
    return "{}, {}".format(str(lat), str(lon))

    @staticmethod
    def message_to_dict(message_arg):
    keep_going = True
    result = message_arg

    while keep_going and (not isinstance(result, dict)):
    try:
    result = loads(result)
    except JSONDecodeError:
    result = None
    keep_going = False

    return result


    @click.command()
    @click.option('--project-id', '-p', required=True, type=str, help='Google Cloud Platform Project Id')
    @click.option('--topic', '-t', required=True, type=str, help='Pub/Sub Topic from which messages will be read')
    @click.option('--seconds', '-s', default=None, required=False, type=int, help='For how long to read messages. If not provided - run forever')
    def run(project_id, topic, seconds):
    pss = PubSubSubscriber(project_id, topic, seconds)
    pss.receive_and_index()


    if __name__ == '__main__':
    run()

    最佳答案

    https://cloud.google.com/pubsub/docs/faq#duplicates:

    Why are there too many duplicate messages?

    Cloud Pub/Sub guarantees at-least-once message delivery, which means that occasional duplicates are to be expected. However, a high rate of duplicates may indicate that the client is not acknowledging messages within the configured ack_deadline_seconds, and Cloud Pub/Sub is retrying the message delivery. This can be observed in the monitoring metrics pubsub.googleapis.com/subscription/pull_ack_message_operation_count for pull subscriptions, and pubsub.googleapis.com/subscription/push_request_count for push subscriptions. Look for elevated expired or webhook_timeout values in the /response_code. This is particularly likely if there are many small messages, since Cloud Pub/Sub may batch messages internally and a partially acknowledged batch will be fully redelivered.

    Another possibility is that the subscriber is not acknowledging some messages because the code path processing those specific messages fails, and the Acknowledge call is never made; or the push endpoint never responds or responds with an error.

    关于python-3.x - 为什么GCP发布/订阅无法确认消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53175170/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com