gpt4 book ai didi

google-cloud-dataflow - 开窗后,Google 数据流流管道不会将工作负载分配给多个工作人员

转载 作者:行者123 更新时间:2023-12-03 22:35:19 25 4
gpt4 key购买 nike

我正在尝试在 python 中设置数据流流管道。我在批处理管道方面有很多经验。我们的基本架构如下所示:
enter image description here

第一步是进行一些基本处理,每条消息大约需要 2 秒才能进入窗口。我们正在使用 3 秒和 3 秒间隔的滑动窗口(稍后可能会发生变化,因此我们有重叠的窗口)。作为最后一步,我们的 SOG 预测需要大约 15 秒的时间来处理,这显然是我们的瓶颈转换。

因此,我们似乎面临的问题是,在窗口化之前,工作负载完全分布在我们的工作人员身上,但最重要的转换根本没有分布。所有的窗口似乎在 1 个 worker 上一次处理一个,而我们有 50 个可用。

日志向我们显示,sog 预测步骤每 15 秒输出一次,如果窗口将在更多工作人员上处理,情况就不应该如此,因此随着时间的推移,这会产生我们不想要的巨大延迟。对于 1 分钟的消息,最后一个窗口的延迟为 5 分钟。当分布起作用时,这应该只有大约 15 秒(SOG 预测时间)。所以在这一点上我们一无所知..

enter image description here

有没有人看到我们的代码是否有问题或如何防止/规避?
这似乎是谷歌云数据流内部发生的事情。这是否也发生在 java 流管道中?

在批处理模式下,一切正常。在那里,人们可以尝试重新洗牌以确保不会发生融合等。但是在流中窗口化之后这是不可能的。

args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
job_name='XX',
num_workers=args.workers,
max_num_workers=MAX_NUM_WORKERS,
disk_size_gb=DISK_SIZE_GB,
local=args.local,
streaming=args.streaming)

pipeline = beam.Pipeline(options=pipeline_options)

# Build pipeline
# pylint: disable=C0330
if args.streaming:
frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
subscription=SUBSCRIPTION_PATH,
with_attributes=True,
timestamp_attribute='timestamp'
))

frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
create_frame_tuples_fn)

crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
pred_bbox_tfserv_fn, SERVER_URL)

sliding_windows = bboxs | 'Window' >> beam.WindowInto(
beam.window.SlidingWindows(
FEATURE_WINDOWS['goal']['window_size'],
FEATURE_WINDOWS['goal']['window_interval']),
trigger=AfterCount(30),
accumulation_mode=AccumulationMode.DISCARDING)

# GROUPBYKEY (per match)
group_per_match = sliding_windows | 'Group' >> beam.GroupByKey()
_ = group_per_match | 'LogPerMatch' >> beam.Map(lambda x: logging.info(
"window per match per timewindow: # %s, %s", str(len(x[1])), x[1][0][
'timestamp']))

sog = sliding_windows | 'Predict SOG' >> beam.Map(predict_sog_fn,
SERVER_URL_INCEPTION,
SERVER_URL_SOG )

pipeline.run().wait_until_finish()

最佳答案

在光束中,平行单位是关键——给定关键的所有窗口都将在同一台机器上生产。但是,如果您有 50 多个 key ,它们应该分配给所有 worker 。

您提到您无法在流媒体中添加 Reshuffle。这应该是可能的;如果您遇到错误,请在 https://issues.apache.org/jira/projects/BEAM/issues 提交错误.重新窗口化到 GlobalWindows 是否会使重新洗牌的问题消失?

关于google-cloud-dataflow - 开窗后,Google 数据流流管道不会将工作负载分配给多个工作人员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54764081/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com