我正在尝试制作一个非常基本的socket io和redis订阅和监听系统,但是,看起来redis的频道好像混在一起了。我什么都试过了,什么都没用。
我有一个 user1
他们已经订阅了他们的活动 user2
事件。如下图所示,这是客户机。
//user1 - socket setup
var socket = io('${heatmap.socket_host}/${user.id}', {
'transports': ['websocket'],
'autoConnect': false,
'query': {'access': heatmap.accessToken}
});
//user2 - socket setup
var socket = io('${heatmap.socket_host}/${user2.id}', {
'transports': ['websocket'],
'autoConnect': false, // optional
'query': {'access': heatmap.accessToken},
});
//user1 client
user.websocket.connect();
//SUBSCRIBING TO THEMSELVES
user.websocket.on('connect', (_) {
print('[connected to me]');
user.websocket.on('LocationChange', (data) => print('[me] - LocationChange: $data'));
});
var devices = await heatmap.devices.find();
var device = devices.first;
//POSTING EVENTS TO THEMSELVES
Timer.periodic(Duration(seconds: 10), (timer) async {
await heatmap.events.postLocation(55, 45, device.id);
});
var people = await heatmap.requests.getGrantedRequests();
people.sent.first.device.user.websocket.connect();
//SUBSCRIBING TO USER2
people.sent.first.device.user.websocket.on('connect', (_) {
print('connected to ${people.sent.first.device.user.name}');
people.sent.first.device.user.websocket.on('LocationChange', (data) => print('[other] - LocationChange: $data'));
});
//user2 client (not subscribed to any events, but is posting events which should be received as `[other] - LocationChange: data`)
Timer.periodic(Duration(seconds: 10), (timer) async {
await heatmap.events.postLocation(51.34, -0.123, '02355fab-9afa-4e26-b7ea- fe7af867a751');
print('Posted');
});
在nodejs中,我为sockets使用socket控制器,为redis使用ihandyredis。它们的设置如下。
const redis = createHandyClient(redisPort, redisHost);
const subscriber = createHandyClient(redisPort, redisHost);
Container.set("redis", redis);
Container.set("subscriber", subscriber);
const socketServer = io(webServer);
useSocketServer(socketServer, { controllers: [EventsSocketController] });
我有一个端点,用户将事件发布到该端点 POST /v1/events
并发布到 redisClient
如下所示
await this.redisClient.publish(`${user.id}:events`, JSON.stringify(event));
以下是 EventSocketController
这就是我管理连接、断开连接和订阅redis频道的地方
@SocketController('/:id')
export class EventsSocketController {
@Inject("redis")
private redisClient!: IHandyRedis;
@Inject("subscriber")
private subscriberClient!: IHandyRedis;
@InjectRepository(Session)
private sessionRepository!: Repository<Session>;
@InjectRepository(ShareRequest)
private requestRepository!: Repository<ShareRequest>;
@InjectRepository(User)
private userRepository!: Repository<User>;
@OnConnect()
async connection(
@ConnectedSocket() socket: Socket,
@SocketQueryParam("access") access: string,
@NspParam('id') userId: string
): Promise<void> {
console.log(socket.id);
try {
const session = await this.sessionRepository.findOneOrFail(
{ access },
{ relations: ["user"] }
);
this.subscriberClient.redis.on("message", (_, message) => {
this.redisClient.redis.hgetall(message, (err, res) => {
socket.emit(JSON.parse(message)['type'], message);
})
});
if (userId && userId != session.user.id) {
const granted = await this.requestRepository
.createQueryBuilder("request")
.innerJoin("request.from", "from", "from.id = :id", { id: session.user.id })
.where("request.granted = :granted", { granted: true })
.getMany();
if (granted.length === 0) throw new NotFoundError();
const user = await this.userRepository
.createQueryBuilder("user")
.where("user.id = :userId", { userId })
.getOne();
if (!user) throw new NotFoundError();
await this.subscriberClient.subscribe(`${user.id}:events`);
}
await this.subscriberClient.subscribe(`${session.user.id}:events`);
} catch (error) {
socket.emit("BAD_TOKEN");
socket.disconnect();
}
}
@OnDisconnect()
async disconnect(@SocketQueryParam("userId") userId: string): Promise<void> {
await this.subscriberClient.unsubscribe(`${userId}:events`);
}
}
问题是我收到的客户
[me] - LocationChange: {"type":"LocationChange","data":{"latitude":"51.34","longitude":"-0.123","timestamp":"2020-07-13 16:39:45.757820"}}
[other] - LocationChange: {"type":"LocationChange","data":{"latitude":"51.34","longitude":"-0.123","timestamp":"2020-07-13 16:39:45.757820"}}
[me] - LocationChange: {"type":"LocationChange","data":{"latitude":"55.0","longitude":"45.0","timestamp":"2020-07-13 16:39:49.282664"}}
[other] - LocationChange: {"type":"LocationChange","data":{"latitude":"55.0","longitude":"45.0","timestamp":"2020-07-13 16:39:49.282664"}}
提前谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!