gpt4 book ai didi

python - 如何使用 python 将流管道发布订阅到数据存储?

转载 作者:太空宇宙 更新时间:2023-11-03 21:42:11 32 4
gpt4 key购买 nike

我尝试将 json 文件发布到 pubsub 并在流处理过程中使用云数据流写入数据存储。

from __future__ import absolute_import

import apache_beam as beam
import json
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from google.cloud.proto.datastore.v1 import entity_pb2
from apache_beam import window
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from googledatastore import helper as datastore_helper

class EntityWrapper(object):
def __init__(self, namespace, kind, ancestor):
self._namespace = namespace
self._kind = kind
self._ancestor = ancestor

def make_entity(self, content):
entity = entity_pb2.Entity()
if self._namespace is not None:
entity.key.partition_id.namespace_id = self._namespace

datastore_helper.add_key_path(entity.key, self._kind, self._ancestor, self._kind, str(uuid.uuid4()))

datastore_helper.add_properties(entity, {"content": unicode(content)})
return entity

pipeline_options = {
'project': PROJECT,
'staging_location': STAGING_LOCATION,
'runner': 'DataflowRunner',
'job_name': JOB_NAME,
'temp_location': TEMP_LOCATION,
'streaming': True}

options = PipelineOptions.from_dictionary(pipeline_options)

def run():
p = beam.Pipeline(options=options)

def parse_pubsub(line):
record = json.loads(line)
return record

(p | "Read from PubSub" >> ReadFromPubSub(topic=TOPIC)
| "PubSub message to Python object" >> beam.Map(parse_pubsub)
| "Windowing" >> beam.WindowInto(window.FixedWindows(10))
| "create entity" >> beam.Map(EntityWrapper(namespace=NAMESPACE, kind=KIND, ancestor=None).make_entity)
| "write to DataStore" >> WriteToDatastore(PROJECT))

result = p.run()
result.wait_until_finish()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

当我在 google cloud shell 上运行此代码时,它能够运行并创建像这样的管道。

Pipeline

但是,当我将 json 发布到 pubsub 时,它不起作用。

错误消息如下。

JOB_MESSAGE_ERROR: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -30: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 134, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 169, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 215, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 237, in process_bundle
processor.process_bundle(instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 299, in process_bundle
input_op.process_encoded(data.data)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 120, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 166, in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
File "apache_beam/runners/worker/operations.py", line 167, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 387, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 388, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 589, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 595, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 612, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 593, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 363, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 698, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 387, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 388, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 589, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 595, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 612, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 593, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 472, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 522, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 698, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 387, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 388, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 589, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 595, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 612, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 593, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.SimpleInvoker.invoke_process
windowed_value, self.process_method(windowed_value.value))
File "/home/shinya_yaginuma/.local/lib/python2.7/site-packages/apache_beam/transforms/core.py", line 1035, in <lambda>
File "pubsub_to_datastore.py", line 21, in make_entity
NameError: global name 'entity_pb2' is not defined

java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

我检查所有库都已安装。所以,我不明白为什么会发生错误。

问候,

最佳答案

因此,您的导入工作正常,因此在您进行 pubsub 时发生错误,而不是在创建 DF 时发生错误。然而,当你的 make_entity 实际被调用时,entity_pb2 就会消失!

根据docs ,您需要导入实际使用的工作线程,或者您可以使导入持久化。尝试保存您的主 session :

pipeline_options = {
'project': PROJECT,
'staging_location': STAGING_LOCATION,
'runner': 'DataflowRunner',
'job_name': JOB_NAME,
'temp_location': TEMP_LOCATION,
'streaming': True,
'save_main_session': True} #

关于python - 如何使用 python 将流管道发布订阅到数据存储?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52771074/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com