gpt4 book ai didi

google-cloud-platform - 合并两个 PCollection (Apache beam)

转载 作者:行者123 更新时间:2023-12-04 15:40:54 24 4
gpt4 key购买 nike

我在云存储中有两个文件。包含 Avro 格式的文件 1,其中包含来自温度传感器的数据。

time_stamp     |  Temperature
1000 | T1
2000 | T2
3000 | T3
4000 | T3
5000 | T4
6000 | T5

包含 Avro 格式的文件 2,其中包含来自风传感器的数据。

time_stamp     |  wind_speed
500 | w1
1200 | w2
1500 | w3
2200 | w4
2500 | w5
3000 | w6

我想像下面这样组合输出

time_stamp |Temperature|wind_speed
1000 |T1 |w1 (last earliest reading from wind sensor at 500)
2000 |T2 |w3 (last earliest reading from wind sensor at 1500)
3000 |T3 |w6 (wind sensor reading at 3000)
4000 |T3 |w6 (last earliest reading from wind sensor at 3000)
5000 |T4 |w6 (last earliest reading from wind sensor at 3000)
6000 |T5 |w6(last earliest reading from wind sensor at 3000)

我正在寻找 apache beam 中的解决方案来合并上述文件。现在它正在从文件中读取,但将来可能会通过 pubsub 读取。我想找出组合两个 PCollection 的自定义方法并创建另一个 PCollection tempDataWithWindSpeed。

     PCollection<Temperature> tempData = p.apply(AvroIO
.read(AvroAutoGenClass.class)
.from("gs://my_bucket/path/to/temp-sensor-data.avro")

PCollection<WindSpeed> windData = p.apply(AvroIO
.read(AvroAutoGenClass.class)
.from("gs://my_bucket/path/to/wind-sensor-data.avro")

PCollection<WindSpeed> tempDataWithWindSpeed = ?

最佳答案

@jszule 的评论通常是 Dataflow/Beam 的一个很好的答案:最受支持的连接是当两个 PCollections 有一个公共(public)键时。对于大多数数据,Beam 可以找出一个模式,您可以使用 CoGroup.join转变。您必须做出的设计决定是如何选择键,例如向下舍入到最接近的 1000。

您的用例很复杂:您需要在时间序列中为没有数据的键结转值。解决方案是使用状态和计时器来生成“缺失”值。您仍然需要仔细选择键,因为状态和计时器是针对每个键和窗口的。状态和计时器也可以在批处理模式下工作,因此这是一个批处理/流处理的统一解决方案。

您可能想阅读 this blog post by Reza Rokni and myself关于这个主题,或 this talk by Reza at the Beam Summit Berlin 2019

关于google-cloud-platform - 合并两个 PCollection (Apache beam),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57839344/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com