gpt4 book ai didi

python - Twitter、多进程和数据库

转载 作者:行者123 更新时间:2023-11-29 19:45:59 25 4
gpt4 key购买 nike

我是一个初学者,在 python/flask 中编写一个用于计划推文和自动转发的小型 Twitter 工具。

我遇到了后台运行进程的问题。

我希望预定的推文和转发能够在后台为给定用户同时工作。

我希望能够终止这些单独运行转发/预定推文的后台进程。

您将如何更改下面的代码来实现此目的?

如果您现在查看下面的代码,它可以工作,但用户无法同时运行计划的推文和转发。此外,如果用户决定终止其中一个进程,那么我们说转发另一个进程也会终止(预定的推文),反之亦然。

我想过将给定进程的标识数据放入数据库中,并在需要终止它时从数据库中调用该标识数据,而不是使用cookies session ,但我不知道如何实现这个想法在代码中。

import ........

mysql = MySQL()
app = Flask(__name__)
app.secret_key = 'xxx'

app.config['MYSQL_DATABASE_USER'] = 'xxx'
app.config['MYSQL_DATABASE_PASSWORD'] = 'xxx'
app.config['MYSQL_DATABASE_DB'] = 'xxx'
app.config['MYSQL_DATABASE_HOST'] = '0.0.0.0'
mysql.init_app(app)

@app.route('/showSignin')
def showSignin():
if session.get('user'):
return redirect('/userHome')
else:
return render_template('signin.html')

@app.route('/showscheduletweets')

def showscheduletweets():

if session.get('user'):
return render_template('scheduletweets.html')
else:
return render_template('signin.html')

@app.route('/validateLogin',methods=['POST'])
def validateLogin():
try:
_username = request.form['inputEmail']
_password = request.form['inputPassword']

# connect to mysql

con = mysql.connect()
cursor = con.cursor()
cursor.callproc('sp_validateLogin',(_username,))
data = cursor.fetchall()

if len(data) > 0:
if check_password_hash(str(data[0][3]),_password):
session['user'] = data[0][0]
consumerkey = data [0][4]
consumersecret = data [0][5]
accesstoken = data [0][6]
tokensecret = data [0][7]
twitter = Twython(consumerkey, consumersecret, accesstoken, tokensecret)
twitter.update_status(status="xxx says hello.")
return render_template('userHome.html')
else:
return render_template('error.html',error = 'Wrong Email address or Password.')
else:
return render_template('error.html',error = 'Wrong Email address or Password.')

except Exception as e:
return render_template('error.html',error = str(e))
finally:
cursor.close()
con.close()

#schedule tweets

@app.route('/scheduletweets',methods=['POST'])

def scheduletweets():
if session.get('user'):
_username = request.form['inputEmail']
con = mysql.connect()
cursor = con.cursor()
cursor.callproc('sp_GetTwitter', (_username,))
data = cursor.fetchall()

session['user'] = data[0][0]
consumerkey = data [0][4]
consumersecret = data [0][5]
accesstoken = data [0][6]
tokensecret = data [0][7]
twitter = Twython(consumerkey, consumersecret, accesstoken, tokensecret)

tweet1 = request.form['inputTweet1']
tweet2 = request.form['inputTweet2']
tweet3 = request.form['inputTweet3']
tweet4 = request.form['inputTweet4']
tweet5 = request.form['inputTweet5']
tweet6 = request.form['inputTweet6']

Hash1 = request.form['inputHash1']
Hash2 = request.form['inputHash2']
Hash3 = request.form['inputHash3']
Hash4 = request.form['inputHash4']

fruits = [Hash1, Hash2, Hash3, Hash4]



list = [tweet1, tweet2, tweet3, tweet4, tweet5, tweet6]
def workit():

while True:
try:
if len(list) > 0:
z = random.randint(1, len(fruits))
a = random.sample(fruits, z)


b=" ".join(str(x) for x in a)
toTweet = list[random.randint(0,len(list))-1] + " " + b

twitter.update_status(status=toTweet)
time.sleep(10)


else:
twitter.update_status(status="Oh dear... I'm afraid I'm rather empty =(")
break
except TwythonError as e:
print (e)


if 'work_process' not in session:
process = Process(target=workit)
process.start()
pid = process.pid
parent_pid = psutil.Process(process.pid).parent().pid
session['work_process'] = (parent_pid, pid)
return redirect('/showscheduletweets')
#retweets
@app.route('/retweet',methods=['POST'])
def retweet():
if session.get('user'):

