gpt4 book ai didi

java - gcp 数据流流程元素不会转到下一个 ParDO 函数

转载 作者:太空宇宙 更新时间:2023-11-04 10:33:13 26 4
gpt4 key购买 nike

*我用良好的数据填充了 map (没有空值),但我无法进入下一个 ParDo 函数。我尝试调试,但不明白为什么会发生这种情况。如果有人知道我做错了什么,请告诉我。我正在放置三个 ParDo 函数。谢谢*

.apply("Parse XML CarrierManifest ", ParDo.of(new DoFn<String, Manifest>() {
@ProcessElement
public void processeElement(ProcessContext c) {
try {

System.out.println(c.element());
JAXBContext jaxbContext = JAXBContext.newInstance(Manifest.class);
Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
StringReader reader = new StringReader(c.element());

Manifest manifest = (Manifest) unmarshaller.unmarshal(reader);

if (manifest == null) throw new RuntimeException("Invalid data");

c.output(manifest);

}

catch (Exception e)
{
LOG.error("Unexpected error while parsing input. File was <[ " + c.element() + " ]>", e);
}

}

}
)
)

//---------------------------------------------------------------------------------------------------------------

.apply("preparing data " , ParDo.of(new DoFn<Manifest, Map<String, List<TableRow>>>()
{
@ProcessElement
public void processeElement(ProcessContext c)
{
Map<String, List<TableRow>> RowsTable = new ArrayMap<>();
RowsTable.put("Manifest",new ArrayList<>());

Manifest manifest = c.element();

Links linkss = manifest.linkes;

System.out.println(linkss.ShipmentsList.linakageShipment.linkageesList.size());

for (int i = 0; i < linkss.ShipmentsList.linakageShipment.linkageesList.size(); i++) {

RowsTable.get("Manifest")
.add(new TableRow()
.set("GROUP_ID", manifest.GroupidValue)
.set("STATUS", manifest.StatusValue)
.set("GROUP_TYPE", manifest.typeValue)
.set("CREATED_AT", manifest.created_atValue)
.set("READY_AT", manifest.ready_atValue)
.set("MANIFEST_NUMBER", manifest.manifest_numberValue)
.set("LINKS_SELF", linkss.SelfLink)

.set("SHIPMENT_ID", linkss.ShipmentsList.linakageShipment.linkageesList.get(i).ID)
.set("SHIPMENT_TYPE", linkss.ShipmentsList.linakageShipment.linkageesList.get(i).Type));
}
c.output(RowsTable);
}

}))
//---------------------------------------------------------------------------------------------------------------

.apply("change rows list to one row ",ParDo.of(new DoFn<Map<String, List<TableRow>>, TableRow>()
{
@ProcessElement
public void processElement(ProcessContext c)
{
System.out.println("id: " + c.element());
for (TableRow r : c.element().get("Manifest")) // Should only have 1
c.output(r);
}
}))

最佳答案

根据您在问题中的评论之一,我了解到问题是您的 Dataflow 管道仅在 Dataflow 本身中运行时才有效(使用 Dataflow Runner),但当您使用 Direct Runner 时,它无法在本地工作。

the documentation for Apache Beam's Direct Runner 中所述,本地执行受到本地可用内存的限制,建议使用本地机器可以处理的小数据集来完成调试过程。无论如何,从同一条评论中我了解到您的管道在数据流中执行时运行良好,因此管道本身没有问题。

根据您提供的描述,该问题看起来肯定与 Direct Runner 的限制有关,但如果您在本地/远程环境中遇到更具体的错误,您应该在问题描述中更加具体,并回答要求提供有关您的使用问题的更多信息的评论,以便能够为您提供帮助。

关于java - gcp 数据流流程元素不会转到下一个 ParDO 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49778796/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com