- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我想使用 Spark runner 运行管道,数据存储在远程机器上。以下命令已用于提交作业:
./spark-submit --class org.apache.beam.examples.WordCount --master spark://192.168.1.214:6066 --deploy-mode cluster --supervise --executor-memory 2G --total-executor-cores 4 hdfs://192.168.1.214:9000/input/word-count-ck-0.1.jar --runner=SparkRunner
它正在创建以下响应:
Running Spark using the REST application submission protocol.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/12 14:44:49 INFO RestSubmissionClient: Submitting a request to launch an application in spark://192.168.1.214:6066.
17/06/12 14:44:49 INFO RestSubmissionClient: Submission successfully created as driver-20170612200920-0006. Polling submission state...
17/06/12 14:44:49 INFO RestSubmissionClient: Submitting a request for the status of submission driver-20170612200920-0006 in spark://192.168.1.214:6066.
17/06/12 14:44:49 INFO RestSubmissionClient: State of driver driver-20170612200920-0006 is now RUNNING.
17/06/12 14:44:49 INFO RestSubmissionClient: Driver is running on worker worker-20170612193258-192.168.1.214-37336 at 192.168.1.214:37336.
17/06/12 14:44:49 INFO RestSubmissionClient: Server responded with CreateSubmissionResponse:
{
"action" : "CreateSubmissionResponse",
"message" : "Driver successfully submitted as driver-20170612200920-0006",
"serverSparkVersion" : "1.6.3",
"submissionId" : "driver-20170612200920-0006",
"success" : true
}
但是,作业卡在“正在运行”状态,stderror 显示以下异常以及其他详细信息:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.IllegalStateException: Unable to find registrar for hdfs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:204)
at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:294)
at org.apache.beam.examples.WordCount.main(WordCount.java:132)
... 6 more
以下是我在项目中使用的插件和依赖:
<packaging>jar</packaging>
<properties>
<beam.version>2.0.0</beam.version>
<surefire-plugin.version>2.20</surefire-plugin.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.3</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
<version>2.8.8</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
<!-- <exclusions>
<exclusion>
<artifactId>beam-sdks-java-core</artifactId>
</exclusion>
</exclusions> -->
</dependency>
<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- Dependencies below this line are specific dependencies needed by the examples code. -->
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>1.22.0</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
<version>v2-rev295-1.22.0</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>1.22.0</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>v1-rev10-1.22.0</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
<!-- Add slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.14</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.14</version>
<!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
<scope>runtime</scope>
</dependency>
<!-- Hamcrest and JUnit are required dependencies of PAssert,
which is used in the main code of DebuggingWordCount example. -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-common</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- The DirectRunner is needed for unit tests. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0-alpha2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<parallel>all</parallel>
<threadCount>4</threadCount>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${surefire-plugin.version}</version>
</dependency>
</dependencies>
</plugin>
<!-- Ensure that the Maven jar plugin runs before the Maven
shade plugin by listing the plugin higher within the file. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
<!--
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
that require this for job submission to a cluster.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.4.0</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
fatjar 包含 HadoopFileSystemRegistrar。以下是WordCount类的源代码:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;
import java.util.Collections;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
//import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;
/**
* An example that counts words in Shakespeare and includes Beam best practices.
*/
public class WordCount {
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics
.counter(ExtractWordsFn.class, "emptyLines");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.inc();
}
// Split the line into words.
String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
/**
* A SimpleFunction that converts a Word and Count into a printable string.
*/
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words
.apply(Count.<String>perElement());
return wordCounts;
}
}
/**
* Options supported by {@link WordCount}. Concept #4: Defining your own
* configuration options. Here, you can add your own arguments to be
* processed by the command-line parser, and specify default values for
* them. You can then access the options values in your pipeline code.
* Inherits standard configuration options.
*/
public interface WordCountOptions extends HadoopFileSystemOptions {
/**
* By default, this example reads from a public dataset containing the
* text of King Lear. Set this option to choose a different input file
* or glob.
*/
@Description("Path of the file to read from")
@Default.String("hdfs://192.168.1.214:9000/beamWorks/kinglear.txt")
String getInputFile();
void setInputFile(String value);
/**
* Set this required option to specify where to write the output.
*/
@Description("/home/ankit/kinglear_chandan.txt ")
@Default.String("hdfs://192.168.1.214:9000/beamWorks/ckoutput/ck")
@Required
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) {
String[] args1 =new String[]{ "--hdfsConfiguration=[{\"fs.defaultFS\" : \"hdfs://192.168.1.214:9000\"}]","--runner=SparkRunner"};
WordCountOptions options = PipelineOptionsFactory
.fromArgs(args1)
.withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
}
最佳答案
如果您按照 the accepted answer 中的描述/链接设置了 fs.DefaultFS ,但仍然遇到问题(我就是这种情况),根本原因可能不同。
这可能是由于 Java ServiceLoader 无法找到 HadoopFileSystemRegistrar。在这种情况下,您可能必须修改可执行 jar 的组装方式。 This answer一个类似的问题(gs 而不是 hdfs)为我提供了解决方案。
编辑:我没有使用 spark,而是使用 Flink,并直接使用“yarn jar”运行管道的主类。
关于maven - Apache 光束 :'Unable to find registrar for hdfs',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44497662/
我希望能够在Vim中使用gq组合键来设置Beamer LaTeX幻灯片中的文本格式。 例如,我想要gqap来转换它: \begin{frame}{Some title} \begin{itemize}
我正在创建一个应用程序来捕获 android 光束并启动我的应用程序而不是指定的应用程序。例如:发送一个网页,但我的应用程序启动了。 我尝试使用 NDEF Intent 过滤器设置 list ,但它似
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 6 年前。 Improve this ques
我正在尝试运行一个管道,我能够使用 DirectRunner 成功运行它, 在谷歌云数据流上。当我执行这个 Maven 命令时: mvn compile exec:java \ -Dexec.
向 GCP 提交数据流作业时出现此错误: Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packag
我正在处理 beams word count examples (在 python 中)。我能够在 DataflowRunner 上运行示例并接收输出。 输出文件目前看起来像: itself: 16
我对 Beam 还是个新手,但您究竟如何读取 GCS 存储桶中的 CSV 文件?我本质上是使用 Beam 将这些文件转换为 pandas 数据框,然后应用 sklearn 模型来“训练”这些数据。我见
我对 Beam 还是个新手,但您究竟如何读取 GCS 存储桶中的 CSV 文件?我本质上是使用 Beam 将这些文件转换为 pandas 数据框,然后应用 sklearn 模型来“训练”这些数据。我见
我一直在使用 Beam 管道 examples作为尝试从 S3 为我的管道加载文件的指南。就像在示例中一样,我定义了自己的 PipelineOptions ,它也扩展了 S3Options我正在尝试使
Beam 同时使用了 Google 的 auto/value和 auto/service工具。 我想使用 Dataflow runner 运行管道,数据存储在 Google Cloud Storage
我想使用 Spark runner 运行管道,数据存储在远程机器上。以下命令已用于提交作业: ./spark-submit --class org.apache.beam.examples.Wor
在 beam 2.14.0 下有以下代码 | "FixedWindow" >> beam.WindowInto(beam.window.FixedWindows(4 * 60),
我有一个使用 apache beam 的 .py 管道导入另一个模块 (.py),这是我的自定义模块。我有这样的结构: ├── mymain.py └── myothermodule.py 我像这样在
我们使用以下代码将记录写入 BigQuery: BigQueryIO.writeTableRows() .to("table") .withCreateDisposition(BigQ
根据 BigQuery 文档,您可以通过提供 insertId ( https://cloud.google.com/bigquery/streaming-data-into-bigquery#dat
为了进行简单的概念验证,我尝试在两分钟的窗口中显示点击数据。我想要做的就是将每个窗口的计数以及窗口的边界打印到 BigQuery。在运行我的管道时,我不断收到以下错误: org.apache.beam
我有一个复杂的数据处理管道,目前在单台机器上用 Python 实现。 管道是围绕处理属于一系列实现文档、页面、单词等的自定义类的对象而构建的。该管道中的大多数操作都是令人尴尬地并行的——它们处理单个文
我有一个管道从 GCS 读取文件通过Pub\Sub , class ExtractFileNameFn(beam.DoFn): def process(self, element):
我正在使用 Apache Beam 的 Python SDK 开发一个在 Google Cloud Dataflow 上运行的工作流程。 在本地运行时,工作流成功完成且没有错误,并且数据输出完全符合预
我构建了一个软件,它使用 GCP Pub/Sub 作为消息队列,使用 Apache Beam 构建管道,使用 Flask 构建网络服务器。它在生产中运行顺利,但我很难将所有部分与 docker-com
我是一名优秀的程序员,十分优秀!