cyberhybridhub/server/lib/questions_db.dart
2026-06-03 05:12:02 -05:00

682 lines
21 KiB
Dart

import 'dart:convert';
import 'package:postgres/postgres.dart';
import 'package:uuid/uuid.dart';
import 'trading/market_history_config.dart';
import 'trading/prospective_answer_scoring.dart';
import 'trading/guess_score_store.dart';
import 'trading/prospective_guess_assignments_db.dart';
import 'trading/prospective_guess_selection.dart';
import 'trading/user_trading_state_db.dart';
/// Postgres access for the questions table.
class QuestionsDb {
QuestionsDb(this._connection);
final Connection _connection;
Connection get connection => _connection;
static const Uuid _uuid = Uuid();
Future<void> ensureUserExists(String firebaseUid) async {
await _connection.execute(
Sql.named(
'''
INSERT INTO users (firebase_uid)
VALUES (@uid)
ON CONFLICT (firebase_uid) DO NOTHING
''',
),
parameters: <String, dynamic>{'uid': firebaseUid},
);
}
static const String _questionsSelectColumns = '''
q.id, q.assigned_user_id, q.question_text, q.user_response, q.correct_answer,
q.created_at, q.modified_at, q.source_tag, q.pipeline_key, q.pipeline_step,
q.metadata''';
static const String _unansweredQuestionsJoin = '''
FROM questions q
LEFT JOIN market_history_prospective_assignments a
ON a.question_id = q.id
AND a.assigned_user_id = q.assigned_user_id
AND a.status = 'pending'
WHERE q.assigned_user_id = @uid AND q.user_response IS NULL''';
static const String _unansweredQuestionsOrder = '''
ORDER BY COALESCE(a.view_order_at, q.created_at) ASC, q.id ASC''';
/// Latest unanswered question for [assignedUserId], or null if none.
Future<Map<String, dynamic>?> findUnansweredQuestion(
String assignedUserId,
) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT $_questionsSelectColumns
$_unansweredQuestionsJoin
$_unansweredQuestionsOrder
LIMIT 1
''',
),
parameters: <String, dynamic>{'uid': assignedUserId},
);
if (result.isEmpty) {
return null;
}
return _rowFromResult(result.first);
}
/// All unanswered questions for [assignedUserId], oldest first.
Future<List<Map<String, dynamic>>> listUnansweredQuestions(
String assignedUserId,
) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT $_questionsSelectColumns
$_unansweredQuestionsJoin
$_unansweredQuestionsOrder
''',
),
parameters: <String, dynamic>{'uid': assignedUserId},
);
return result.map(_rowFromResult).toList();
}
/// FIFO head after deferring [deferredQuestionId] (falls back to sole item).
Future<Map<String, dynamic>?> findNextUnansweredAfterDefer({
required String assignedUserId,
required String deferredQuestionId,
}) async {
final List<Map<String, dynamic>> rows =
await listUnansweredQuestions(assignedUserId);
if (rows.isEmpty) {
return null;
}
for (final Map<String, dynamic> row in rows) {
if (row['id'] != deferredQuestionId) {
return row;
}
}
return rows.first;
}
/// Deletes every guess-game question row for [firebaseUid].
///
/// Assignments and answer snapshots referencing these rows cascade away.
Future<void> deleteAllProspectiveGuessQuestionsForUser(
String firebaseUid,
) async {
await _connection.execute(
Sql.named(
'''
DELETE FROM questions
WHERE assigned_user_id = @uid
AND source_tag = 'market_history:prospective'
''',
),
parameters: <String, dynamic>{'uid': firebaseUid},
);
}
/// Removes an unanswered question owned by [assignedUserId].
Future<void> deleteUnansweredQuestion({
required String questionId,
required String assignedUserId,
}) async {
await _connection.execute(
Sql.named(
'''
DELETE FROM questions
WHERE id = @id::uuid
AND assigned_user_id = @uid
AND user_response IS NULL
''',
),
parameters: <String, dynamic>{
'id': questionId,
'uid': assignedUserId,
},
);
}
/// Records [userResponse] for an unanswered question owned by [assignedUserId].
///
/// When the answered question metadata includes `prospective_question_id`,
/// also snapshots the answer into `market_history_prospective_answers`.
Future<Map<String, dynamic>?> submitAnswer({
required String questionId,
required String assignedUserId,
required num userResponse,
}) async {
final DateTime now = DateTime.now().toUtc();
final Result result = await _connection.execute(
Sql.named(
'''
UPDATE questions
SET user_response = @user_response, modified_at = @modified_at
WHERE id = @id::uuid
AND assigned_user_id = @uid
AND user_response IS NULL
RETURNING id, assigned_user_id, question_text, user_response, correct_answer,
created_at, modified_at, source_tag, pipeline_key, pipeline_step,
metadata
''',
),
parameters: <String, dynamic>{
'id': questionId,
'uid': assignedUserId,
'user_response': userResponse,
'modified_at': now,
},
);
if (result.isEmpty) {
return null;
}
final Map<String, dynamic> updated = _rowFromResult(result.first);
await _recordProspectiveAnswer(updated);
await ProspectiveGuessAssignmentsDb(_connection).markAnsweredByQuestionId(
questionId: questionId,
answeredAt: now,
);
await _gradeProspectiveAnswerIfNeeded(updated);
return updated;
}
Future<Map<String, dynamic>?> findQuestionById({
required String questionId,
required String assignedUserId,
}) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT id, assigned_user_id, question_text, user_response, correct_answer,
created_at, modified_at, source_tag, pipeline_key, pipeline_step,
metadata
FROM questions
WHERE id = @id::uuid AND assigned_user_id = @uid
''',
),
parameters: <String, dynamic>{
'id': questionId,
'uid': assignedUserId,
},
);
if (result.isEmpty) {
return null;
}
return _rowFromResult(result.first);
}
/// All Firebase UIDs that have a profile row (pipeline targets).
Future<List<String>> listAllUserFirebaseUids() async {
final Result result = await _connection.execute(
'SELECT firebase_uid FROM users ORDER BY updated_at DESC',
);
return result
.map((ResultRow row) => row[0]! as String)
.toList();
}
Future<Map<String, dynamic>?> getPipelineState(String assignedUserId) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT pipeline_key, step, context, updated_at
FROM user_pipeline_state
WHERE assigned_user_id = @uid
''',
),
parameters: <String, dynamic>{'uid': assignedUserId},
);
if (result.isEmpty) {
return null;
}
final ResultRow row = result.first;
final Object? contextRaw = row[2];
final Map<String, dynamic> context = contextRaw is Map<String, dynamic>
? contextRaw
: jsonDecode(contextRaw.toString()) as Map<String, dynamic>;
return <String, dynamic>{
'assignedUserId': assignedUserId,
'pipelineKey': row[0]! as String,
'step': row[1]! as String,
'context': context,
'updatedAt': (row[3]! as DateTime).toIso8601String(),
};
}
Future<void> upsertPipelineState({
required String assignedUserId,
required String pipelineKey,
required String step,
required Map<String, dynamic> context,
}) async {
await ensureUserExists(assignedUserId);
final DateTime now = DateTime.now().toUtc();
await _connection.execute(
Sql.named(
'''
INSERT INTO user_pipeline_state (
assigned_user_id, pipeline_key, step, context, updated_at
) VALUES (
@uid, @pipeline_key, @step, @context::jsonb, @updated_at
)
ON CONFLICT (assigned_user_id) DO UPDATE SET
pipeline_key = EXCLUDED.pipeline_key,
step = EXCLUDED.step,
context = EXCLUDED.context,
updated_at = EXCLUDED.updated_at
''',
),
parameters: <String, dynamic>{
'uid': assignedUserId,
'pipeline_key': pipelineKey,
'step': step,
'context': jsonEncode(context),
'updated_at': now,
},
);
}
/// Moves an unanswered question to the end of the user's queue.
Future<Map<String, dynamic>?> deferQuestion({
required String questionId,
required String assignedUserId,
}) async {
final DateTime now = DateTime.now().toUtc();
final ProspectiveGuessAssignmentsDb assignmentsDb =
ProspectiveGuessAssignmentsDb(_connection);
final bool pushed = await assignmentsDb.pushToBackOfQueue(
questionId: questionId,
firebaseUid: assignedUserId,
now: now,
);
if (!pushed) {
return _deferQuestionWithoutAssignment(
questionId: questionId,
assignedUserId: assignedUserId,
modifiedAt: now,
);
}
final Result result = await _connection.execute(
Sql.named(
'''
UPDATE questions q
SET modified_at = @modified_at
WHERE q.id = @id::uuid
AND q.assigned_user_id = @uid
AND q.user_response IS NULL
RETURNING q.id, q.assigned_user_id, q.question_text, q.user_response,
q.correct_answer, q.created_at, q.modified_at,
q.source_tag, q.pipeline_key, q.pipeline_step, q.metadata
''',
),
parameters: <String, dynamic>{
'id': questionId,
'uid': assignedUserId,
'modified_at': now,
},
);
if (result.isEmpty) {
return null;
}
return _rowFromResult(result.first);
}
/// Legacy defer for questions without a pending assignment row.
Future<Map<String, dynamic>?> _deferQuestionWithoutAssignment({
required String questionId,
required String assignedUserId,
required DateTime modifiedAt,
}) async {
final Result result = await _connection.execute(
Sql.named(
'''
UPDATE questions q
SET created_at = sub.max_ts + INTERVAL '1 millisecond',
modified_at = @modified_at
FROM (
SELECT COALESCE(MAX(q.created_at), @modified_at) AS max_ts
FROM questions q
WHERE q.assigned_user_id = @uid
AND q.user_response IS NULL
AND q.id <> @id::uuid
) sub
WHERE q.id = @id::uuid
AND q.assigned_user_id = @uid
AND q.user_response IS NULL
RETURNING q.id, q.assigned_user_id, q.question_text, q.user_response,
q.correct_answer, q.created_at, q.modified_at,
q.source_tag, q.pipeline_key, q.pipeline_step, q.metadata
''',
),
parameters: <String, dynamic>{
'id': questionId,
'uid': assignedUserId,
'modified_at': modifiedAt,
},
);
if (result.isEmpty) {
return null;
}
return _rowFromResult(result.first);
}
/// Count of unanswered questions assigned to [assignedUserId].
Future<int> countUnansweredQuestions(String assignedUserId) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT COUNT(*)::int AS count
FROM questions
WHERE assigned_user_id = @uid AND user_response IS NULL
''',
),
parameters: <String, dynamic>{'uid': assignedUserId},
);
return (result.first[0]! as num).toInt();
}
/// Cumulative guess score for [firebaseUid] (Firebase auth UID), persisted in
/// `user_trading_state.context.guess_score` and repaired from answer history.
Future<Map<String, dynamic>> getGuessScoreSummary(String firebaseUid) async {
await ensureUserExists(firebaseUid);
return GuessScoreStore.loadSummary(_connection, firebaseUid);
}
/// Resets guess score to a blank slate and clears all guess questions/links.
Future<Map<String, dynamic>> resetGuessScoreSummary(
String firebaseUid, {
DateTime? now,
}) async {
final DateTime tick = (now ?? DateTime.now()).toUtc();
final DateTime earliest = ProspectiveGuessSelection.earliestPlayableSlotStart(
tick,
MarketHistoryConfig.windowDays,
);
await _connection.execute(
Sql.named(
'''
DELETE FROM market_history_prospective_answers
WHERE assigned_user_id = @uid
''',
),
parameters: <String, dynamic>{'uid': firebaseUid},
);
await ProspectiveGuessAssignmentsDb(_connection).deleteAllForUser(
firebaseUid,
);
await deleteAllProspectiveGuessQuestionsForUser(firebaseUid);
await UserTradingStateDb(_connection).resetGuessScore(
firebaseUid,
slotStart: earliest,
);
return getGuessScoreSummary(firebaseUid);
}
/// Slot-progressive prospective question for [firebaseUid], or null when caught up.
Future<Map<String, dynamic>?> pickProspectiveQuestionForUser(
String firebaseUid, {
DateTime? now,
}) async {
return ProspectiveGuessSelection(connection: _connection).pickForUser(
firebaseUid,
now: now,
);
}
Future<Map<String, dynamic>> createQuestion({
required String assignedUserId,
required String questionText,
required num correctAnswer,
String? sourceTag,
String? pipelineKey,
String? pipelineStep,
Map<String, dynamic>? metadata,
}) async {
await ensureUserExists(assignedUserId);
final String id = _uuid.v4();
final DateTime now = DateTime.now().toUtc();
await _connection.execute(
Sql.named(
'''
INSERT INTO questions (
id, assigned_user_id, question_text, user_response, correct_answer,
created_at, modified_at, source_tag, pipeline_key, pipeline_step,
metadata
) VALUES (
@id::uuid, @assigned_user_id, @question_text, NULL, @correct_answer,
@created_at, @modified_at, @source_tag, @pipeline_key, @pipeline_step,
@metadata::jsonb
)
''',
),
parameters: <String, dynamic>{
'id': id,
'assigned_user_id': assignedUserId,
'question_text': questionText,
'correct_answer': correctAnswer,
'created_at': now,
'modified_at': now,
'source_tag': sourceTag,
'pipeline_key': pipelineKey,
'pipeline_step': pipelineStep,
'metadata': jsonEncode(metadata ?? <String, dynamic>{}),
},
);
return _rowToJson(
id: id,
assignedUserId: assignedUserId,
questionText: questionText,
userResponse: null,
correctAnswer: correctAnswer,
createdAt: now,
modifiedAt: now,
sourceTag: sourceTag,
pipelineKey: pipelineKey,
pipelineStep: pipelineStep,
metadata: metadata ?? <String, dynamic>{},
);
}
/// Payload sent to the Flutter client over SignalR (excludes correct answer).
Map<String, dynamic> toClientPayload(
Map<String, dynamic> question, {
required int unansweredCount,
}) {
return <String, dynamic>{
'id': question['id'],
'assignedUserId': question['assignedUserId'],
'text': question['text'],
'sentAt': question['createdAt'],
'unansweredCount': unansweredCount,
};
}
Map<String, dynamic> _rowFromResult(ResultRow row) {
final Object idValue = row[0]!;
final String id = idValue is String ? idValue : idValue.toString();
final DateTime createdAt = row[5]! as DateTime;
final DateTime modifiedAt = row[6]! as DateTime;
return _rowToJson(
id: id,
assignedUserId: row[1]! as String,
questionText: row[2]! as String,
userResponse: _readOptionalNumeric(row[3]),
correctAnswer: _readNumeric(row[4]),
createdAt: createdAt,
modifiedAt: modifiedAt,
sourceTag: row.length > 7 ? row[7] as String? : null,
pipelineKey: row.length > 8 ? row[8] as String? : null,
pipelineStep: row.length > 9 ? row[9] as String? : null,
metadata: _readJsonMap(row.length > 10 ? row[10] : null),
);
}
Map<String, dynamic> _readJsonMap(Object? value) {
if (value is Map<String, dynamic>) {
return value;
}
if (value is Map) {
return Map<String, dynamic>.from(value);
}
if (value == null) {
return <String, dynamic>{};
}
return jsonDecode(value.toString()) as Map<String, dynamic>;
}
/// Postgres NUMERIC columns may decode as [String] or [num].
static num _readNumeric(Object? value) {
if (value == null) {
return 0;
}
if (value is num) {
return value;
}
if (value is String) {
return num.parse(value);
}
return num.parse(value.toString());
}
static num? _readOptionalNumeric(Object? value) {
if (value == null) {
return null;
}
return _readNumeric(value);
}
Future<void> _gradeProspectiveAnswerIfNeeded(
Map<String, dynamic> answeredQuestion,
) async {
final Map<String, dynamic> metadata = Map<String, dynamic>.from(
answeredQuestion['metadata'] as Map<String, dynamic>? ??
<String, dynamic>{},
);
final bool isProspective = metadata['prospective_question_id'] != null ||
answeredQuestion['sourceTag'] == 'market_history:prospective';
if (!isProspective) {
return;
}
final num? userResponse =
_readOptionalNumeric(answeredQuestion['userResponse']);
if (userResponse == null) {
return;
}
final ProspectiveAnswerGrade grade = gradeProspectiveAnswer(
userResponse: userResponse,
correctAnswer: _readNumeric(answeredQuestion['correctAnswer']),
);
final String symbol = metadata['symbol'] as String? ?? '';
final String? modifiedAtWire = answeredQuestion['modifiedAt'] as String?;
final DateTime at = modifiedAtWire == null
? DateTime.now().toUtc()
: DateTime.parse(modifiedAtWire).toUtc();
await recordProspectiveGuessScore(
tradingStateDb: UserTradingStateDb(_connection),
firebaseUid: answeredQuestion['assignedUserId']! as String,
grade: grade,
symbol: symbol,
at: at,
);
}
Future<void> _recordProspectiveAnswer(Map<String, dynamic> answeredQuestion) async {
final Map<String, dynamic> metadata = Map<String, dynamic>.from(
answeredQuestion['metadata'] as Map<String, dynamic>? ??
<String, dynamic>{},
);
final String? prospectiveQuestionId =
metadata['prospective_question_id'] as String?;
if (prospectiveQuestionId == null || prospectiveQuestionId.isEmpty) {
return;
}
final num? userSliderValue =
_readOptionalNumeric(answeredQuestion['userResponse']);
if (userSliderValue == null) {
return;
}
final String questionId = answeredQuestion['id']! as String;
final String assignedUserId = answeredQuestion['assignedUserId']! as String;
final String? modifiedAtWire = answeredQuestion['modifiedAt'] as String?;
final DateTime answeredAt = modifiedAtWire == null
? DateTime.now().toUtc()
: DateTime.parse(modifiedAtWire).toUtc();
await _connection.execute(
Sql.named(
'''
INSERT INTO market_history_prospective_answers (
question_id, assigned_user_id, prospective_question_id,
symbol, older_slot_start, newer_slot_start,
expected_percent_increase_price, user_slider_value, answered_at
)
SELECT
@question_id::uuid, @assigned_user_id, p.id,
p.symbol, p.older_slot_start, p.newer_slot_start,
p.price_delta_pct, @user_slider_value, @answered_at
FROM market_history_prospective_questions p
WHERE p.id = @prospective_question_id::uuid
ON CONFLICT (question_id) DO UPDATE
SET
prospective_question_id = EXCLUDED.prospective_question_id,
symbol = EXCLUDED.symbol,
older_slot_start = EXCLUDED.older_slot_start,
newer_slot_start = EXCLUDED.newer_slot_start,
expected_percent_increase_price = EXCLUDED.expected_percent_increase_price,
user_slider_value = EXCLUDED.user_slider_value,
answered_at = EXCLUDED.answered_at
''',
),
parameters: <String, dynamic>{
'question_id': questionId,
'assigned_user_id': assignedUserId,
'prospective_question_id': prospectiveQuestionId,
'user_slider_value': userSliderValue,
'answered_at': answeredAt,
},
);
}
Map<String, dynamic> _rowToJson({
required String id,
required String assignedUserId,
required String questionText,
required Object? userResponse,
required num correctAnswer,
required DateTime createdAt,
required DateTime modifiedAt,
String? sourceTag,
String? pipelineKey,
String? pipelineStep,
Map<String, dynamic>? metadata,
}) {
return <String, dynamic>{
'id': id,
'assignedUserId': assignedUserId,
'text': questionText,
'userResponse': userResponse,
'correctAnswer': correctAnswer,
'createdAt': createdAt.toIso8601String(),
'modifiedAt': modifiedAt.toIso8601String(),
if (sourceTag != null) 'sourceTag': sourceTag,
if (pipelineKey != null) 'pipelineKey': pipelineKey,
if (pipelineStep != null) 'pipelineStep': pipelineStep,
if (metadata != null && metadata.isNotEmpty) 'metadata': metadata,
};
}
}