import 'dart:io'; import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart' as shelf_io; import '../lib/alpaca/alpaca_assets_client.dart'; import '../lib/alpaca/alpaca_market_data_client.dart'; import '../lib/alpaca/alpaca_trading_client.dart'; import '../lib/db.dart'; import '../lib/env.dart'; import '../lib/market_history_env.dart'; import '../lib/firebase_auth.dart'; import '../lib/handlers/incoming_question_handler.dart'; import '../lib/handlers/market_history_admin_handler.dart'; import '../lib/handlers/profile_handler.dart'; import '../lib/handlers/questions_handler.dart'; import '../lib/handlers/questions_hub_handler.dart'; import '../lib/handlers/trading_dev_handler.dart'; import '../lib/pipeline/question_pipeline.dart'; import '../lib/question_service.dart'; import '../lib/questions_db.dart'; import '../lib/trading/prospective_answer_scoring.dart'; import '../lib/trading/guardrails.dart'; import '../lib/trading/market_data_db.dart'; import '../lib/trading/market_data_history.dart'; import '../lib/trading/market_history_prospective_questions.dart'; import '../lib/trading/market_history_api_rate_limiter.dart'; import '../lib/trading/market_history_query.dart'; import '../lib/trading/market_data_ingest.dart'; import '../lib/trading/market_data_retention.dart'; import '../lib/trading/market_history_admin_actions.dart'; import '../lib/trading/tradable_assets_db.dart'; import '../lib/trading/tradable_assets_sync.dart'; import '../lib/trading/trade_actuator.dart'; import '../lib/trading/trade_orders_db.dart'; import '../lib/trading/trading_config_db.dart'; import '../lib/trading/trading_dev_actions.dart'; import '../lib/trading/trading_orchestrator.dart'; import '../lib/trading/trading_pipeline.dart'; import '../lib/trading/user_trading_state_db.dart'; import '../lib/workers/market_history_scheduler.dart'; import '../lib/workers/market_history_scheduler_config.dart'; import '../lib/workers/question_background_worker.dart'; Future main() async { final Directory serverRoot = Directory.current; if (!File('migrations/001_users.sql').existsSync()) { final String cwd = serverRoot.path; stderr.writeln( 'Run the API from the server/ directory (cd server && dart run bin/server.dart). ' 'Current directory: $cwd', ); exit(1); } final ServerEnv env = ServerEnv.load(); ProspectiveAnswerScoring.closenessExtraPointsEnabled = env.prospectiveAnswerClosenessEnabled; final ProfileDb db = await ProfileDb.connect(env.databaseUrl); await db.migrate(); final FirebaseAuthVerifier auth = FirebaseAuthVerifier(env.firebaseWebApiKey); final QuestionsDb questionsDb = QuestionsDb(db.connection); final QuestionService questionService = QuestionService( questionsDb: questionsDb, hubConnections: questionsHubConnections, ); // Trading wiring (gated by TRADING_ENABLED). The trading pipeline plugs // into QuestionPipeline.onAnswerSubmitted so +10/-10 answers stage orders. // When QUESTION_PIPELINE_TEST_MODE=true, the actuator runs without the // Alpaca trading client (test_accepted rows) and ingest is skipped. TradingPipeline? tradingPipeline; TradingOrchestrator? tradingOrchestrator; TradingDevActions? tradingDevActions; MarketHistoryScheduler? marketHistoryScheduler; MarketHistoryAdminActions? marketHistoryAdminActions; AlpacaMarketDataClient? alpacaMarketDataClient; AlpacaTradingClient? alpacaTradingClient; if (env.tradingEnabled) { final MarketDataDb marketDataDb = MarketDataDb(db.connection); final TradingConfigDb tradingConfigDb = TradingConfigDb(db.connection); final TradeOrdersDb tradeOrdersDb = TradeOrdersDb(db.connection); final UserTradingStateDb tradingStateDb = UserTradingStateDb(db.connection); final MarketHistoryEnv mh = env.marketHistory; tradingPipeline = TradingPipeline( questionsDb: questionsDb, questionService: questionService, marketDataDb: marketDataDb, tradingConfigDb: tradingConfigDb, tradingStateDb: tradingStateDb, marketHistoryQuery: MarketHistoryQuery(connection: db.connection), marketHistoryEnv: mh, guardrails: Guardrails(allowLive: env.alpaca.allowLive), ); final bool useRealAlpaca = !env.questionPipelineTestMode && env.alpaca.hasCredentials; MarketDataIngest? marketDataIngest; if (useRealAlpaca && env.tradingWorkerIngestEnabled) { alpacaMarketDataClient = AlpacaMarketDataClient(env: env.alpaca); marketDataIngest = MarketDataIngest( marketDataDb: marketDataDb, tradingStateDb: tradingStateDb, alpacaClient: alpacaMarketDataClient, ); } if (useRealAlpaca) { alpacaTradingClient = AlpacaTradingClient(env: env.alpaca); } final TradeActuator tradeActuator = TradeActuator( tradingConfigDb: tradingConfigDb, tradingStateDb: tradingStateDb, tradeOrdersDb: tradeOrdersDb, questionsDb: questionsDb, guardrails: Guardrails(allowLive: env.alpaca.allowLive), alpacaClient: alpacaTradingClient, ); tradingOrchestrator = TradingOrchestrator( questionsDb: questionsDb, tradingConfigDb: tradingConfigDb, pipeline: tradingPipeline, actuator: tradeActuator, ingest: marketDataIngest, ingestEnabled: env.tradingWorkerIngestEnabled, evalEnabled: env.tradingWorkerEvalEnabled, ); if (env.tradingDevEndpointsEnabled) { tradingDevActions = TradingDevActions( questionsDb: questionsDb, marketDataDb: marketDataDb, tradingConfigDb: tradingConfigDb, tradingPipeline: tradingPipeline, ); } if (useRealAlpaca && (env.marketHistorySyncEnabled || env.adminPortalEnabled)) { final TradableAssetsDb tradableAssetsDb = TradableAssetsDb(db.connection); final AlpacaAssetsClient assetsClient = AlpacaAssetsClient(env: env.alpaca); final MarketHistoryApiRateLimiter historyRateLimiter = MarketHistoryApiRateLimiter( requestsPerMinute: mh.apiRequestsPerMinute, ); final AlpacaMarketDataClient historyMarketDataClient = AlpacaMarketDataClient(env: env.alpaca); final TradableAssetsSync tradableAssetsSync = TradableAssetsSync( assetsClient: assetsClient, assetsDb: tradableAssetsDb, connection: db.connection, ); final MarketDataHistorySync historySync = MarketDataHistorySync( marketDataClient: historyMarketDataClient, apiRequestsPerMinute: mh.apiRequestsPerMinute, tradableAssetsDb: tradableAssetsDb, marketDataDb: marketDataDb, connection: db.connection, batchSize: mh.historySyncBatchSize, maxSymbols: mh.historySyncMaxSymbols, windowDays: mh.windowDays, ); final MarketDataRetention retention = MarketDataRetention( connection: db.connection, windowDays: mh.retentionDays, ); final MarketHistoryProspectiveQuestions prospectiveQuestions = MarketHistoryProspectiveQuestions(connection: db.connection); if (env.marketHistorySyncEnabled) { marketHistoryScheduler = MarketHistoryScheduler( connection: db.connection, config: MarketHistorySchedulerConfig( universeRefreshHours: mh.universeRefreshHours, historySyncHours: mh.historySyncHours, cleanupHours: mh.cleanupHours, syncHourUtc: mh.syncHourUtc, staleSyncRunMinutes: mh.staleSyncRunMinutes, ), runUniverse: (DateTime now) => tradableAssetsSync.runOnce(now: now), runBackfill: (DateTime now) => historySync.runOnce(now: now), backfillIsDue: historySync.hasPendingSlots, runCleanup: (DateTime now) => retention.run(archive: mh.archiveEnabled, now: now), runProspectiveQuestions: (DateTime now) => prospectiveQuestions.refresh( now: now, windowDays: mh.windowDays, retentionDays: mh.retentionDays, ), ); } if (env.adminPortalEnabled) { if (env.marketHistorySyncEnabled) { marketHistoryAdminActions = MarketHistoryAdminActions( connection: db.connection, runUniverse: (DateTime now) => tradableAssetsSync.runOnce(now: now), runBackfill: (DateTime now) => historySync.runOnce(now: now), runCleanup: (DateTime now, bool archive, int windowDays) => retention.run(archive: archive, now: now, windowDays: windowDays), defaultArchiveEnabled: mh.archiveEnabled, defaultWindowDays: mh.windowDays, ); } else { stderr.writeln( 'Admin portal on-demand sync/cleanup disabled: ' 'MARKET_HISTORY_SYNC_ENABLED=false.', ); } } } else if (env.adminPortalEnabled) { stderr.writeln( 'Admin portal on-demand sync/cleanup unavailable: requires ' 'TRADING_ENABLED=true, Alpaca credentials, and ' 'QUESTION_PIPELINE_TEST_MODE=false.', ); } } final QuestionPipeline questionPipeline = QuestionPipeline( questionsDb: questionsDb, questionService: questionService, tradingPipeline: tradingPipeline, testMode: env.questionPipelineTestMode, ); QuestionBackgroundWorker? backgroundWorker; if (env.questionWorkerEnabled) { backgroundWorker = QuestionBackgroundWorker( pipeline: questionPipeline, interval: Duration(seconds: env.questionWorkerIntervalSeconds), tradingOrchestrator: tradingOrchestrator, marketHistoryScheduler: marketHistoryScheduler, ); backgroundWorker.start(); } final Handler profile = profileHandler(db: db, auth: auth); final Handler questionsHub = questionsHubHandler( auth: auth, questionService: questionService, ); final Handler incomingQuestion = incomingQuestionHandler( auth: auth, questionsDb: questionsDb, ); final Handler questions = questionsHandler( auth: auth, questionsDb: questionsDb, questionService: questionService, questionPipeline: questionPipeline, ); final Handler? tradingDev = tradingDevActions == null ? null : tradingDevHandler(auth: auth, devActions: tradingDevActions); final Handler? marketHistoryAdmin = env.adminPortalEnabled ? marketHistoryAdminHandler( auth: auth, connection: db.connection, adminFirebaseUids: env.adminFirebaseUids, actions: marketHistoryAdminActions, portalConfig: MarketHistoryAdminPortalConfig( archiveEnabled: env.marketHistory.archiveEnabled, windowDays: env.marketHistory.windowDays, retentionDays: env.marketHistory.retentionDays, syncEnabled: env.marketHistorySyncEnabled, ), ) : null; final Handler handler = Pipeline() .addMiddleware(logRequests()) .addHandler((Request request) { final String path = request.requestedUri.path; if (path.startsWith(questionsHubPath)) { return questionsHub(request); } if (path == '/v1/me/incoming-question') { return incomingQuestion(request); } if (tradingDev != null && path.startsWith(tradingDevBasePath)) { return tradingDev(request); } if (marketHistoryAdmin != null && path.startsWith('/v1/admin')) { return marketHistoryAdmin(request); } if (path.startsWith(questionsBasePath)) { return questions(request); } return profile(request); }); final HttpServer server = await shelf_io.serve( handler, InternetAddress.anyIPv4, env.port, ); stdout.writeln( 'Cyber Hybrid Hub API listening on http://localhost:${server.port}', ); }