gpt4 book ai didi

python - Python-调用函数和继续的主程序

转载 作者:太空宇宙 更新时间:2023-11-03 20:39:15 25 4
gpt4 key购买 nike

我正在以设定的频率(例如8hz)收集数据,该数据会被修改,存储并偶尔发送出去以进行写入。

由于流式传输/写入数据,我遇到了时序问题。程序写入数据时(每5秒一次)花费的时间超过1 / 8hz(0.125s)。这会延迟我的数据采集时间。

我想做的是调用我的写函数并使它运行,但也允许我的主程序继续运行,以使计时没有延迟。

我尝试使用几种不同的方法,但是运气不好:线程,多处理和异步。我很可能会错误地使用它们。

我正在做的一个非常简化的版本:

    def main():
while True:
curTime = datetime.datetime.now()
while curTime < nextTime:
continue
data = collectData() #collect data (serial port, tcp, etc.)
pdata = processData(data) #process data
hdata = holdData(hdata) #store data stream for occasional writing

if len(hdata) > 8*5:
writeData(hdata) #send data to be written - takes too long and causes delay in next sample > 0.125s from previous.


nextTime = curTime + datetime.timedelta(microsecond = 125000) #adjust next time for measurement - 0.125s after last time data was collected.



在上面的代码中。我想调用writeData并让该函数完成任务,但是让我的主要函数继续前进并收集更多数据。假设它比我的写入间隔快,writeData可以花任意时间。目前是。

我正在使用python3。

希望这些信息足以为您提供一些指导。

任何帮助深表感谢。

最佳答案

您正在尝试使用异步编程来解决问题的正确方法。 Python中的异步编程本身就很棘手,因为使用线程(threading),进程(multiprocessing)或协程(asyncio)实现的并发性存在主要差异。没有“正确”的方法,您选择最适合当前用例的方法。

您的问题既有IO绑定(数据获取和写入)任务,也有CPU绑定(数据处理)任务,它们可以并行运行。这是您的操作方法。也许这不是最优雅的解决方案,但是它将向您展示如何解决此类问题的想法。

在我们的解决方案中,我们将线程用于IO绑定任务,而进程则用于CPU绑定任务。就个人而言,我更愿意使用线程来完成所有任务,但是在这种情况下,由于GIL的原因,我们将无法释放现代多核CPU的所有功能来并行化数据处理。

首先,让我们在可执行脚本中导入所需的模块:

import time
import random
import signal
from threading import Thread
from multiprocessing.pool import Pool
from queue import Queue, Empty


我们解决的问题是 producer-consumer问题。主线程以固定的时间间隔获取数据并将其放入队列。处理器线程从队列中获取数据,并将其提交给工作人员池进行处理,然后收集结果并将其放入另一个队列。写入线程不断读取此队列,最终保存数据。现在,我们添加一些常量-并行运行的多个工作进程以及以秒为单位的数据获取间隔:

WORKERS = 4
FETCH_INTERVAL = 1


以下是负责无限循环中每 FETCH_INTERVAL秒获取数据的主线程:

def main():
raw_data = Queue()
processor = Thread(target=process, args=(raw_data,))
processor.start()
i = 0

try:

while True:
t_fetch = time.time()

# Simulate the data fetching:
time.sleep(0.5)
data = i, random.random()
print("[main] Fetched raw data:", data)

raw_data.put(data)
t_elapsed = time.time() - t_fetch

if t_elapsed < FETCH_INTERVAL:
time.sleep(FETCH_INTERVAL - t_elapsed)
else:
print("[error] The fetch interval is too short!")

i = i + 1

except KeyboardInterrupt:
print("shutting down...")
finally:
raw_data.put(None)
processor.join()

if __name__ == "__main__":
main()


我们先定义一个 raw_data队列,该队列将存储获取的数据,然后开始一个 processor线程,该线程运行一个 process函数,该函数将 raw_data队列作为其参数。请注意,我们不仅在每次获取数据后都在 FETCH_INTERVAL秒内休眠,而且还要考虑到由于数据获取而导致的延迟,因为这也是一项与IO绑定的任务。该脚本将无限期运行,直到按下 Ctrl-C为止。一旦中断,我们将 None放入队列以向线程发送信号,表明处理已结束,并等待 processor线程完成。现在,我们添加由 process线程运行的 processor函数的定义:

