gpt4 book ai didi

python - 如何在 flask 中一一处理请求?

转载 作者:行者123 更新时间:2023-12-03 01:20:05 27 4
gpt4 key购买 nike

我正在开发一个将数据发送到Flask后端的应用程序。然后 flask 将接收到的内容插入 flex 搜索中。在插入 flex 搜索之前,它将检查ID是否存在;如果ID存在,则将其更新,否则将插入索引。

样例代码:

from flask import Flask
from flask import jsonify, request
import jwt
from elasticsearch import Elasticsearch
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():
try:
id = request.form['id']
database = "sample"
es =Elasticsearch("localhost",port = 9200)
cols=es.search(index=database, body={ "query": { "match": { "id": id}}})
present =False
if cols['hits']['hits']:
x1=cols['hits']['hits'][0]['_source']
eid = cols['hits']['hits'][0]['_id']
present =True
if present == False:
newvalues = {"url":"hello",'id':id}
es.index(index=database, doc_type="logs", body=newvalues)
else: #if already there append data
newvalues ={}
es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
return jsonify({'status': 'success'})
except jwt.InvalidTokenError as e:
print(e)
return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if __name__=="__main__":
try:
app.run(host="localhost",port=5005,debug=True,processes =1)
except Exception as e:
print("exception in test",e)

这里的问题是,请求每5秒从前端发送一次。因此有时会冲突,即,每当收到带有ID的请求时,同时ID的插入过程就会发生。第二个请求假定该ID在数据库中不存在,因此它还会插入,从而在索引中转入2个具有相同ID的数据。我该怎么做一次插入一个,另一个应该等待?

python -3.6

编辑:
尝试使用信号量:

from flask import Flask
from flask import jsonify, request
import jwt
from elasticsearch import Elasticsearch
import threading
sLock = threading.Semaphore()
app = Flask(__name__)
@app.route('/test',methods=['POST'])
def hello():
sLock.acquire()
try:
id = request.form['id']
database = "sample"
es =Elasticsearch("localhost",port = 9200)
cols=es.search(index=database, body={ "query": { "match": { "id": id}}})
present =False
if cols['hits']['hits']:
x1=cols['hits']['hits'][0]['_source']
eid = cols['hits']['hits'][0]['_id']
present =True
if present == False:
newvalues = {"url":"hello",'id':id}
es.index(index=database, doc_type="logs", body=newvalues)
else: #if already there append data
newvalues ={}
es.update(index=database,doc_type='logs',id=eid,body={"doc":newvalues})
sLock.release()
return jsonify({'status': 'success'})
except jwt.InvalidTokenError as e:
print(e)
return jsonify({'success': 'false', 'message': 'Invalid Token!!!'})
if __name__=="__main__":
try:
app.run(host="localhost",port=5005,debug=True,processes =1)
except Exception as e:
print("exception in test",e)

提前致谢!

最佳答案

您可以使用mget方法并设置时间阈值。这样,您就不会发送时间请求,而是发送包含ID列表的请求-doc here

from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
from elasticsearch import helpers

idL = [] # create it before the flask route declaration
threshold = 5 #set a threshold in the same way
now = datetime.now()
delta = timedelta(seconds=30) # set a time threshold of 1 minute

def update(result):
for success, info in helpers.parallel_bulk(client= es, actions=createUpdateElem(result ):
if not success:
print(info)
def index(result):
for success, info in helpers.parallel_bulk(client= es, actions=createIndexElem(result ):
if not success:
print(info)

def createIndexElem(result):
for elem in result:
yield {
'_op_type': 'index',
'_index': 'database',
'_id': elem,
'_source': {'question': 'The life, universe and everything.'}
}


def createUpdateElem(result):
for elem in result:
yield {
'_op_type': 'update',
'_index': 'database',
'_id': elem,
'doc': {'question': 'The life, universe and everything.'}
}


def checkResponse(response, idL):
updateL = []
for elem in response['docs']:
if elem['_id'] in idL:
updateL.append(elem['_id'])
toBeIndexed = list(set(idL) - set(updateL))
return toBeIndexed,updateL




def multiget(idL):
response = es.mget(index = 'database',body = {'ids': idL})
doc2BeIndicized = checkResponse(response, idL)
now = datetime.now()
idL = []
return doc2BeIndicized


@app.route('/test',methods=['POST'])
def hello():
try:

id = request.form['id']
idL.append(id)
if len(idL) > threshold:
result = multiget(idL)
if result:
indexed, updated = result
if updated:
update(updated)
if indexed:
index(indexed)
elif (now + delta) > datetime.now():
result = multiget(idL)
if result:
indexed, updated = result
if updated:
update(updated)
if indexed:
index(indexed)
else:
continue

以相同的方式,您可以索引或更新具有批量的文档列表-或在服务中更好的并行批量,因为它使用多线程。 doc here。请记住,您需要分析mget调用的响应,因为在列表中,可能只有es中存在某些元素,而其他元素中没有

关于python - 如何在 flask 中一一处理请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60613667/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com