gpt4 book ai didi

python - 如何使此脚本由消息触发实时运行

转载 作者:行者123 更新时间:2023-12-05 06:52:50 26 4
gpt4 key购买 nike

我有这个脚本可以通过 Python 将消息从 Telegram 上的一个 channel 复制到另一个 channel 。这目前将运行一次,复制所有丢失的消息,然后完成。我如何修改它以间隔运行或在 channel 中出现新消息时运行?

我还有其他要使用的文件,例如带有 API key / channel 的 API 配置文件等。

即使有人可以帮助我指明正确的方向,我也会深入研究。我对 Python 不是很熟悉。


import asyncio
import logging

from telethon.tl.patched import MessageService
from telethon.errors.rpcerrorlist import FloodWaitError
from telethon import TelegramClient
from telethon.sessions import StringSession
from settings import API_ID, API_HASH, forwards, get_forward, update_offset, STRING_SESSION


logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)

SENT_VIA = f'\n__Sent via__ `{str(__file__)}`'


def intify(string):
try:
return int(string)
except:
return string


async def forward_job():
''' the function that does the job 😂 '''
if STRING_SESSION:
session = StringSession(STRING_SESSION)
else:
session = 'forwarder'

async with TelegramClient(session, API_ID, API_HASH) as client:

confirm = ''' IMPORTANT 🛑
Are you sure that your `config.ini` is correct ?

You can run the `get_chat_info.py` script to confirm the `from` and `to`.

Press [ENTER] to continue:
'''

input(confirm)

error_occured = False
for forward in forwards:
from_chat, to_chat, offset = get_forward(forward)

if not offset:
offset = 0

last_id = 0

async for message in client.iter_messages(intify(from_chat), reverse=True, offset_id=offset):
if isinstance(message, MessageService):
continue
try:
await client.send_message(intify(to_chat), message)
last_id = str(message.id)
logging.info('forwarding message with id = %s', last_id)
update_offset(forward, last_id)

except FloodWaitError as fwe:
print(f'\n{fwe}\n\nRun the script again after some time. \
FloodWaitError Occured')
quit()
except Exception as err:
logging.exception(err)
error_occured = True
continue



if __name__ == "__main__":
assert forwards
asyncio.run(forward_job())

最佳答案

你需要调查Updates文档。使用它,您将能够在代码中获取新消息。

你还需要改变你的处理方式

看起来类似于:

from telethon import TelegramClient, events


client = TelegramClient(session, API_ID, API_HASH)

database = {}
for forward in forwards
from_chat, to_chat, offset = get_forward(forward)
database[from_chat] = {"to_chat": to_chat, "offset": offset}


@client.on(events.NewMessage)
async def my_event_handler(event):
to_chat = database.get(event.chat_id)
if to_chat:
await client.send_message(to_chat, event.message)
last_id = message.id
database[event.chat_id] = last_id
logging.info('forwarding message with id = %s', last_id)


client.start()

# TODO: do `iter_messages` initial or while script was not running

client.run_until_disconnected()

或者如果你想定期调用它,你可以:

  • 在循环中使用 iter_messages 包装方法
    while True:
    # TODO:
    # do_iter_messages__and__forwarding()

    await asyncio.sleep(10)
  • 或在延迟一段时间后再次从该方法调用带有 iter_messages 的方法
    def do_iter_messages__and__forwarding():
    # TODO:
    # for message in client.iter_messages(...):
    # do_message_forward(...)

    await asyncio.sleep(10)
    do_iter_messages__and__forwarding()



关于python - 如何使此脚本由消息触发实时运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65907713/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com