gpt4 book ai didi

java - Amazon Kinesis getRecord() 结果不匹配

转载 作者:行者123 更新时间:2023-12-01 12:15:28 24 4
gpt4 key购买 nike

我刚开始使用 Kinesis,其 API 可用 here

我用这个来推送 100 条记录 动能

for (int j = 0; j < 100; j++) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
putRecordRequest.setData(ByteBuffer.wrap(data.getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", j));
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
+ ", ShardID : " + putRecordResult.getShardId() + ", Sequence No : "+ putRecordResult.getSequenceNumber());
}

现在我想获取被推送的记录数。为此,我正在使用它:
Iterator<Shard> shardIterator = getTotalShardsIterator();//Implemented and giving perfectly all the shards.....

现在使用上面的迭代器,我得到的计数为:
.....
while (shardIterator.hasNext()) {
Shard shard = shardIterator.next();
String shardId = shard.getShardId();
int datacount = getDataCount(shardId, myStreamName);
totalStreamDataCount+= datacount;
System.out.println("Data Count for Shard " + shardId + " is : " + datacount);
}
.....

这是我的函数 getDataCount(shardId, myStreamName)
 public static int getDataCount(String shardId, String streamName) {
int dataCount = 0;
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(ShardIteratorType.TRIM_HORIZON);


GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(1000);

GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
if(!records.isEmpty() && records.size() > 0){
dataCount = records.size();
Iterator<Record> iterator = records.iterator();
while(iterator.hasNext()) {
Record record = iterator.next();
byte[] bytes = record.getData().array();
String recordData = new String(bytes);
System.out.println("Shard Id. :"+shardId+"Seq. No. is : "+" Record data :"+recordData);
}
}


return dataCount;
}

但是这段代码每次运行时都会给出不匹配的结果,就像有时它显示 81 有时 91

请对此有所了解..:)

最佳答案

推送到 kinesis 的记录有限制(数据/秒),因此如果超过此限制,记录可能会失败(速度取决于使用的分片数量)。
现在,您可以使用 kinesis API 获取失败的记录计数,并将它们再次推送到 kinesis。

List<CompletionStage<PutRecordsResponse>> putRecordResponseCompletionStage = <PutRecordRequest call with Stream Name , Partition key and Data>;

AtomicInteger failedCount = new AtomicInteger();
AtomicInteger recordsCount = new AtomicInteger();

int loopCount = 0;
for (CompletionStage<PutRecordsResponse> stage : putRecordResponseCompletionStage) {
loopCount++;
try {

PutRecordsResponse response = stage.toCompletableFuture().get();
failedCount.addAndGet(response.failedRecordCount());
recordsCount.addAndGet(response.records().size());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

}


Check example [here](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)

关于java - Amazon Kinesis getRecord() 结果不匹配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25439932/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com