gpt4 book ai didi

java - 使用 Google Dataflow 上的 KafkaIO 通过 SSL 连接到 Kafka

转载 作者:太空宇宙 更新时间:2023-11-03 14:29:06 24 4
gpt4 key购买 nike

从服务器上,我能够连接并从配置了 SSL 的远程 kafka 服务器主题中获取数据。

从 GCP,我如何使用传递 SSL 信任库、 keystore 证书位置和 Google 服务帐户 json 的 Google Dataflow 管道连接到远程 kafka 服务器?

我正在使用 Eclipse 插件作为数据流运行器选项。

如果我指向 GCS 上的证书,当证书指向 Google 存储桶时它会抛出错误。


Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

Caused by: org.apache.kafka.common.KafkaException:
java.io.FileNotFoundException:
gs:/bucket/folder/truststore-client.jks (No such file or directory)

已关注:Truststore and Google Cloud Dataflow

更新代码指向 SSL 信任库, keystore 位置到本地机器的/tmp 目录证书,以防 KafkaIO 需要从文件路径读取。它没有抛出 FileNotFoundError。

尝试从 GCP 帐户运行服务器 Java 客户端代码并使用 Dataflow - Beam Java 管道,我收到以下错误。


ssl.truststore.location = <LOCAL MACHINE CERTICATE FILE PATH>
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 1.0.0
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : aaa7af6d4a11b29d
org.apache.kafka.common.network.SslTransportLayer close
WARNING: Failed to send SSL Close message
java.io.IOException: Broken pipe

org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:153)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:205)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at

org.apache.kafka.common.utils.LogContext$KafkaLogger warn
WARNING: [Consumer clientId=consumer-1, groupId=test-group] Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

任何建议或示例表示赞赏。

最佳答案

Git 从本地机器克隆或上传 Java Maven 项目到 GCP Cloud Shell 主目录。在 Cloud Shell 终端上使用 Dataflow runner 命令编译项目。

mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=com.packagename.JavaClass \
-Dexec.args="--project=PROJECT_ID \
--stagingLocation=gs://BUCKET/PATH/ \
--tempLocation=gs://BUCKET/temp/ \
--output=gs://BUCKET/PATH/output \
--runner=DataflowRunner"

确保运行器设置为 DataflowRunnner.class,并且在云端运行作业时,您会在 Dataflow 控制台上看到该作业。 DirectRunner 执行不会显示在云数据流控制台上。

将证书放在 Maven 项目的资源文件夹中,并使用 ClassLoader 读取文件。

ClassLoader classLoader = getClass().getClassLoader();
File file = new File(classLoader.getResource("keystore.jks").getFile());
resourcePath.put("keystore.jks",file.getAbsoluteFile().getPath());

编写 ConsumerFactoryFn() 以复制 Dataflow 的“/tmp/”目录中的证书,如 https://stackoverflow.com/a/53549757/4250322 中所述

将 KafkaIO 与资源路径属性结合使用。

Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/tmp/truststore.jks");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/tmp/keystore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);

//other properties
...

PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers(BOOTSTRAP_SERVERS)
.withTopic(TOPIC)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(props)
.withConsumerFactoryFn(new ConsumerFactoryFn())
.withMaxNumRecords(50)
.withoutMetadata()
).apply(Values.<String>create());

// Apply Beam transformations and write to output.

关于java - 使用 Google Dataflow 上的 KafkaIO 通过 SSL 连接到 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54337653/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com