gpt4 book ai didi

java - 使用 AWS Java DynamoDB 流 Kinesis 适配器处理 DynamoDB 流

转载 作者:行者123 更新时间:2023-12-01 22:37:46 34 4
gpt4 key购买 nike

我尝试使用 DynamoDB 流和 AWS 提供的 Java DynamoDB 流 Kinesis 适配器捕获 DynamoDB 表更改。我正在 Scala 应用程序中使用 AWS Java 开发工具包。

我首先关注 AWS guide并浏览 AWS 发布的 code example 。但是,我在让亚马逊自己发布的代码在我的环境中运行时遇到问题。我的问题在于 KinesisClientLibConfiguration 对象。

在示例代码中,KinesisClientLibConfiguration 使用 DynamoDB 提供的流 ARN 配置。

new KinesisClientLibConfiguration("streams-adapter-demo",
streamArn,
streamsCredentials,
"streams-demo-worker")

我在 Scala 应用程序中遵循了类似的模式,首先从 Dynamo 表中查找当前 ARN:

lazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn

然后使用提供的 ARN 创建 KinesisClientLibConfiguration:

lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
"testProcess",
streamArn,
defaultProviderChain,
"testWorker"
).withMaxRecords(1000)
.withRegionName("eu-west-1")
.withMetricsLevel(MetricsLevel.NONE)
.withIdleTimeBetweenReadsInMillis(500)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

我已验证提供的流 ARN,所有内容均与我在 AWS 控制台中看到的内容相符。

在运行时,我最终收到一个异常,指出提供的 ARN 不是有效的流名称:

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at
'streamName' failed to satisfy constraint: Member must satisfy regular
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code:
400; Error Code: ValidationException; Request ID: )

查看 KinesisClientLibConfiguration 上提供的文档,这确实有意义,因为第二个参数被列为 streamName,而没有提及 ARN。

我似乎在 KinesisClientLibConfiguration 上找不到与 ARN 相关的任何内容。由于我使用的是 DynamoDB 流而不是 Kinesis 流,我也不确定如何找到我的流名称。

此时,我不确定已发布的 AWS 示例中缺少什么,看起来他们可能正在使用更旧版本的 KCL。我正在使用 amazon-kinesis-client 版本 1.7.0。

最佳答案

问题实际上最终超出了我的 KinesisClientLibConfiguration 范围。

通过使用相同的配置并提供 DynamoDB 流适配器库中包含的流适配器以及 DynamoDB 和 CloudWatch 的客户端,我能够解决此问题。

我的工作解决方案现在看起来像这样。

定义 Kinesis 客户端配置。

//Kinesis config for DynamoDB streams
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
getClass.getName, //DynamoDB shard lease table name
streamArn, //pulled from the dynamo table at runtime
dynamoCredentials, //DefaultAWSCredentialsProviderChain
KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

定义流适配器和 CloudWatch 客户端

val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
streamAdapterClient.setRegion(region)

val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
cloudWatchClient.setRegion(region)

创建 RecordProcessorFactory 的实例,由您定义一个实现 KCL 提供的 IRecordProcessorFactory 和返回的 IRecordProcessor 的类。

val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)

而我所缺少的部分,所有这些都需要提供给您的工作人员。

val worker :Worker =
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(kinesisConfig)
.kinesisClient(streamAdapterClient)
.dynamoDBClient(dynamoClient)
.cloudWatchClient(cloudWatchClient)
.build()

//this will start record processing
streamExecutorService.submit(worker)

关于java - 使用 AWS Java DynamoDB 流 Kinesis 适配器处理 DynamoDB 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40060827/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com