如何在Flutter中使用signalr、firebase消息传递和flutter本地通知处理多个流

mqkwyuun  于 2023-05-29  发布在  Flutter
关注(0)|答案(1)|浏览(243)

我写了一个flutter应用程序(版本3.7.12),调试/使用Android(iOS和Web未配置),我有我的后端API写在.NET 7实现SignalR。Flutter应用程序和后端都配置为使用firebase(身份验证,存储,通知...)。一切都很好!除了首尾相连。。步骤如下:

  • 我发个信息-好的
  • ChatService在API中发送消息- ok
  • 它被API接收,发送到数据库,然后通过firebase - ok触发通知,然后新消息到达(通过另一个用户/应用程序示例):
  • 在ChatService - KO中接收到一条消息。当它将被添加到新的消息流时,本地存储抖动通知出现(沿着消息),新消息变为空,即使在参数“args”中收到并看到,也不会在控制台中打印任何内容。这是chat_service.dart中的方法:
void _handleIncomingMessage(List<dynamic>? args) {
    final msg = MessageEntity(
        content: args?[1] as String? ?? "",
        id: args?[2],
        senderId: args?[0] as String? ?? "",
        sentAt: DateTime.now(),
        userEntity: UserEntity(
            id: args?[0] as String? ?? ""));
    _messageController.add(msg);
    print("Message added to stream: $msg");
  }

我在Flutter中使用了这些包:

firebase_messaging: ^14.4.0
flutter_local_notifications: ^14.0.0+1
signalr_core: ^1.1.1

我为signalr hub实现了一个服务:

i_chat_service.dart

abstract class IChatService {
  Future<void> initSignalR(BuildContext context);
  Future<void> sendMessage(String user, String message, String conversationId);
  void dispose();
  Future<void> sendTypingNotification(String conversationId, String userId);
  Future<void> joinGroup(String conversationId);
  Stream<String> get onTypingNotification;
  Stream<MessageEntity> get onNewMessage;
}

chat_service.dart(为清晰起见缩短)

class ChatService implements IChatService {
  final AuthHttpClient _authHttpClient;

  late HubConnection? _hubConnection;
  final BehaviorSubject<String?> _typingController = BehaviorSubject<String?>();
  final BehaviorSubject<MessageEntity> _messageController =
      BehaviorSubject<MessageEntity>();

  ChatService(this._authHttpClient);

  @override
  Future<void> initSignalR(BuildContext context) async {
    _hubConnection = HubConnectionBuilder()
        .withUrl(
            dotenv["ChatHubURL"] ?? "",
            HttpConnectionOptions(
              transport: HttpTransportType.webSockets,
              client: _authHttpClient,
              logMessageContent: true,
            ))
        .withAutomaticReconnect([500, 1000, 2000, 3000]).build();

    _hubConnection!.onclose((error) {
      print('SignalR connection closed: $error');
    });

    _hubConnection!.on('ReceiveMessage', _handleIncomingMessage);
    _hubConnection!.on('ReceiveTypingNotification', _handleTypingNotification);

    await _hubConnection!.start();
  }

  Future<void> _ensureConnectionActive() async {
    if (!isConnectionActive()) {
      await _hubConnection!.stop();
      await _hubConnection!.start();
    }
  }

  bool isConnectionActive() {
    return _hubConnection != null &&
        _hubConnection!.state == HubConnectionState.connected;
  }

  @override
  Future<void> sendMessage(
      String user, String message, String conversationId) async {
    await _ensureConnectionActive();

    await _hubConnection!.invoke('SendMessage', args: <Object>[
      user,
      message,
      conversationId,
    ]);
  }

  void _handleIncomingMessage(List<dynamic>? args) {
    final msg = MessageEntity(
        content: args?[1] as String? ?? "",
        id: args?[2],
        senderId: args?[0] as String? ?? "",
        sentAt: DateTime.now(),
        userEntity: UserEntity(
            id: args?[0] as String? ?? ""));
    _messageController.add(msg);
    print("Message added to stream: $msg");
  }

  @override
  Stream<MessageEntity> get onNewMessage => _messageController.stream;
}

此聊天服务用于此小部件(为清晰起见缩短):

class ConversationsDetailsWidget extends StatefulWidget {
  final MessagesEntity? messagesEntity;

  const ConversationsDetailsWidget({required this.messagesEntity, Key? key})
      : super(key: key);

  @override
  State<ConversationsDetailsWidget> createState() =>
      _ConversationsDetailsWidgetState();
}

