gpt4 book ai didi

java - apache-beam java 的 ElasticsearchIO 是否支持 Templating 和 ValueProvider 参数?调用模板时出错

转载 作者:行者123 更新时间:2023-12-03 01:14:48 32 4
gpt4 key购买 nike

我试图为 Apache 梁创建一个模板,以将数据索引到 Elasticsearch 。正在创建模板,但在调用模板时,管道因无协议(protocol)错误而失败。它看起来很奇怪,因为错误与 URL 对象有关。

public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

public interface IndexToEsOptions extends PipelineOptions {
@Description("Path of the gzip index file to read from")
ValueProvider<String> getInputIndexFile();
void setInputIndexFile(ValueProvider<String> value);

@Description("Index name to index with")
ValueProvider<String> getIndexName();
void setIndexName(ValueProvider<String> value);

@Description("Index template name")
ValueProvider<String> getIndexTemplate();
void setIndexTemplate(ValueProvider<String> value);

@Description("URI for es")
@Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
ValueProvider<String> getEsUri();
void setEsUri(ValueProvider<String> value);
}

public static void main(String[] args) {



IndexToEsOptions options = PipelineOptionsFactory
.fromArgs(args).
withValidation().as(IndexToEsOptions.class);

Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputIndexFile()))
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
new String[]{options.getEsUri().toString()},
options.getIndexName().toString(),
options.getIndexTemplate().toString())
.withConnectTimeout(240)
)
.withMaxBatchSizeBytes(15 * 1024 * 1024)
);

p.run();
}
我运行时得到的错误是

java.lang.IllegalArgumentException: Cannot get Elasticsearch versionorg.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:194)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165)org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125)org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358)org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)java.base/java.lang.Thread.run(Thread.java:834) Caused by:org.apache.beam.sdk.util.UserCodeException:java.lang.IllegalArgumentException: Cannot get Elasticsearch versionorg.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(UnknownSource)org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80)org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62)org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95)org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183)... 14 more Caused by: java.lang.IllegalArgumentException: Cannot getElasticsearch versionorg.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1475)org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271)Caused by: java.net.MalformedURLException: no protocol:RuntimeValueProvider{propertyName=esUri,default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}java.base/java.net.URL.(URL.java:627)java.base/java.net.URL.(URL.java:523)java.base/java.net.URL.(URL.java:470)org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$ConnectionConfiguration.createClient(ElasticsearchIO.java:417)org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1457)org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271)org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(UnknownSource)org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80)org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62)org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95)org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165)org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125)org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358)org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)java.base/java.lang.Thread.run(Thread.java:834)

最佳答案

简单地说,不,它看起来不像 ElasticsearchIO.ConnectionConfiguration支持 ValueProviders,至少在当前版本 (2.22.0) 中不支持。您可以通过查看 ConnectionConfiguration.Create 的签名来看到这一点。 :

public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
java.lang.String index,
java.lang.String type)
并将其与支持 ValueProviders 的函数进行比较, ElasticsearchIO.Read.withQuery :
public ElasticsearchIO.Read withQuery(ValueProvider<java.lang.String> query)
要支持 ValueProvider,函数必须实际接受 ValueProvider 对象。这是因为 ValueProvider 旨在在运行时而不是在管道构建期间传递参数。所以在管道构建过程中,它必须作为一个 ValueProvider 对象到处传递。
在您的示例中,您正在调用 toString在您的 ValueProvider 上为 EsUri ,而不是生成一个包含您的 URL 的字符串,您将获得一个字符串表示形式的 ValueProvider,如下所示: "RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com} .这就是为什么你会得到一个 MalformedURLException .它试图将该字符串读取为失败的 URL。
解决方案很简单,您只需更改 EsUri通过将参数从 ValueProvider<String> 更改为构造时间参数至 String .请注意,将其用作构造时间参数意味着每次要更改该参数时都需要重建管道。不幸的是,在添加 ValueProvider 支持之前,您无能为力。

关于java - apache-beam java 的 ElasticsearchIO 是否支持 Templating 和 ValueProvider 参数?调用模板时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63044207/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com