如何在Flutter中路由RSocket客户端

58wvjzkj  于 2022-12-30  发布在  Flutter
关注(0)|答案(1)|浏览(142)

这是Java(WebFlux)代码。

public Mono<String> sendMessage(SendMessageRequest requestBody, RSocketRequester rSocketRequester) {
        Long userId = clientManager.getUserIdBySocket(rSocketRequester);

        Chat chat = Chat.builder()
                .chatroomId(requestBody.getChatroomId())
                .createdDate(LocalDateTime.now())
                .lastModifiedDate(LocalDateTime.now())
                .userId(userId)
                .build();

        return chatRepository.save(chat)
                .flatMap(entity ->  chatReadRepository.findByUserIdAndChatroomId(userId, entity.getChatroomId()))
                .flatMapMany(entity -> chatMemberRepository.findAllByChatroomIdAndUserIdNot(entity.getChatroomId(), userId))
                .map(ChatMember::getUserId)
                .map(clientManager::getSocketByUserId)
                .flatMap(socketOptional ->
                    socketOptional.<org.reactivestreams.Publisher<String>>map(socketRequester -> socketRequester.route("chat.receive")
                        .data(requestBody)
                        .send()
                        .thenReturn("Success!"))
                        .orElseGet(() -> Mono.just("fail!"))
                )
                .collectList().thenReturn("Success!");
    }

我在java中使用此代码

socketRequester -> socketRequester.route("chat.receive")
                        .data(requestBody)
                        .send()
                        .thenReturn("Success!"))
                        .orElseGet(() -> Mono.just("fail!"))

这是我的flutter代码。我使用rsocket:^1.0.0版本。

void main() async {
  String jwt = "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwicm9sZXMiOlsiVVNFUiJdLCJpYXQiOjE2NTk1OTg1OTgsImV4cCI6MTY1OTYwODU5OH0.bJHn4IKm6DtnGAQAxyruRb-LJSgKt-72-L9g7JqBtHw";
  var rSocket = await RSocketConnector.create()
      .setupPayload(routeAndDataPayload("socket.acceptor", jwt))
      .connect('tcp://192.168.219.101:8081');

  var payload = await rSocket.requestResponse!(routeAndDataPayload("healthcheck", "data!!!"));

  print(payload.getDataUtf8());

  rSocket.close();

  runApp(const MyApp());
}

我怎么能在flutter中接收到这个消息?
我想使控制器“聊天。接收”的路线在Flutter。
如果是Java代码,

@MessageMapping("chat.receive")

但是我不知道flutter的@消息Map...

wfsdck30

wfsdck301#

首先,rsocket: ^1.0.0.似乎无法正常工作,您需要使用

rsocket:
    git:
      url: git@github.com:rsocket/rsocket-dart.git
      ref: master

而不是。
然后您可以创建一个Futute,如下所示,作为您的连接持有人:

final Future<RSocket> _rsocketConnectionStream = RSocketConnector.create()
      .keepAlive(2000, 999999999)
      .connect('ws://192.168.1.188:8080')
      .catchError((error) => print(error));

现在,您可以像这样进行RSokcet调用:

Payload routeAndDataPayload(String route, String data) {
    var compositeMetadata =
        CompositeMetadata.fromEntries([RoutingMetadata(route, List.empty())]);
    var metadataBytes = compositeMetadata.toUint8Array();
    var dataBytes = Uint8List.fromList(utf8.encode(data));
    return Payload.from(metadataBytes, dataBytes);
  }

  Stream<String?> fetchSeekersNames(String name, double latitude, double longitude, double searchingRadius) {
    var body = """
    {
      "location": {
          "latitude": $latitude,
          "longitude": $longitude
        },
        "radius": $searchingRadius,
        "dancerPartnerSeekerName": "$name"
    }
  """;
    return _rsocketConnectionStream
        .asStream()
        .asyncExpand((rSocket) => rSocket.requestStream!(
            routeAndDataPayload("/api/XXXX", body)))
        .map((element) => element!.getDataUtf8())
        .doOnError((p0, p1) => print(p1));
  }

没有routeAndDataPayload方法我做不到,我是经过几天的研究才写出来的。
据我所知,Dart中没有@MessageMapping等价物,RSocket库不支持双向通道。
但是,如果您试图使您的flutter代码保持RSocket基本连接,并在其上执行request-stream,我还没有能够将其拔出。
我不能让它在TCP上工作。我不得不使用webSocket作为我的RSocket底层传输。但是如何在flutter代码中保持websocket/rsocket连接持续进行呢?
如果你发现了什么,请写在这里。总之,我也

相关问题