gpt4 book ai didi

java - TupleTag Tag 对应一个非单例结果

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:10:24 25 4
gpt4 key购买 nike

在谷歌云数据流中,我的连接失败,并显示“TupleTag 标签对应于一个非单例结果”从错误堆栈来看,这似乎是在 CoGBKResults 的覆盖方法中发生的。

String Ad_ID = e.getKey();
String Ad_Info = "none";
Ad_Info = e.getValue().getOnly(AdInfoTag);

以下是我的加入方法。

static PCollection<String> joinEvents(PCollection<TableRow> ImpressionTable,
PCollection<TableRow> AdTable) throws Exception {

final TupleTag<String> ImpressionInfoTag = new TupleTag<String>();
final TupleTag<String> AdInfoTag = new TupleTag<String>();

// transform both input collections to tuple collections, where the keys are Ad_ID
PCollection<KV<String, String>> ImpressionInfo = ImpressionTable.apply(
ParDo.of(new ExtractImpressionDataInfoFn()));
PCollection<KV<String, String>> AdInfo = AdTable.apply(
ParDo.of(new ExtractAdDataInfoFn()));

// Ad_ID 'key' -> CGBKR (<ImpressionInfo>, <AdInfo>)
PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
.of(ImpressionInfoTag, ImpressionInfo)
.and(AdInfoTag, AdInfo)
.apply(CoGroupByKey.<String>create());

// Process the CoGbkResult elements generated by the CoGroupByKey transform.
// Ad_ID 'key' -> string of <Impressioninfo>, <Adinfo>
PCollection<KV<String, String>> finalResultCollection =
kvpCollection.apply(ParDo.named("Process").of(
new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
private static final long serialVersionUID = 1L;

@Override
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String Ad_ID = e.getKey();
String Ad_Info = "none";
Ad_Info = e.getValue().getOnly(AdInfoTag);
for (String eventInfo : c.element().getValue().getAll(ImpressionInfoTag)) {
// Generate a string that combines information from both collection values
c.output(KV.of(Ad_ID, " " + Ad_Info
+ " " + eventInfo));
}
}
}));

//write to GCS
PCollection<String> formattedResults = finalResultCollection
.apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
@Override
public void processElement(ProcessContext c) {
String outputstring = "AdUnitID: " + c.element().getKey()
+ ", " + c.element().getValue();
c.output(outputstring);
}
}));
return formattedResults;
}

我的 ExtractImpressionDataInfoFn 类和 ExtractAdDatInfoFn 类如下。

static class ExtractImpressionDataInfoFn extends DoFn<TableRow, KV<String, String>> {
private static final long serialVersionUID = 1L;

@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
String Ad_ID = (String) row.get("AdUnitID");
String User_ID = (String) row.get("UserID");
String Client_ID = (String) row.get("ClientID");
String Impr_Time = (String) row.get("GfpActivityAdEventTIme");
String ImprInfo = "UserID: " + User_ID + ", ClientID: " + Client_ID + ", GfpActivityAdEventTIme: " + Impr_Time;
c.output(KV.of(Ad_ID, ImprInfo));
}
}


static class ExtractAdDataInfoFn extends DoFn<TableRow, KV<String, String>> {
private static final long serialVersionUID = 1L;

@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
String Ad_ID = (String) row.get("AdUnitID");
String Content_ID = (String) row.get("ContentID");
String Pub_ID = (String) row.get("Publisher");
String Add_Info = "ContentID: " + Content_ID + ", Publisher: " + Pub_ID;
c.output(KV.of(Ad_ID, Add_Info));
}
}

印象和广告的架构如下印象:广告单元编号用户身份客户端ID
GfpActivityAdEventTIme

广告:广告单元 ID客户端ID发布者

enter image description here

最佳答案

该错误表明当您调用 getOnly 时,CoGroupByKey 有多个结果。特别是这一行:

Ad_Info = e.getValue().getOnly(AdInfoTag);

如果您将其更改为 getAll(AdInfoTag),它应该可以工作。

关于java - TupleTag Tag <taginfo> 对应一个非单例结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46355898/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com