gpt4 book ai didi

python - 混合 asyncio 和 Kivy : How to start the asyncio loop and the Kivy application at the same time?

转载 作者:太空宇宙 更新时间:2023-11-04 00:25:03 28 4
gpt4 key购买 nike

迷失在异步中。

我同时在学习Kivy和asyncio,卡在了解决运行Kivy和运行asyncio循环的问题上,无论怎么转,都是阻塞调用,需要顺序执行(好吧,我希望我是错的),例如

loop = asyncio.get_event_loop()
loop.call_soon(MyAsyncApp().run())
loop.run_forever()

我当前的尝试导致应用程序启动,但没有协程正在执行,例如当我单击“连接”按钮时,我应该使用 loop.call_soon 开始安排和执行任务,但没有任何反应。

有人可以看看我的代码并提出解决问题的正确方法吗?


import asyncio
import random
import time
from kivy.app import App
from kivy.lang import Builder

ui = Builder.load_string('''
BoxLayout:
orientation: 'vertical'
GridLayout:
rows: 2
cols: 2
Label:
text: 'Status:'
size_hint: 0.3, 1
Label:
id: status
text: ''
Label:
text: 'Data:'
size_hint: 0.7, 1
Label:
id: data
text: ''
BoxLayout:
direction: 'horizontal'
Button:
text: 'Get Data'
on_press: app.connect()
Button:
text: 'Stop Data'
on_press: pass
''')

class MyAsyncApp(App):

def __init__(self):
super(self.__class__, self).__init__()

self.x_connected = None
self.x_widget_data = None
self.x_widget_status = None
self.x_loop = asyncio.get_event_loop()

def build(self):
return ui

def connect(self):
# Get widget
self.x_widget_status = self.root.ids.status

# Update status
self.x_widget_status.text = 'Preparing to connect...'

# Connect using asyncio
# --> But the loop must be already running <---
self.x_loop.call_soon(self.do_connect)

async def do_connect(self):
# Connect asynchronously

# Get widget
self.x_widget_data = self.root.ids.data

# Update status
self.x_connected = False
self.x_widget_status.text = 'Connecting...'

# Perform actual actions
try:
result = await self.feed_kivy()
if result:
self.x_widget_status.text = 'Service not available: ' + result
return
except Exception as e:
self.x_widget_status.text = 'Error while connecting'
return

# Update status
self.x_connected = True
self.x_widget_status.text = 'Connected'

async def feed_kivy(self):
# Deliver fresh data at random interval

# Some slow process to get data
result = await asyncio.sleep(random.randint(1, 5), time.time())
self.x_widget_data.text = result

# Reschedule ourselves
await self.x_loop.call_soon(self.feed_kivy())


def main():
# If loop started here, app is never started
loop = asyncio.get_event_loop()
loop.call_soon(MyAsyncApp().run())
loop.run_forever()
loop.close()


if __name__ == '__main__':
main()

最佳答案

My current attempt results in the application being launched, but no coroutine is being executed

发生这种情况是因为 MyAsyncApp().run() 阻止了执行流,并且控制永远不会返回到 asyncio 的事件循环。这就是所有事件循环的工作方式。

与其尝试手动交叉两个循环,更短的方法是使用现有尝试:

https://github.com/kivy/kivy/pull/5241

此 PR 来自 Kivy 的一位开发人员,包含带有解释和使用示例的工作实现。

但是它还没有合并到 master 中:你需要用这个 PR 手动构建 Kivy。

关于python - 混合 asyncio 和 Kivy : How to start the asyncio loop and the Kivy application at the same time?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47815647/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com