gpt4 book ai didi

apache-spark - Azure HDInsight 的 SparkRunner 上的 Apache Beam 管道

转载 作者:可可西里 更新时间:2023-11-01 16:35:35 30 4
gpt4 key购买 nike

我尝试让 Beam 管道在 Azure 的 HDInsight SparkRunner 上运行。我首先尝试使用基于 Spark 2.3.0/Hadoop 2.7 (HDI 3.6) 的集群,然后是 2.3.1/Hadoop 3.0 (HDI 4.0 Preview)。我尝试使用 Apache Beam 2.2.0 和下一个 2.10.0-SNAPSHOT。

spark-submit 命令是(对于 Beam 2.10.0):

JARS="wasbs:///dependency/hadoop-azure-3.1.1.3.0.2.0-50.jar,wasbs:///dependency/azure-storage-7.0.0.jar,wasbs:///dependency/beam-model-fn-execution-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-model-job-management-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-model-pipeline-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-runners-core-construction-java-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-runners-core-java-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-runners-direct-java-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-runners-spark-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-sdks-java-core-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-sdks-java-fn-execution-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-sdks-java-io-hadoop-file-system-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-vendor-grpc-1_13_1-0.1.jar"

spark-submit --conf spark.yarn.maxAppAttempts=1 --deploy-mode cluster --master yarn --jars $JARS --class example.MinimalWordCountJava8 wasbs:///mavenproject1-1.0-SNAPSHOT.jar --runner=SparkRunner

(最初 -jars 没有提供 hadoop-azure 和 azure-storage jar,但这没有任何区别)。

main() 看起来像这样:

public static void main(String[] args) {

JavaSparkContext ct = new JavaSparkContext();
Configuration config = ct.hadoopConfiguration();

config.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
config.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
config.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb");
config.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasbs");
config.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
config.set("fs.azure.account.key." + account + ".blob.core.windows.net", key);
config.set("fs.defaultFS", "wasb://" + container + "@" + account + ".blob.core.windows.net");

System.out.println("### hello.txt content:");
JavaRDD<String> content = ct.textFile("wasbs:///hello.txt");
System.out.println(content.toString());

System.out.println("### MinimalWordCountJava8");

PipelineOptions options = PipelineOptionsFactory.create();
SparkContextOptions sparkContextOptions = options.as(SparkContextOptions.class);
sparkContextOptions.setUsesProvidedSparkContext(true);
sparkContextOptions.setProvidedSparkContext(ct);
sparkContextOptions.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(sparkContextOptions);

p.apply(TextIO.read().from("hello.txt"))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
// CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
.apply(TextIO.write().to("output"));

p.run().waitUntilFinish();

调用 Pipeline.create(options); 时失败,异常跟踪:

18/12/09 14:47:10 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Failed to construct Hadoop filesystem with configuration Configuration: /usr/hdp/3.0.2.0-50/hadoop/conf/core-site.xml, /usr/hdp/3.0.2.0-50/hadoop/conf/hdfs-site.xml
java.lang.IllegalArgumentException: Failed to construct Hadoop filesystem with configuration Configuration: /usr/hdp/3.0.2.0-50/hadoop/conf/core-site.xml, /usr/hdp/3.0.2.0-50/hadoop/conf/hdfs-site.xml
at org.apache.beam.sdk.io.hdfs.HadoopFileSystemRegistrar.fromOptions(HadoopFileSystemRegistrar.java:59)
at org.apache.beam.sdk.io.FileSystems.verifySchemesAreUnique(FileSystems.java:489)
at org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions(FileSystems.java:479)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:47)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:145)
at io.aptly.mavenproject1.MinimalWordCountJava8.main(MinimalWordCountJava8.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "wasbs"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:3377)
at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:530)
at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:542)
at org.apache.beam.sdk.io.hdfs.HadoopFileSystem.<init>(HadoopFileSystem.java:82)
at org.apache.beam.sdk.io.hdfs.HadoopFileSystemRegistrar.fromOptions(HadoopFileSystemRegistrar.java:56)
... 10 more
18/12/09 14:47:10 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.IllegalArgumentException: Failed to construct Hadoop filesystem with configuration Configuration: /usr/hdp/3.0.2.0-50/hadoop/conf/core-site.xml, /usr/hdp/3.0.2.0-50/hadoop/conf/hdfs-site.xml
at org.apache.beam.sdk.io.hdfs.HadoopFileSystemRegistrar.fromOptions(HadoopFileSystemRegistrar.java:59)
at org.apache.beam.sdk.io.FileSystems.verifySchemesAreUnique(FileSystems.java:489)
at org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions(FileSystems.java:479)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:47)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:145)
at io.aptly.mavenproject1.MinimalWordCountJava8.main(MinimalWordCountJava8.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "wasbs"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:3377)
at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:530)
at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:542)
at org.apache.beam.sdk.io.hdfs.HadoopFileSystem.<init>(HadoopFileSystem.java:82)
at org.apache.beam.sdk.io.hdfs.HadoopFileSystemRegistrar.fromOptions(HadoopFileSystemRegistrar.java:56)

提交有效(wasps:// 被识别)并且读取小的wasps:///hello.txt 没有失败。这些情况表明在此之前使用 wasps:// 是可以的。

在 Beam 的早期,它似乎失败了。

因此,我通过了 JavaSparkContextPipelineOptions(使用其他 SO 问题/答案建议的动态 hadoop 配置)。但这对我没有影响。

谁能指导如何解决这个问题?

最佳答案

通过快速挖掘代码和错误跟踪器,似乎从 Hadoop 3.2.0(codeJira)开始支持 Azure 作为 Hadoop 文件系统。目前 Beam 固定为 2.7.3 版。这可以解释 Beam 的 HadoopFilesystem 中的故障。

可能 spark-submit 成功了,因为 wasbs:// 通过与 Hadoop 库不同的机制或使用捆绑的更新版本的 Hadoop 得到支持。

关于apache-spark - Azure HDInsight 的 SparkRunner 上的 Apache Beam 管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53695036/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com