在尝试了不同的方法来使它工作之后,我还没有设法理解Rails的神奇价值,以提供一个WebSocket接口。因为它已经用NodeJS和FastAPI接口测试过了。据我所知,Rails服务器读取/响应WS事件的正确方法是通过实现的'send_msg'方法,但是应该如何调用这个方法呢?为了调用send_msg方法,我似乎必须修改客户端代码以使用Rails提供的JS库(如here),这是不可能的。
正如标题所述,问题是如何创建一个简单(但通用?)的WS消息接收器/广播器?
WebSocket接口应如何工作
- 在/clock处具有ws:端点
- 客户端可以连接到/clock
- 当客户端向/clock发送带有数据“requestTime”的WS消息时,API会向所有连接的客户端广播服务器系统时间
- 客户代码不能更改
客户端如何尝试连接和请求时间(NodeJS)(连接X个客户端和广播时间Y次)
import async from "async";
import fetch from "node-fetch";
import fs from "fs";
import ws from "ws";
// client that only counts the received messages
function wsFunction(requestCount) {
return resolve => {
let limit = requestCount;
// construct all promises
const client = new ws("ws://localhost:3000/clock");
client.on('message', data => {
if(--limit == 0) {
client.close();
}
});
client.on('close', () => {
if(limit > 0) {
console.log("Socket closed prematurely... ", limit);
}
});
client.on('open', () => {
resolve(client); // client connected
});
const close = () => {
if(client.readyState !== ws.CLOSED && client.readyState !== ws.CLOSING) {
client.close();
}
}
}
}
/**
*
* @param {*} limit
* @returns operation time for limit messages, or -1 if connection is cut
*/
function attemptClockFetches(clientCount, retrieveCount) {
const clients = [];
for(let i = 0; i < clientCount - 1; ++i) {
clients.push(async () => new Promise(wsFunction(retrieveCount)));
}
// client that initiates the broadcast
const promise = new Promise(async resolve => {
const startTime = performance.now();
const sockets = await async.parallel(clients); // connect all clients
// create updater client
const client = new ws("ws://localhost:3000/clock");
// now update until limit is reached
client.on('close', () => {
if(retrieveCount > 0) {
console.log("Parent socket closed prematurely...");
}
});
client.on('message', () => {
if(--retrieveCount > 0) {
client.send("requestTime");
} else {
client.close();
const endTime = performance.now();
// close all sockets
for(let s of sockets) {
s.close();
}
resolve(endTime - startTime);
}
});
client.on('open', () => {
client.send("requestTime");
});
});
return promise;
}
async function doStressTest() {
await attemptClockFetches(10, 10);
}
const i = setInterval(() => {
// prevent node from killing process
}, 1000);
doStressTest().then(() => {
clearInterval(i);
});
工作NodeJS WebSocket响应器的片段,本质上这是需要在Rails中复制的内容
const wsServer = new ws.WebSocketServer({ server: server, path: "/clock" });
wsServer.on('connection', socket => {
socket.on('error', err => {
console.error(err);
});
socket.on('message', data => {
if(data.toString() === "requestTime") {
// broadcast time on requestTime event to all clients
wsServer.clients.forEach(client => {
if(client.readyState === ws.OPEN) {
client.send((new Date()).getMilliseconds());
}
});
}
});
});
我目前实现的内容我已将其添加到routes.rb中,假设它将所有WS事件定向到路径/clock,即ClocksChannel
Rails.application.routes.draw do
get '/users/:userId/cards', to: 'card#index'
# get '/clock', to: 'card#clock' <- ADDING THIS MAKES RAILS RESPOND IN HTTP EVEN THOUGH USING WebSocket PROTOCOL
mount ActionCable.server => '/clock'
end
主卡_controller.rb的内容
class CardController < ApplicationController
def index
# do some index things, not part of WS
end
# def clock
# render "Hello World"
# end
end
实现了这个通道,假设它订阅和取消订阅客户端。至于调用send_msg,我对如何调用它没有清晰的理解
require "time"
class ClocksChannel < ApplicationCable::Channel
def subscribed
# stream_from "some_channel"
end
def unsubscribed
# Any cleanup needed when channel is unsubscribed
end
def send_msg(data)
if data == "requestTime"
ActionCable.server.broadcast "requestTime", message: (Time.now.to_f * 1000).to_i
end
end
end
当服务器接收到与给定设置的连接时,Rails库中会给出以下输出:
Started GET "/clock" for 127.0.0.1 at 2023-03-09 20:12:29 +0200
Started GET "/clock/" [WebSocket] for 127.0.0.1 at 2023-03-09 20:12:29 +0200
Successfully upgraded to WebSocket (REQUEST_METHOD: GET, HTTP_CONNECTION: Upgrade, HTTP_UPGRADE: websocket)
There was an exception - JSON::ParserError(859: unexpected token at 'requestTime')
There was an exception - JSON::ParserError(859: unexpected token at 'requestTime')
C:/Ruby31-x64/lib/ruby/3.1.0/json/common.rb:216:in `parse'
C:/Ruby31-x64/lib/ruby/3.1.0/json/common.rb:216:in `parse'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/activesupport-7.0.4.2/lib/active_support/json/decoding.rb:23:in `decode'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/connection/base.rb:168:in `decode'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/connection/base.rb:89:in `dispatch_websocket_message'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/server/worker.rb:59:in `block in invoke'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/activesupport-7.0.4.2/lib/active_support/callbacks.rb:118:in `block in run_callbacks'
...
...
1条答案
按热度按时间lo8azlld1#
这似乎是不可行的ApplicationCable,因为它要求数据是JSON格式。为了解决这个问题,我做了以下更改:
删除上述问题中显示的所有先前代码,因为这些代码不再必要/有效
添加
gem 'faye-websocket'
,然后运行bundle install
使用Faye创建控制器:
将路由添加到routes.rb中的控制器
此解决方案可能并非在所有情况下都是完美的,但它可以很好地完成工作