- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我的输入数据如下所示:
id,vin,url,exteriorColor,interiorColor,design,transmission,lastcrawled,mileage,price,certified,dealerId,historyType,MSRP
114722309,19XVC2F35PR012846,http://www.pohankaacura.com/auto/used-2017-acura-ilx-chantilly-va-near-buckeystown-md/24742881/,Modern Steel,graystone,0,8-Speed Dual-Clutch,2018-02-05 01:49:47 UTC,1646,22550,0,28453
我想构建一个 Beam 管道,该管道将从 csv 文件中读取此数据,获取 vin 并计算 vin 在文件中出现的次数。所以我想按 vin 分组并计算计数。我希望我的最终输出位于平面文件中。我错过了注释,所以我现在添加了它,但我收到了不同的错误,而且我在这里也找不到解决方案。下面是我的代码。
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
public class p1 {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("~/slow_storage_drive/beam_test_files/one_vin.csv"))
.apply("Parse&ConvertToKV", MapElements.via(
new SimpleFunction<String, KV<String, Integer>>() {
public KV<String, Integer> apply(String input){
String[] split = input.split(",");
String key = split[1];
Integer value = 1;
return KV.of(key, value);
}
}
))
.apply(GroupByKey.<String, Integer>create())
.apply("SumOfValuesByKey", ParDo.of(new DoFn<KV<String, Iterable<Integer>>, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
Integer crawlCount = 0;
String vin = context.element().getKey();
Iterable<Integer> counts = context.element().getValue();
for (Integer count : counts){
crawlCount += count;
}
context.output(vin + ": " + crawlCount);
}
}))
.apply(TextIO.write().to("~/slow_storage_drive/beam_example_files/emr_beam_test/final_output").withoutSharding());
p.run().waitUntilFinish();
}
}
我尝试使用以下命令运行该程序:
mvn compile -X exec:java -Dexec.mainClass=p1 -Pdirect-runner
我收到以下错误:
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project emr_beam_test: An exception occured while executing the Java class. java.lang.IllegalStateException: Invisible parameter type of p1$2 arg0 for public p1$2$DoFnInvoker(p1$2) -> [Help 1]
我无法理解我做错了什么。谁能帮帮我吗?
最佳答案
您必须使用 @ProcessElement 注释来注释您的匿名类方法 processElement。
更多关于注解的信息请引用https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html
关于java - Apache Beam 管道从 csv 文件读取、拆分、groupbyKey 并写入文本文件时出现 "IllegalStateException"错误。为什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52921931/
我有课Entreprise具有基元数据类型和另一个类上的映射:Etablissement它仅由原始数据类型组成。 public class Entreprise implements Comparab
请帮助我理解在数据集上使用时传递给 groupByKey 的参数 scala> val data = spark.read.text("Sample.txt").as[String] data: or
groupByKey 的文档中有一些可怕的语言,警告它可能“非常昂贵”,并建议使用 aggregateByKey相反,只要有可能。 我想知道成本的差异是否来自这样一个事实,即对于某些聚合,整个组永远不
我为我的数据定义了一个自定义类 Person 并使用了 groupByKey 操作,如下所示: public class Person implements Serializable { pr
我正在尝试根据交易数据生成简单的客户摘要。例如,给定目标交易类型,发生了多少笔交易以及总金额是多少? 原始输入示例: custid desc amount 111 coffee
我正在尝试处理一些数据并以这样的方式写入输出,即结果按键分区,并按另一个参数排序 - 比如 ASC。例如, >>> data =sc.parallelize(range(10000))
根据 Databricks 最佳实践,应避免使用 Spark groupByKey,因为 Spark groupByKey 处理的工作方式是,信息将首先在 worker 之间洗牌,然后再进行处理发生。
给定一个列表[v]和一个键控函数f::v -> k,我想生成一个 map Map k [v]。 Haskell 中是否存在这样的东西? import Data.Map groupByKey :: (v
我怎样才能让 GroupByKey 触发早期结果,而不是等待所有数据到达(在我的情况下这是相当长的时间)。我试图通过早期触发将我的输入 PCollection 拆分到窗口中,但是它只是行不通。在给出结
我在 java 中有一个 apache beam 管道,如下所示: pipeline .apply("Read pubsub",PubsubIO.readStrings
我有一个类型为 ((id, ts), some value) 的有序 RDD。这是仅在 id 字段上使用自定义分区程序进行分区的。 math.abs(id.hashCode % numPartitio
这是 here 的后续问题。我正在尝试基于此实现 k-means implementation 。它工作得很好,但是我想用reduceByKey()替换groupByKey(),但我不知道如何做(我不
我正在做我在 Spark (Python) 上的第一步,我正在努力使用 groupByKey() 中的迭代器。我无法对这些值求和:我的代码如下所示: example = sc.parallelize(
我有一些看起来像这样的数据: ([('01','A','2016-01-01','8701','123','2016-10-23'),('01','A','2016- 01-01','8701','1
我总是用reduceByKey当我需要在 RDD 中对数据进行分组时,因为它会在对数据进行混洗之前执行 map 侧缩减,这通常意味着更少的数据会被混洗,从而获得更好的性能。即使map端reduce函数
谁能解释一下reducebykey、groupbykey、aggregatebykey和combinebykey之间的区别吗?我已阅读有关此内容的文档,但无法理解确切的差异。 带有示例的解释会很棒。
我有很多这种格式的元组: (1,200,a) (2,300,a) (1,300,b) (2,400,a) (2,500,b) (3,200,a) (3,400,b) (1,500,a) (2,400,
我有一个如下所示的元组列表 ls=[('c', 's'),('c', 'm'), ('c', 'p'), ('h', 'bi'), ('h', 'vi'), ('n', 'l'), ('n', 'nc
我正在尝试使用 Spark 提取邮政编码前缀,但由于尝试使用 org.apache.spark.unsafe.types.UTF8String 作为参数来初始化 java.lang.Double,Sp
我的流数据流管道从 PubSub 提取数据,不会写入 BigQuery,也不会记录任何错误。这些元素进入节点“Write to BigQuery/StreamingInserts/StreamingW
我是一名优秀的程序员,十分优秀!