import 'dart:async'; import 'dart:convert'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'signalr_protocol.dart'; import 'text_message_format.dart'; /// Active SignalR connection for the questions hub. class QuestionsHubConnection { QuestionsHubConnection({ required this.connectionToken, required this.firebaseUid, required this.channel, }); final String connectionToken; final String firebaseUid; final WebSocketChannel channel; bool handshakeComplete = false; StreamSubscription? _subscription; void listen(void Function(String message) onMessage, {void Function()? onDone}) { _subscription = channel.stream.listen( (Object? message) { if (message is String) { onMessage(message); } }, onDone: onDone, cancelOnError: true, ); } Future sendRaw(String payload) async { channel.sink.add(payload); } Future sendInvocation(String target, List arguments) async { await sendRaw(SignalrProtocol.invocation( target: target, arguments: arguments, )); } Future close() async { await _subscription?.cancel(); await channel.sink.close(); } } /// Tracks connected clients and delivers hub invocations by Firebase UID. class QuestionsHubConnections { final Map _byToken = {}; final Map> _tokensByUid = >{}; void register(QuestionsHubConnection connection) { _byToken[connection.connectionToken] = connection; _tokensByUid .putIfAbsent(connection.firebaseUid, () => {}) .add(connection.connectionToken); } Future unregister(String connectionToken) async { final QuestionsHubConnection? connection = _byToken.remove(connectionToken); if (connection == null) { return; } final Set? tokens = _tokensByUid[connection.firebaseUid]; tokens?.remove(connectionToken); if (tokens != null && tokens.isEmpty) { _tokensByUid.remove(connection.firebaseUid); } await connection.close(); } bool isConnected(String firebaseUid) => (_tokensByUid[firebaseUid]?.isNotEmpty ?? false); Future pushQuestionToConnection( QuestionsHubConnection connection, Map question, ) async { if (!connection.handshakeComplete) { return; } await connection.sendInvocation('ReceiveQuestion', [question]); } Future pushQuestion( String firebaseUid, Map question, ) async { final Set tokens = _tokensByUid[firebaseUid] ?? {}; var delivered = 0; for (final String token in tokens) { final QuestionsHubConnection? connection = _byToken[token]; if (connection == null || !connection.handshakeComplete) { continue; } await connection.sendInvocation('ReceiveQuestion', [question]); delivered++; } return delivered; } /// Parses inbound frames after the handshake. void handleClientMessage( QuestionsHubConnection connection, String payload, ) { for (final String message in TextMessageFormat.parse(payload)) { final Map json = jsonDecode(message) as Map; final int? type = json['type'] as int?; if (type == 6) { unawaited(connection.sendRaw(SignalrProtocol.ping())); } } } }