gpt4 book ai didi

java - Apache Beam 管道从 csv 文件读取、拆分、groupbyKey 并写入文本文件时出现 "IllegalStateException"错误。为什么?

转载 作者:行者123 更新时间:2023-12-02 10:40:55 24 4
gpt4 key购买 nike

我的输入数据如下所示:

id,vin,url,exteriorColor,interiorColor,design,transmission,lastcrawled,mileage,price,certified,dealerId,historyType,MSRP
114722309,19XVC2F35PR012846,http://www.pohankaacura.com/auto/used-2017-acura-ilx-chantilly-va-near-buckeystown-md/24742881/,Modern Steel,graystone,0,8-Speed Dual-Clutch,2018-02-05 01:49:47 UTC,1646,22550,0,28453

我想构建一个 Beam 管道,该管道将从 csv 文件中读取此数据,获取 vin 并计算 vin 在文件中出现的次数。所以我想按 vin 分组并计算计数。我希望我的最终输出位于平面文件中。我错过了注释,所以我现在添加了它,但我收到了不同的错误,而且我在这里也找不到解决方案。下面是我的代码。

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;

public class p1 {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("~/slow_storage_drive/beam_test_files/one_vin.csv"))

.apply("Parse&ConvertToKV", MapElements.via(
new SimpleFunction<String, KV<String, Integer>>() {
public KV<String, Integer> apply(String input){
String[] split = input.split(",");
String key = split[1];
Integer value = 1;
return KV.of(key, value);
}
}
))

.apply(GroupByKey.<String, Integer>create())


.apply("SumOfValuesByKey", ParDo.of(new DoFn<KV<String, Iterable<Integer>>, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
Integer crawlCount = 0;
String vin = context.element().getKey();
Iterable<Integer> counts = context.element().getValue();
for (Integer count : counts){
crawlCount += count;
}
context.output(vin + ": " + crawlCount);
}
}))

.apply(TextIO.write().to("~/slow_storage_drive/beam_example_files/emr_beam_test/final_output").withoutSharding());

p.run().waitUntilFinish();
}

}

我尝试使用以下命令运行该程序:

mvn compile -X exec:java -Dexec.mainClass=p1 -Pdirect-runner

我收到以下错误:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project emr_beam_test: An exception occured while executing the Java class. java.lang.IllegalStateException: Invisible parameter type of p1$2 arg0 for public p1$2$DoFnInvoker(p1$2) -> [Help 1]

我无法理解我做错了什么。谁能帮帮我吗?

最佳答案

您必须使用 @ProcessElement 注释来注释您的匿名类方法 processElement。

更多关于注解的信息请引用https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html

关于java - Apache Beam 管道从 csv 文件读取、拆分、groupbyKey 并写入文本文件时出现 "IllegalStateException"错误。为什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52921931/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com