def process(raw_data):
proc_data = Queue()
writer = Thread(target=write, args=(proc_data,))
writer.start()

with Pool(WORKERS, init_worker) as pool:

while True:
data_batch = dequeue_data(raw_data, batch_size=WORKERS)

if not data_batch:
time.sleep(0.5)
continue

results = pool.map(process_data, data_batch)
print("[processor] Processed raw data:", results)

for r in results:
proc_data.put(r)

if None in data_batch:
break

print("joining the writer thread...")
writer.join()


在这里,我们创建一个 proc_data队列,该队列将保存 writer线程的数据处理结果。 writer线程运行一个 write函数,稍后我们将对其进行定义。一旦启动 writer线程,我们将创建 poolWORKERS进程。在这里,我们使用 init_worker函数作为 Pool进程初始化程序,以忽略工作进程在主线程中处理时的键盘中断:

def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)


创建进程池后,我们将进入无限循环,方法是通过调用下面将定义的 raw_data函数不断地使 dequeue_data队列中的数据批次出队。然后将数据批处理提交给工作池进行处理。 process_data函数将在下面定义。然后,我们收集结果并将其放入 proc_data队列,该队列由 writer线程读取。如果数据批中有 None,则处理会中断,我们等待 writer线程完成。 dequeue_data函数的定义如下:

def dequeue_data(data_queue, batch_size):
items = []

for _ in range(batch_size):
try:
item = data_queue.get(block=False)
except (KeyboardInterrupt, Empty):
break

items.append(item)

return items


在这里,您看到它只是尝试从 batch_size获取并返回最多 data_queue数据点。如果没有数据,则返回一个空列表。 process_data函数什么都不做,只能休眠1-5秒:

def process_data(data):

if data is None:
return

# Simulate the data processing:
time.sleep(random.randint(1, 5))

return data


最后,我们定义在 write线程中运行的 writer函数:

def write(proc_data):

while True:
data = proc_data.get()

if data is None:
break

# Simulate the data writing:
time.sleep(random.randint(1, 2))
print("[writer] Wrote processed data:", data)


一旦从 None队列获取 proc_data,无限循环就会停止。现在,我们将所有提供的代码保存在一个脚本中,然后运行并检查其输出:

[main] Fetched raw data: (0, 0.8092310624924178)
[main] Fetched raw data: (1, 0.8594148294409398)
[main] Fetched raw data: (2, 0.9059856675215566)
[main] Fetched raw data: (3, 0.5653361157057876)
[main] Fetched raw data: (4, 0.8966396309003691)
[main] Fetched raw data: (5, 0.5772344067614918)
[processor] Processed raw data: [(0, 0.8092310624924178)]
[main] Fetched raw data: (6, 0.4614411399877961)
^Cshutting down...
[writer] Wrote processed data: (0, 0.8092310624924178)
[processor] Processed raw data: [(1, 0.8594148294409398), (2, 0.9059856675215566), (3, 0.5653361157057876), (4, 0.8966396309003691)]
[writer] Wrote processed data: (1, 0.8594148294409398)
[writer] Wrote processed data: (2, 0.9059856675215566)
[processor] Processed raw data: [(5, 0.5772344067614918), (6, 0.4614411399877961), None]
joining the writer thread...
[writer] Wrote processed data: (3, 0.5653361157057876)
[writer] Wrote processed data: (4, 0.8966396309003691)
[writer] Wrote processed data: (5, 0.5772344067614918)
[writer] Wrote processed data: (6, 0.4614411399877961)


main线程以固定的时间间隔获取数据,而 processor并行地批量处理数据,而 writer保存结果。一旦我们点击 Ctrl-Cmain线程就停止获取数据,然后 processor线程就完成了对其余获取数据的处理,并开始等待 writer线程完成将数据写入磁盘。

关于python - Python-调用函数和继续的主程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56959038/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com