gpt4 book ai didi

java - 在 Apache Beam 中读取 CSV 文件时跳过 header

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:08:10 24 4
gpt4 key购买 nike

我想跳过 CSV 文件的标题行。截至目前,我在将 header 加载到谷歌存储之前手动删除 header 。

下面是我的代码:

PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr = c.element().split(",");
ClassFinance fin = new ClassFinance();
fin.setBeneficiaryFinance(strArr[0]);
fin.setCatlibCode(strArr[1]);
fin.set_rNR_(Double.valueOf(strArr[2]));
fin.set_rNCS_(Double.valueOf(strArr[3]));
fin.set_rCtb_(Double.valueOf(strArr[4]));
fin.set_rAC_(Double.valueOf(strArr[5]));
c.output(fin);
}
}));

我已经检查了 stackoverflow 中的现有问题,但我觉得它没有希望:Skipping header rows - is it possible with Cloud DataFlow?

有什么帮助吗?

编辑:我已经尝试过类似下面的方法并且有效:

PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       

PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr2 = c.element().split(",");
String header = Arrays.toString(strArr2);
ClassFinance fin = new ClassFinance();

if(header.contains("Beneficiary"))
System.out.println("Header");
else {
fin.setBeneficiaryFinance(strArr2[0].trim());
fin.setCatlibCode(strArr2[1].trim());
fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
c.output(fin);
}
}
}));

最佳答案

您分享的较旧的 Stack Overflow 帖子 ( Skipping header rows - is it possible with Cloud DataFlow? ) 确实包含您问题的答案。

此选项在 Apache Beam SDK 中当前不可用,尽管 Apache Beam JIRA 问题跟踪器中有一个开放的功能请求BEAM-123 .请注意,在撰写本文时,此功能请求仍处于打开状态且未得到解决,而且这种情况已经持续了 2 年。然而,从这个意义上说,似乎正在做一些努力,并且该问题的最新更新是从 2018 年 2 月开始的,所以我建议您随时了解该 JIRA 问题的最新情况,因为它最近已移至 sdk -java-core 组件,它可能会在那里受到更多关注。

考虑到这些信息,我会说您使用的方法(在将文件上传到 GCS 之前删除 header )是您的最佳选择。我会避免手动执行此操作,因为您可以轻松编写脚本并自动执行删除 header 上传文件 过程。


编辑:

我已经能够使用 DoFn 想出一个简单的过滤器。它可能不是最优雅的解决方案(我自己不是 Apache Beam 专家),但它确实有效,您可以根据自己的需要对其进行调整。它要求您事先知道正在上传的 CSV 文件的 header (因为它将按元素内容过滤),但同样,将其作为您可以根据需要修改的模板:

public class RemoveCSVHeader {
// The Filter class
static class FilterCSVHeaderFn extends DoFn<String, String> {
String headerFilter;

public FilterCSVHeaderFn(String headerFilter) {
this.headerFilter = headerFilter;
}

@ProcessElement
public void processElement(ProcessContext c) {
String row = c.element();
// Filter out elements that match the header
if (!row.equals(this.headerFilter)) {
c.output(row);
}
}
}

// The main class
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));

String header = "col1,col2,col3,col4";

vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
.apply(TextIO.write().to("out"));

p.run().waitUntilFinish();
}
}

关于java - 在 Apache Beam 中读取 CSV 文件时跳过 header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50855647/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com