gpt4 book ai didi

python-asyncio - 如何将自定义任务集成到 aiogram 执行器中?

转载 作者:行者123 更新时间:2023-12-04 16:00:33 24 4
gpt4 key购买 nike

import asyncio
from threading import Thread
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = ''

bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)

chat_ids = {}

@dp.message_handler()
async def echo(message: types.Message):
# old style:
# await bot.send_message(message.chat.id, message.text)

chat_ids[message.message_id] = message.from_user
text = f'{message.message_id} {message.from_user} {message.text}'
await message.reply(text, reply=False)


async def periodic(sleep_for, queue):
while True:
await asyncio.sleep(sleep_for)
now = datetime.utcnow()
print(f"{now}")
for id in chat_ids:
queue.put_nowait((id, f"{now}"))
# await bot.send_message(id, f"{now}", disable_notification=True)


def run_tick(queue):
newloop = asyncio.new_event_loop()
asyncio.set_event_loop(newloop)
asyncio.run(periodic(3, queue))


if __name__ == '__main__':
queue = asyncio.Queue()
Thread(target=run_tick, args=(queue,), daemon=True).start()
executor.start_polling(dp, skip_updates=True)

我想在有事件但暂时失败时通过 bot.send_message 向注册用户发送消息。
这是我尝试过的。
  • bot.send_message 崩溃,因为它是从另一个线程调用的。 (超时上下文管理器应该在任务中使用)
  • 所以,我试图通过使用队列来解决这个问题,但是没有办法将我自己的任务添加到执行程序中。

  • 有没有简单的方法可以做到这一点?

    编辑:2020-1-3

    这是@user4815162342 的工作示例。

    import asyncio
    from datetime import datetime
    from aiogram import Bot, Dispatcher, executor, types

    API_TOKEN = ''

    bot = Bot(token=API_TOKEN)
    dp = Dispatcher(bot)

    chat_ids = {}

    @dp.message_handler()
    async def echo(message: types.Message):
    chat_ids[message.from_user.id] = message.from_user
    text = f'{message.message_id} {message.from_user} {message.text}'
    await message.reply(text, reply=False)

    async def periodic(sleep_for):
    while True:
    await asyncio.sleep(sleep_for)
    now = datetime.utcnow()
    print(f"{now}")
    for id in chat_ids:
    await bot.send_message(id, f"{now}", disable_notification=True)

    if __name__ == '__main__':
    dp.loop.create_task(periodic(10))
    executor.start_polling(dp)

    最佳答案

    最初的问题是您试图从不同的线程调用 asyncio 代码。为了修复由此产生的错误,您创建了一个新的事件循环,同时保留了额外的线程。俗话说,现在你有两个问题。

    队列的想法看起来还没有完成,因为没有从队列中读取的代码;即使有,它也不会工作,因为 asyncio 队列不是为了在事件循环或线程之间共享而设计的。为了解决这个问题,您需要找到一种方法从事件循环中运行定期更新,即重新检查这个假设:

    but there is no way to add my own task into executor.



    看着 sourceExecutor ,它似乎从调度程序中获取事件循环,调度程序将其保存在可公开访问的 loop 中。属性。这意味着您只需调用 create_task 即可创建任务。该循环上的方法。例如:

    if __name__ == '__main__':
    dp.loop.create_task(periodic())
    executor.start_polling(dp, skip_updates=True)

    现在 periodic可以在您最初的尝试中制定:

    async def periodic(sleep_for, queue):
    while True:
    await asyncio.sleep(sleep_for)
    now = datetime.utcnow()
    for id in chat_ids:
    await bot.send_message(id, f"{now}",
    disable_notification=True)

    请注意,我没有对此进行测试,因为我不使用 aiogram .您可能需要解决的一个潜在问题是您的 chat_ids dict 似乎包含 message.message_id作为键,而 bot.send_message接受 message.chat.id .

    关于python-asyncio - 如何将自定义任务集成到 aiogram 执行器中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59502355/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com