456 lines
14 KiB
Dart
456 lines
14 KiB
Dart
import 'dart:async';
|
|
import 'dart:io';
|
|
|
|
import '../pipeline/branch_decision.dart';
|
|
import '../pipeline/question_pipeline.dart' show PipelineKeys, TradingPhases;
|
|
import '../question_service.dart';
|
|
import '../questions_db.dart';
|
|
import 'guardrails.dart';
|
|
import 'market_data_db.dart';
|
|
import '../market_history_env.dart';
|
|
import 'market_history_query.dart';
|
|
import 'prospective_answer_scoring.dart';
|
|
import 'rule_engine.dart';
|
|
import 'symbol_obfuscator.dart';
|
|
import 'trading_config.dart';
|
|
import 'trading_config_db.dart';
|
|
import 'user_trading_state_db.dart';
|
|
|
|
/// Result of one [TradingPipeline.evaluate] cycle for a single user.
|
|
class TradingEvaluationResult {
|
|
TradingEvaluationResult({
|
|
required this.questionsCreated,
|
|
required this.rulesFired,
|
|
required this.rulesSkipped,
|
|
});
|
|
|
|
final int questionsCreated;
|
|
final List<String> rulesFired;
|
|
final List<String> rulesSkipped;
|
|
}
|
|
|
|
/// Bridges the rule engine, market-data snapshots, and the existing
|
|
/// question delivery pipeline. Lives next to other pipeline branches in
|
|
/// [QuestionPipeline.onAnswerSubmitted] under `pipeline_key=trading`.
|
|
class TradingPipeline {
|
|
TradingPipeline({
|
|
required QuestionsDb questionsDb,
|
|
required QuestionService questionService,
|
|
required MarketDataDb marketDataDb,
|
|
required TradingConfigDb tradingConfigDb,
|
|
required UserTradingStateDb tradingStateDb,
|
|
RuleEngine? ruleEngine,
|
|
MarketHistoryQuery? marketHistoryQuery,
|
|
MarketHistoryEnv? marketHistoryEnv,
|
|
SymbolObfuscator? symbolObfuscator,
|
|
Guardrails? guardrails,
|
|
int maxQueuedQuestions = 3,
|
|
DateTime Function()? clock,
|
|
}) : _questionsDb = questionsDb,
|
|
_questionService = questionService,
|
|
_marketDataDb = marketDataDb,
|
|
_tradingConfigDb = tradingConfigDb,
|
|
_tradingStateDb = tradingStateDb,
|
|
_ruleEngine = ruleEngine ?? RuleEngine(clock: clock),
|
|
_marketHistoryQuery = marketHistoryQuery,
|
|
_marketHistoryEnv = marketHistoryEnv ?? MarketHistoryEnv.fromMap(<String, String>{}),
|
|
_symbolObfuscator = symbolObfuscator ?? SymbolObfuscator(),
|
|
_guardrails = guardrails ?? Guardrails(),
|
|
_maxQueuedQuestions = maxQueuedQuestions,
|
|
_clock = clock ?? DateTime.now;
|
|
|
|
final QuestionsDb _questionsDb;
|
|
final QuestionService _questionService;
|
|
final MarketDataDb _marketDataDb;
|
|
final TradingConfigDb _tradingConfigDb;
|
|
final UserTradingStateDb _tradingStateDb;
|
|
final RuleEngine _ruleEngine;
|
|
final MarketHistoryQuery? _marketHistoryQuery;
|
|
final MarketHistoryEnv _marketHistoryEnv;
|
|
final SymbolObfuscator _symbolObfuscator;
|
|
final Guardrails _guardrails;
|
|
final int _maxQueuedQuestions;
|
|
final DateTime Function() _clock;
|
|
|
|
/// Runs all enabled rules for [firebaseUid] against the latest snapshots.
|
|
///
|
|
/// For each rule that fires and passes the "queue room + cooldown" checks,
|
|
/// creates a `pipeline_key=trading` question via [QuestionService] and
|
|
/// records the rule's `await_confirm` state in `user_trading_state.context`.
|
|
///
|
|
/// Pre-trade [Guardrails] are NOT enforced here — those run in [handleAnswer]
|
|
/// and again in the actuator. The only "block" at this stage is the queue
|
|
/// limit and the per-rule daily cooldown.
|
|
Future<TradingEvaluationResult> evaluate(String firebaseUid) async {
|
|
final List<String> fired = <String>[];
|
|
final List<String> skipped = <String>[];
|
|
|
|
final EffectiveTradingConfig? config =
|
|
await _tradingConfigDb.resolveEffectiveConfig(firebaseUid);
|
|
if (config == null || !config.enabled) {
|
|
return TradingEvaluationResult(
|
|
questionsCreated: 0,
|
|
rulesFired: fired,
|
|
rulesSkipped: skipped,
|
|
);
|
|
}
|
|
|
|
final DateTime now = _clock().toUtc();
|
|
int questionsCreated = 0;
|
|
|
|
for (final TradingRuleConfig rule in config.rules) {
|
|
try {
|
|
if (await _ruleHasOpenQuestion(firebaseUid, rule.id)) {
|
|
skipped.add('${rule.id}(open_question)');
|
|
continue;
|
|
}
|
|
|
|
final int queued =
|
|
await _questionsDb.countUnansweredQuestions(firebaseUid);
|
|
if (queued >= _maxQueuedQuestions) {
|
|
skipped.add('${rule.id}(queue_full)');
|
|
continue;
|
|
}
|
|
|
|
final DateTime? lastFiredAt =
|
|
await _tradingStateDb.getRuleLastFiredAt(firebaseUid, rule.id);
|
|
|
|
final RuleEvaluation result;
|
|
if (rule.type == 'guess_weekly_move') {
|
|
result = await _evaluateGuessRule(
|
|
firebaseUid: firebaseUid,
|
|
rule: rule,
|
|
lastFiredAt: lastFiredAt,
|
|
now: now,
|
|
);
|
|
} else {
|
|
final Map<String, MarketDataSnapshot> snapshots =
|
|
await _loadSnapshotsForRule(rule);
|
|
result = _ruleEngine.evaluate(
|
|
rule: rule,
|
|
snapshots: snapshots,
|
|
lastFiredAt: lastFiredAt,
|
|
now: now,
|
|
);
|
|
}
|
|
|
|
if (!result.fired) {
|
|
skipped.add('${rule.id}(${result.skipReason?.name ?? 'no_fire'})');
|
|
continue;
|
|
}
|
|
|
|
final String phase = rule.type == 'guess_weekly_move'
|
|
? TradingPhases.awaitAnswer
|
|
: TradingPhases.awaitConfirm;
|
|
final num correctAnswer =
|
|
result.correctAnswer ?? (rule.type == 'guess_weekly_move' ? 10 : 10);
|
|
|
|
final Map<String, dynamic> question =
|
|
await _questionService.createAndDeliverQuestion(
|
|
assignedUserId: firebaseUid,
|
|
questionText: result.questionText!,
|
|
correctAnswer: correctAnswer,
|
|
sourceTag: 'trading:rule:${rule.id}',
|
|
pipelineKey: PipelineKeys.trading,
|
|
pipelineStep: '${rule.id}:$phase',
|
|
metadata: result.guessSymbol == null
|
|
? null
|
|
: <String, dynamic>{'guess_symbol': result.guessSymbol},
|
|
);
|
|
questionsCreated++;
|
|
fired.add(rule.id);
|
|
|
|
if (rule.type == 'guess_weekly_move' && result.guessSymbol != null) {
|
|
await _tradingStateDb.recordGuessSymbolPicked(
|
|
firebaseUid: firebaseUid,
|
|
symbol: result.guessSymbol!,
|
|
at: now,
|
|
);
|
|
}
|
|
|
|
await _tradingStateDb.setRuleState(
|
|
firebaseUid: firebaseUid,
|
|
ruleId: rule.id,
|
|
state: <String, dynamic>{
|
|
'phase': phase,
|
|
'last_fired_at': now.toIso8601String(),
|
|
'question_id': question['id'],
|
|
'symbol': result.guessSymbol ?? rule.symbol,
|
|
'observed_price': result.observedPrice,
|
|
'ref_price': result.refPrice,
|
|
'pct': result.pricePct,
|
|
if (result.symbolToken != null) 'symbol_token': result.symbolToken,
|
|
},
|
|
);
|
|
} catch (e, st) {
|
|
stderr.writeln(
|
|
'TradingPipeline.evaluate rule=${rule.id} uid=$firebaseUid: $e\n$st',
|
|
);
|
|
skipped.add('${rule.id}(error)');
|
|
}
|
|
}
|
|
|
|
return TradingEvaluationResult(
|
|
questionsCreated: questionsCreated,
|
|
rulesFired: fired,
|
|
rulesSkipped: skipped,
|
|
);
|
|
}
|
|
|
|
/// Handles an answered `pipeline_key=trading` question.
|
|
///
|
|
/// `+10` (yes) → stages a pending order (no Alpaca POST yet — Step 9).
|
|
/// Anything else → logs a skip and clears the rule's `await_confirm` state.
|
|
Future<void> handleAnswer({
|
|
required String firebaseUid,
|
|
required Map<String, dynamic> answeredQuestion,
|
|
required num userResponse,
|
|
}) async {
|
|
final String? pipelineStep =
|
|
answeredQuestion['pipelineStep'] as String?;
|
|
if (pipelineStep == null) {
|
|
return;
|
|
}
|
|
final List<String> parts = pipelineStep.split(':');
|
|
if (parts.length < 2) {
|
|
return;
|
|
}
|
|
final String phase = parts[1];
|
|
if (phase != TradingPhases.awaitConfirm &&
|
|
phase != TradingPhases.awaitAnswer) {
|
|
return;
|
|
}
|
|
final String ruleId = parts.first;
|
|
final num correctAnswer = answeredQuestion['correctAnswer'] as num? ?? 10;
|
|
final String questionId = answeredQuestion['id']! as String;
|
|
final DateTime now = _clock().toUtc();
|
|
|
|
final EffectiveTradingConfig? config =
|
|
await _tradingConfigDb.resolveEffectiveConfig(firebaseUid);
|
|
if (config == null) {
|
|
return;
|
|
}
|
|
TradingRuleConfig? rule;
|
|
for (final TradingRuleConfig r in config.rules) {
|
|
if (r.id == ruleId) {
|
|
rule = r;
|
|
break;
|
|
}
|
|
}
|
|
if (rule == null) {
|
|
return;
|
|
}
|
|
|
|
final Map<String, dynamic>? priorState =
|
|
await _tradingStateDb.getRuleState(firebaseUid, ruleId);
|
|
|
|
if (rule.type == 'guess_weekly_move') {
|
|
final Map<String, dynamic> metadata = Map<String, dynamic>.from(
|
|
answeredQuestion['metadata'] as Map<String, dynamic>? ??
|
|
<String, dynamic>{},
|
|
);
|
|
// Login/bootstrap prospective questions are graded in [QuestionsDb.submitAnswer].
|
|
if (metadata['prospective_question_id'] == null) {
|
|
await _handleGuessAnswer(
|
|
firebaseUid: firebaseUid,
|
|
rule: rule,
|
|
questionId: questionId,
|
|
userResponse: userResponse,
|
|
correctAnswer: correctAnswer,
|
|
priorState: priorState,
|
|
now: now,
|
|
);
|
|
}
|
|
return;
|
|
}
|
|
|
|
final BranchOutcome outcome = BranchDecision.yesNo(
|
|
userResponse: userResponse,
|
|
correctAnswer: correctAnswer,
|
|
);
|
|
final Map<String, dynamic> baseState = <String, dynamic>{
|
|
...?priorState,
|
|
};
|
|
|
|
if (outcome == BranchOutcome.match) {
|
|
final Map<String, dynamic>? match =
|
|
rule.onAnswerMatch is Map<String, dynamic>
|
|
? rule.onAnswerMatch
|
|
: null;
|
|
final String side = (match?['side'] as String?) ?? 'buy';
|
|
final num notional =
|
|
(match?['notional_usd'] as num?) ?? 10;
|
|
final String clientOrderId = '$firebaseUid-$ruleId-$questionId';
|
|
|
|
await _tradingStateDb.addPendingOrder(
|
|
firebaseUid: firebaseUid,
|
|
order: <String, dynamic>{
|
|
'rule_id': ruleId,
|
|
'question_id': questionId,
|
|
'symbol': rule.symbol,
|
|
'side': side,
|
|
'order_type': 'market',
|
|
'notional_usd': notional,
|
|
'client_order_id': clientOrderId,
|
|
'staged_at': now.toIso8601String(),
|
|
},
|
|
);
|
|
|
|
baseState['phase'] = TradingPhases.submitOrder;
|
|
baseState['question_id'] = questionId;
|
|
baseState['answer'] = 'yes';
|
|
baseState['answered_at'] = now.toIso8601String();
|
|
await _tradingStateDb.setRuleState(
|
|
firebaseUid: firebaseUid,
|
|
ruleId: ruleId,
|
|
state: baseState,
|
|
);
|
|
} else {
|
|
await _tradingStateDb.recordSkip(
|
|
firebaseUid: firebaseUid,
|
|
ruleId: ruleId,
|
|
questionId: questionId,
|
|
at: now,
|
|
);
|
|
|
|
baseState['phase'] = TradingPhases.done;
|
|
baseState['question_id'] = questionId;
|
|
baseState['answer'] = 'no';
|
|
baseState['answered_at'] = now.toIso8601String();
|
|
await _tradingStateDb.setRuleState(
|
|
firebaseUid: firebaseUid,
|
|
ruleId: ruleId,
|
|
state: baseState,
|
|
);
|
|
}
|
|
}
|
|
|
|
Future<RuleEvaluation> _evaluateGuessRule({
|
|
required String firebaseUid,
|
|
required TradingRuleConfig rule,
|
|
required DateTime? lastFiredAt,
|
|
required DateTime now,
|
|
}) async {
|
|
if (_marketHistoryQuery == null) {
|
|
return RuleEvaluation(
|
|
rule: rule,
|
|
fired: false,
|
|
skipReason: RuleSkipReason.insufficientBars,
|
|
);
|
|
}
|
|
|
|
final List<WeeklyMover> movers = await _marketHistoryQuery.weeklyMovers(
|
|
asOf: now,
|
|
minBars: _marketHistoryEnv.minBarsForGuess,
|
|
windowDays: _marketHistoryEnv.windowDays,
|
|
);
|
|
if (movers.isEmpty) {
|
|
return RuleEvaluation(
|
|
rule: rule,
|
|
fired: false,
|
|
skipReason: RuleSkipReason.insufficientBars,
|
|
);
|
|
}
|
|
|
|
final List<String> universe =
|
|
movers.map((WeeklyMover m) => m.symbol).toList();
|
|
WeeklyMover? picked;
|
|
String? token;
|
|
for (final WeeklyMover mover in movers) {
|
|
final bool onCooldown = await _tradingStateDb.isGuessSymbolOnCooldown(
|
|
firebaseUid: firebaseUid,
|
|
symbol: mover.symbol,
|
|
now: now,
|
|
cooldownHours: _marketHistoryEnv.guessCooldownHours,
|
|
);
|
|
if (onCooldown) {
|
|
continue;
|
|
}
|
|
picked = mover;
|
|
token = _symbolObfuscator.tokenFor(mover.symbol, universe);
|
|
break;
|
|
}
|
|
|
|
return _ruleEngine.evaluateGuessWeeklyMove(
|
|
rule: rule,
|
|
mover: picked,
|
|
symbolToken: token,
|
|
lastFiredAt: lastFiredAt,
|
|
now: now,
|
|
);
|
|
}
|
|
|
|
Future<void> _handleGuessAnswer({
|
|
required String firebaseUid,
|
|
required TradingRuleConfig rule,
|
|
required String questionId,
|
|
required num userResponse,
|
|
required num correctAnswer,
|
|
required Map<String, dynamic>? priorState,
|
|
required DateTime now,
|
|
}) async {
|
|
final ProspectiveAnswerGrade grade = gradeProspectiveAnswer(
|
|
userResponse: userResponse,
|
|
correctAnswer: correctAnswer,
|
|
);
|
|
final String symbol =
|
|
(priorState?['symbol'] as String?) ?? rule.symbol;
|
|
|
|
await recordProspectiveGuessScore(
|
|
tradingStateDb: _tradingStateDb,
|
|
firebaseUid: firebaseUid,
|
|
grade: grade,
|
|
symbol: symbol,
|
|
at: now,
|
|
);
|
|
|
|
final Map<String, dynamic> baseState = <String, dynamic>{
|
|
...?priorState,
|
|
'phase': TradingPhases.done,
|
|
'question_id': questionId,
|
|
'answer': grade.sameDirection ? 'match' : 'miss',
|
|
'score_delta': grade.answerScore,
|
|
'direction_point': grade.directionPoint,
|
|
'closeness_point': grade.closenessPoint,
|
|
'answer_score': grade.answerScore,
|
|
'error_abs': grade.absError,
|
|
'answered_at': now.toIso8601String(),
|
|
};
|
|
await _tradingStateDb.setRuleState(
|
|
firebaseUid: firebaseUid,
|
|
ruleId: rule.id,
|
|
state: baseState,
|
|
);
|
|
}
|
|
|
|
Future<Map<String, MarketDataSnapshot>> _loadSnapshotsForRule(
|
|
TradingRuleConfig rule,
|
|
) async {
|
|
final List<String> metrics = <String>{'last_trade', rule.refMetric}.toList();
|
|
final Map<String, MarketDataSnapshot> result =
|
|
<String, MarketDataSnapshot>{};
|
|
for (final String metric in metrics) {
|
|
final MarketDataSnapshot? snap =
|
|
await _marketDataDb.latestForSymbol(rule.symbol, metric);
|
|
if (snap != null) {
|
|
result[metric] = snap;
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
Future<bool> _ruleHasOpenQuestion(
|
|
String firebaseUid,
|
|
String ruleId,
|
|
) async {
|
|
final List<Map<String, dynamic>> open =
|
|
await _questionsDb.listUnansweredQuestions(firebaseUid);
|
|
return open.any((Map<String, dynamic> q) =>
|
|
q['pipelineKey'] == PipelineKeys.trading &&
|
|
(q['pipelineStep'] as String? ?? '').startsWith('$ruleId:'));
|
|
}
|
|
|
|
/// Exposed for the actuator (Step 9) and tests.
|
|
Guardrails get guardrails => _guardrails;
|
|
}
|