- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试在 python 中设置数据流流管道。我在批处理管道方面有很多经验。我们的基本架构如下所示:
第一步是进行一些基本处理,每条消息大约需要 2 秒才能进入窗口。我们正在使用 3 秒和 3 秒间隔的滑动窗口(稍后可能会发生变化,因此我们有重叠的窗口)。作为最后一步,我们的 SOG 预测需要大约 15 秒的时间来处理,这显然是我们的瓶颈转换。
因此,我们似乎面临的问题是,在窗口化之前,工作负载完全分布在我们的工作人员身上,但最重要的转换根本没有分布。所有的窗口似乎在 1 个 worker 上一次处理一个,而我们有 50 个可用。
日志向我们显示,sog 预测步骤每 15 秒输出一次,如果窗口将在更多工作人员上处理,情况就不应该如此,因此随着时间的推移,这会产生我们不想要的巨大延迟。对于 1 分钟的消息,最后一个窗口的延迟为 5 分钟。当分布起作用时,这应该只有大约 15 秒(SOG 预测时间)。所以在这一点上我们一无所知..
有没有人看到我们的代码是否有问题或如何防止/规避?
这似乎是谷歌云数据流内部发生的事情。这是否也发生在 java 流管道中?
在批处理模式下,一切正常。在那里,人们可以尝试重新洗牌以确保不会发生融合等。但是在流中窗口化之后这是不可能的。
args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
job_name='XX',
num_workers=args.workers,
max_num_workers=MAX_NUM_WORKERS,
disk_size_gb=DISK_SIZE_GB,
local=args.local,
streaming=args.streaming)
pipeline = beam.Pipeline(options=pipeline_options)
# Build pipeline
# pylint: disable=C0330
if args.streaming:
frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
subscription=SUBSCRIPTION_PATH,
with_attributes=True,
timestamp_attribute='timestamp'
))
frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
create_frame_tuples_fn)
crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
pred_bbox_tfserv_fn, SERVER_URL)
sliding_windows = bboxs | 'Window' >> beam.WindowInto(
beam.window.SlidingWindows(
FEATURE_WINDOWS['goal']['window_size'],
FEATURE_WINDOWS['goal']['window_interval']),
trigger=AfterCount(30),
accumulation_mode=AccumulationMode.DISCARDING)
# GROUPBYKEY (per match)
group_per_match = sliding_windows | 'Group' >> beam.GroupByKey()
_ = group_per_match | 'LogPerMatch' >> beam.Map(lambda x: logging.info(
"window per match per timewindow: # %s, %s", str(len(x[1])), x[1][0][
'timestamp']))
sog = sliding_windows | 'Predict SOG' >> beam.Map(predict_sog_fn,
SERVER_URL_INCEPTION,
SERVER_URL_SOG )
pipeline.run().wait_until_finish()
最佳答案
在光束中,平行单位是关键——给定关键的所有窗口都将在同一台机器上生产。但是,如果您有 50 多个 key ,它们应该分配给所有 worker 。
您提到您无法在流媒体中添加 Reshuffle。这应该是可能的;如果您遇到错误,请在 https://issues.apache.org/jira/projects/BEAM/issues 提交错误.重新窗口化到 GlobalWindows 是否会使重新洗牌的问题消失?
关于google-cloud-dataflow - 开窗后,Google 数据流流管道不会将工作负载分配给多个工作人员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54764081/
有谁知道如何使用 JS window.open(...) 但是默默地使用它,即实际上没有打开它,所以打开检查它是否能够打开? 我必须运行脚本来进行弹出 block 检查,我快完成了。我不想让浏览器物理
WebAudio的AnalyserNode有一个FFT来获取声音信号的频域数据。我不明白 FFT 的窗口函数是如何定义的。是否可以更改窗口大小甚至窗口函数(即 Hanning 或 Blackman)?
使用下面的代码,我绘制了一个圆角矩形。它绘制了一个漂亮的实心浅灰色填充圆角矩形(大小为“self”)。我实际上想绘制它的像素反转,即:不是实心圆角矩形,而是在实心浅灰色矩形中的这个圆角矩形形状的窗口或
我是一名优秀的程序员,十分优秀!