cyberhybridhub/server/lib/trading/trading_pipeline.dart

292 lines
9.4 KiB
Dart

import 'dart:async';
import 'dart:io';
import '../pipeline/branch_decision.dart';
import '../pipeline/question_pipeline.dart' show PipelineKeys, TradingPhases;
import '../question_service.dart';
import '../questions_db.dart';
import 'guardrails.dart';
import 'market_data_db.dart';
import 'rule_engine.dart';
import 'trading_config.dart';
import 'trading_config_db.dart';
import 'user_trading_state_db.dart';
/// Result of one [TradingPipeline.evaluate] cycle for a single user.
class TradingEvaluationResult {
TradingEvaluationResult({
required this.questionsCreated,
required this.rulesFired,
required this.rulesSkipped,
});
final int questionsCreated;
final List<String> rulesFired;
final List<String> rulesSkipped;
}
/// Bridges the rule engine, market-data snapshots, and the existing
/// question delivery pipeline. Lives next to other pipeline branches in
/// [QuestionPipeline.onAnswerSubmitted] under `pipeline_key=trading`.
class TradingPipeline {
TradingPipeline({
required QuestionsDb questionsDb,
required QuestionService questionService,
required MarketDataDb marketDataDb,
required TradingConfigDb tradingConfigDb,
required UserTradingStateDb tradingStateDb,
RuleEngine? ruleEngine,
Guardrails? guardrails,
int maxQueuedQuestions = 3,
DateTime Function()? clock,
}) : _questionsDb = questionsDb,
_questionService = questionService,
_marketDataDb = marketDataDb,
_tradingConfigDb = tradingConfigDb,
_tradingStateDb = tradingStateDb,
_ruleEngine = ruleEngine ?? RuleEngine(),
_guardrails = guardrails ?? Guardrails(),
_maxQueuedQuestions = maxQueuedQuestions,
_clock = clock ?? DateTime.now;
final QuestionsDb _questionsDb;
final QuestionService _questionService;
final MarketDataDb _marketDataDb;
final TradingConfigDb _tradingConfigDb;
final UserTradingStateDb _tradingStateDb;
final RuleEngine _ruleEngine;
final Guardrails _guardrails;
final int _maxQueuedQuestions;
final DateTime Function() _clock;
/// Runs all enabled rules for [firebaseUid] against the latest snapshots.
///
/// For each rule that fires and passes the "queue room + cooldown" checks,
/// creates a `pipeline_key=trading` question via [QuestionService] and
/// records the rule's `await_confirm` state in `user_trading_state.context`.
///
/// Pre-trade [Guardrails] are NOT enforced here — those run in [handleAnswer]
/// and again in the actuator. The only "block" at this stage is the queue
/// limit and the per-rule daily cooldown.
Future<TradingEvaluationResult> evaluate(String firebaseUid) async {
final List<String> fired = <String>[];
final List<String> skipped = <String>[];
final EffectiveTradingConfig? config =
await _tradingConfigDb.resolveEffectiveConfig(firebaseUid);
if (config == null || !config.enabled) {
return TradingEvaluationResult(
questionsCreated: 0,
rulesFired: fired,
rulesSkipped: skipped,
);
}
final DateTime now = _clock().toUtc();
int questionsCreated = 0;
for (final TradingRuleConfig rule in config.rules) {
try {
if (await _ruleHasOpenQuestion(firebaseUid, rule.id)) {
skipped.add('${rule.id}(open_question)');
continue;
}
final int queued =
await _questionsDb.countUnansweredQuestions(firebaseUid);
if (queued >= _maxQueuedQuestions) {
skipped.add('${rule.id}(queue_full)');
continue;
}
final Map<String, MarketDataSnapshot> snapshots =
await _loadSnapshotsForRule(rule);
final DateTime? lastFiredAt =
await _tradingStateDb.getRuleLastFiredAt(firebaseUid, rule.id);
final RuleEvaluation result = _ruleEngine.evaluate(
rule: rule,
snapshots: snapshots,
lastFiredAt: lastFiredAt,
now: now,
);
if (!result.fired) {
skipped.add('${rule.id}(${result.skipReason?.name ?? 'no_fire'})');
continue;
}
final Map<String, dynamic> question =
await _questionService.createAndDeliverQuestion(
assignedUserId: firebaseUid,
questionText: result.questionText!,
correctAnswer: 10,
sourceTag: 'trading:rule:${rule.id}',
pipelineKey: PipelineKeys.trading,
pipelineStep: '${rule.id}:${TradingPhases.awaitConfirm}',
);
questionsCreated++;
fired.add(rule.id);
await _tradingStateDb.setRuleState(
firebaseUid: firebaseUid,
ruleId: rule.id,
state: <String, dynamic>{
'phase': TradingPhases.awaitConfirm,
'last_fired_at': now.toIso8601String(),
'question_id': question['id'],
'symbol': rule.symbol,
'observed_price': result.observedPrice,
'ref_price': result.refPrice,
'pct': result.pricePct,
},
);
} catch (e, st) {
stderr.writeln(
'TradingPipeline.evaluate rule=${rule.id} uid=$firebaseUid: $e\n$st',
);
skipped.add('${rule.id}(error)');
}
}
return TradingEvaluationResult(
questionsCreated: questionsCreated,
rulesFired: fired,
rulesSkipped: skipped,
);
}
/// Handles an answered `pipeline_key=trading` question.
///
/// `+10` (yes) → stages a pending order (no Alpaca POST yet — Step 9).
/// Anything else → logs a skip and clears the rule's `await_confirm` state.
Future<void> handleAnswer({
required String firebaseUid,
required Map<String, dynamic> answeredQuestion,
required num userResponse,
}) async {
final String? pipelineStep =
answeredQuestion['pipelineStep'] as String?;
if (pipelineStep == null) {
return;
}
final List<String> parts = pipelineStep.split(':');
if (parts.length < 2 || parts[1] != TradingPhases.awaitConfirm) {
return;
}
final String ruleId = parts.first;
final num correctAnswer = answeredQuestion['correctAnswer'] as num? ?? 10;
final String questionId = answeredQuestion['id']! as String;
final DateTime now = _clock().toUtc();
final EffectiveTradingConfig? config =
await _tradingConfigDb.resolveEffectiveConfig(firebaseUid);
if (config == null) {
return;
}
TradingRuleConfig? rule;
for (final TradingRuleConfig r in config.rules) {
if (r.id == ruleId) {
rule = r;
break;
}
}
if (rule == null) {
return;
}
final BranchOutcome outcome = BranchDecision.yesNo(
userResponse: userResponse,
correctAnswer: correctAnswer,
);
final Map<String, dynamic>? priorState =
await _tradingStateDb.getRuleState(firebaseUid, ruleId);
final Map<String, dynamic> baseState = <String, dynamic>{
...?priorState,
};
if (outcome == BranchOutcome.match) {
final Map<String, dynamic>? match =
rule.onAnswerMatch is Map<String, dynamic>
? rule.onAnswerMatch
: null;
final String side = (match?['side'] as String?) ?? 'buy';
final num notional =
(match?['notional_usd'] as num?) ?? 10;
final String clientOrderId = '$firebaseUid-$ruleId-$questionId';
await _tradingStateDb.addPendingOrder(
firebaseUid: firebaseUid,
order: <String, dynamic>{
'rule_id': ruleId,
'question_id': questionId,
'symbol': rule.symbol,
'side': side,
'order_type': 'market',
'notional_usd': notional,
'client_order_id': clientOrderId,
'staged_at': now.toIso8601String(),
},
);
baseState['phase'] = TradingPhases.submitOrder;
baseState['question_id'] = questionId;
baseState['answer'] = 'yes';
baseState['answered_at'] = now.toIso8601String();
await _tradingStateDb.setRuleState(
firebaseUid: firebaseUid,
ruleId: ruleId,
state: baseState,
);
} else {
await _tradingStateDb.recordSkip(
firebaseUid: firebaseUid,
ruleId: ruleId,
questionId: questionId,
at: now,
);
baseState['phase'] = TradingPhases.done;
baseState['question_id'] = questionId;
baseState['answer'] = 'no';
baseState['answered_at'] = now.toIso8601String();
await _tradingStateDb.setRuleState(
firebaseUid: firebaseUid,
ruleId: ruleId,
state: baseState,
);
}
}
Future<Map<String, MarketDataSnapshot>> _loadSnapshotsForRule(
TradingRuleConfig rule,
) async {
final List<String> metrics = <String>{'last_trade', rule.refMetric}.toList();
final Map<String, MarketDataSnapshot> result =
<String, MarketDataSnapshot>{};
for (final String metric in metrics) {
final MarketDataSnapshot? snap =
await _marketDataDb.latestForSymbol(rule.symbol, metric);
if (snap != null) {
result[metric] = snap;
}
}
return result;
}
Future<bool> _ruleHasOpenQuestion(
String firebaseUid,
String ruleId,
) async {
final List<Map<String, dynamic>> open =
await _questionsDb.listUnansweredQuestions(firebaseUid);
return open.any((Map<String, dynamic> q) =>
q['pipelineKey'] == PipelineKeys.trading &&
(q['pipelineStep'] as String? ?? '').startsWith('$ruleId:'));
}
/// Exposed for the actuator (Step 9) and tests.
Guardrails get guardrails => _guardrails;
}