gpt4 book ai didi

python - 如何让客户端使用 flask 从服务器读取变量

转载 作者:行者123 更新时间:2023-12-03 18:40:10 25 4
gpt4 key购买 nike

我正在制作我的第一个 flask /python 网络应用程序。该应用程序最初显示一个邀请用户填写的表单,然后他们单击“提交”按钮,然后服务器运行模拟并创建一个带有显示结果的图形的 PNG 文件,最后页面被重绘显示的图表。我的python代码大致是这种形式:

# flask_app.py

@app.route("/", methods=["POST", "GET"])
def home():

if request.method == 'POST':
# bunch of request.form things to scoop the contents of the form

if form_answers_all_good:
for i in range(huge_number):
# some maths
# create png file with results

return render_template("index.htm", foo=bar)

该程序运行良好,但 huge_number循环可能需要几十秒。所以我想要的是某种进度指示器——它不一定是一个漂亮的动画——即使是百分比进度的字符串读数也可以。

大概我可以将我的 for 循环更改为...
    for i in range(huge_number):
# some maths
percentage_done = str(i * 100/huge_number)

然后以某种方式在客户端安排读取(轮询?) percentage_done所以我放了类似的东西:
Completed {% percentage_done %}% so far.

在我的 index.htm .顺便说一句,我对 Javascript、AJAX 之类的知识或想到它,客户端上的几乎所有东西(除了 HTML)都是初学者水平。

我看过很多关于类似听起来问题的解释,但通常他们做的事情比我实际需要的要复杂得多,而且由于我缺乏客户端知识,我无法理解它们。因此,例如,某些解决方案可能包含一段代码,而我实际上不知道将其放在哪里,或者我不知道需要先加载其他内容才能使该代码段正常工作。

编辑:我在 pythonanywhere.com 上托管我的网络应用程序.包含的模块列表是 here .

