gpt4 book ai didi

python - 在后台运行特定的 Django 脚本

转载 作者:行者123 更新时间:2023-12-01 04:20:53 25 4
gpt4 key购买 nike

我正在开发 Gmail 提取应用程序并使用 Gmail API 从服务器获取邮件。问题在于,即使我在后端框架中使用了线程,邮件的获取时间也太大了。现在我要实现一项功能,建议用户选择批量下载,“一旦您的下载准备就绪,我们就会向您发送邮件”,但为此我想在应用程序中运行下面提到的 download.py树在后台,一旦获取结束,它将被终止。在代码的最底部,我想向用户发送邮件,告知他们的下载已准备就绪,但尽管我已在 settings.py 中定义了邮件服务器,但它无法正常工作。

下载.py

import httplib2, base64
from stripogram import html2text
from oauth2client.django_orm import Storage
from apiclient.discovery import build
from oauth2client import client

from django.contrib.auth.models import User
from .models import CredentialsModel
from django.conf import settings

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import authentication, permissions
from gextracto import models
from gextracto.models import UserData
from django.core.mail import EmailMessage
from django.core import mail

connection = mail.get_connection()



class ListMails(APIView):
"""
Gets a list of a specified number mail ids for a particular label
Extracts the email in the form of plain/text
The API returns all the extracted mails
"""

authentication_classes = (authentication.SessionAuthentication,)
permission_classes = (permissions.IsAuthenticated,)



def extract_headers(self, message):
"""
Extract the headers for a single mail and returns it
{To, From, Subject}
"""

needed_fields = ('From', 'To', 'Subject')
return {i['name']:i['value'] for i in message['payload']['headers'] if i['name'] in needed_fields}





def get_message_body(self, message):
"""
Get the body of an email
Recursively look for the body for different mimetypes
Returns the body as text/plain
"""

if 'payload' in message:
return self.get_message_body(message['payload'])

elif 'parts' in message:
return self.get_message_body(message['parts'][0])

else:
data = base64.urlsafe_b64decode(message['body']['data'].encode('ASCII'))
markdown_data = html2text(data)#.decode('utf-8', "replace")
data = data.replace("\n", "<br/>")

# return {markdown, html}
return {'markdown':unicode( markdown_data,"ISO-8859-1"), 'html':unicode(data,"ISO-8859-1")} if markdown_data else {'html':unicode(data,"ISO-8859-1")}




def message_content_html(self, userId, message_id, service):
"""
Make queries to get the content for a mail given its message id
Returns all the content
"""

content = {'id':message_id}
# try
message = service.users().messages().get(userId=userId, id=message_id).execute()

mimetype = message['payload']['mimeType']

if mimetype == 'text/html':
return {}
#
else:
body = self.get_message_body(message)
if body == "":
body = "<empty message>"

headers = self.extract_headers(message)
content['body'] = body
content.update(headers)

return content




def collect_mails(self, user, messages, service):
"""
Collect the content for all the mails currently downloaded
"""

all_messages = []
try:
for message in messages:
content = self.message_content_html(user.username, message['id'], service)
if content:
all_messages.append(content)
return all_messages

# return empty list if no messages were downloaded
except KeyError:
return []


def get(self, request, format=None):
"""
Handles the GET request to get all the mails for a label
Paginages through the GAPI content if required
API returns all the messages
{To, From, Subject, body}
"""

user = request.user
storage = Storage(CredentialsModel, 'id', user, 'credential')
credentials = storage.get()
http_auth = credentials.authorize(httplib2.Http())
service = build('gmail', 'v1', http=http_auth)
user_Id = user.username
label_id = request.GET['label']

# try
# call Google API with a request to get a list of all the labels
response = service.users().messages().list(userId=user_Id, labelIds=label_id, maxResults=100).execute()

all_messages = self.collect_mails(user, response['messages'], service)


if not all_messages:
return Response([])


else:
if 'nextPageToken' in response:
page_token_flag = True

# request more more mails if the download limit has not yet been satisfied
while(page_token_flag):
response = service.users().messages().list(userId=user_Id, pageToken=response['nextPageToken'], maxResults=100).execute()
all_messages.append(self.collect_mails(user, response['messages'], service))
print(all_messages)
#for x in range(0,len(all_messages)):
#b=all_messages[10]
#instance= UserData(user_id=user ,label=label_id, sender = b['From'] , subject=b['Subject'] , body=b['body'])
#instance.save()
page_token_flag = 'nextPageToken' in response

##
for x in range(0,len(all_messages)):
b=all_messages[10]
instance= UserData(user_id=user ,label=label_id, sender = b['From'] , subject=b['Subject'] , body=b['body'])
instance.save()
print ("Hi i am here!!!")
email = EmailMessage('Your Download Ready!', 'http://127.0.0.1:8000/admin/gextracto/userdata/', to=[user], connection=connection)
email.send()
connection.close()
return Response(all_messages)

请告诉我如何在后台运行它。如果需要任何其他信息,请询问。谢谢

最佳答案

不知道确切的要求,但我会考虑使用 Celery 来运行后台任务。这种方法允许以 native Django 方式管理所有后脚本事件。

您还可以考虑使用 cron(作为 manage.py 命令)运行 Django 脚本 - 但它可能会导致一些限制。

发送电子邮件失败怎么办 - 相信,发送电子邮件后您不需要关闭连接。通常我使用 send_mail()/send_mass_mail() 函数 - 请检查他们的代码以获得想法。

关于python - 在后台运行特定的 Django 脚本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33706936/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com