_username = request.form['inputEmail']
con = mysql.connect()
cursor = con.cursor()
cursor.callproc('sp_GetTwitter', (_username,))
data = cursor.fetchall()

session['user'] = data[0][0]
consumerkey = data [0][4]
consumersecret = data [0][5]
accesstoken = data [0][6]
tokensecret = data [0][7]


Retweet1 = request.form['inputRetweet1']
Retweet2 = request.form['inputRetweet2']
Retweet3 = request.form['inputRetweet3']
Retweet4 = request.form['inputRetweet4']
Exclude1 = request.form['inputExclude1']
Exclude2 = request.form['inputExclude2']




def work():
twitter = Twython(consumerkey, consumersecret, accesstoken, tokensecret)
naughty_words = [Exclude1, Exclude2]
good_words = [Retweet1, Retweet2, Retweet3, Retweet4]
filter = " OR ".join(good_words)
blacklist = " -".join(naughty_words)
keywords = filter +" -"+ blacklist
print(keywords)
while True:
search_results = twitter.search(q=keywords, count=10)
try:
for tweet in search_results["statuses"]:
try:
twitter.retweet(id = tweet["id_str"])
time.sleep(60)
except TwythonError as e:
print (e)
except TwythonError as e:
print (e)

if 'work_process' not in session:
process = Process(target=work)
process.start()
pid = process.pid
parent_pid = psutil.Process(process.pid).parent().pid
session['work_process'] = (parent_pid, pid)
return redirect('/showretweet')

#terminating scheduled tweets and retweets
@app.route('/stoptweet', methods=['POST'])
def stoptweet():
if 'work_process' in session:
parent_pid, pid = session['work_process']
try:
process = psutil.Process(pid)
if process.parent().pid == parent_pid:
process.terminate()
except psutil.NoSuchProcess:
pass
session.pop('work_process')
return render_template('index.html')
else:
return render_template('index.html')

if __name__ == '__main__':
app.run(host=os.getenv('IP', '0.0.0.0'),port=int(os.getenv('PORT', xxx)))

最佳答案

您可能想使用celery python module ,并在后台工作时移动时间表推文和转发。

有关更多信息,请参阅文档:http://flask.pocoo.org/docs/0.11/patterns/celery/

你将装饰那些与celery相关的函数,而不是flask。

例如:

在您的脚本中:

import my_schedule_module

然后在my_schedule_module.py中:

from celery import Celery, Task
from celery.result import AsyncResult

from celery.task.base import periodic_task

import sqlite3 # Here I use sqlite, can be sql
import redis # Here I am using redis, you can use another db as well > check documentation

from datetime import timedelta # used to schedule your background jobs, see in configuration below


app_schedule = Celery('my_schedule_module')


'''
Celery Configuration
'''

# a mockup configuration of your background jobs, as example use retweet each 60s
app_schedule.conf.update(
CELERY_ACCEPT_CONTENT = ['application/json'],
CELERY_TASK_SERIALIZER='json',
# CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
# CELERY_TIMEZONE='Europe/Oslo',
# CELERY_ENABLE_UTC=True,
CELERYD_TASK_TIME_LIMIT = 600,
CELERYD_TASK_SOFT_TIME_LIMIT = 600,
CELERYD_MAX_TASKS_PER_CHILD = 1000,
CELERYD_OPTS="--time-limit=600 --concurrency=4",
BROKER_URL = 'redis://localhost:6379/0',
CELERY_RESULT_BACKEND = 'redis://localhost',
CELERYBEAT_SCHEDULE = {
'add-every-60-seconds': {
'task': 'my_schedule_module.retweet',
'schedule': timedelta(seconds=60)
},
}
)

@app_schedule.task()
def retweet(tweet):
# your tweet function

@app_schedule.task()
def scheduletweets():
# your background job
# pseudo code
tweets = get_tweets()
process_tweet_list = []
for tweet in tweets:
process_tweet_list.append( retweet.s(tweet) )
job = group(process_tweet_list) #group is celery.group, see documentation
result = job.apply_async() # process job list async
print 'result', result.ready(), result.successful()

您还可以使用回调函数 - 例如,您可能希望更新数据库中转发推文的日期时间。

在这种情况下,您的语法如下:

    result = my_schedule_module.retweet.apply_async( (tweet,) , link=my_schedule_module.callback_to_store_results_of_retweet.s())

关于python - Twitter、多进程和数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40974132/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com