gpt4 book ai didi

python - 如何在 Apache Beam(Python SDK)中对早期触发进行单元测试

转载 作者:行者123 更新时间:2023-12-04 04:01:23 25 4
gpt4 key购买 nike

我想为具有早期触发的流管道创建一个单元测试。示例管道如下所示:

class CalculateTeamScores(beam.PTransform):
def expand(self, scores):
return scores \
| "windowing scores" >> beam.WindowInto(
beam.window.FixedWindows(120),
trigger=AfterWatermark(early=AfterCount(1)),
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=Duration(seconds=3600)) \
| "preparing scores for combining" >> beam.Map(
lambda team_score: (team_score['team'], team_score['score'])) \
| "calculating team scores" >> beam.CombinePerKey(sum) \
| "forming the result" >> beam.ParDo(FormatResult())
我写了以下测试:
class TestCalculateTeamScores(TestCase):

def test_should_sum_score_for_each_team(self):
# given
p = TestPipeline()

scores_stream = p | "loading score stream" >> TestStream()\
.advance_processing_time(advance_by=timestamp_from_datetime('12:30:00', as_int=True))\
.advance_watermark_to(new_watermark=timestamp_from_datetime('12:00:00'))\
.add_elements(elements=[team_score('red', 5)],
event_timestamp=timestamp_from_datetime('12:00:30'))\
.advance_processing_time(advance_by=500)\
.add_elements(elements=[team_score('red', 9)],
event_timestamp=timestamp_from_datetime('12:01:50'))\

# when
result = scores_stream | 'calculating team scores' >> CalculateTeamScores()

# then
assert_that(result, equal_to([
{
'team': 'red',
'score': 5,
'eventTime': (timestamp_from_datetime('12:00:00'),
timestamp_from_datetime('12:02:00'))
},
{
'team': 'red',
'score': 14,
'eventTime': (timestamp_from_datetime('12:00:00'),
timestamp_from_datetime('12:02:00'))
},
]))
p.run()
如您所见,我希望得到一个分数 = 5 的早期 Pane 。不幸的是,我无法得到我想要的结果。我得到 2 个得分 = 14 的 Pane 。
我使用 TestStream类,没有记录,也不是公共(public)接口(interface)的一部分,但它看起来适合我的情况。

最佳答案

由于 TestPipeline 无效,我无法获取早期 Pane 设置。它应该是:

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
看起来像 TestStream适用于测试早期触发。

关于python - 如何在 Apache Beam(Python SDK)中对早期触发进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63019786/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com