cyberhybridhub/server/lib/handlers/questions_hub_handler.dart

201 lines
5.8 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'package:shelf/shelf.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:uuid/uuid.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import '../cors_headers.dart';
import '../firebase_auth.dart';
import '../question_service.dart';
import '../signalr/questions_hub_connections.dart';
import '../signalr/signalr_protocol.dart';
import '../signalr/text_message_format.dart';
const String questionsHubPath = '/hubs/questions';
final QuestionsHubConnections questionsHubConnections = QuestionsHubConnections();
Handler questionsHubHandler({
required FirebaseAuthVerifier auth,
required QuestionService questionService,
}) {
return (Request request) async {
if (request.method == 'OPTIONS') {
return Response.ok('', headers: apiCorsHeaders());
}
final String path = request.requestedUri.path;
if (path.endsWith('/negotiate') && request.method == 'POST') {
return _handleNegotiate(request, auth);
}
if (_isWebSocketUpgrade(request)) {
final String? firebaseUid = await auth.verifyBearerToken(
_tokenFromRequest(request),
);
if (firebaseUid == null) {
return Response.forbidden('Unauthorized', headers: apiCorsHeaders());
}
final String? connectionToken = request.url.queryParameters['id'];
if (connectionToken == null || connectionToken.isEmpty) {
return Response.badRequest(
body: 'Missing connection id',
headers: apiCorsHeaders(),
);
}
final Handler wsHandler = webSocketHandler(
(WebSocketChannel channel, String? subprotocol) {
_handleWebSocket(
channel: channel,
connectionToken: connectionToken,
firebaseUid: firebaseUid,
questionService: questionService,
);
},
);
return wsHandler(request);
}
return Response.notFound('Not found', headers: apiCorsHeaders());
};
}
Future<Response> _handleNegotiate(
Request request,
FirebaseAuthVerifier auth,
) async {
final String? firebaseUid = await auth.verifyBearerToken(
request.headers['Authorization'] ?? request.headers['authorization'],
);
if (firebaseUid == null) {
return _jsonResponse(401, <String, dynamic>{'error': 'Unauthorized'});
}
const Uuid uuid = Uuid();
final String connectionToken = uuid.v4();
return _jsonResponse(200, <String, dynamic>{
'negotiateVersion': 1,
'connectionId': connectionToken,
'connectionToken': connectionToken,
'availableTransports': <Map<String, dynamic>>[
<String, dynamic>{
'transport': 'WebSockets',
'transferFormats': <String>['Text'],
},
],
});
}
void _handleWebSocket({
required WebSocketChannel channel,
required String connectionToken,
required String firebaseUid,
required QuestionService questionService,
}) {
final QuestionsHubConnection connection = QuestionsHubConnection(
connectionToken: connectionToken,
firebaseUid: firebaseUid,
channel: channel,
);
questionsHubConnections.register(connection);
connection.listen(
(String message) => _onSocketMessage(
connection,
message,
questionService: questionService,
),
onDone: () => questionsHubConnections.unregister(connectionToken),
);
}
void _onSocketMessage(
QuestionsHubConnection connection,
String payload, {
required QuestionService questionService,
}) {
if (!connection.handshakeComplete) {
try {
final List<String> messages = TextMessageFormat.parse(payload);
final Map<String, dynamic> handshake =
jsonDecode(messages.first) as Map<String, dynamic>;
if (handshake['protocol'] != 'json') {
unawaited(connection.sendRaw(
SignalrProtocol.handshakeResponse(error: 'Unsupported protocol'),
));
return;
}
connection.handshakeComplete = true;
unawaited(_completeHandshakeAndDeliverPending(
connection: connection,
questionService: questionService,
));
if (messages.length > 1) {
final String remaining = messages
.sublist(1)
.join(TextMessageFormat.recordSeparator);
questionsHubConnections.handleClientMessage(
connection,
'$remaining${TextMessageFormat.recordSeparator}',
);
}
} catch (_) {
unawaited(connection.sendRaw(
SignalrProtocol.handshakeResponse(error: 'Invalid handshake'),
));
}
return;
}
questionsHubConnections.handleClientMessage(connection, payload);
}
Future<void> _completeHandshakeAndDeliverPending({
required QuestionsHubConnection connection,
required QuestionService questionService,
}) async {
await connection.sendRaw(SignalrProtocol.handshakeResponse());
await questionService.deliverPendingQuestionOnConnect(connection);
}
bool _isWebSocketUpgrade(Request request) {
if (request.method != 'GET') {
return false;
}
final String? connection = request.headers['Connection'];
if (connection == null ||
!connection.toLowerCase().split(',').map((t) => t.trim()).contains('upgrade')) {
return false;
}
return request.headers['Upgrade']?.toLowerCase() == 'websocket';
}
String? _tokenFromRequest(Request request) {
final String? authHeader =
request.headers['Authorization'] ?? request.headers['authorization'];
if (authHeader != null && authHeader.startsWith('Bearer ')) {
return authHeader;
}
final String? accessToken = request.url.queryParameters['access_token'];
if (accessToken != null && accessToken.isNotEmpty) {
return 'Bearer $accessToken';
}
return null;
}
Response _jsonResponse(int status, Map<String, dynamic> body) {
return Response(
status,
body: jsonEncode(body),
headers: <String, String>{
...apiCorsHeaders(),
'Content-Type': 'application/json',
},
);
}