import 'dart:async'; import 'dart:io'; import '../alpaca/alpaca_models.dart'; import '../alpaca/alpaca_trading_client.dart'; import '../questions_db.dart'; import 'guardrails.dart'; import 'trade_orders_db.dart'; import 'trading_config.dart'; import 'trading_config_db.dart'; import 'user_trading_state_db.dart'; /// Outcome of one [TradeActuator.processPendingOrders] run for a user. class TradeActuatorResult { TradeActuatorResult({ required this.submitted, required this.rejected, required this.errors, }); /// `client_order_id`s for orders successfully POSTed (or test-mode shortcut). final List submitted; /// Orders blocked by guardrails. Same `client_order_id` removed from pending. final List rejected; /// Free-form error notes (Alpaca 5xx, DB issues, …). Pending row left in place /// so the next worker tick can retry. final List errors; } class TradeActuatorRejection { TradeActuatorRejection({ required this.clientOrderId, required this.reason, this.detail, }); final String clientOrderId; final GuardrailRejectionReason reason; final String? detail; } /// Drains `pending_orders` from `user_trading_state.context`, applies pre-trade /// [Guardrails], submits to Alpaca (paper) or short-circuits in test mode, and /// persists the resulting `trade_orders` row. /// /// **Test mode**: when [alpacaClient] is null, no HTTP is performed; a row is /// still inserted with `alpaca_order_id = 'test-'` and /// `status = 'test_accepted'`. This is what the worker uses when /// `QUESTION_PIPELINE_TEST_MODE=true`. class TradeActuator { TradeActuator({ required TradingConfigDb tradingConfigDb, required UserTradingStateDb tradingStateDb, required TradeOrdersDb tradeOrdersDb, required QuestionsDb questionsDb, required Guardrails guardrails, AlpacaTradingClient? alpacaClient, DateTime Function()? clock, }) : _tradingConfigDb = tradingConfigDb, _tradingStateDb = tradingStateDb, _tradeOrdersDb = tradeOrdersDb, _questionsDb = questionsDb, _guardrails = guardrails, _alpacaClient = alpacaClient, _clock = clock ?? DateTime.now; final TradingConfigDb _tradingConfigDb; final UserTradingStateDb _tradingStateDb; final TradeOrdersDb _tradeOrdersDb; final QuestionsDb _questionsDb; final Guardrails _guardrails; final AlpacaTradingClient? _alpacaClient; final DateTime Function() _clock; bool get isTestMode => _alpacaClient == null; Future processPendingOrders(String firebaseUid) async { final List submitted = []; final List rejected = []; final List errors = []; final EffectiveTradingConfig? config = await _tradingConfigDb.resolveEffectiveConfig(firebaseUid); if (config == null) { return TradeActuatorResult( submitted: submitted, rejected: rejected, errors: errors, ); } final List> pending = await _tradingStateDb.listPendingOrders(firebaseUid); for (final Map order in pending) { final String clientOrderId = order['client_order_id']! as String; try { final _OrderProcessing decision = await _processOne( firebaseUid: firebaseUid, config: config, order: order, ); if (decision.success) { submitted.add(clientOrderId); await _tradingStateDb.removePendingOrder( firebaseUid: firebaseUid, clientOrderId: clientOrderId, ); } else if (decision.rejection != null) { rejected.add(decision.rejection!); await _tradingStateDb.removePendingOrder( firebaseUid: firebaseUid, clientOrderId: clientOrderId, ); } else if (decision.error != null) { errors.add('${clientOrderId}: ${decision.error}'); } } catch (e, st) { errors.add('${clientOrderId}: $e'); stderr.writeln( 'TradeActuator.processPendingOrders uid=$firebaseUid ' 'client_order_id=$clientOrderId: $e\n$st', ); } } return TradeActuatorResult( submitted: submitted, rejected: rejected, errors: errors, ); } Future<_OrderProcessing> _processOne({ required String firebaseUid, required EffectiveTradingConfig config, required Map order, }) async { final String clientOrderId = order['client_order_id']! as String; final String symbol = order['symbol']! as String; final String side = order['side']! as String; final String orderType = order['order_type'] as String? ?? 'market'; final num notional = (order['notional_usd'] as num?) ?? 0; final String? questionId = order['question_id'] as String?; final String? ruleId = order['rule_id'] as String?; // Idempotency: existing trade_orders row → skip POST, count as submitted. final TradeOrder? existing = await _tradeOrdersDb.findByClientOrderId(clientOrderId); if (existing != null) { return _OrderProcessing.success(); } // Guardrail aggregates from already-submitted orders. final DateTime now = _clock().toUtc(); final DateTime startOfDayUtc = DateTime.utc(now.year, now.month, now.day); final DateTime windowStart = now.subtract(_guardrails.windowDuration); final int dailyOrderCount = await _tradeOrdersDb.countOrdersSince(firebaseUid, startOfDayUtc); final num notionalInWindow = await _tradeOrdersDb.notionalUsdInWindow(firebaseUid, windowStart); // Question gating: we treat this order as "answered" because TradingPipeline // only stages an order after the user matches the confirming answer. The // remaining check is whether any *other* trading question is still open. final bool hasOtherUnanswered = questionId == null ? false : await _hasOtherUnansweredTradingQuestion(firebaseUid, questionId); final GuardrailDecision decision = _guardrails.check( config: config, symbol: symbol, notionalUsd: notional, dailyOrderCount: dailyOrderCount, notionalUsdInWindow: notionalInWindow, hasUnansweredQuestion: hasOtherUnanswered, questionAnswered: questionId != null, ); if (!decision.allowed) { return _OrderProcessing.rejected( TradeActuatorRejection( clientOrderId: clientOrderId, reason: decision.reason!, detail: decision.detail, ), ); } if (isTestMode) { await _tradeOrdersDb.insertOrder( firebaseUid: firebaseUid, clientOrderId: clientOrderId, symbol: symbol, side: side, orderType: orderType, status: 'test_accepted', alpacaOrderId: 'test-$clientOrderId', notionalUsd: notional, questionId: questionId, ruleId: ruleId, raw: { 'mode': 'test', 'config_mode': config.mode, 'submitted_at': now.toIso8601String(), }, ); return _OrderProcessing.success(); } return _submitToAlpaca( firebaseUid: firebaseUid, order: order, config: config, now: now, ); } Future<_OrderProcessing> _submitToAlpaca({ required String firebaseUid, required Map order, required EffectiveTradingConfig config, required DateTime now, }) async { final String clientOrderId = order['client_order_id']! as String; final String symbol = order['symbol']! as String; final String side = order['side']! as String; final String orderType = order['order_type'] as String? ?? 'market'; final num notional = (order['notional_usd'] as num?) ?? 0; final String? questionId = order['question_id'] as String?; final String? ruleId = order['rule_id'] as String?; final AlpacaOrderRequest request = AlpacaOrderRequest( symbol: symbol, side: side, type: orderType, timeInForce: 'day', clientOrderId: clientOrderId, notional: notional, ); AlpacaOrderResponse? response; try { response = await _alpacaClient!.submitOrder(request); } on AlpacaTradingDuplicateClientOrderIdException { response = await _alpacaClient!.getOrderByClientOrderId(clientOrderId); if (response == null) { return _OrderProcessing.error('duplicate id but no order on Alpaca'); } } on AlpacaTradingException catch (e) { return _OrderProcessing.error(e.message); } await _tradeOrdersDb.insertOrder( firebaseUid: firebaseUid, clientOrderId: clientOrderId, symbol: symbol, side: side, orderType: orderType, status: response.status, alpacaOrderId: response.id, notionalUsd: notional, questionId: questionId, ruleId: ruleId, raw: { 'mode': config.mode, 'alpaca': response.raw, 'submitted_at': now.toIso8601String(), }, ); return _OrderProcessing.success(); } Future _hasOtherUnansweredTradingQuestion( String firebaseUid, String currentQuestionId, ) async { final List> open = await _questionsDb.listUnansweredQuestions(firebaseUid); for (final Map q in open) { if (q['pipelineKey'] != 'trading') continue; if (q['id'] == currentQuestionId) continue; return true; } return false; } } class _OrderProcessing { _OrderProcessing._({this.success_ = false, this.rejection, this.error}); factory _OrderProcessing.success() => _OrderProcessing._(success_: true); factory _OrderProcessing.rejected(TradeActuatorRejection rejection) => _OrderProcessing._(rejection: rejection); factory _OrderProcessing.error(String message) => _OrderProcessing._(error: message); final bool success_; final TradeActuatorRejection? rejection; final String? error; bool get success => success_; }