- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个用 Dataflow 编写的批处理管道。我想实现以下数据连接。
我有两个 PCollection。首先是代表 session :
class Session{
String id
long start;
long stop;
}
第二个代表一些事件:
class Event{
long timestamp;
String id;
}
我想加入这两个 PCollection,最后有类似 KV<Session,Iterable<Event>>
的内容- 因此该结构包含具有关联事件列表的 session 。如果事件的时间戳在一个 session (或多个 session )的时间范围内,则应将其聚合附加到它(或它们)。
实现这一目标的最佳方法是什么?
最佳答案
鉴于这是一个批处理管道,我要做的就是遍历所有可能的 Session
首先,建立一个列表并将其另存为 PCollectionView
。然后,在解析每个Event
时我们可以检查哪个Session
会不会掉下来。
在我的测试中,我定义了类和构造函数,如下所示:
@DefaultCoder(AvroCoder.class)
public static class Session {
String id;
long start;
long stop;
public Session(String id, long start, long stop) {
this.id = id;
this.start = start;
this.stop = stop;
}
public Session() {
// for serialization only
}
}
@DefaultCoder(AvroCoder.class)
public static class Event {
String id;
long timestamp;
public Event(String id, long timestamp) {
this.id = id;
this.timestamp = timestamp;
}
public Event() {
// for serialization only
}
}
我们将使用一些测试数据,例如:
// Example sessions data
final List<Session> sessionList = Arrays.asList(
new Session("s1", 0L, 100L),
new Session("s2", 100L, 200L),
new Session("s3", 200L, 300L)
);
// Example event data
final List<Event> eventList = Arrays.asList(
new Event("e1", 20L),
new Event("e2", 60L),
new Event("e3", 120L),
new Event("e4", 160L),
new Event("e5", 210L),
new Event("e6", 290L)
);
首先我们将构建 PCollectionView
以及所有可能的 session :
// create PCollectionView from sessions
final PCollectionView<List<Session>> sessionPC = p
.apply("Create Sessions", Create.of(sessionList))
.apply("Save as List", View.asList());
并且,对于每个 Event
,我们将检查 AssignFn
ParDo 其中 Session
应该Event
落入:
public static class AssignFn extends DoFn<Event, KV<Session, Event>> {
final PCollectionView<List<Session>> sessionPC;
public AssignFn(PCollectionView<List<Session>> TagsideInput) {
this.sessionPC = TagsideInput;
}
@ProcessElement
public void processElement(ProcessContext c) {
Event event = c.element();
// get side input with all possible Sessions
List<Session> sessions = c.sideInput(sessionPC);
// where does the Event fall in?
for (Session session:sessions) {
if (event.timestamp >= session.start && event.timestamp <= session.stop) {
c.output(KV.of(session, event));
break;
}
}
}
}
主要管道结构为:
p
.apply("Create Events", Create.of(eventList))
.apply("Assign Sessions", ParDo.of(new AssignFn(sessionPC))
.withSideInputs(sessionPC))
.apply("Group By Key", GroupByKey.<Session,Event>create())
.apply("Log Grouped Results", ParDo.of(new LogFn()));
请注意,在 Session
之后作业,我们应用 GroupByKey
操作以获得 KV<Session, Iterable<Event>>
形式的所需输出。
LogFn
仅用于验证内容:
public static class LogFn extends DoFn<KV<Session, Iterable<Event>>, KV<Session, Iterable<Event>>> {
@ProcessElement
public void processElement(ProcessContext c) {
Session session = c.element().getKey();
Iterable<Event> events = c.element().getValue();
StringBuilder str = new StringBuilder();
// print session info
str.append(String.format("\nSession id=%s, start=%d, stop=%d", session.id, session.start, session.stop));
// print each event info
for (Event event:events) {
str.append(String.format("\n---Event id=%s, timestamp=%d", event.id, event.timestamp));
}
LOG.info(str.toString());
c.output(c.element());
}
}
我得到了预期的输出:
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s2, start=100, stop=200
---Event id=e3, timestamp=120
---Event id=e4, timestamp=160
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s1, start=0, stop=100
---Event id=e1, timestamp=20
---Event id=e2, timestamp=60
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s3, start=200, stop=300
---Event id=e6, timestamp=290
---Event id=e5, timestamp=210
完整代码here .
使用 Beam SDK 2.16.0 和 DirectRunner
进行测试.
关于java - 当第一个包含某个事件的时间范围和第二个时间戳时,如何连接两个 PCollection?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58731608/
在我进行一些处理和按键分组后,我得到了一个如下所示的数据集。我现在需要对每一行数据进行一些处理以获得下面的输出。我试过 flatmap 它真的很慢,因为“值”列表的长度可以任意长。我想我可以将每一行拆
我有一个无界集合,它是从 PubsubIO 中读取数据的,名为 Trade格式如 { timestamp: 123, type: "", side: "" // sell or buy
我对 Google Cloud Platform 还很陌生,这是我第一次在研究生类(class)的项目中尝试使用 Google Dataflow。我想要做的是编写一个自动加载作业,从我的云存储上的某个
没有关于如何将 pCollections 转换为输入到 .CoGroupByKey() 所需的 pCollections 的文档 语境 基本上我有两个大的 pCollections,我需要能够找到两者
我正在尝试实现一个数据管道,它连接来自 Kafka 主题的多个无限源。我能够连接到主题并获取数据为 PCollection我需要将其转换为 PCollection 。我将逗号分隔的字符串拆分为数组,并
我有一个用例,我需要从 DoFn 输出多个 T。所以 DoFn 函数返回一个 PCollection> 。我想将它转换为 PCollection 以便稍后在管道中我可以像这样过滤: PCollecti
我有两个 P-Collection,如下所示 P1 = ['H','E','L','L','O','W','O','R','L','D'] P2 = ['W','E','L','C','O','M',
Beam 中是否有可能有一个 PCollection 来“保存”同一父类(super class)的不同对象,使得 PCollection result = input.apply(ParDo.of
我有两个 bigquery 表。 表A c_id count_c_id p_id 表B id c_name p_type c_id 根据表 A 中的列,我需要使用 DF 管道从表 B
假设我有一个类型为 KV 的有界 PCollection p 。假设 p 无法放入内存,因此不能成为 DoFn 的侧面输入. 示例p: ("foo", 0) ("bar", 1) ("baz", 2)
我正在尝试使用 Apache Beam 中的 Flatten 函数添加 header 。但是,似乎没有办法根据文档设置顺序:https://beam.apache.org/documentation/
我想将时间戳设置为无限的字符串集合 在我的解决方案中,pcollection 的每一行都是一行 csv 这一行的一个字段中有一个时间戳和其他字段,例如点击次数等。 我想根据它自己的时间戳(事件时间)而
我想要 Pcollection 中的列表值。 PCollection> lst = bqT2.apply(ParDo.of(new UserId())); // line 1 List myL
我有一个用 Dataflow 编写的批处理管道。我想实现以下数据连接。 我有两个 PCollection。首先是代表 session : class Session{ String id
我的管道的这一部分应该接受输入,对其应用适当的元组标记,然后根据它接收到的标记对输入进行进一步处理。 当运行下面的代码时,来自主标签 (tag1) 的 PCollection 工作正常。但是,附加标签
我正在使用 Apache-Beam 和 Python SDK。 目前,我的管道读取多个文件,解析它们并从其数据生成 pandas 数据帧。然后,它将它们分组到一个单个数据帧中。 我现在想要的是检索这个
我正在尝试根据交易数据生成简单的客户摘要。例如,给定目标交易类型,发生了多少笔交易以及总金额是多少? 原始输入示例: custid desc amount 111 coffee
使用 CoGroupsByKey 方法加入两个 Pcollection 需要数小时才能执行超过 8 百万条记录。从另一个 stackoverflow 帖子中注意到 CoGbkResult has mo
我正在使用 Apache Beam 读取数据流并写入 BigQuery,在 tableA 中.我的行存储在 类型的数据集中. 目前我正在按原样读取行并写入表。但是我想根据 timestamp 过滤行
我有一个 PCollection,并且希望在验证条件时应用自定义 PTransform。(该条件不依赖于Pcollection内容) 示例:我有日志,如果 PipelineOptions 中提供了日期
我是一名优秀的程序员,十分优秀!