gpt4 book ai didi

java - 分别在SPARK中处理多个文件

转载 作者:太空宇宙 更新时间:2023-11-04 12:47:53 24 4
gpt4 key购买 nike

我需要有关使用 Apache Spark 实现一个工作流程的帮助。我的任务是:

  1. 我有几个 CSV 文件作为源数据。注意:这些文件可能有不同的布局

  2. 我有元数据,其中包含我需要如何解析每个文件的信息(这不是问题)

  3. 主要目标:结果是带有几个附加列的源文件。我必须更新每个源文件而不加入一个输出范围。例如:源 10 个文件 -> 10 个结果文件,每个结果文件仅包含来自相应源文件的数据。

据我所知,Spark 可以通过掩码打开许多文件:

 var source = sc.textFile("/source/data*.gz");

但在这种情况下我无法识别文件的哪一行。如果我获取源文件列表并尝试按以下场景进行处理:

JavaSparkContext sc = new JavaSparkContext(...);
List<String> files = new ArrayList() //list of source files full name's
for(String f : files)
{
JavaRDD<String> data = sc.textFile(f);
//process this file with Spark
outRdd.coalesce(1, true).saveAsTextFile(f + "_out");
}

但在这种情况下,我将以顺序模式处理所有文件。

接下来我的问题是:如何以并行模式处理多个文件?例如:一个文件-一个执行器?

我尝试通过带有源数据的简单代码来实现这一点:

//JSON file with paths to 4 source files, saved in inData variable
{
"files": [
{
"name": "/mnt/files/DigilantDaily_1.gz",
"layout": "layout_1"
},
{
"name": "/mnt/files/DigilantDaily_2.gz",
"layout": "layout_2"
},
{
"name": "/mnt/files/DigilantDaily_3.gz",
"layout": "layout_3"
},
{
"name": "/mnt/files/DigilantDaily_4.gz",
"layout": "layout_4"
}
]
}

sourceFiles= new ArrayList<>();
JSONObject jsFiles = (JSONObject) new JSONParser().parse(new FileReader(new File(inData)));
Iterator<JSONObject> iterator = ((JSONArray)jsFiles.get("files")).iterator();
while (iterator.hasNext()){
SourceFile sf = new SourceFile();
JSONObject js = iterator.next();
sf.FilePath = (String) js.get("name");
sf.MetaPath = (String) js.get("layout");
sourceFiles.add(sf);
}

SparkConf sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("spark-app");
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

try {

final Validator validator = new Validator();

ExecutorService pool = Executors.newFixedThreadPool(4);

for(final SourceFile f : sourceFiles)
{
pool.execute(new Runnable() {

@Override
public void run() {

final Path inFile = Paths.get(f.FilePath);

JavaRDD<String> d1 = sparkContext
.textFile(f.FilePath)
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return validator.parseRow(s);
}
});

JavaPairRDD<String, Integer> d2 = d1.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String userAgent = validator.getUserAgent(s);
return new Tuple2<>(DeviceType.deviceType(userAgent), 1);
}
});

JavaPairRDD<String, Integer> d3 = d2.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer val1, Integer val2) throws Exception {
return val1 + val2;
}
});

d3.coalesce(1, true)
.saveAsTextFile(outFolder + "/" + inFile.getFileName().toString());//, org.apache.hadoop.io.compress.GzipCodec.class);
}
});
}
pool.shutdown();
pool.awaitTermination(60, TimeUnit.MINUTES);
} catch (Exception e) {
throw e;
} finally {
if (sparkContext != null) {
sparkContext.stop();
}
}

但是这段代码失败了,但有异常:

Exception in thread "pool-13-thread-2" Exception in thread "pool-13-thread-3" Exception in thread "pool-13-thread-1" Exception in thread "pool-13-thread-4" java.lang.Error: org.apache.spark.SparkException: Task not serializable
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:335)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:334)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.filter(RDD.scala:334)
at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
at append.dev.App$1.run(App.java:87)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
... 2 more

我想知道我哪里错了?

谢谢你帮助我!

最佳答案

我使用了类似的多线程方法,效果良好。我相信问题出在您定义的内部类中。

在单独的类上创建可运行/可调用,并确保它与您提交的 jar 一起传递到 Spark。另外,当您隐式地将状态传递给函数(f.FilePath)时,请实现可序列化。

关于java - 分别在SPARK中处理多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36133710/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com