- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试运行此处找到的游戏统计示例管道和集成测试 https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/game但我不确定设置本地环境的正确方法是什么。
我的主要目标是学习如何使用 TestDataflowRunner,以便我可以为我编写的现有管道实现集成测试。
[更新]我写了一个基本的数据流,它从 PubSub 读取消息并将其写入不同的主题。我有一个使用 TestDirectRunner 通过的集成测试,但在尝试使用 TestDataflowRunner 时出现错误
pipeline.py
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--output_topic', required=True)
parser.add_argument('--input_subscription', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as p:
# Read from PubSub into a PCollection.
messages = (
p |
beam.io.ReadFromPubSub(subscription=known_args.input_subscription).
with_output_types(bytes)
)
lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
def format_pubsub(msg):
logging.info(f'Format PubSub: {msg}')
return str(msg)
output = (
lines
| 'format' >> beam.Map(format_pubsub)
| 'encode' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes))
output | beam.io.WriteToPubSub(known_args.output_topic)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
pubsub_it_test.py
from __future__ import absolute_import
import logging
import os
import time
import unittest
import uuid
from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr
from apache_beam.io.gcp.tests import utils
from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
from apache_beam.runners.runner import PipelineState
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
import pipeline
INPUT_TOPIC = 'wordcount-input'
OUTPUT_TOPIC = 'wordcount-output'
INPUT_SUB = 'wordcount-input-sub'
OUTPUT_SUB = 'wordcount-output-sub'
DEFAULT_INPUT_NUMBERS = 1
WAIT_UNTIL_FINISH_DURATION = 12 * 60 * 1000 # in milliseconds
class TestIT(unittest.TestCase):
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.project = self.test_pipeline.get_option('project')
self.uuid = str(uuid.uuid4())
# Set up PubSub environment.
from google.cloud import pubsub
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
self.output_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
self.input_topic.name)
self.output_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
self.output_topic.name,
ack_deadline_seconds=60)
def _inject_numbers(self, topic, num_messages):
"""Inject numbers as test data to PubSub."""
logging.debug('Injecting %d numbers to topic %s', num_messages, topic.name)
for n in range(num_messages):
self.pub_client.publish(self.input_topic.name, str(n).encode('utf-8'))
def tearDown(self):
test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub, self.output_sub])
test_utils.cleanup_topics(self.pub_client, [self.input_topic, self.output_topic])
@attr('IT')
def test_pubsub_pipe_it(self):
# Build expected dataset.
expected_msg = [('%d' % num).encode('utf-8') for num in range(DEFAULT_INPUT_NUMBERS)]
# Set extra options to the pipeline for test purpose
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
pubsub_msg_verifier = PubSubMessageMatcher(self.project, self.output_sub.name, expected_msg, timeout=400)
extra_opts = {
'input_subscription': self.input_sub.name,
'output_topic': self.output_topic.name,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
}
# Generate input data and inject to PubSub.
self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
pipeline.run(self.test_pipeline.get_full_options_as_args(**extra_opts))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()
我在数据流日志中收到此错误
Error message from worker: generic::unknown: Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 290, in loads
return dill.loads(s)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'pipeline'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 638, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 467, in get
self.data_channel_factory)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 925, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
get_operation(transform_id))) for transform_id in sorted(
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in get_operation
transform_id, transform_consumers)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1546, in create_par_do
parameter)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1582, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 294, in loads
return dill.loads(s)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'pipeline'
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service.cc:631
我正在运行的命令是
pytest --log-cli-level=INFO pubsub_it_test.py --test-pipeline-options="--runner=TestDataflowRunner \
--project=$PROJECT --region=europe-west1 \
--staging_location=gs://$BUCKET/staging \
--temp_location=gs://$BUCKET/temp \
--job_name=it-test-pipeline \
--setup_file ./setup.py"
我的这个管道的 repo 可以在这里找到 https://github.com/tunnelWithAC/apache-beam-integration-test
谢谢
最佳答案
集成测试旨在由 Beam 的 CI/CD 基础架构运行。它们是基于 nose
的,需要一个自定义插件来理解 --test-pipeline-options
标志。我不建议走这条路。
我会关注 quick start Ricco D 建议的环境指南。您可以使用 pytest 运行集成测试。要使用相同的 --test-pipeline-options
标志,您需要 this definition .否则,wordcount 示例展示了如何设置您自己的命令行标志。
更新:
我用它来设置 virtualenv:
pip install apache-beam[gcp,test]
test
标签会引入 pytest,但如果您已经安装了 pytest,则不需要。
然后我创建了这个 conftest.py
文件来配置 pytest(基于 Beam 自己的 conftest.py):
def pytest_addoption(parser):
parser.addoption('--test-pipeline-options',
help='Options to use in test pipelines. NOTE: Tests may '
'ignore some or all of these options.')
运行测试:
pytest --log-cli-level=INFO pipeline_it_test.py --test-pipeline-options="--runner=TestDataflowRunner --project=PROJECT --region=us-west1 --staging_location=gs://BUCKET/staging --temp_location=gs://BUCKET/temp --output=gs://BUCKET/output "
您的测试可能不需要 --test-pipeline-options 中的所有选项。
关于google-cloud-dataflow - 如何运行 Apache Beam 集成测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66695171/
我有一个 Cloud Run 服务,它通过 SQLAlchemy 访问 Cloud SQL 实例.但是,在 Cloud Run 的日志中,我看到 CloudSQL connection failed.
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 4年前关闭。 Improve t
在将 docker 容器镜像部署到 Cloud Run 时,我可以选择一个区域,这很好。 Cloud Run 将构建委托(delegate)给 Cloud Build,后者显然会创建两个存储桶来实现这
我正在尝试将 Cloud Functions 用作由 PubSub 触发的异步后台工作程序,并进行更长时间的工作(以分钟为单位)。完整代码在这里https://github.com/zdenulo/c
这是/etc/cloud/cloud.cfg的内容Ubuntu云16.04镜像: # The top level settings are used as module # and system co
如何从 Google Cloud Function 启动 Cloud Dataflow 作业?我想使用 Google Cloud Functions 作为启用跨服务组合的机制。 最佳答案 我已经包含了
我想使用 Cloud Shell 在我的第二代 Cloud Sql 实例上运行数据库迁移。 我找到了一个 example in the docs关于如何使用 gcloud 进行连接.但是当我运行命令时
我正在尝试使用 Google Cloud PubSub和我的 Google Cloud Dataproc群集,我收到如下身份验证范围错误: { "code" : 403, "errors" :
这是我的用例。 我已经有一个以私有(private)模式部署的 Cloud Run 服务。 (与云功能相同的问题) 我正在开发使用此 Cloud Run 的新服务。我在应用程序中使用默认凭据进行身份验
如何连接到 Cloud SQL 上的数据库,而无需在容器中添加我的凭据文件? 最佳答案 使用 UNIX 域套接字 (Java) 从云运行(完全托管)连接到云 SQL At this time Clou
我有一个google-cloud-ml作业,需要从gs存储桶加载numpy .npz文件。我遵循了this example上关于如何从gs加载.npy文件的操作,但是由于.npz文件已压缩,因此它对我
我想创建链接到另一个项目中的 Cloud Source Repository 的 Cloud Build 触发器。但是当我在应该选择存储库的步骤中时,列表是空的。我尝试了不同的许可,但没有运气。谁能告
向 Twilio 发送 SMS 时,Twilio 会向指定的 URL 发送多个请求,以通过 Webhook 提供该 SMS 传送的状态。我想让这个回调异步,所以我开发了一个 Cloud Functio
我需要更改我的项目 ID,因为要验证的 Firebase 身份验证链接在链接上显示了项目 ID,并且由于品牌 reshape ,项目名称已更改。根据我发现的信息,更改项目 ID 似乎不太可能。我正在考
用于部署我的 Angular 应用程序的 CI/CD 管道已关闭,但我看到 Google Cloud Run 在容器镜像更新后没有部署新修订版。 我已将 Cloud Build 设置为在 GitHub
报价https://cloud.google.com/load-balancing/docs/https/setting-up-https-serverless#enabling While Goog
Cloud Spanner 提供了两种不同的 API。 Cloud Spanner 读取与 Cloud Spanner SQL API 之间有什么区别? 最佳答案 在幕后,它们都使用相同的执行机制,因
我是 GCP 堆栈的新手,所以我对用于存储数据的 GCP 技术数量感到非常困惑: https://cloud.google.com/products/storage 虽然上面的文章中没有提到googl
我发现 Google Cloud Functions 的网络出站费用令人惊讶,我正在尝试了解发生这种情况的原因以及如何避免这种情况。 Stackdriver 监控表明有问题的函数是我的 ingest
我使用 Prisma使用 Cloud Run 和 Cloud SQL。在向 prisma.schema 提供 DATABASE_URL 后,它会在运行时抛出一个错误。 Can't reach data
我是一名优秀的程序员,十分优秀!