gpt4 book ai didi

java - Flink 左外连接 : Enriching a Stream with Data from a csv file

转载 作者:行者123 更新时间:2023-12-04 16:04:09 24 4
gpt4 key购买 nike

我正在使用 Flink 1.4.0 .

我正在将 Kafka 主题中的数据消费到 DataStream 中。数据转换成POJO ,说 Employee ,我最终得到了一个类似的东西:

DataStream<Employee> employeeStream = ...;

现在,我需要使用来自大 csv 的数据来丰富此流实例中的一些字段。文件。我找到了一种加载 csv 并创建另一个 DataStream 的好方法:
DataStream<Enrichements> enrichmentsStream = ...;

现在,两个 POJOs共享一个可用于 JOIN 的字段(id)手术。如果这些是数据集,我将能够应用 leftOuterJoin() ,但他们不是。我不关心窗口,因为我想要任何 Employee丰富来自 Enrichments 的信息,如果它的 id 存在于 csv 中。我该怎么做呢?忽略窗口的连接操作会起作用吗?会不会是资源匮乏?它看起来像这样吗?:
    employeeStream 
.join(enrichmentsStream )
.where(new SelectKeyForEmployee())
.equalTo(new SelectKeyForEnrichments())
.window(?????)
.apply(new JoinEnrichFunction());

此外,由于窗口必须由两个流共享,我如何在应用 JOIN 之前定义它们的窗口函数以及 JoinEnrichFunction() 的实现是什么? ?

最佳答案

事实证明,在这种情况下,连接操作是多余的。流连接并不直观,只有在共享相同窗口机制的流之间应用时才有意义。

在这种情况下, map 函数足以满足此处详述的丰富目标。下面的代码片段应该足够澄清了:

public MainClass {
public void main(String[] args) {
...
// Some custom way of loading the csv data into a Map<POJO> format
MetadataLoader loader = new MetadataLoader("pathToData.csv");
Map<Employee> metadataHashMap = loader.getMetadataMap(employeeEnrichmentData);
...

// Enrichment
SingleOutputStreamOperator<Employee>> enrichedStream = rawStream
.map(new MapMetadataToEmployees(metadataHashMap))
.name("Enrich: with Employee Metadata");

// Some sink opeartion
...

}
}

final class MapMetadataToEmployees implements MapFunction<Employee, Employee>, Serializable {

private Map<Employee> metaDataMap;

public MapMetadataToEmployees(Map<String, Employee> metaDataMap) {
this.metaDataMap = metaDataMap;
}

@Override
public Employee map(Employee employee) {

if (metaDataMap.containsKey(employee.getId())) {

Employee employeeWithMetaData = metaDataMap.get(employee.getId());

employee.setSalary(employeeWithMetaData.getSalary);
employee.setRank(employeeWithMetaData.getRank());
employee.setBusinessTitle(employeeWithMetaData.getBusinessTitle());
}

return employee;
}
}

关于java - Flink 左外连接 : Enriching a Stream with Data from a csv file,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49432713/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com