gpt4 book ai didi

scala - Elasticsearch连接器可在IDE中工作,但不能在本地集群上工作

转载 作者:行者123 更新时间:2023-12-03 00:14:54 26 4
gpt4 key购买 nike

我正在尝试使用提供的Twitter streamElasticsearch2 connector写入Elasticsearch 2.3索引

在IntelliJ中运行我的工作正常,但是当我在本地集群上运行该jar工作时,出现以下错误:

05/09/2016 13:26:58 Job execution switched to status RUNNING.
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to SCHEDULED
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to DEPLOYING
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to RUNNING
05/09/2016 13:26:59 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to FAILED
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)

05/09/2016 13:26:59 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
05/09/2016 13:26:59 Job execution switched to status FAILED.

------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)

我在scala中的代码:
val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "elasticsearch")
config.put("node.name", "node-1")

config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2/bin")
val transports = new util.ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getLocalHost(),9300))
transports.add(new InetSocketAddress(InetAddress.getLoopbackAddress(),9300))
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))
transports.add(new InetSocketAddress(InetAddress.getByName("localhost"),9300))
stream.addSink(new ElasticsearchSink(config, transports, new ElasticSearchSinkTwitter()))

从IDE运行该程序与本地群集有什么区别?

最佳答案

诸如此类的问题通常是由IDE(IntelliJ,Eclipse)对IDE进行管理/包含的方式不同以及Flink通过胖子进行作业提交所引起的。

前几天我遇到了同样的问题,任务管理器日志文件显示了以下根本原因:

java.lang.IllegalArgumentException: An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist. You need to add the corresponding JAR file supporting this SPI to your classpath. The current classpath supports the following names: [es090, completion090, XBloomFilter]



搜索错误,我在解决问题的SO上找到了此答案:

https://stackoverflow.com/a/38354027/3609571

通过向我的 pom.xml添加以下依赖项:
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>5.4.1</version>
</dependency>

注意,在这种情况下,依存关系的顺序很重要。它仅在将 lucene-core依赖项放在首位时起作用。将其添加到最后对我不起作用。因此,这比适当的修复更像是“黑客”。

关于scala - Elasticsearch连接器可在IDE中工作,但不能在本地集群上工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37114886/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com