gpt4 book ai didi

python - 如何在 Python 中处理传入的 PubSub 消息?

转载 作者:行者123 更新时间:2023-12-05 06:28:36 24 4
gpt4 key购买 nike

我在 Debian 上创建了一个 Cloud Compute Engine 实例,并成功创建了一个主题的推送订阅

from google.cloud import pubsub_v1

project_id = "censored"
topic_name = "censored"
subscription_name = "censored"
endpoint = "https://censored.appspot.com/pubsub/push?token=censored"

def create_push_subscription(project_id,
topic_name,
subscription_name,
endpoint):
"""Create a new push subscription on the given topic."""
# [START pubsub_create_push_subscription]

subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(
project_id, subscription_name)

push_config = pubsub_v1.types.PushConfig(
push_endpoint=endpoint)

subscription = subscriber.create_subscription(
subscription_path, topic_path, push_config)

print('Push subscription created: {}'.format(subscription))
print('Endpoint for subscription is: {}'.format(endpoint))
# [END pubsub_create_push_subscription]

create_push_subscription(project_id, topic_name, subscription_name, endpoint)

但我不确定传入的消息究竟是如何到达的。我找到了这个示例代码来解析消息,但我不确定如何让它在后台监控并在收到消息时“激活”。

import argparse
import base64
import json
import sys
import time

from google.cloud import pubsub_v1

def summarize(message):
# [START parse_message]
data = message.data.decode('utf-8')
attributes = message.attributes

name = attributes['name']
time_created = attributes['timeCreated']
bucket_id = attributes['bucketId']
object_id = attributes['objectId']
generation = attributes['objectGeneration']
description = (
'\tName: {name}\n'
'\tTime Created: {time_created}\n'
'\tBucket ID: {bucket_id}\n'
'\tObject ID: {object_id}\n'
'\tGeneration: {generation}\n'
).format(
name=name,
time_created=time_created,
bucket_id=bucket_id,
object_id=object_id,
generation=generation
)

if 'overwroteGeneration' in attributes:
description += '\tOverwrote generation: %s\n' % (
attributes['overwroteGeneration'])
if 'overwrittenByGeneration' in attributes:
description += '\tOverwritten by generation: %s\n' % (
attributes['overwrittenByGeneration'])

payload_format = attributes['payloadFormat']
if payload_format == 'JSON_API_V1':
object_metadata = json.loads(data)
name = object_metadata['name']
time_created = object_metadata['timeCreated']
size = object_metadata['size']
content_type = object_metadata['contentType']
metageneration = object_metadata['metageneration']
description += (
'\tName: {name}\n'
'\tTime Created: {time_created}\n'
'\tContent type: {content_type}\n'
'\tSize: {object_size}\n'
'\tMetageneration: {metageneration}\n'
).format(
name=name,
time_created=time_created,
content_type=content_type,
object_size=size,
metageneration=metageneration
)
return description
print('Note for developer: If BucketId and ObjectId listed, utf encoding.')
print('If not, JSON_V1 encoding. Adjust accordingly.')

# [END parse_message]
while(True):
print("signpost 1")
summarize(message)
print("signpost 2")
time.sleep(10)
print("signpost 3")

例如,这段代码会返回

NameError: name 'message' is not defined

这是预期的...

有人可以帮我正确设置吗?

我知道它在 PULL 中有所不同,因为消息将在拉取期间定义,但如果可能的话,我想将其保留为 PUSH。

最佳答案

您需要创建一个长时间运行的进程,该进程要么能够持续轮询新消息(拉订阅),要么有一个可访问的端点来接收新消息(推送订阅)。

请参阅此处示例:https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/pubsub/cloud-client/subscriber.py ,以及这里推和拉的区别:https://cloud.google.com/pubsub/docs/subscriber

关于python - 如何在 Python 中处理传入的 PubSub 消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54230637/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com