class _ConversationsDetailsWidgetState
    extends State<ConversationsDetailsWidget> {
  final TextEditingController _messageController = TextEditingController();
  final chatService = getIt<IChatService>();
  List<MessageEntity> _messages = [];
  final Map<String, String> _usernames = {};

  @override
  void initState() {
    super.initState();
    initSignalR();
// ... (shortened for clarity)
  }

@override
  void dispose() {
    chatService.dispose();
    _scrollController.dispose();
    _typingEndStreamController.close();
    super.dispose();
  }

void _onSend() {
    if (_messageController.text.isNotEmpty) {
      final myUserId = getIt<IUserService>().getUserId() ?? "";
      
     chatService.sendMessage(
        myUserId,
        _messageController.text,
        widget.messagesEntity?.conversationId ?? "",
      );
      _typingEndStreamController.add(null);
      _messageController.clear();
      _messageController.clearComposing();
      _scrollToBottom();
    }
  }

  Future<void> initSignalR() async {
    await chatService.initSignalR(context);
    await chatService
        .joinGroup(widget.messagesEntity!.conversationId!);
  }
// ... (shortened for clarity) - this is the widget tree

Expanded(
              child: StreamBuilder<MessageEntity>(
                  stream: chatService.onNewMessage,
                  builder: (BuildContext context,
                      AsyncSnapshot<MessageEntity> snapshot) {
                    if (snapshot.hasData) {
                      final newMessage = snapshot.data!;
                      final username = _usernames[newMessage.senderId] ?? "";
                      final updatedMessage = MessageEntity(
                        content: newMessage.content,
                        id: newMessage.id,
                        senderId: newMessage.senderId,
                        sentAt: newMessage.sentAt,
                        userEntity: UserEntity(
                          id: newMessage.senderId,
                          username: username
                        ),
                      );
                      if (_messages.isEmpty ||
                          _messages.last.id != newMessage.id) {
                        _messages = [..._messages, updatedMessage];
                        _scrollToBottom();
                      }
                    }
                    return MessageListWidget(
                      messages: _messages,
                      myUserId: myUserId ?? '',
                      scrollController: _scrollController,
                      usernames: _usernames,
                    );
                  }),
            ),

最后,我实现了firebase消息和flutter本地通知。两者都在main. dart中初始化。此处使用Flutter本地通知:

FirebaseMessaging.onMessage.listen((RemoteMessage message) {
    RemoteNotification? notification = message.notification;
    AndroidNotification? android = message.notification?.android;
    if (notification != null && android != null) {
      _showNotification(flutterLocalNotificationsPlugin,
          notification.title ?? '', notification.body ?? '');
    }
  });

Future _showNotification(
    FlutterLocalNotificationsPlugin flutterLocalNotificationsPlugin,
    String title,
    String body) async {
  const androidPlatformChannelSpecifics = AndroidNotificationDetails(
      'channel_id', 'channel_name',
      importance: Importance.max, priority: Priority.high, showWhen: false);
  const iOSPlatformChannelSpecifics = DarwinNotificationDetails();
  const platformChannelSpecifics = NotificationDetails(
      android: androidPlatformChannelSpecifics,
      iOS: iOSPlatformChannelSpecifics);
  await flutterLocalNotificationsPlugin
      .show(0, title, body, platformChannelSpecifics, payload: 'item x');
}

如果我删除FirebaseMessaging.onMessage.listen中调用的本地 Flutter 通知,整个聊天系统就可以工作。消息被添加到流中,因此显示在StreamBuilder中。我只是不明白这是怎么回事。看起来本地 Flutter 通知正在“窃取”执行代码的线程。以前有人遇到过吗?我不打算直接在ChatService中管理通知,因为Firebase Messaging有自己的流。是不是有几个流同时这样运行的事实?有没有可能给出一个优先顺序/一个队列?

0g0grzrc

0g0grzrc1#

因此,该错误似乎不是由于signalR、flutter本地通知和firebase消息的交互造成的。
我发现了另一个错误,其中signalR示例没有重新创建。我添加了一个constructor:

ChatService.create(String conversationId, this._authHttpClient) {
    initSignalR(conversationId);
  }

并在_ConversationsDetailsWidgetState中这样调用它:

late final ChatService _chatService;

@override
  void initState() {
    super.initState();
    _chatService = ChatService.create(
        widget.messagesEntity?.conversationId ?? "", getIt<AuthHttpClient>());
    // ...
  }

这是可悲的不是使用DI和得到它,但它解决了问题。

相关问题