gpt4 book ai didi

python - Kinesis 消费者返回空记录(boto、python)

转载 作者:太空宇宙 更新时间:2023-11-04 02:26:32 25 4
gpt4 key购买 nike

我在检查写入 Kinesis 的数据时遇到问题。看起来下面的示例应该可以工作,但我从 get_records 返回一个空列表(在 Records 字段中)。知道会发生什么吗?

import uuid
import boto3
import time


streamname = 'mytestStream'
session = boto3.session.Session()
kinesis_client = session.client('kinesis', region_name='us-east-1')


##### WRITE TO KINESIS

partitionkey = str(uuid.uuid4())[:8]
put_response = kinesis_client.put_record(StreamName=streamname,Data='mytestdata',PartitionKey=partitionkey)

time.sleep(5)


##### READ FROM KINESIS

shard_id = kinesis_client.describe_stream(StreamName=streamname)['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=streamname, ShardId=shard_id, ShardIteratorType="LATEST")["ShardIterator"]
data_from_kinesis = kinesis_client.get_records(ShardIterator=shard_iterator)

谢谢!

最佳答案

如果你要使用最新的检查点,你应该首先开始读取流,然后放置记录。在您的示例中,时间线如下;

  • 在 t0:流中的最新检查点在 101。
  • 在 t1(主线程):您将记录放入流中,记录位于检查点 102。
  • 在 t2(主线程):您开始在最新点 103 跟踪流。

要解决这个问题,您应该在不同的线程中运行生产者和消费者。正确的流程应该是这样的;

  • 在 t0(消费者线程):开始在最新位置(即 201)拖尾流。
  • 在 t1(生产者线程):您将记录放入流中,并将记录放在检查点 202 上。
  • 在 t2(消费者线程):随着服务器端的分片向前移动(因为您刚刚添加了数据)并且您从检查点 201 开始一直跟踪分片,您迭代新的检查点 202 并显示您的数据。<

关于python - Kinesis 消费者返回空记录(boto、python),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50260854/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com