gpt4 book ai didi

apache-flink - 如何配置 Flink 以将 S3 用于后端状态和检查点

转载 作者:行者123 更新时间:2023-12-01 13:36:23 25 4
gpt4 key购买 nike

我有一个 Flink v1.2、3 个 JobManager、2 个 TaskManager 的设置。我想使用 S3 存储桶而不是 hdfs 来处理后端状态和检查点以及 zookeeper storageDir

fs.s3.accessKey: [accessKey]
fs.s3.secretKey: [secretKey]

state.backend: filesystem

state.backend.fs.checkpointdir: s3:///[bucket]/flink-checkpoints
state.checkpoints.dir: s3:///[bucket]/external-checkpoints
high-availability: zookeeper
high-availability.zookeeper.storageDir: s3:///[bucket]/recovery



在 JobManager 我登录我有
2017-03-22 09:52:40,971 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,981 WARN org.apache.hadoop.security.UserGroupInformation - PriviledgedActionException as:ubuntu (auth:SIMPLE) cause:java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
2017-03-22 09:52:40,981 ERROR org.apache.flink.runtime.jobmanager.JobManager - Failed to run JobManager.
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,983 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.
2017-03-22 09:52:40,993 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.

我没有安装hadoop。不确定是否需要这样做,以及是否应该如何/在哪里安装/配置它?

编辑:使用以下 hadoop xml (core-site.xml) 配置 Flink 后,我不太了解 IAM 部分并且我没有使用 EMR,我自己安装了集群(在 AWS 中)以便能够在不依赖于图片:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>s3://[ bucket ] </value>
</property>

<property>
<name>fs.s3a.access.key</name>
<description>[ Access Key ]</description>
</property>

<property>
<name>fs.s3a.secret.key</name>
<description>[ Secret Key ]</description>
</property>

<property>
<name>fs.s3.awsAccessKeyId</name>
<description>[ Access Key ]</description>
</property>

<property>
<name>fs.s3.awsSecretAccessKey</name>
<description>[ Secret Key ]</description>
</property>

<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>[ Access Key ]</value>
</property>

<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>[ Secret Key ]</value>
</property>

<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>

我收到此错误:
  2017-03-24 11:20:17,760 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:303)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

编辑:我的错误我在描述字段中设置了键而不是值。

最佳答案

请查看 guide on running Flink with S3关于如何设置 S3。

我认为您缺少的是带有 fs.s3.impl 配置键的 hadoop 配置文件。
即使您没有使用 Hadoop,您仍然需要使用 Hadoop 配置文件。

关于apache-flink - 如何配置 Flink 以将 S3 用于后端状态和检查点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42948560/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com