ruby 如何向现有客户端提供Rails WebSocket(任何通用消息)?

kyks70gy  于 2023-03-12  发布在  Ruby
关注(0)|答案(1)|浏览(117)

在尝试了不同的方法来使它工作之后,我还没有设法理解Rails的神奇价值,以提供一个WebSocket接口。因为它已经用NodeJS和FastAPI接口测试过了。据我所知,Rails服务器读取/响应WS事件的正确方法是通过实现的'send_msg'方法,但是应该如何调用这个方法呢?为了调用send_msg方法,我似乎必须修改客户端代码以使用Rails提供的JS库(如here),这是不可能的。
正如标题所述,问题是如何创建一个简单(但通用?)的WS消息接收器/广播器?

WebSocket接口应如何工作

  • 在/clock处具有ws:端点
  • 客户端可以连接到/clock
  • 当客户端向/clock发送带有数据“requestTime”的WS消息时,API会向所有连接的客户端广播服务器系统时间
  • 客户代码不能更改
    客户端如何尝试连接和请求时间(NodeJS)(连接X个客户端和广播时间Y次)
import async from "async";
import fetch from "node-fetch";
import fs from "fs";
import ws from "ws";

// client that only counts the received messages
function wsFunction(requestCount) {
    return resolve => {
        let limit = requestCount;
        // construct all promises
        const client = new ws("ws://localhost:3000/clock");

        client.on('message', data => {
            if(--limit == 0) {
                client.close();
            }
        });

        client.on('close', () => {
            if(limit > 0) {
                console.log("Socket closed prematurely... ", limit);
            }
        });

        client.on('open', () => {
            resolve(client); // client connected
        });

        const close = () => {
            if(client.readyState !== ws.CLOSED && client.readyState !== ws.CLOSING) {
                client.close();
            }
        }
    }
}

/**
 * 
 * @param {*} limit 
 * @returns operation time for limit messages, or -1 if connection is cut
 */
function attemptClockFetches(clientCount, retrieveCount) {
    const clients = [];
    for(let i = 0; i < clientCount - 1; ++i) {
        clients.push(async () => new Promise(wsFunction(retrieveCount)));
    }

    // client that initiates the broadcast
    const promise = new Promise(async resolve => {
        const startTime = performance.now();
        const sockets = await async.parallel(clients); // connect all clients

        // create updater client
        const client = new ws("ws://localhost:3000/clock");

        // now update until limit is reached
        client.on('close', () => {
            if(retrieveCount > 0) {
                console.log("Parent socket closed prematurely...");
            }
        });

        client.on('message', () => {
            if(--retrieveCount > 0) {
                client.send("requestTime");
            } else {
                client.close();
                const endTime = performance.now();
                // close all sockets
                for(let s of sockets) {
                    s.close();
                }
                resolve(endTime - startTime);
            }
        });

        client.on('open', () => {
            client.send("requestTime");
        });
    });

    return promise;
}

async function doStressTest() {
    await attemptClockFetches(10, 10);
}

const i = setInterval(() => {
    // prevent node from killing process
}, 1000);

doStressTest().then(() => {
    clearInterval(i);
});

工作NodeJS WebSocket响应器的片段,本质上这是需要在Rails中复制的内容

const wsServer = new ws.WebSocketServer({ server: server, path: "/clock" });

wsServer.on('connection', socket => {
    socket.on('error', err => {
        console.error(err);
    });

    socket.on('message', data => {
        if(data.toString() === "requestTime") {
            // broadcast time on requestTime event to all clients
            wsServer.clients.forEach(client => {
                if(client.readyState === ws.OPEN) {
                    client.send((new Date()).getMilliseconds());
                }
            });
        }
    });
});

我目前实现的内容我已将其添加到routes.rb中,假设它将所有WS事件定向到路径/clock,即ClocksChannel

Rails.application.routes.draw do
  get '/users/:userId/cards', to: 'card#index'
  # get '/clock', to: 'card#clock' <- ADDING THIS MAKES RAILS RESPOND IN HTTP EVEN THOUGH USING WebSocket PROTOCOL

  mount ActionCable.server => '/clock'
end

主卡_controller.rb的内容

class CardController < ApplicationController
    def index
        # do some index things, not part of WS
    end

    # def clock
    #     render "Hello World"
    # end
end

实现了这个通道,假设它订阅和取消订阅客户端。至于调用send_msg,我对如何调用它没有清晰的理解

require "time"

class ClocksChannel < ApplicationCable::Channel
  def subscribed
    # stream_from "some_channel"
  end

  def unsubscribed
    # Any cleanup needed when channel is unsubscribed
  end

  def send_msg(data)
    if data == "requestTime"
          ActionCable.server.broadcast "requestTime", message: (Time.now.to_f * 1000).to_i
    end
  end
end

当服务器接收到与给定设置的连接时,Rails库中会给出以下输出:

Started GET "/clock" for 127.0.0.1 at 2023-03-09 20:12:29 +0200
Started GET "/clock/" [WebSocket] for 127.0.0.1 at 2023-03-09 20:12:29 +0200
Successfully upgraded to WebSocket (REQUEST_METHOD: GET, HTTP_CONNECTION: Upgrade, HTTP_UPGRADE: websocket)
There was an exception - JSON::ParserError(859: unexpected token at 'requestTime')
There was an exception - JSON::ParserError(859: unexpected token at 'requestTime')
C:/Ruby31-x64/lib/ruby/3.1.0/json/common.rb:216:in `parse'
C:/Ruby31-x64/lib/ruby/3.1.0/json/common.rb:216:in `parse'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/activesupport-7.0.4.2/lib/active_support/json/decoding.rb:23:in `decode'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/connection/base.rb:168:in `decode'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/connection/base.rb:89:in `dispatch_websocket_message'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/server/worker.rb:59:in `block in invoke'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/activesupport-7.0.4.2/lib/active_support/callbacks.rb:118:in `block in run_callbacks'
...
...
lo8azlld

lo8azlld1#

这似乎是不可行的ApplicationCable,因为它要求数据是JSON格式。为了解决这个问题,我做了以下更改:
删除上述问题中显示的所有先前代码,因为这些代码不再必要/有效
添加gem 'faye-websocket',然后运行bundle install
使用Faye创建控制器:

class ClockController < ApplicationController
    @@clients = []
    def connect
        if Faye::WebSocket.websocket?(request.env)
            ws = Faye::WebSocket.new(request.env)

            ws.on :open do |event|
                # Code to execute when the websocket connection is opened
                @@clients << ws
            end

            ws.on :message do |event|
                if event.data == 'requestTime'
                    now = Time.now
                    @@clients.each do |client|
                        client.send(now.to_s)
                    end
                end
            end

            ws.on :close do |event|
                # Code to execute when the websocket connection is closed
                @@clients.delete(ws)
                ws = nil
            end

            # Return async Rack response
            return ws.rack_response
        else
            # Return regular HTTP response
            render text: 'Not a websocket request'
        end
    end
end

将路由添加到routes.rb中的控制器

get '/clock', to: 'clock#connect', via: :all

此解决方案可能并非在所有情况下都是完美的,但它可以很好地完成工作

相关问题