gpt4 book ai didi

java - kinesis从多个分片获取数据

转载 作者:行者123 更新时间:2023-11-30 07:23:25 25 4
gpt4 key购买 nike

我正在尝试构建一个从 AWS Kinesis 读取数据的简单应用程序。我已经设法使用单个分片读取数据,但我想从 4 个不同的分片获取数据。

问题是,我有一个 while 循环,只要分片处于 Activity 状态,它就会进行迭代,这会阻止我从不同分片读取数据。到目前为止,我找不到替代算法,也无法实现基于 KCL 的解决方案。非常感谢提前

public static void DoSomething() {
AmazonKinesisClient client = new AmazonKinesisClient();
//noinspection deprecation
client.setEndpoint(endpoint, serviceName, regionId);
/** get shards from the stream using describe stream method*/

DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
List<Shard> shards = new ArrayList<>();
String exclusiveStartShardId = null;
do {
describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
shards.addAll(describeStreamResult.getStreamDescription().getShards());
if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
} else {
exclusiveStartShardId = null;
}
}while (exclusiveStartShardId != null);

/** shards obtained */
String shardIterator;

GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shards.get(0).getShardId());
getShardIteratorRequest.setShardIteratorType("LATEST");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();

while (!shardIterator.equals(null)) {
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(250);
GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();

shardIterator = getRecordsResult.getNextShardIterator();
if(records.size()!=0) {
for(Record r : records) {
System.out.println(r.getPartitionKey());
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {

}
}
}

最佳答案

建议您不要从多个分片中的单个进程/工作线程中读取数据。首先,正如您所看到的,它增加了代码的复杂性,但更重要的是,您将遇到扩展问题。

可扩展性的“ secret ”是拥有小型且独立的工作人员或其他此类单位。您可以在 AWS 中的 Hadoop、DynamoDB 或 Kinesis 中看到此类设计。它允许您构建小型系统(微服务),可以根据需要轻松扩展和缩小。随着您的服务变得更加成功,或者其使用情况发生其他波动,您可以轻松添加更多工作/数据单元。

正如您在这些 AWS 服务中所看到的,您有时可以在 DynamoDB 中自动获得这种可扩展性,有时您需要将分片添加到您的 kinesis 流中。但对于您的应用程序,您需要以某种方式控制您的可扩展性。

对于 Kinesis,您可以使用 AWS Lambda 或 Kinesis 客户端库 (KCL) 进行扩展和缩减。他们都在监听流的状态(分片和事件的数量),并使用它来添加或删除工作人员并传递事件供他们处理。在这两种解决方案中,您都应该构建一个针对单个分片工作的工作线程。

如果您需要对齐来自多个分片的事件,您可以使用某些状态服务(例如 Redis 或 DynamoDB)来实现。

关于java - kinesis从多个分片获取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37162901/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com