前言
最近遇到了个需求,需要接收数据并且可视化展示出来,数据格式为DataFame。这个用Dash很容易实现,但实时更新就难搞了。Dash官方有定时刷新的组件,读取倒是不难,难点在于如何推送数据。经过这两天摸索,我找出了三个解决方案。
存储到本地磁盘
Dash官方有dash_core_components.Interval
组件,这是一个计时器,可以设定时间间隔循环刷新页面。那么很自然的想法,我把生成的数据存到本地,然后网页不断刷新读取就行了。根据前一篇文章的经验,我们知道feather格式最适合小规模的dataframe的写入与读取,有了这一点,很容易就可以写出代码。
import dash
import dash_core_components as dcc
import dash_html_components as html
import dash_table # 渲染表格的插件
from dash.dependencies import Input, Output
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H4('实时更新数据'),
html.Div(id='live-update-text'),
dash_table.DataTable(id='live-update-table'),
dcc.Interval(
id='interval',
interval=5*1000, # 5秒钟间隔
n_intervals=0
)
])
)
@app.callback([Output("live-update-table", "data")],
[Input("interval", "n_intervals")])
def make_table(n):
df = pd.read_feather('log.feather')
return df.to_dict('records')
当然,还需要另外开一个进程用于存储df到feather。
测试了一段时间,发现feather读写有bug:当写入的同时读取,就会报错,从而中断计时器的运行,必须手动刷新网页才能重新开始更新。这就不行了,于是我的同事想出了另外一种解决方案:通过ZMQ推送到内存中
通过ZMQ推送到内存中
首先需要开一个端口用于发送数据
import zmq
import time
import random
import numpy as np
import pandas as pd
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5411")
while True:
df = pd.DataFrame(data=np.random.normal(0, 1, [2, 3]))
socket.send_pyobj(df)
time.sleep(1)
然后上述代码中的make_table
就可以直接读取内存中的数据,这样速度更快,而且不会产生读写冲突bug。
# 其他代码省略
def create_zmq_socket(zmq_port="5411"):
"""连接ZMQ服务"""
context = zmq.Context()
zmq_socket = context.socket(zmq.SUB)
zmq_socket.connect("tcp://localhost:%s" % zmq_port)
zmq_socket.setsockopt_string(zmq.SUBSCRIBE,'') # 消息过滤
return zmq_socket
def recv_zmq(listen_port="5411"):
"""从ZMQ中接受数据"""
with create_zmq_socket(listen_port) as socket:
msg = socket.recv_pyobj()
return msg
@app.callback([Output("live-update-table", "data")],
[Input("interval", "n_intervals")])
def make_table(n):
df = recv_zmq()
return df.to_dict('records')
这个方案其实比较完美了,而且也符合我对前后端分离的粗浅认知。但还是有些不足,最重要的一点就是,诸如PYQT这种库,都有相应的事件回调函数,只有有数据推送时才会更新,不需要实时刷新监听。我们这种方案,时间间隔太短可能会消耗更多资源,时间间隔长的话对低延时的业务又非常不利。如果能实现类似于PYQT这种回调触发更新的逻辑就好了。
通过调用API更新数据
由于计算机基础和网络通信等知识的欠缺,我一开始就走向了死胡同,找了很多个方法,都是基于“数据推送过来,触发Dash的回调”进行的。我甚至在考虑要不要把数据推送也写到网页代码里面,然后用while True
循环。很明显这样是不行的,While循环会阻塞Dash的线程,结果就是网页根本启动不了。
查了很多资料,最后发现了dash_devices库,它对Dash
进行了扩展,其中一个新功能就是可以直接在代码内部改变组件值,而无需通过函数回调。看到这个功能,我突然灵光一现,是不是可以在网页代码里写一个API,数据推送模块一旦要更新数据,就调用一下这个API,然后改变页面上某个隐藏控件的属性,并链式回调触发make_table
函数,提醒它从ZQM中获取数据。(当然理论上Post直接更新也可以,但是我对Post传数据不太了解,所以目前是通过这种方式间接实现的)
因为Dash是基于Flask实现的,所以我们可以很容易绑定一个路由。完整代码如下:
import dash_devices
from dash_devices.dependencies import Input, Output, State
import dash_core_components as dcc
import dash_html_components as html
import dash_table
import zmq
import time
import random
import numpy as np
import pandas as pd
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = dash_devices.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H4('实时更新数据'),
html.Div(id='live-update-text'),
dash_table.DataTable(id='live-update-table'),
html.Div(id='flag',hidden=True),
])
)
@app.callback([Output("live-update-table", "data")],
[[Input("flag", "children")]])
def make_table(n):
df = recv_zmq()
return df.to_dict('records')
def create_zmq_socket(zmq_port="5411"):
"""连接ZMQ服务"""
context = zmq.Context()
zmq_socket = context.socket(zmq.SUB)
zmq_socket.connect("tcp://localhost:%s" % zmq_port)
zmq_socket.setsockopt_string(zmq.SUBSCRIBE,'') # 消息过滤
return zmq_socket
def recv_zmq(listen_port="5411"):
"""
接受数据
"""
with create_zmq_socket(listen_port) as socket:
msg = socket.recv_pyobj()
return msg
def get_count():
global count
count += 1
if count >= 10:
count = 0
return count
@app.server.route('/update_data',methods = ['get','post'])
def update_data():
app.push_mods({'flag': {'children': str(get_count())}})
return '请求成功'
前半部分代码和上两章差不多,只不过用dash_devices
替换掉了dash
,核心在于最后的绑定路由。这样的话,在数据推送模块,每次推送新数据后,都可以get一下API,提醒网页更新数据。
import requests
requests.get('127.0.0.1:5000/update_data')
版权属于:作者名称
本文链接:https://www.sitstars.com/archives/114/
转载时须注明出处及本声明