gpt4 book ai didi

java - 当第一个包含某个事件的时间范围和第二个时间戳时,如何连接两个 PCollection?

转载 作者:行者123 更新时间:2023-12-01 21:57:39 26 4
gpt4 key购买 nike

我有一个用 Dataflow 编写的批处理管道。我想实现以下数据连接。

我有两个 PCollection。首先是代表 session :

class Session{
String id
long start;
long stop;
}

第二个代表一些事件:

class Event{
long timestamp;
String id;
}

我想加入这两个 PCollection,最后有类似 KV<Session,Iterable<Event>> 的内容- 因此该结构包含具有关联事件列表的 session 。如果事件的时间戳在一个 session (或多个 session )的时间范围内,则应将其聚合附加到它(或它们)。

实现这一目标的最佳方法是什么?

最佳答案

鉴于这是一个批处理管道,我要做的就是遍历所有可能的 Session首先,建立一个列表并将其另存为 PCollectionView 。然后,在解析每个Event时我们可以检查哪个Session会不会掉下来。

在我的测试中,我定义了类和构造函数,如下所示:

@DefaultCoder(AvroCoder.class)
public static class Session {
String id;
long start;
long stop;

public Session(String id, long start, long stop) {
this.id = id;
this.start = start;
this.stop = stop;
}

public Session() {
// for serialization only
}
}

@DefaultCoder(AvroCoder.class)
public static class Event {
String id;
long timestamp;

public Event(String id, long timestamp) {
this.id = id;
this.timestamp = timestamp;
}

public Event() {
// for serialization only
}
}

我们将使用一些测试数据,例如:

// Example sessions data
final List<Session> sessionList = Arrays.asList(
new Session("s1", 0L, 100L),
new Session("s2", 100L, 200L),
new Session("s3", 200L, 300L)
);

// Example event data
final List<Event> eventList = Arrays.asList(
new Event("e1", 20L),
new Event("e2", 60L),
new Event("e3", 120L),
new Event("e4", 160L),
new Event("e5", 210L),
new Event("e6", 290L)
);

首先我们将构建 PCollectionView以及所有可能的 session :

// create PCollectionView from sessions
final PCollectionView<List<Session>> sessionPC = p
.apply("Create Sessions", Create.of(sessionList))
.apply("Save as List", View.asList());

并且,对于每个 Event ,我们将检查 AssignFn ParDo 其中 Session应该Event落入:

public static class AssignFn extends DoFn<Event, KV<Session, Event>> {  

final PCollectionView<List<Session>> sessionPC;

public AssignFn(PCollectionView<List<Session>> TagsideInput) {
this.sessionPC = TagsideInput;
}

@ProcessElement
public void processElement(ProcessContext c) {
Event event = c.element();

// get side input with all possible Sessions
List<Session> sessions = c.sideInput(sessionPC);

// where does the Event fall in?
for (Session session:sessions) {
if (event.timestamp >= session.start && event.timestamp <= session.stop) {
c.output(KV.of(session, event));
break;
}
}
}
}

主要管道结构为:

p
.apply("Create Events", Create.of(eventList))
.apply("Assign Sessions", ParDo.of(new AssignFn(sessionPC))
.withSideInputs(sessionPC))
.apply("Group By Key", GroupByKey.<Session,Event>create())
.apply("Log Grouped Results", ParDo.of(new LogFn()));

请注意,在 Session 之后作业,我们应用 GroupByKey操作以获得 KV<Session, Iterable<Event>> 形式的所需输出。

LogFn仅用于验证内容:

public static class LogFn extends DoFn<KV<Session, Iterable<Event>>, KV<Session, Iterable<Event>>> {  

@ProcessElement
public void processElement(ProcessContext c) {
Session session = c.element().getKey();
Iterable<Event> events = c.element().getValue();
StringBuilder str = new StringBuilder();

// print session info
str.append(String.format("\nSession id=%s, start=%d, stop=%d", session.id, session.start, session.stop));

// print each event info
for (Event event:events) {
str.append(String.format("\n---Event id=%s, timestamp=%d", event.id, event.timestamp));
}

LOG.info(str.toString());

c.output(c.element());
}
}

我得到了预期的输出:

Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s2, start=100, stop=200
---Event id=e3, timestamp=120
---Event id=e4, timestamp=160
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s1, start=0, stop=100
---Event id=e1, timestamp=20
---Event id=e2, timestamp=60
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s3, start=200, stop=300
---Event id=e6, timestamp=290
---Event id=e5, timestamp=210

完整代码here .

使用 Beam SDK 2.16.0 和 DirectRunner 进行测试.

关于java - 当第一个包含某个事件的时间范围和第二个时间戳时,如何连接两个 PCollection?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58731608/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com