gpt4 book ai didi

python - 类型错误 : can't pickle coroutine objects when i am using asyncio loop. run_in_executor()

转载 作者:行者123 更新时间:2023-12-05 02:02:56 27 4
gpt4 key购买 nike

我指的是this repo是为了让mmaction2 grad-cam demo从短视频离线推理适配到长视频在线推理。脚本如下所示:

注意:为了使这个脚本可以很容易地重现,我注释掉了一些需要很多依赖的代码。

import cv2
import numpy as np
import torchvision.transforms as transforms
import sys
from PIL import Image
#from mmaction.apis import init_recognizer
#from utils.gradcam_utils import GradCAM
import torch
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
# sys.path.append('./utils')


async def preprocess_img(arr):
image = Image.fromarray(np.uint8(arr))
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
transform = transforms.Compose([
transforms.Resize((model_input_height, model_input_width)),
transforms.ToTensor(),
transforms.Normalize(mean, std, inplace=False),
])
normalized_img = transform(image)
img_np = normalized_img.numpy()
return img_np


async def inference(frame_buffer):
print("starting inference")
# inputs = {}
# input_tensor = torch.from_numpy(frame_buffer).type(torch.FloatTensor)
# input_cuda_tensor = input_tensor.cuda()
# inputs['imgs'] = input_cuda_tensor
# results = gradcam(inputs)
# display_buffer = np.squeeze(results[0].cpu().detach().numpy(), axis=0)
# return display_buffer


async def run_blocking_func(loop_, queue_, frame_buffer):
with ProcessPoolExecutor() as pool:
blocking_func = partial(inference, frame_buffer)
frame = await loop_.run_in_executor(pool, blocking_func)
print(frame)
await queue_.put(frame)
await asyncio.sleep(0.01)

async def get_frames(capture):
capture.grab()
ret, frame = capture.retrieve()
if not ret:
print("empty frame")
return
for i in range(32):
img = await preprocess_img(frame)
expandimg = np.expand_dims(img, axis=(0, 1, 3))
print(f'expandimg.shape{expandimg.shape}')
frame_buffer[:, :, :, i, :, :] = expandimg[:, :, :, 0, :, :]
return frame_buffer


async def show_frame(queue_: asyncio.LifoQueue):
display_buffer = await queue_.get()
for i in range(32):
blended_image = display_buffer[i, :, :, :]
cv2.imshow('Grad-CAM VIS', blended_image)
if cv2.waitKey(10) & 0xFF == ord('q'):
cap.release()
cv2.destroyAllWindows()
break


async def produce(loop_, queue_, cap):
while True:
frame_buffer = await asyncio.create_task(get_frames(cap))
# Apply Grad-CAM
display_buffer = await asyncio.create_task(run_blocking_func(loop_, queue_,frame_buffer))
await queue_.put(display_buffer)


async def consume(queue_):
while True:
if queue_.qsize():
task1 = asyncio.create_task(show_frame(queue_))
await asyncio.wait(task1)
if cv2.waitKey(1) == 27:
break
else:
await asyncio.sleep(0.01)


async def run(loop_, queue_, cap_):
producer_task = asyncio.create_task(produce(loop_, queue_, cap_))
consumer_task = asyncio.create_task(consume(queue_))
await asyncio.gather(producer_task, consumer_task)


if __name__ == '__main__':

# config = '/home/weidawang/Repo/mmaction2/configs/recognition/i3d/i3d_r50_video_inference_32x2x1_100e_kinetics400_rgb.py'
# checkpoint = '/home/weidawang/Repo/mmaction2/checkpoints/i3d_r50_video_32x2x1_100e_kinetics400_rgb_20200826-e31c6f52.pth'
# device = torch.device('cuda:0')
# model = init_recognizer(config, checkpoint, device=device, use_frames=False)
video_path = 'replace_with_your_video.mp4'
model_input_height = 256
model_input_width = 340
# target_layer_name = 'backbone/layer4/1/relu'
# gradcam = GradCAM(model, target_layer_name)

cap = cv2.VideoCapture(video_path)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) # float
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) # float

frame_buffer = np.zeros((1, 1, 3, 32, model_input_height, model_input_width))
display_buffer = np.zeros((32, model_input_height, model_input_width, 3)) # (32, 256, 340, 3)

loop = asyncio.get_event_loop()
queue = asyncio.LifoQueue(maxsize=2)

try:
loop.run_until_complete(run(loop_=loop, queue_=queue, cap_=cap))
finally:
print("shutdown service")
loop.close()

但是当我运行它时,它报告以下错误:

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
File "/home/weidawang/miniconda3/lib/python3.7/concurrent/futures/process.py", line 205, in _sendback_result
exception=exception))
File "/home/weidawang/miniconda3/lib/python3.7/multiprocessing/queues.py", line 358, in put
obj = _ForkingPickler.dumps(obj)
File "/home/weidawang/miniconda3/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle coroutine objects
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/home/weidawang/Repo/Python-AI-Action-Utils/temp2.py", line 120, in <module>
loop.run_until_complete(run(loop_=loop, queue_=queue, cap_=cap))
File "/home/weidawang/miniconda3/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
return future.result()
File "/home/weidawang/Repo/Python-AI-Action-Utils/temp2.py", line 94, in run
await asyncio.gather(producer_task, consumer_task)
File "/home/weidawang/Repo/Python-AI-Action-Utils/temp2.py", line 76, in produce
display_buffer = await asyncio.create_task(run_blocking_func(loop_, queue_,frame_buffer))
File "/home/weidawang/Repo/Python-AI-Action-Utils/temp2.py", line 42, in run_blocking_func
frame = await loop_.run_in_executor(pool, blocking_func)
TypeError: can't pickle coroutine objects
Task was destroyed but it is pending!
task: <Task pending coro=<consume() running at /home/weidawang/Repo/Python-AI-Action-Utils/temp2.py:88> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7cf1418cd0>()]> cb=[gather.<locals>._done_callback() at /home/weidawang/miniconda3/lib/python3.7/asyncio/tasks.py:691]>

Process finished with exit code 1

最佳答案

如果你使用run_in_executor,目标函数不应该是async。您需要在 def inference() 之前删除 async 关键字。

关于python - 类型错误 : can't pickle coroutine objects when i am using asyncio loop. run_in_executor(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65557258/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com