gpt4 book ai didi

python - 我可以在没有服务器回调的情况下更新 Bokeh 图吗?

转载 作者:行者123 更新时间:2023-12-04 14:19:17 25 4
gpt4 key购买 nike

当在 python 中运行的单独算法的结果返回结果时,我希望 Bokeh 定期和任意更新,而不是基于来自 Bokeh 界面的任何输入。

我尝试过各种解决方案,但它们都依赖于对某个 UI 事件的回调或周期性回调,如下面的代码所示。

import numpy as np
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource, Plot, LinearAxis, Grid
from bokeh.models.glyphs import MultiLine
from time import sleep
from random import randint


def getData(): # simulate data acquisition
# run slow algorith
sleep(randint(2,7)) #simulate slowness of algorithm
return dict(xs=np.random.rand(50, 2).tolist(), ys=np.random.rand(50, 2).tolist())


# init plot
source = ColumnDataSource(data=getData())

plot = Plot(
title=None, plot_width=600, plot_height=600,
min_border=0, toolbar_location=None)

glyph = MultiLine(xs="xs", ys="ys", line_color="#8073ac", line_width=0.1)
plot.add_glyph(source, glyph)

xaxis = LinearAxis()
plot.add_layout(xaxis, 'below')

yaxis = LinearAxis()
plot.add_layout(yaxis, 'left')

plot.add_layout(Grid(dimension=0, ticker=xaxis.ticker))
plot.add_layout(Grid(dimension=1, ticker=yaxis.ticker))
curdoc().add_root(plot)


# update plot
def update():
bokeh_source = getData()
source.stream(bokeh_source, rollover=50)

curdoc().add_periodic_callback(update, 100)

这看起来确实有效,但这是解决问题的最佳方法吗?与其让 Bokeh 尝试每 100 毫秒更新一次,不如在新数据可用时将其推送给它?

谢谢

最佳答案

您可以使用 zmq 和 asyncio 来完成。这是 Bokeh 服务器的代码,它在异步协程中等待数据:

from bokeh.models import ColumnDataSource
from bokeh.plotting import figure, curdoc
from functools import partial
from tornado.ioloop import IOLoop
import zmq.asyncio

doc = curdoc()

context = zmq.asyncio.Context.instance()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:1234")
socket.setsockopt(zmq.SUBSCRIBE, b"")

def update(new_data):
source.stream(new_data, rollover=50)

async def loop():
while True:
new_data = await socket.recv_pyobj()
doc.add_next_tick_callback(partial(update, new_data))

source = ColumnDataSource(data=dict(x=[0], y=[0]))

plot = figure(height=300)
plot.line(x='x', y='y', source=source)

doc.add_root(plot)
IOLoop.current().spawn_callback(loop)

要发送数据,只需在另一个 python 进程中运行以下代码:

import time
import random
import zmq

context = zmq.Context.instance()
pub_socket = context.socket(zmq.PUB)
pub_socket.bind("tcp://127.0.0.1:1234")

t = 0
y = 0

while True:
time.sleep(1.0)
t += 1
y += random.normalvariate(0, 1)
pub_socket.send_pyobj(dict(x=[t], y=[y]))

关于python - 我可以在没有服务器回调的情况下更新 Bokeh 图吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56742575/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com