gpt4 book ai didi

apache-crunch - 在 Apache Crunch 中是否有将 PCollection 转换为 PTable 的通用方法?

转载 作者:行者123 更新时间:2023-12-04 06:10:55 27 4
gpt4 key购买 nike

我在一个 util 类中有这些方法,它们将特定的 PCollection 转换为特定的 PTable。

public static PTable<IdDetails, CASegmentsForModification> getPTableForCASegments(PCollection<CASegmentsForModification> aggregatedPCollectionForCASegments) {
return aggregatedPCollectionForCASegments.parallelDo(new CASegmentsPTableConverter(),
Avros.tableOf(Avros.records(IdDetails.class), Avros.records(CASegmentsForModification.class)));
}

public static PTable<IdDetails, UserPrimaryIdMapping> getPTableForPrimaryIdMapping(PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping) {
return pCollectionOfUserPrimaryIdMapping.parallelDo(new UserPrimaryIdMappingPTableConverter(),
Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping.class)));
}

public static PTable<IdDetails, UserGroupSegments> getPTableForUserGroupSegments(PCollection<UserGroupSegments> pCollectionOfUserGroupSegments) {
return pCollectionOfUserGroupSegments.parallelDo(new UserGroupSegmentsPTableConverter(),
Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserGroupSegments.class)));
}

如何实现上述方法的一种通用方法?

最佳答案

使用静态 asPtable 有更好的方法来自 PTables util 类的方法。您的 PCollection 必须是 Pair 类型,PTable 结果将是 PTable 类型

    public static <K,V> PTable<K,V> asPTable(PCollection<Pair<K,V>> pcollect)

根据您的示例,您只需要创建您的 DoFn(或扩展类)并返回一个 Avros.pairs(Avros.records(yourClass.class), Avros.records(yourOtherClass.class))。

另一种方法是使用预定义的 MapFn,它是 ExtractKEyFn并将其应用于您的收藏。您将需要实现 map 方法来提取键并生成键值输出。本质上是一样的思路,之后就可以把PCollection>转换成PTable

它应该可以为您节省大量样板代码。

以防万一,还有其他有用的功能,例如 FilterFn但是我们在您使用 MemPipeline 时发现了一些错误用于单元测试。我建议的第一种方法应该是最安全的。

编辑:

保存一些代码的一个很好的平衡是使用字段名称根据字段名称获取您的 key ,并为每个 PCollection 调用此 MapFn。
//we are assuming the key will be in the first level of your record
public class GenericRecordToPair <V extends GenericRecord, K extends GenericRecord> extends MapFn<V, Pair<K, V>> {
String key;

public GenericRecordToPair(String key){
this.key = key;
}

@Override
public Pair<T, TupleN> map(S input) {
return new Pair<K,V> (input.get(key), input);
}

}

从您的示例中,您可以执行以下操作
PCollection<UserGroupSegments> pCollectionOfUserGroupSegments = ...//comming from somewhere
PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping = ...//comming from somewhere
PTable<IdDetails, UserGroupSegments> pTable1 = PTables.asPTable(pCollectionOfUserGroupSegments.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserGroupSegments))));
PTable<IdDetails, UserPrimaryIdMapping> pTable2 = PTables.asPTable(pCollectionOfUserPrimaryIdMapping.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping))));

关于apache-crunch - 在 Apache Crunch 中是否有将 PCollection 转换为 PTable 的通用方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45670377/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com