gpt4 book ai didi

django - 使用 MySQL <> WebSocket 实时更新 Django 应用程序

转载 作者:行者123 更新时间:2023-12-03 16:01:48 25 4
gpt4 key购买 nike

我需要不断从 MySQL 获取数据以大约 200 毫秒的更新频率获取数据的数据库。我需要不断更新仪表板文本字段上的数据值。我的仪表板是基于 Django 构建的。
我已经阅读了很多关于 Channels 的信息但所有的教程都是关于聊天应用程序的。我知道我需要实现 WebSockets这将基本上有一个开放的连接并获取数据。使用聊天应用程序,这是有道理的,但我没有遇到任何谈论 MySQL 的内容。数据库。
我也读过 mysql-events .由于表中的数据来自外部传感器,我不明白如何监控 Django 中的表,即每当表中添加新行时,我需要根据列插入新行值(value)。
关于如何去做的任何想法?我浏览了很多文章,但找不到针对此要求的特定内容。

最佳答案

感谢 Timothee Legros回答,它有点帮助我朝着正确的方向前进。
在互联网上的任何地方,它都说 Django channel 是/可以用于实时应用程序,但没有任何地方谈论确切的实现(除了聊天应用程序)。
我用过 Celery , Django ChannelsCelery's Beat完成任务并按预期工作。
它分为三个部分。设置 channel ,然后创建一个 celery 任务,定期调用它(在 Celery Beat 的帮助下),然后将该任务的输出发送到 channel ,以便它可以将该数据发送到 websocket。
channel
我遵循了 Channel 网站上的原始教程并以此为基础。
路由.py

from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer),
re_path(r'ws/realtimeupdate/$', consumers.RealTimeConsumer),
]
消费者.py
class RealTimeConsumer(AsyncWebsocketConsumer):
async def connect(self):

self.channel_group_name = 'core-realtime-data'

# Join room group
await self.channel_layer.group_add(
self.channel_group_name,
self.channel_name
)

await self.accept()

async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
self.channel_group_name,
self.channel_name
)

# Receive message from WebSocket
async def receive(self, text_data):
print(text_data)
pass

async def loc_message(self, event):
# print(event)
message_trans = event['message_trans']
message_tag = event['message_tag']
# print("sending data to websocket")
await self.send(text_data=json.dumps({
'message_trans': message_trans,
'message_tag': message_tag
}))
这个类基本上会发送数据到 websocket一旦它收到它。以上两个将特定于应用程序。
现在我们将设置 celery .
在项目的基本目录中,设置文件所在的位置,我们需要制作三个文件。
  • celery.py 这将初始化 celery 。
  • 路由.py 这将用于路由 channel 的 websocket 地址。
  • task.py 我们将在这里设置任务

  • celery .py
    import os

    from celery import Celery

    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj_name.settings')

    app = Celery('proj_name', backend='redis://localhost', broker='redis://localhost/')

    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    # should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')

    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()


    @app.task(bind=True)
    def debug_task(self):
    print(f'Request: {self.request!r}')
    路由.py
    from channels.auth import AuthMiddlewareStack
    from channels.routing import ProtocolTypeRouter, URLRouter

    from app_name import routing

    application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': AuthMiddlewareStack(
    URLRouter(
    routing.websocket_urlpatterns
    )
    ),
    })
    任务.py
    @shared_task(name='realtime_task')
    def RealTimeTask():
    time_s = time.time()
    result_trans = CustomModel_1.objects.all()
    result_tag = CustomModel_2.objects.all()
    result_trans_json = serializers.serialize('json', result_trans)
    result_tag_json = serializers.serialize('json', result_tag)
    # output = {"ktr": result_transmitter_json, "ktag": result_tag_json}
    # print(output)
    channel_layer = get_channel_layer()
    message = {'type': 'loc_message',
    'message_transmitter': result_trans_json,
    'message_tag': result_tag_json}
    async_to_sync(channel_layer.group_send)('core-realtime-data', message)
    print(time.time()-time_s)
    任务在完成任务后,将结果发送回 channel , channel 又将其中继到 websocket。
    设置.py
    # Channels
    CHANNEL_LAYERS = {
    'default': {
    'BACKEND': 'channels_redis.core.RedisChannelLayer',
    'CONFIG': {
    "hosts": [('127.0.0.1', 6379)],
    },
    },
    }

    CELERY_BEAT_SCHEDULE = {
    'task-real': {
    'task': 'realtime_task',
    'schedule': 1 # this means, the task will run itself every second
    },
    }
    现在唯一剩下的就是在 javascript 中创建一个 websocket文件并开始收听它。
    //Create web socket to receive data
    const chatSocket = new WebSocket(
    'ws://'
    + window.location.host
    + '/ws/realtimeupdate'
    + '/'
    );

    chatSocket.onmessage = function(e) {
    const data = JSON.parse(e.data);
    console.log(e.data + '\n');
    // For trans
    var arrayOfObjects = JSON.parse(data.message_trans);
    //Do your thing

    //For tags
    var arrayOfObjects_tag = JSON.parse(data.message_tag);
    //Do your thing
    }

    };

    chatSocket.onclose = function(e) {
    console.error('Chat socket closed unexpectedly');
    };
    回答 MySQL 用法,我正在将数据插入 MySQL来自外部传感器和 tasks.py 中的数据库, 正在使用 Django ORM 查询表.
    总的来说,它完成了预期的工作,用来自 MySQL 的实时数据填充实时仪表板。可以肯定,可能有不同的更好的方法,请让我知道。

    关于django - 使用 MySQL <> WebSocket 实时更新 Django 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63302230/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com