gpt4 book ai didi

google-cloud-dataflow - 如何运行 Apache Beam 集成测试?

转载 作者:行者123 更新时间:2023-12-04 07:56:05 25 4
gpt4 key购买 nike

我正在尝试运行此处找到的游戏统计示例管道和集成测试 https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/game但我不确定设置本地环境的正确方法是什么。

我的主要目标是学习如何使用 TestDataflowRunner,以便我可以为我编写的现有管道实现集成测试。

[更新]我写了一个基本的数据流,它从 PubSub 读取消息并将其写入不同的主题。我有一个使用 TestDirectRunner 通过的集成测试,但在尝试使用 TestDataflowRunner 时出现错误

pipeline.py

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--output_topic', required=True)
parser.add_argument('--input_subscription', required=True)

known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as p:
# Read from PubSub into a PCollection.
messages = (
p |
beam.io.ReadFromPubSub(subscription=known_args.input_subscription).
with_output_types(bytes)
)

lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))

def format_pubsub(msg):
logging.info(f'Format PubSub: {msg}')
return str(msg)

output = (
lines
| 'format' >> beam.Map(format_pubsub)
| 'encode' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes))

output | beam.io.WriteToPubSub(known_args.output_topic)

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

pubsub_it_test.py

from __future__ import absolute_import

import logging
import os
import time
import unittest
import uuid

from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr

from apache_beam.io.gcp.tests import utils
from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
from apache_beam.runners.runner import PipelineState
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline

import pipeline


INPUT_TOPIC = 'wordcount-input'
OUTPUT_TOPIC = 'wordcount-output'
INPUT_SUB = 'wordcount-input-sub'
OUTPUT_SUB = 'wordcount-output-sub'

DEFAULT_INPUT_NUMBERS = 1
WAIT_UNTIL_FINISH_DURATION = 12 * 60 * 1000 # in milliseconds


class TestIT(unittest.TestCase):
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.project = self.test_pipeline.get_option('project')
self.uuid = str(uuid.uuid4())

# Set up PubSub environment.
from google.cloud import pubsub
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
self.output_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))

self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
self.input_topic.name)
self.output_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
self.output_topic.name,
ack_deadline_seconds=60)

def _inject_numbers(self, topic, num_messages):
"""Inject numbers as test data to PubSub."""
logging.debug('Injecting %d numbers to topic %s', num_messages, topic.name)
for n in range(num_messages):
self.pub_client.publish(self.input_topic.name, str(n).encode('utf-8'))

def tearDown(self):
test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub, self.output_sub])
test_utils.cleanup_topics(self.pub_client, [self.input_topic, self.output_topic])

@attr('IT')
def test_pubsub_pipe_it(self):
# Build expected dataset.
expected_msg = [('%d' % num).encode('utf-8') for num in range(DEFAULT_INPUT_NUMBERS)]

# Set extra options to the pipeline for test purpose
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
pubsub_msg_verifier = PubSubMessageMatcher(self.project, self.output_sub.name, expected_msg, timeout=400)
extra_opts = {
'input_subscription': self.input_sub.name,
'output_topic': self.output_topic.name,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
}

# Generate input data and inject to PubSub.
self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)

# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
pipeline.run(self.test_pipeline.get_full_options_as_args(**extra_opts))

if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()

我在数据流日志中收到此错误

Error message from worker: generic::unknown: Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 290, in loads
return dill.loads(s)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'pipeline'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 638, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 467, in get
self.data_channel_factory)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 925, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
get_operation(transform_id))) for transform_id in sorted(
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in get_operation
transform_id, transform_consumers)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1546, in create_par_do
parameter)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1582, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 294, in loads
return dill.loads(s)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'pipeline'

passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service.cc:631

我正在运行的命令是

pytest --log-cli-level=INFO pubsub_it_test.py --test-pipeline-options="--runner=TestDataflowRunner \
--project=$PROJECT --region=europe-west1 \
--staging_location=gs://$BUCKET/staging \
--temp_location=gs://$BUCKET/temp \
--job_name=it-test-pipeline \
--setup_file ./setup.py"

我的这个管道的 repo 可以在这里找到 https://github.com/tunnelWithAC/apache-beam-integration-test

谢谢

最佳答案

集成测试旨在由 Beam 的 CI/CD 基础架构运行。它们是基于 nose 的,需要一个自定义插件来理解 --test-pipeline-options 标志。我不建议走这条路。

我会关注 quick start Ricco D 建议的环境指南。您可以使用 pytest 运行集成测试。要使用相同的 --test-pipeline-options 标志,您需要 this definition .否则,wordcount 示例展示了如何设置您自己的命令行标志。


更新:

我用它来设置 virtualenv:

pip install apache-beam[gcp,test]

test 标签会引入 pytest,但如果您已经安装了 pytest,则不需要。

然后我创建了这个 conftest.py 文件来配置 pytest(基于 Beam 自己的 conftest.py):

def pytest_addoption(parser):
parser.addoption('--test-pipeline-options',
help='Options to use in test pipelines. NOTE: Tests may '
'ignore some or all of these options.')

运行测试:

pytest --log-cli-level=INFO pipeline_it_test.py --test-pipeline-options="--runner=TestDataflowRunner --project=PROJECT --region=us-west1 --staging_location=gs://BUCKET/staging --temp_location=gs://BUCKET/temp --output=gs://BUCKET/output "

您的测试可能不需要 --test-pipeline-options 中的所有选项。

关于google-cloud-dataflow - 如何运行 Apache Beam 集成测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66695171/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com