gpt4 book ai didi

java - Apache Beam 从 Pub/Sub 流式传输到 ElasticSearch

转载 作者:行者123 更新时间:2023-12-02 01:13:48 30 4
gpt4 key购买 nike

我正在使用 Apache Beam 编写一个 Java 流管道,它从 Google Cloud PubSub 读取消息并将它们写入 ElasticSearch 实例。目前,我使用的是直接运行程序,但计划将该解决方案部署在 Google Cloud Dataflow 上。

首先,我编写了一个从 PubSub 读取并写入文本文件的管道,它可以工作。然后,我启动了 ElasticSearch 实例,这也起作用了。我用curl写了一些文档,很简单。

然后,当我尝试使用 Beam 的 ElasticSearch 连接器执行写入时,我开始出现一些错误。实际上,我得到了 ava.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest,尽管我添加了对 pom.xml 文件的依赖项。

我所做的基本上是这样的:

messages.apply(
"TwoMinWindow",
Window.into(FixedWindows.of(new Duration(120*1000)))
).apply(
"ElasticWrite",
ElasticsearchIO.write()
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration
.create(new String[]{"http://xxx.xxx.xxx.xxx:9200"}, "streaming_data", "string")
.withUsername("xxxx")
.withPassword("xxxxxxxx")
)
);

使用 DirectRunner,我可以连接到 PubSub,但当管道尝试连接 ElasticSearch 实例时出现异常:

java.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest(Ljava/lang/String;Ljava/lang/String;[Lorg/apache/http/Header;)Lorg/elasticsearch/client/Response;
at org.apache.beam.sdk.util.UserCodeException.wrap (UserCodeException.java:34)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup (Unknown Source)
at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor (DoFnInvokers.java:50)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load (DoFnLifecycleManager.java:104)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load (DoFnLifecycleManager.java:91)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture (LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync (LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad (LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get (LocalCache.java:2044)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get (LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad (LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get (LocalCache.java:4958)
at org.apache.beam.runners.direct.DoFnLifecycleManager.get (DoFnLifecycleManager.java:61)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator (ParDoEvaluatorFactory.java:129)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication (ParDoEvaluatorFactory.java:79)
at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication (TransformEvaluatorRegistry.java:169)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest(Ljava/lang/String;Ljava/lang/String;[Lorg/apache/http/Header;)Lorg/elasticsearch/client/Response;
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion (ElasticsearchIO.java:1348)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup (ElasticsearchIO.java:1200)

我在 pom.xml 中添加的是:

    <dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elastic.version}</version>
</dependency>

我被这个问题困扰了,不知道如何解决。如果我使用 JestClient,我可以毫无问题地连接到 ElasticSearch。

你有什么建议吗?

最佳答案

您使用的是较新版本的 RestClient,该版本没有 performRequest(String, Header) 方法。如果你看the latest source code ,您可以看到该方法现在需要一个 Request ,而在旧版本中 there were methods that took Strings and Headers 。这些方法已被弃用,然后 removed from the code on September 1, 2018 .

更改您的代码以使用较新的 Elastic Search 库,或指定与您的兼容的较旧版本的库(它需要在 7.0.x 之前,例如 6.8.4 )代码。

关于java - Apache Beam 从 Pub/Sub 流式传输到 ElasticSearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58961289/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com