gpt4 book ai didi

google-cloud-platform - Apache Beam 中的窗口函数

转载 作者:行者123 更新时间:2023-12-05 05:53:20 25 4
gpt4 key购买 nike

有人知道如何在 apache beam(数据流)中执行窗口函数吗?

例子: Ex

ID  Sector  Country Income
1 Liam US 16133
2 Noah BR 10184
3 Oliver ITA 11119
4 Elijah FRA 13256
5 William GER 7722
6 James AUS 9786
7 Benjamin ARG 1451
8 Lucas FRA 4541
9 Henry US 9111
10 Alexander ITA 13002
11 Olivia ENG 5143
12 Emma US 18076
13 Ava MEX 15930
14 Charlotte ENG 18247
15 Sophia BR 9578
16 Amelia FRA 10813
17 Isabella FRA 7575
18 Mia GER 14875
19 Evelyn AUS 19749
20 Harper ITA 19642

问题:

  1. 如何创建另一个包含按 ID 排序的收入运行总和的列?
  2. 如何使用收入最高的人的排名创建另一个列

谢谢布鲁诺

最佳答案

考虑以下方法。我已经尽力确保 Pado fns 是结合和交换的。这意味着当在多个 worker 上并行运行时,这应该不会中断。如果您发现这打破了 DataflowRunner

,请告诉我
import apache_beam as beam
from apache_beam.transforms.core import DoFn


class cum_sum(DoFn):

def process(self, element,lkp_data,accum_sum):

for lkp_id_income in lkp_data:
if element['ID'] >= lkp_id_income[0]:
accum_sum += lkp_id_income[1]
element.update({'cumulative_sum':accum_sum})
yield element

class rank_it(DoFn):

def process(self, element, lkp_data,counter):

for lkp_id_cumsum in lkp_data:
if lkp_id_cumsum['cumulative_sum'] < element['cumulative_sum']:
counter += 1
element.update({'rank':counter})
yield element


with beam.Pipeline() as p:
data = (
p
| 'create'>>beam.Create(
[
{
'ID':4,
'Sector':'Liam',
'Country':'US',
'Income':1400
},
{
'ID':2,
'Sector':'piam',
'Country':'IS',
'Income':1200
},
{
'ID':1,
'Sector':'Oiam',
'Country':'PS',
'Income':1300
},
{
'ID':3,
'Sector':'Uiam',
'Country':'OS',
'Income':1800
}
]
)
)

ids_income = (
data
| 'get_ids_income'>>beam.Map(lambda element: (element['ID'], element['Income']))
)
with_cumulative_sum = (
data
| 'cumulative_sum'>>beam.ParDo(cum_sum(),lkp_data = beam.pvalue.AsIter(ids_income),accum_sum = 0)
)

with_ranking =(
with_cumulative_sum
| 'ranking'>>beam.ParDo(rank_it(),lkp_data = beam.pvalue.AsIter(with_cumulative_sum),counter = 1)
| 'print'>>beam.Map(print)

)

输出

{'ID': 4, 'Sector': 'Liam', 'Country': 'US', 'Income': 1400, 'cumulative_sum': 5700, 'rank': 4}
{'ID': 2, 'Sector': 'piam', 'Country': 'IS', 'Income': 1200, 'cumulative_sum': 2500, 'rank': 2}
{'ID': 1, 'Sector': 'Oiam', 'Country': 'PS', 'Income': 1300, 'cumulative_sum': 1300, 'rank': 1}
{'ID': 3, 'Sector': 'Uiam', 'Country': 'OS', 'Income': 1800, 'cumulative_sum': 4300, 'rank': 3}

关于google-cloud-platform - Apache Beam 中的窗口函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69904981/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com