编辑: pythonanywhere.com 做 not allow streaming :-(

最佳答案

你提到你是 flask 的新手,所以我假设你是 flask 的新手,但对 python 很熟悉,而且你对可以使用的东西非常有限,因为你在使用 pythonanywhere。主要的事情之一是它是单线程的,因此很难扩展任何东西。此外,最好坚持使用纯 python,因为在任何地方管理 python 中的依赖项将是一个额外的问题,最终只使用 python 内置函数是可行的。

我专注于展示一个可行的解决方案,您可以简单地在 pythonanywhere 或本地复制和粘贴,而不是显示代码片段。我会试着去:

  • 显示工作解决方案
  • 描述如何复制它
  • 分解主要组成部分并简要说明

  • (1) 工作解决方案

    你可以访问它 here (我做了很多限制,试图避免人们破坏它)。解决方案只涉及两个文件, flask_app.pyindex.html
    How it looks like

    (1.1) 解决方案代码

    “./home/{username}/{flask_foldername}/flask_app.py”
    from queue import Queue
    import time
    import random
    import threading
    from PIL import Image
    import flask
    from flask import request
    import json
    import io
    import uuid
    import base64

    ### You create a Queue and start a scheduler, Start flask after that
    def run_scheduler(app):
    sleep_time = 5
    while True:
    time.sleep(sleep_time)
    print("\n"*5)
    print(f'images Completed:{app.images_completed}')
    print('-----'*20)
    if(app.images_toBe_processed.qsize() > 0):
    next_image_name = app.images_toBe_processed.get()
    print(f"No Images being processed so scheduler will start processing the next image {next_image_name} from the queue")
    app.function_to_process_image(next_image_name, app)
    else:
    pass

    def function_to_process_image(image_name, app):
    huge_number = 5
    R = random.randint(0,256)
    G = random.randint(0,256)
    B = random.randint(0,256)
    for i in range(huge_number):
    # some maths
    percentage_done = str((i+1)*100/huge_number)
    app.images_processing_status[image_name] = percentage_done
    time.sleep(1)
    app.images_processing_status[image_name] = str(100.0)
    img = Image.new('RGB', (60, 30), color =(R,G,B))
    b=io.BytesIO()
    img.save(b, "jpeg")
    app.images_completed[image_name] = {"status":1,"file": b}
    print(f"IC from function: {app.images_completed} **************************")
    if app.images_processing_status.get("!!total!!",False): app.images_processing_status["!!total!!"]+= 1
    else: app.images_processing_status["!!total!!"] = 1
    del app.images_processing_status[image_name]
    return 0 #process sucessful

    class Webserver(flask.Flask):
    def __init__(self,*args,**kwargs):
    scheduler_func = kwargs["scheduler_func"]
    function_to_process_image = kwargs["function_to_process_image"]
    queue_MAXSIZE = kwargs["queue_MAXSIZE"]
    del kwargs["function_to_process_image"], kwargs["scheduler_func"], kwargs["queue_MAXSIZE"]
    super(Webserver, self).__init__(*args, **kwargs)
    self.start_time = time.strftime("%d/%m/%Y %H:%M")
    self.queue_MAXSIZE = queue_MAXSIZE
    self.active_processing_threads = []
    self.images_processing_status = {}
    self.images_completed = {}
    self.images_toBe_processed = Queue(maxsize=queue_MAXSIZE)
    self.function_to_process_image = function_to_process_image
    self.scheduler_thread = threading.Thread(target=scheduler_func, args=(self,))


    app = Webserver(__name__,
    template_folder="./templates",
    static_folder="./",
    static_url_path='',
    scheduler_func = run_scheduler,
    function_to_process_image = function_to_process_image,
    queue_MAXSIZE = 20,
    )


    ### You define a bunch of views
    @app.route("/",methods=["GET"])
    def send_index_view():
    if not flask.current_app.scheduler_thread.isAlive():
    flask.current_app.scheduler_thread.start()
    return flask.render_template('index.html',queue_size = flask.current_app.images_toBe_processed.qsize(),
    max_queue_size =flask.current_app.queue_MAXSIZE , being_processed=len(flask.current_app.active_processing_threads),
    total=flask.current_app.images_processing_status.get("!!total!!",0), start_time=flask.current_app.start_time )

    @app.route("/process_image",methods=["POST"])
    def receive_imageProcessing_request_view():
    image_name = json.loads(request.data)["image_name"]
    if(flask.current_app.images_toBe_processed.qsize() >= flask.current_app.queue_MAXSIZE ):
    while(not flask.current_app.images_toBe_processed.empty()):
    flask.current_app.images_toBe_processed.get()
    requestedImage_status = {"name":image_name, "id":uuid.uuid1()}
    flask.current_app.images_toBe_processed.put(image_name)
    return flask.jsonify(requestedImage_status)

    @app.route("/check_image_progress",methods=["POST"])
    def check_image_progress():
    print(f'Current Image being processed: {flask.current_app.images_processing_status}')
    print(f'Current Images completed: {flask.current_app.images_completed}')
    image_name = json.loads(request.data)["image_name"]
    is_finished = flask.current_app.images_completed \
    .get(image_name,{"status":0,"file": ''})["status"]
    requestedImage_status = {
    "is_finished": is_finished,
    "progress": flask.current_app.images_processing_status.get(image_name,"0")
    }
    return flask.jsonify(requestedImage_status) #images_processing_status[image_name]})

    @app.route("/get_image",methods=["POST"])
    def get_processed_image():
    image_name = json.loads(request.data)["image_name"]
    file_bytes = flask.current_app.images_completed[image_name]["file"] #open("binary_image.jpeg", 'rb').read()
    file_bytes = base64.b64encode(file_bytes.getvalue()).decode()
    flask.current_app.images_completed.clear()
    return flask.jsonify({image_name:file_bytes}) #images_processing_status[image_name]})

    “./home/{username}/{flask_foldername}/templates/index.html”
    <html>
    <head>
    </head>
    <body>
    <h5> welcome to the index page, give some inputs and get a random RGB image back after some time</h5>
    <h5> Wait 10 seconds to be able to send an image request to the server </h5>
    <h5>When the page was loaded there were {{queue_size}} images on the queue to be processed, and {{being_processed}} images being processed</h5>
    <h5> The max size of the queue is {{max_queue_size}}, and it will be reseted when reaches it</h5>
    <h5>A total of {{total}} images were processed since the server was started at {{start_time}}</h5>
    <form>
    <label for="name">Image name:</label><br>
    <input type="text" id="name" name="name" value="ImageName" required><br>
    </form>
    <button onclick="send();" disabled>Send request to process image </button>
    <progress id="progressBar" value="0" max="100"></progress>
    <img style="display:block" />
    <script>
    window.image_name = "";
    window.requests = "";
    function send(){
    var formEl = document.getElementsByTagName("form")[0];
    var input = formEl.getElementsByTagName("input")[0];
    var RegEx = /^[a-zA-Z0-9]+$/;
    var Valid = RegEx.test(input.value);
    if(Valid){
    window.image_name = input.value;
    var xhttp = new XMLHttpRequest();
    xhttp.onload = function() {
    result=JSON.parse(xhttp.response)
    window.requests = setTimeout(check_image_progress, 3000);
    };
    xhttp.open("POST", "/process_image", true);
    xhttp.send(JSON.stringify({"image_name":input.value}));
    var buttonEl = document.getElementsByTagName("button")[0];
    buttonEl.disabled = true;
    buttonEl.innerHTML = "Image sent to process;only one image per session allowed";
    }
    else{
    alert("input not valid, only alphanumeric characters");
    }
    }

    function check_image_progress(){
    var xhttp = new XMLHttpRequest();
    xhttp.onload = function() {
    result=JSON.parse(xhttp.response)
    var progressBarEl = document.getElementsByTagName("progress")[0];
    if(progressBarEl.value < result["progress"]){
    progressBarEl.value=result["progress"];
    } else {}
    if(result["is_finished"] == true){
    clearTimeout(window.requests);
    window.requests = setTimeout(get_image,5);
    }
    else {
    window.requests = setTimeout(check_image_progress, 3000);
    }
    };
    xhttp.open("POST", "/check_image_progress", true);
    xhttp.send(JSON.stringify({"image_name":window.image_name}));
    }

    function get_image(){
    var xhttp = new XMLHttpRequest();
    xhttp.onload = function() {
    result=JSON.parse(xhttp.response)
    img_base64 = result[window.image_name];
    var progressBarEl = document.getElementsByTagName("progress")[0];
    progressBarEl.value=100;
    clearTimeout(window.requests);
    var imgEl = document.getElementsByTagName("img")[0];
    console.log(result)
    imgEl.src = 'data:image/jpeg;base64,'+img_base64;
    };
    xhttp.open("POST", "/get_image", true);
    xhttp.send(JSON.stringify({"image_name":window.image_name}));
    }
    setTimeout(function(){document.getElementsByTagName("button")[0].disabled=false;},100);
    function hexToBase64(str) {
    return btoa(String.fromCharCode.apply(null, str.replace(/\r|\n/g, "").replace(/([\da-fA-F]{2}) ?/g, "0x$1 ").replace(/ +$/, "").split(" ")));
    }
    </script>
    </body>
    </html>

    (2) 如何复制,创建自己的webapp
  • 转到 web 中的 Web 应用程序选项卡
  • 向下滚动找到源目录链接
  • 单击 flask_app.py 并包含 flask_app.py 代码
  • 点击模板目录,如果不存在则创建
  • 单击 index.html 文件并包含 index.html 代码
  • 返回 Web 应用程序选项卡并重新加载您的应用程序

  • walktrhough

    (3) APP主要组件

    关于 pythonanywhere 的一些细节:
  • Pythoanywhere 运行 wsgi开始您的flask app ,它基本上会导入你的app来自 flask_app.py并运行它。
  • 默认 wsgi在您的 /home/{username} 中运行文件夹而不是 /home/{username}/{flask_folder} ,您可以根据需要更改此设置。
  • Pythonanywhere 是单线程的,因此您不能依赖将作业发送到后台。

  • 后端需要注意的主要组件:
  • 1) Threads , Flask将在 wsgi 运行的主线程中我们将运行一个子线程scheduler这将跟踪 Queue并安排下一个要处理的图像。
  • 2) Flask类:app,处理用户请求并将处理请求发送到Queue的组件
  • 3) Queue , 一个 Queue按顺序存储用户处理图像的请求
  • 4) Scheduler , 决定是否可以运行新函数 process_image 调用以及是否可以运行的组件。需要独立运行Thread比 flask 。
  • 5) 将所有这些封装在一个自定义类中Webserver然后能够轻松访问(pythonanywhere 使用 wsgi,这使得跟踪本地创建的变量变得困难)

  • 所以看看代码的大图
    #lot of imports
    +-- 14 lines: from queue import Queue-----------------------------------------------------------------------------------------

    # this function will check periodically if there's no images being processed at the moment.
    # if no images are being processed check in the queue if there's more images to be processd
    # and start the first one in the queue
    def run_scheduler(app):
    +-- 12 lines: sleep_time = 5 -------------------------------------------------------------------------------------------------

    # this function do the math and creates an random RGB image in the end.
    def function_to_process_image(image_name, app):
    +-- 21 lines: {---------------------------------------------------------------------------------------------------------------

    # This class encapsulates all the data structures("state") from our application
    # in order to easily access the progress and images information
    class Webserver(flask.Flask):
    def __init__(self,*args,**kwargs):
    +-- 13 lines: scheduler_func = kwargs["scheduler_func"]-----------------------------------------------------------------------

    # Here we're instatiating the class
    app = Webserver(__name__,
    +-- 5 lines: template_folder="./templates",----------------------------------------------------------------------------------
    queue_MAXSIZE = 20,
    )


    ### You define a bunch of views
    +-- 39 lines: @app.route("/",methods=["GET"]) --------------------------------------------------------------------------------


    前端的主要组件:
  • send当用户点击 send request to process image 时触发的函数按钮
  • check_progresssend function 触发的函数反复请求 check_progress在 flask 中查看以获取有关进度的信息。处理结束后,我们删除重复。
  • get_imagecheck_progress 触发的函数处理结束时 ('is_finished' = 1)

  • 前端大图 :
    <html>
    <head>
    </head>
    <body>
    <!-- JUST THE INITIAL HTML elements -->
    +-- 12 lines: <h5> welcome to the index page, give some inputs and get a random RGB image back after some time</h5>-----------


    <script>
    window.image_name = "";
    window.requests = "";
    function send(){
    // SEND image process request when click button and set a timer to call periodically check_image_process
    +-- 20 lines: var formEl = document.getElementsByTagName("form")[0];----------------------------------------------------------
    }

    function check_image_progress(){
    // SEND a request to get processing status for a certain image_name
    +-- 18 lines: var xhttp = new XMLHttpRequest();-------------------------------------------------------------------------------
    }

    function get_image(){
    // SEND a request to get the image when image_status 'is_processed' = 1
    +--- 13 lines: var xhttp = new XMLHttpRequest();------------------------------------------------------------------------------
    }
    setTimeout(function(){document.getElementsByTagName("button")[0].disabled=false;},100);
    </script>
    </body>
    </html>

    关于python - 如何让客户端使用 flask 从服务器读取变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61292483/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com