- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Beam 和 Flink runner 设置流处理管道。 Flink 是一个本地 session 部署,包含以下 docker-compose 文件:
version: "3"
services:
jobmanager:
image: flink:1.12.0-scala_2.12-java11
container_name: flink-jobmanager
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
ports:
- 8081:8081
command: jobmanager
networks:
- flink
taskmanager:
image: flink:1.12.0-scala_2.12-java11
container_name: flink-taskmanager
environment:
FLINK_PROPERTIES: "jobmanager.rpc.address: jobmanager"
command: taskmanager
networks:
- flink
networks:
flink:
下面是我的Beam应用的pom.xml:
<properties>
<beam.version>2.27.0</beam.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-kafka -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.12</artifactId>
<version>${beam.version}</version>
</dependency>
</dependencies>
我的 Beam 应用程序在 Beam Direct runner 上运行得很好,但是当我尝试在 Flink runner 上运行时,出现以下异常:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:470)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1122)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:277)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:69)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1373)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:700)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.util.ArrayList.readObject(Unknown Source)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:467)
... 14 more
JDK 版本:Oracle Java 11
执行命令:
mvn exec:java -Dexec.mainClass=vn.duclm.beam.BeamApplication \
-Dexec.args="--runner=FlinkRunner \
--flinkMaster=localhost:8081 \
--filesToStage=target/beam-hello.jar"
我尝试同步 Flink 和 Beam 的版本,但没有成功。我现在不知道可以做些什么来解决这个问题。在此先感谢您的帮助。
最佳答案
这看起来是您的 Flink 客户端使用与 Flink 集群不同版本的 Scala 库的问题。我相信 flink-runner 仍然链接到 Scala 2.11 库。尝试按如下方式修改您的 pom.xml
:
<properties>
<beam.version>2.27.0</beam.version>
<flink.version>1.12.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.12</artifactId>
<version>${beam.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-stream-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
相反,您可以指定一个使用 Scala 2.11 的 Flink 容器。
关于java - Flink runner 上的 Beam : ClassNotFoundException: org. apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65948490/
我听说 Translate API 需要付费,但究竟是什么阻止了我们使用免费的 Google 翻译服务 here免费 ?否则,免费服务的限制是什么? 最佳答案 根据下面的链接,没有什么可以阻止您。 h
我正在修复我的 Karma 配置以运行 Angular2 - rc 1 版本的测试。我可以运行测试,但如果我在 html 中有一个翻译管道,它们就会失败。(我可以让它工作的配置是从 [这里][1] 得
我正在使用以下代码: GttService myService = new GttService("ex1cor.ex1Ap.1"); myService.setUserCredentials("ex
是否可以在 Silverstripe 3 中翻译数据对象? 我使用这个模块: http://www.silverstripe.org/translatable-module/ 在我的配置中是否定义了以
我有以下三个问题 我想使用 Google 的 API 来翻译文本。我知道谷歌对翻译和检测单独收费。谷歌翻译也支持翻译两种翻译方式 i) 通过指定源和目标,如 https://www.googleapi
我一直在使用 Microsoft Translator 的 HTTP API 来翻译我网站上的文本。 作为the documentation describes , 有一个选项可以指定翻译的类别。找不
当您在 localhost 上开发应用程序时,是否有机会在不获取 key 的情况下使用 Use Google Translation API? 我希望这样的事情能够奏效' https://www.go
我正在尝试翻译实体的某些字段,但在尝试创建对象时出现以下错误... id; } /** * Set name * * @param string $nam
当用户访问我们的网站时,我们使用 Google Translate API 将我们的内容翻译成用户的语言。 (当然,我们遵循署名和链接要求,以便用户知道内容是 Google 的翻译。) 为了优化,我们
我非常频繁地使用谷歌翻译 API V2,在大约 2000 个请求之后,我开始在返回的 JSON 中得到这个: Array ( [error] => Array (
我刚开始使用 Google 翻译 API,在测试过程中我们注意到,对于某些翻译(我尚未找到模式),我们会在响应中收到\u200b 字符。这会导致很多问题,最重要的是,它似乎没有任何目的或没有任何意义。
从laravel 5.8升级到laravel 6.0后,发现这个错误。 Method Illuminate\Translation\Translator::getFromJson does not e
这是一个基本的移动滑入/滑出菜单。 我发现很难调试,但基本上我的问题是,当我按下菜单按钮时,菜单会顺利打开,再次按下它会顺利关闭。然而,当我再次按下它(第三次)时,它不顺利打开,它只是出现。但是它仍然
我的 Android Studio 的翻译编辑器无法正常工作。如果我打开翻译编辑器确实列出了字符串的正确键,但是找不到默认值和翻译。所有键都有一个默认值,其中大多数也有一个翻译。 我重新启动了 And
我正在尝试从 python 控制台而不是通过 bazel -build 运行 Tensorflow 的 translate.py,但我在这两行出现错误: from tensorflow.models.
本文整理了Java中org.apache.tika.language.translate.YandexTranslator.translate()方法的一些代码示例,展示了YandexTranslat
我需要来自 Google Transle API 的同义词信息。有没有可能得到它? 最佳答案 抱歉不行。看到这个 post .看起来他们opened a PIT添加功能。你应该给它加星号来增加重量!
我在我的一个项目中使用 Google Cloud Translation API。我想指定翻译的性别。我无法在 Google Cloud Translation 中找到相关信息。我也在互联网上搜索了很
我已经在我的 Angular-Cli 应用程序中实现了 ngx-translate 并且在我执行以下操作时工作正常: {{ 'some.value' | translate }} 但是我该如何翻译 H
我决定在我的项目中使用 Google Cloud Translation API。在我尝试运行他们的脚本之前,一切似乎都很好。它总是说我需要“使用 require([])”。 在我在 require
我是一名优秀的程序员,十分优秀!