websocket 不断从服务器向客户端发送数据

rm5edbpk  于 2022-11-11  发布在  其他
关注(0)|答案(3)|浏览(341)

看看这个example
正如你所看到的,某种event不断地被发送到客户端。我想在consumers.py中使用Django-Channels来模拟这种情况。下面是我所拥有的简化版本:

class ChatConsumer(AsyncConsumer):
    async def ws_connect(self, event):
        self.send = get_db_object()

        ....

        await self.send({
            "type": "websocket.accept"
        })

    # I need to CONSTANTLY receive & send data
    async def ws_receive(self, event):

        obj = ...# query DB and get the newest object

        json_obj = {
            'field_1': obj.field_1,
            'field_2': obj.field_2,
        }

        await self.send({
            "type": "websocket.send",
            "text": json.dumps(json_obj)
        })

    @database_sync_to_async
    def get_db_object(self,**kwargs):
        return Some_Model.objects.get(**kwargs)[0]

在这里,我希望我的Django后端不断:
1.查询数据库
1.从数据库接收对象
1.将接收到的obj发送到前端WebSocket,格式为event
我如何才能做到这一点?重要的是我需要不断向客户端发送数据。
互联网上的大多数Django-Channels资源只涉及聊天应用程序,这些应用程序不一定会不断地向客户端发送数据。我找不到任何可以完成这项工作的代码。
请不要再推荐Redis或渠道文档...或者一些随机的缺乏好的文档的第三方库...推荐很容易,但很难实现。例如,我发现有人推荐Snorky,但它确实缺乏如何实现它的文档。
然而,如果有专门做这项工作的网站,我可能会去看看,即使它不使用Django-Channels。

monwx1rj

monwx1rj1#

consumers.py

import asyncio
from channels.consumer import AsyncConsumer

class ChatConsumer(AsyncConsumer):

    async def websocket_connect(self, event):
        self.connected = True
        print("connected", event)
        await self.send({
            "type": "websocket.accept"
        })

        while self.connected:
            await asyncio.sleep(2)

            obj = # do_something (Ex: constantly query DB...)

            await self.send({
                'type': 'websocket.send',
                'text': # obj,
            })

    async def websocket_receive(self, event):
        print("receive", event)

    async def websocket_disconnect(self, event):
        print("disconnected", event)
        self.connected = False

Javascript

var loc = window.location;
var wsStart = 'ws://';
if (loc.protocol == 'https:') {
    wsStart = 'wss://'
}
var endpoint = wsStart + loc.host + loc.pathname;

var socket = new WebSocket(endpoint);

socket.onmessage = function(e){
    console.log("message", e);
};
socket.onopen = function(e){
    console.log("open", e);
};
socket.onerror = function(e){
    console.log("error", e)
};
socket.onclose = function(e){
    console.log("close", e)
};

你所需要做的就是修改obj并发送它。你可以根据需要扩展这个函数。所以,现在我感兴趣的是在我的PostgreSQL中获取最新插入的行,并将该行注入到我的WebSocket中。我可以每2秒查询一次我的数据库,因为它是由await asyncio.sleep(2)指定的,并将它注入到前端套接字中。

lmvvr0a8

lmvvr0a82#

使用channels==1.* 和Django==1.*,你可以使用线程模块,例如:


# Some view.py

import threading
import time

class Publisher(threading.Thread):
    def __init__(self, reply_channel, frequency=0.5):
        super(Publisher, self).__init__()
        self._running = True
        self._reply_channel = reply_channel
        self._publish_interval = 1.0 / frequency

    def run(self):
        while self._running:
           self._reply_channel.send({'text': 'some data'})
           time.sleep(self._publish_interval)

    def stop(self):
        self._running = False

publishers = {}

def ws_connect(message):
    message.reply_channel.send({'accept': True})
    publisher = Publisher(reply_channel=message.reply_channel)
    publisher.start()
    publishers[message.reply_channel] = publisher

def ws_disconnect(message):
    publisher = publishers[message.reply_channel]
    publisher.stop()
    del publishers[message.reply_channel]
bmp9r5qi

bmp9r5qi3#

有点晚了党在这里,但当涉及到Django你应该总是尝试做的事情“他们的方式”第一...
因此,我只是使用Django Channels来完成这个任务。我的客户端向服务器发送一条消息,服务器随后用所需的数据库信息来响应,如下所示:

class SettingsConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()

    def disconnect(self, close_code):
        pass

    def receive(self, text_data):
        settings = get_settings(self.scope["user"])

        self.send(
            text_data=json.dumps(
                {
                    "playMode": settings.play_mode,
                    "isRecording": settings.is_recording,
                }
            )
        )

现在,至于触发常量事件的JS......我只是使用SetInterval每隔.25s向消费者请求更新!
我的逻辑是,它是客户端,所以如果它做了一点额外的工作没什么大不了的,因为服务器无论如何都会响应。JS看起来如下...

const chatSocket = new WebSocket(
            'ws://'
            + window.location.host
            + '/ws/macros/update/'
        );

      chatSocket.onmessage = function(e) {
          const data = JSON.parse(e.data);
          if(data["playMode"] == true) {
            $('#playmode-checkbox').attr('checked', true);
          } else {
            $('#playmode-checkbox').attr('checked', false);
          }

          if(data["isRecording"] == true) {
            $('#recording-checkbox').attr('checked', true);
          } else {
            $('#recording-checkbox').attr('checked', false);
          }
      };

      chatSocket.onclose = function(e) {
          console.error('Chat socket closed unexpectedly');
      };

      window.setInterval(function() {
          chatSocket.send(JSON.stringify({
              'message': "start"
          }));
      }, 250);

是的,你可以做更多的事情来使它更异步友好和优化。但是,你要求简单和工作,所以我希望这能有所帮助!

相关问题