gpt4 book ai didi

python - Websockets 消息仅在最后发送,而不是在使用 async/await 的实例中发送,在嵌套 for 循环中产生

转载 作者:行者123 更新时间:2023-12-05 04:46:19 31 4
gpt4 key购买 nike

我有一个计算量很大的过程,需要几分钟才能在服务器中完成。所以我想通过 websockets 将每次迭代的结果发送给客户端。

整个应用程序工作正常,但我的问题是在整个模拟完成后,所有消息都以一大块的形式到达客户端。我一定是在这里遗漏了一些东西,因为我希望 await websocket.send_json() 在此过程中发送消息,而不是最后发送所有消息。

服务器 python (FastAPI)

# A very simplified abstraction of the actual app.

def simulate_intervals(data):
for t in range(data.n_intervals):
state = interval(data) # returns a JAX NumPy array
yield state

def simulate(data):
for key in range(data.n_trials):
trial = simulate_intervals(data)
yield trial

@app.websocket("/ws")
async def socket(websocket: WebSocket):

await websocket.accept()
while True:
# Get model inputs from client
data = await websocket.receive_text()
# Minimal computation
nodes = distributions(data)

nodosJson = json.dumps(nodes, cls=NumpyEncoder)
# I expect this message to be sent early on,
# but the client gets it at the end with all the other messages.
await websocket.send_json({"tipo": "nodos", "datos": json.loads(nodosJson)})

# Heavy computation
trials = simulate(data)

for trialI, trial in enumerate(trials):
for stateI, state in enumerate(trial):
stateString = json.dumps(state, cls=NumpyEncoder)

await websocket.send_json(
{
"tipo": "estado",
"datos": json.loads(stateString),
"trialI": trialI,
"stateI": stateI,
}
)

await websocket.send_json({"tipo": "estado", "msg": "fin"})

为了完整起见,这里是基本的客户端代码。

客户端

const ws = new WebSocket('ws://localhost:8000/ws');

ws.onopen = () => {
console.log('Conexión exitosa');
};

ws.onmessage = (e) => {
const mensaje = JSON.parse(e.data);
console.log(mensaje);
};

botonEnviarDatos.onclick = () => {
ws.send(JSON.stringify({...}));
}

最佳答案

我无法让它像我的问题中所发布的那样工作,仍然有兴趣听取任何人的意见,了解为什么在不阻止它们的情况下发送多个异步消息是不可能的。

对于任何感兴趣的人,这是我目前的解决方案:

来自客户端和服务器的乒乓消息

我更改了逻辑,因此服务器和客户端不断地相互发送消息,而不是尝试在来自客户端的单个请求中流式传输数据。

这实际上比我最初的尝试效果更好,因为我可以检测到套接字何时断开连接并停止服务器中的处理。基本上,如果客户端断开连接,则不会从该客户端发送新的数据请求,服务器也不会继续进行繁重的计算。

服务器

# A very simplified abstraction of the actual app.

def simulate_intervals(data):
for t in range(data.n_intervals):
state = interval(data) # returns a JAX NumPy array
yield state

def simulate(data):
for key in range(data.n_trials):
trial = simulate_intervals(data)
yield trial

@app.websocket("/ws")
async def socket(websocket: WebSocket):

await websocket.accept()
while True:
# Get messages from client
data = await websocket.receive_text()

# "tipo" is basically the type of data being sent from client or server to the other one.
# In this case, "tipo": "inicio" is the client sending inputs and requesting for a certain data in response.
if data["tipo"] == "inicio":
# Minimal computation
nodes = distributions(data)

nodosJson = json.dumps(nodes, cls=NumpyEncoder)
# In this first interaction, the client gets the first message without delay.
await websocket.send_json({"tipo": "nodos", "datos": json.loads(nodosJson)})

# Since this is a generator (def returns yield) it does not actually
# trigger that actual computationally heavy process.
trials = simulate(data)

# define some initial variables to count the iterations
trialI = 0
stateI = 0
trialsLen = args.number_trials
statesLen = 600

# load the first trial (also a generator)
# without the for loop used before, the counters and next()
# allow us to do the same as being done before in the for loop
trial = next(trials)

# With the use of generators and next() it is possible to keep
# this first message light on the server and send the first response
# as quickly as possible.

# This type of message asks for the next instance of the simluation
# without processing the entire model.
elif data["tipo"] == "sim":
# check if we are within the limits (before this was a nested for loop)
if trialI < trialsLen and stateI < statesLen:
# Trigger the next instance of the simulation
state = next(trial)
# update counter
stateI = stateI + 1

# Send the message with 1 instance of the simulation.
#
stateString = json.dumps(state, cls=NumpyEncoder)
await websocket.send_json(
{
"tipo": "estado",
"datos": json.loads(stateString),
"trialI": trialI,
"stateI": stateI,
}
)

# Check if the second loop is done
if stateI == statesLen:
# update counter of first loop
trialI = trialI + 1
# update counter of second loop
stateI = 0

# Check if there are more pending trials,
# otherwise stop and notify the client we are done.
try:
trial = next(trials)
except StopIteration:
await websocket.send_json({"tipo": "fin"})

客户端

只是实际改变的部分:

ws.onmessage = (e) => {
const mensaje = JSON.parse(e.data);

// Simply check the type of incoming message so it can be processed
if (mensaje.tipo === 'fin') {
viz.calcularResultados();
} else if (mensaje.tipo === 'nodos') {
viz.pintarNodos(mensaje.datos);
} else if (mensaje.tipo === 'estado') {
viz.sumarEstado(mensaje.datos);
}

// After receiving a message, ping the server for the next one
ws.send(
JSON.stringify({
tipo: 'sim',
})
);
};

这似乎是保持服务器和客户端协同工作的合理解决方案。我能够在客户端显示长时间模拟的进度,用户体验比等待服务器响应很长时间要好得多。希望它能帮助其他有类似问题的人。

关于python - Websockets 消息仅在最后发送,而不是在使用 async/await 的实例中发送,在嵌套 for 循环中产生,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68884040/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com