gpt4 book ai didi

maven - Apache 光束 :'Unable to find registrar for hdfs'

转载 作者:可可西里 更新时间:2023-11-01 15:04:02 28 4
gpt4 key购买 nike

我想使用 Spark runner 运行管道,数据存储在远程机器上。以下命令已用于提交作业:

./spark-submit   --class org.apache.beam.examples.WordCount   --master spark://192.168.1.214:6066   --deploy-mode cluster   --supervise   --executor-memory 2G   --total-executor-cores 4 hdfs://192.168.1.214:9000/input/word-count-ck-0.1.jar --runner=SparkRunner

它正在创建以下响应:

Running Spark using the REST application submission protocol.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/12 14:44:49 INFO RestSubmissionClient: Submitting a request to launch an application in spark://192.168.1.214:6066.
17/06/12 14:44:49 INFO RestSubmissionClient: Submission successfully created as driver-20170612200920-0006. Polling submission state...
17/06/12 14:44:49 INFO RestSubmissionClient: Submitting a request for the status of submission driver-20170612200920-0006 in spark://192.168.1.214:6066.
17/06/12 14:44:49 INFO RestSubmissionClient: State of driver driver-20170612200920-0006 is now RUNNING.
17/06/12 14:44:49 INFO RestSubmissionClient: Driver is running on worker worker-20170612193258-192.168.1.214-37336 at 192.168.1.214:37336.
17/06/12 14:44:49 INFO RestSubmissionClient: Server responded with CreateSubmissionResponse:
{
"action" : "CreateSubmissionResponse",
"message" : "Driver successfully submitted as driver-20170612200920-0006",
"serverSparkVersion" : "1.6.3",
"submissionId" : "driver-20170612200920-0006",
"success" : true
}

但是,作业卡在“正在运行”状态,stderror 显示以下异常以及其他详细信息:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.IllegalStateException: Unable to find registrar for hdfs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:204)
at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:294)
at org.apache.beam.examples.WordCount.main(WordCount.java:132)
... 6 more

以下是我在项目中使用的插件和依赖:

 <packaging>jar</packaging>

<properties>
<beam.version>2.0.0</beam.version>
<surefire-plugin.version>2.20</surefire-plugin.version>
</properties>

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.3</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
<version>2.8.8</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
<!-- <exclusions>
<exclusion>
<artifactId>beam-sdks-java-core</artifactId>
</exclusion>
</exclusions> -->
</dependency>

<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- Dependencies below this line are specific dependencies needed by the examples code. -->
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>1.22.0</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
<version>v2-rev295-1.22.0</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>1.22.0</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>v1-rev10-1.22.0</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.4</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>

<!-- Add slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.14</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.14</version>
<!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
<scope>runtime</scope>
</dependency>

<!-- Hamcrest and JUnit are required dependencies of PAssert,
which is used in the main code of DebuggingWordCount example. -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-common</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- The DirectRunner is needed for unit tests. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0-alpha2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<parallel>all</parallel>
<threadCount>4</threadCount>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${surefire-plugin.version}</version>
</dependency>
</dependencies>
</plugin>

<!-- Ensure that the Maven jar plugin runs before the Maven
shade plugin by listing the plugin higher within the file. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>



<!--
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
that require this for job submission to a cluster.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.4.0</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

fatjar 包含 HadoopFileSystemRegistrar。以下是WordCount类的源代码:

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;

import java.util.Collections;

import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
//import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;

/**
* An example that counts words in Shakespeare and includes Beam best practices.
*/
public class WordCount {
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics
.counter(ExtractWordsFn.class, "emptyLines");

@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.inc();
}

// Split the line into words.
String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);

// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}

/**
* A SimpleFunction that converts a Word and Count into a printable string.
*/
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}

public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words
.apply(Count.<String>perElement());

return wordCounts;
}
}

/**
* Options supported by {@link WordCount}. Concept #4: Defining your own
* configuration options. Here, you can add your own arguments to be
* processed by the command-line parser, and specify default values for
* them. You can then access the options values in your pipeline code.
* Inherits standard configuration options.
*/
public interface WordCountOptions extends HadoopFileSystemOptions {

/**
* By default, this example reads from a public dataset containing the
* text of King Lear. Set this option to choose a different input file
* or glob.
*/
@Description("Path of the file to read from")
@Default.String("hdfs://192.168.1.214:9000/beamWorks/kinglear.txt")
String getInputFile();

void setInputFile(String value);

/**
* Set this required option to specify where to write the output.
*/
@Description("/home/ankit/kinglear_chandan.txt ")
@Default.String("hdfs://192.168.1.214:9000/beamWorks/ckoutput/ck")
@Required
String getOutput();

void setOutput(String value);
}

public static void main(String[] args) {
String[] args1 =new String[]{ "--hdfsConfiguration=[{\"fs.defaultFS\" : \"hdfs://192.168.1.214:9000\"}]","--runner=SparkRunner"};
WordCountOptions options = PipelineOptionsFactory
.fromArgs(args1)
.withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
}

最佳答案

如果您按照 the accepted answer 中的描述/链接设置了 fs.DefaultFS ,但仍然遇到问题(我就是这种情况),根本原因可能不同。

这可能是由于 Java ServiceLoader 无法找到 HadoopFileSystemRegistrar。在这种情况下,您可能必须修改可执行 jar 的组装方式。 This answer一个类似的问题(gs 而不是 hdfs)为我提供了解决方案。

编辑:我没有使用 spark,而是使用 Flink,并直接使用“yarn jar”运行管道的主类。

关于maven - Apache 光束 :'Unable to find registrar for hdfs',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44497662/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com