gpt4 book ai didi

python - 如何在特定时间退出生成器?

转载 作者:太空宇宙 更新时间:2023-11-03 10:56:10 25 4
gpt4 key购买 nike

我正在阅读来自 Twitter Streaming API 的推文。连接到 API 后,我得到了一个生成器。

我正在遍历收到的每条推文,但我想退出迭代器,比如说,在晚上 18 点。收到每条推文后,我会检查它是否晚于指定的时间戳并停止。

问题是我接收推文的频率不够高。所以,我可以在 17:50 收到一个,下一个在晚上 19​​ 点收到。那时我会发现时间已经过去,我需要停下来。

有没有办法在晚上 18 点强制停止?

这是我的代码的高级 View :

def getStream(tweet_iter):
for tweet in tweet_iter:
#do stuff
if time_has_passed():
return

tweet_iter = ConnectAndGetStream()
getStream(tweet_iter)

最佳答案

为生产者创建一个单独的线程并使用Queue沟通。我还必须使用 threading.Event用于停止生产者。

import itertools, queue, threading, time

END_TIME = time.time() + 5 # run for ~5 seconds

def time_left():
return END_TIME - time.time()

def ConnectAndGetStream(): # stub for the real thing
for i in itertools.count():
time.sleep(1)
yield "tweet {}".format(i)

def producer(tweets_queue, the_end): # producer
it = ConnectAndGetStream()
while not the_end.is_set():
tweets_queue.put(next(it))

def getStream(tweets_queue, the_end): # consumer
try:
while True:
tweet = tweets_queue.get(timeout=time_left())
print('Got', tweet)
except queue.Empty:
print('THE END')
the_end.set()

tweets_queue = queue.Queue() # you might wanna use the maxsize parameter
the_end = threading.Event()
producer_thread = threading.Thread(target=producer,
args=(tweets_queue, the_end))
producer_thread.start()
getStream(tweets_queue, the_end)
producer_thread.join()

关于python - 如何在特定时间退出生成器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40544666/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com