gpt4 book ai didi

python - 数据流 : using beam. combiners 上一个 beam.combiners 的结果

转载 作者:行者123 更新时间:2023-12-04 17:29:52 29 4
gpt4 key购买 nike

我正在使用 Beam 管道计算流式数据的电话号码频率。我使用的滑动窗口每 5 分钟重复一次,总周期为 15 分钟,因此正如预期的那样,对于某些输入,当输入落在多个窗口中时,我会得到多个输出。

计算出现次数后,我想求出输入特征的平均值。输入是像这样的元组:

('phone_number', '123')
('phone_number', '456')
('phone_number', '456')
('phone_number', '456')

管道的第一部分是计算每个数字的频率:

| 'window' >> beam.WindowInto(window.SlidingWindows(900, 300))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'count_occurences' >> beam.combiners.Count.PerKey()

我的计算正确,我可以计算每个数字的频率,得到 3 个结果,因为每个周期有 3 个滑动窗口(在我们的例子中,456 次调用中有 2 次在同一个窗口中,第三次在一个不同的):

(('phone_number', '123'), 1)
(('phone_number', '123'), 1)
(('phone_number', '123'), 1)
(('phone_number', '456'), 2)
(('phone_number', '456'), 2)
(('phone_number', '456'), 2)
(('phone_number', '456'), 1)
(('phone_number', '456'), 1)
(('phone_number', '456'), 1)

现在,我想在计算的所有窗口值中找到每个电话号码的平均值,即:

(('phone_number', '123'), 1.0)
(('phone_number', '456'), 1.5)

我管道中的下一步是

| 'Find Means' >> beam.combiners.Mean.PerKey()

但这只是给了我:

(('phone_number', '123'), 1.0)
(('phone_number', '123'), 1.0)
(('phone_number', '123'), 1.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 1.0)
(('phone_number', '456'), 1.0)
(('phone_number', '456'), 1.0)

有什么方法可以对前一个结果进行另一次 beam.combiners 计算?

最佳答案

beam.combiners.Mean.PerKey() 给您错误输出的原因是组合器为您提供了为每个键+窗口计算的单个值。

然而,这里还有更多。在流处理中开窗的原因是为了确保输入在产生结果之前是有界的。也就是说,流式管道的输入通常是无界的,这意味着除非管道终止,否则它们永远不会停止接收数据。因此不可能计算所有窗口的值,因为您需要永远等待。

在我看来,您似乎在尝试计算“在比较所有可能的滑动 15 分钟窗口时,每 5 分钟滑动一次,电话号码在 15 分钟窗口中出现的平均次数”。如果不是这种情况,请澄清以帮助我理解

由于我们需要以某种方式限制计算,因此可以周期性地输出结果,即对于每个窗口,并不断输出新结果,更新它直到管道结束。这应该可以通过 StatefulDoFn 实现。

为此我建议:

  • 将滑动窗口的计数输出到 GlobalWindows
  • 存储总和和计数以在 StatefulDoFn 中计算平均值
  • 定期或在每个元素上输出计算的平均值并向下游更新结果(即覆盖 BigQuery 中的同一行,或在使用 SQL 检查 BigQuery 表时删除多余的行)

像这样:

class ComputeMeanStatefulDoFn(DoFn):
TOTAL_STATE = CombiningStateSpec('total', sum)
COUNT_STATE = CombiningStateSpec('count', sum)

def process(self, element,
total=DoFn.StateParam(TOTAL_STATE),
count=DoFn.StateParam(COUNT_STATE)):
key_phone_number, value_window_count = element
current_count = count.read() + 1
current_total = total.read() + value_window_count
mean = current_total / current_count
# You can emit every N results to reduce the volume
# but please make sure to at least emit the first M << N results
yield (key_phone_number, mean)
total.add(value_window_count)
count.add(1)

| 'window' >> beam.WindowInto(window.SlidingWindows(900, 300))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'count_occurences' >> beam.combiners.Count.PerKey()
| 'window_globally' >> beam.WindowInto(window.GlobalWindows)
| 'compute_mean_across_windows' >> beam.ParDo(ComputeMeanStatefulDoFn)

本质上,这里发生的是总和和计数存储到持久性/磁盘中,每次新元素到达全局窗口时我们都会重新计算新的均值。

注意:您需要处理多次为同一个键发出更新后的平均值。 IE。您可能希望覆盖包含您的结果的 BigQuery 表中的一行。

注意:根据您尝试计算的语义,您可能希望从 SlidingWindows 函数发出空窗口,以便将它们包含在下游均值计算中。

注意:您不能在此处使用 Combine.globally,因为它永远不会终止,这是由于流式管道中无限输入的性质。我相信如果您尝试启动这样的管道,这可能会出错。

关于python - 数据流 : using beam. combiners 上一个 beam.combiners 的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60738647/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com