import 'package:postgres/postgres.dart'; import '../alpaca/alpaca_market_data_client.dart'; import '../alpaca/alpaca_models.dart'; import 'backfill_sync_item.dart'; import 'market_data_db.dart'; import 'market_history_api_rate_limiter.dart'; import 'market_history_bar_placeholder.dart'; import 'market_history_config.dart'; import 'market_history_minute_aggregate.dart'; import 'market_history_session_slot.dart'; import 'market_history_trading_calendar.dart'; import 'sync_run_recorder.dart'; import 'tradable_assets_db.dart'; /// Result of persisting one Alpaca bars batch for a single slot. class PersistBarsResult { const PersistBarsResult({ required this.written, required this.symbolsWritten, required this.notInResponse, required this.emptyInResponse, required this.wrongSlotBarTimes, }); final int written; final Set symbolsWritten; final List notInResponse; final List emptyInResponse; /// Symbol → comma-separated bar [t] values outside the requested slot. final Map wrongSlotBarTimes; /// Full error when [requested] symbols were fetched but not all persisted /// and [placeholdersWritten] did not cover the remainder. String? errorIfIncomplete({ required List requested, required DateTime slotStart, required String timeframe, int placeholdersWritten = 0, }) { if (requested.isEmpty) { return null; } final List unpersisted = requested .where((String symbol) => !symbolsWritten.contains(symbol)) .toList(growable: false); if (unpersisted.isEmpty) { return null; } if (placeholdersWritten >= unpersisted.length) { return null; } final String slotWire = MarketHistorySessionSlot.slotStartWire(slotStart); final List parts = [ 'Alpaca returned no persistable $timeframe bars', 'slot=$slotWire', 'requested=${requested.join(",")}', 'unpersisted=${unpersisted.join(",")}', ]; if (notInResponse.isNotEmpty) { parts.add('missing_from_response=${notInResponse.join(",")}'); } if (emptyInResponse.isNotEmpty) { parts.add('empty_bar_series=${emptyInResponse.join(",")}'); } if (wrongSlotBarTimes.isNotEmpty) { parts.add( 'wrong_slot_bars=${wrongSlotBarTimes.entries.map((MapEntry e) => '${e.key}:${e.value}').join("|")}', ); } if (written == 0) { parts.add('rows_written=0'); } return parts.join('; '); } } /// Outcome of one [MarketDataHistorySync.runOnce] invocation. class MarketDataHistorySyncResult { MarketDataHistorySyncResult({ required this.rowsWritten, required this.startedAt, required this.finishedAt, this.error, this.slotsSynced = 0, }); final int rowsWritten; final DateTime startedAt; final DateTime finishedAt; final String? error; /// Number of completed session-half slots written in this run. final int slotsSynced; bool get succeeded => error == null; } /// One ended UTC slot and the symbols still missing a bar for that slot. class MarketHistorySlotFetchPlan { const MarketHistorySlotFetchPlan({ required this.slotStart, required this.symbols, }); final DateTime slotStart; final List symbols; } /// Backfill: one Alpaca `1Min` range per ended session slot × symbol batch, /// aggregated into `sessionHalf` rows. /// /// Chooses work from the most recently completed slot backward until the rolling /// window is full. Only symbols missing that specific slot are requested. /// /// Throttles to [apiRequestsPerMinute]. On HTTP 429, waits [rateLimitCooldown], /// retries once, then stops the run so partial rows are kept for the next tick. class MarketDataHistorySync { MarketDataHistorySync({ required AlpacaMarketDataClient marketDataClient, required TradableAssetsDb tradableAssetsDb, required MarketDataDb marketDataDb, required Connection connection, this.batchSize = MarketHistoryConfig.historySyncBatchSize, this.maxSymbols = MarketHistoryConfig.historySyncMaxSymbols, this.windowDays = MarketHistoryConfig.windowDays, this.timeframe = MarketHistoryConfig.barTimeframe, this.feed = 'iex', int? apiRequestsPerMinute, MarketHistoryApiRateLimiter? rateLimiter, Duration rateLimitCooldown = MarketHistoryConfig.rateLimitCooldown, Future Function(Duration delay)? sleep, }) : _marketDataClient = marketDataClient, _tradableAssetsDb = tradableAssetsDb, _marketDataDb = marketDataDb, _recorder = SyncRunRecorder(connection), _rateLimiter = rateLimiter ?? MarketHistoryApiRateLimiter( requestsPerMinute: apiRequestsPerMinute ?? MarketHistoryConfig.apiRequestsPerMinute, ), _rateLimitCooldown = rateLimitCooldown, _sleep = sleep ?? Future.delayed; final AlpacaMarketDataClient _marketDataClient; final TradableAssetsDb _tradableAssetsDb; final MarketDataDb _marketDataDb; final SyncRunRecorder _recorder; final MarketHistoryApiRateLimiter _rateLimiter; final Duration _rateLimitCooldown; final Future Function(Duration delay) _sleep; final int batchSize; final int maxSymbols; final int windowDays; final String timeframe; final String feed; static const String kind = 'backfill'; Future runOnce({DateTime? now}) async { final DateTime tick = (now ?? DateTime.now()).toUtc(); final SyncRunOutcome outcome = await _recorder.record( kind, () => _syncBody(tick), now: tick, ); return MarketDataHistorySyncResult( rowsWritten: outcome.rowsWritten, startedAt: outcome.startedAt, finishedAt: outcome.finishedAt, error: outcome.error, slotsSynced: outcome.slotsSynced ?? 0, ); } /// Whether any symbol is missing a completed slot in the rolling window. Future hasPendingSlots(DateTime now) async { final List symbols = await _activeSymbols(); if (symbols.isEmpty) { return false; } final List plans = await _pendingSlotFetchPlans(now, symbols); return plans.isNotEmpty; } Future _syncBody(DateTime now) async { List symbols = await _activeSymbols(); if (symbols.isEmpty) { return const SyncRunCounts(); } final List fetchPlans = await _pendingSlotFetchPlans(now, symbols); if (fetchPlans.isEmpty) { return const SyncRunCounts(); } int rowsWritten = 0; int slotsSynced = 0; final List batchErrors = []; final Map> backfillItemsBySlot = >{}; bool stopForRateLimit = false; for (final MarketHistorySlotFetchPlan plan in fetchPlans) { if (stopForRateLimit) { break; } final DateTime slotStart = plan.slotStart; // Alpaca `end` is exclusive; use slot close, not last second of the window. final DateTime slotEnd = MarketHistorySessionSlot.endExclusive(slotStart); bool slotWrote = false; for (final List batch in chunkList(plan.symbols, batchSize)) { if (stopForRateLimit) { break; } final Set alreadySynced = await _marketDataDb.symbolsWithBarForSlot( symbols: batch, slotStart: slotStart, timeframe: timeframe, ); final List toFetch = batch .where((String symbol) => !alreadySynced.contains(symbol)) .toList(growable: false); if (toFetch.isEmpty) { continue; } backfillItemsBySlot .putIfAbsent(slotStart, () => {}) .addAll(toFetch); try { final AlpacaBarsResponse response = await _fetchBarsWithRateLimitRetry( symbols: toFetch, slotStart: slotStart, slotEnd: slotEnd, ); final PersistBarsResult persisted = await _persistBars( response: response, batch: toFetch, slotStart: slotStart, ); rowsWritten += persisted.written; final int placeholders = await _writeNoDataPlaceholders( symbols: toFetch, persisted: persisted, slotStart: slotStart, checkedAt: now, ); rowsWritten += placeholders; if (persisted.written > 0 || placeholders > 0) { slotWrote = true; } final String? emptyDataError = persisted.errorIfIncomplete( requested: toFetch, slotStart: slotStart, timeframe: timeframe, placeholdersWritten: placeholders, ); if (emptyDataError != null && !_suppressEmptyMarketError( slotStart: slotStart, placeholdersWritten: placeholders, )) { batchErrors.add(emptyDataError); } } on AlpacaMarketDataException catch (e) { if (e.isRateLimited) { batchErrors.add( 'rate limited after ${_rateLimitCooldown.inSeconds}s cooldown; ' 'partial sync saved ($rowsWritten rows); resume on next run', ); stopForRateLimit = true; break; } batchErrors.add( 'slot ${slotStart.toIso8601String()} batch ${toFetch.join(",")}: $e', ); } on Object catch (e) { batchErrors.add( 'slot ${slotStart.toIso8601String()} batch ${toFetch.join(",")}: $e', ); } } if (slotWrote) { slotsSynced++; } } return SyncRunCounts( rowsWritten: rowsWritten, error: batchErrors.isEmpty ? null : batchErrors.join('; '), slotsSynced: slotsSynced, backfillItems: _backfillItemsFromMap(backfillItemsBySlot), ); } static List _backfillItemsFromMap( Map> itemsBySlot, ) { if (itemsBySlot.isEmpty) { return []; } final List slotStarts = itemsBySlot.keys.toList() ..sort((DateTime a, DateTime b) => b.compareTo(a)); return slotStarts .map( (DateTime slotStart) => BackfillSyncItem( slotStart: slotStart, symbols: itemsBySlot[slotStart]!.toList()..sort(), ), ) .toList(growable: false); } Future _fetchBarsWithRateLimitRetry({ required List symbols, required DateTime slotStart, required DateTime slotEnd, }) async { for (int attempt = 0; attempt < 2; attempt++) { await _rateLimiter.acquire(); try { return await _marketDataClient.getBarsRange( symbols: symbols, timeframe: MarketHistoryConfig.alpacaFetchTimeframe, start: slotStart, end: slotEnd, ); } on AlpacaMarketDataException catch (e) { if (!e.isRateLimited || attempt == 1) { rethrow; } await _sleep(_rateLimitCooldown); } } throw StateError('unreachable fetch retry loop'); } Future _persistBars({ required AlpacaBarsResponse response, required List batch, required DateTime slotStart, }) async { int written = 0; final Set batchSymbols = batch.toSet(); final Set symbolsWritten = {}; final List notInResponse = []; final List emptyInResponse = []; final Map wrongSlotBarTimes = {}; final DateTime plannedSlot = MarketHistorySessionSlot.slotStartContaining(slotStart); final String slotWire = MarketHistorySessionSlot.slotStartWire(plannedSlot); for (final String symbol in batch) { if (!response.barsBySymbol.containsKey(symbol)) { notInResponse.add(symbol); continue; } final List bars = response.barsBySymbol[symbol]!; if (bars.isEmpty) { emptyInResponse.add(symbol); continue; } } for (final MapEntry> entry in response.barsBySymbol.entries) { if (!batchSymbols.contains(entry.key)) { continue; } final SessionHalfBarAggregate? aggregate = aggregateMinuteBarsForSlot( bars: entry.value, slotStart: plannedSlot, ); if (aggregate == null) { wrongSlotBarTimes[entry.key] = entry.value .map((AlpacaBar b) => MarketHistorySessionSlot.wireUtc(b.timestamp)) .join(','); continue; } await _marketDataDb.upsertSnapshot( symbol: entry.key, metric: 'bar', timeframe: timeframe, feed: feed, price: aggregate.close, volume: aggregate.volume, asOf: plannedSlot, raw: { 'o': aggregate.open, 'h': aggregate.high, 'l': aggregate.low, 'c': aggregate.close, 'v': aggregate.volume, 't': MarketHistorySessionSlot.wireUtc(aggregate.lastBarAt), 'slot_start': slotWire, 'minute_bars_count': aggregate.minuteBarCount, }, ); written++; symbolsWritten.add(entry.key); } return PersistBarsResult( written: written, symbolsWritten: symbolsWritten, notInResponse: notInResponse, emptyInResponse: emptyInResponse, wrongSlotBarTimes: wrongSlotBarTimes, ); } Future _writeNoDataPlaceholders({ required List symbols, required PersistBarsResult persisted, required DateTime slotStart, required DateTime checkedAt, }) async { final List needPlaceholder = symbols .where((String symbol) => !persisted.symbolsWritten.contains(symbol)) .toList(growable: false); if (needPlaceholder.isEmpty) { return 0; } for (final String symbol in needPlaceholder) { await _marketDataDb.upsertNoDataBarPlaceholder( symbol: symbol, slotStart: slotStart, timeframe: timeframe, feed: feed, checkedAt: checkedAt, source: MarketHistoryTradingCalendar.isLikelyNoRegularSession(slotStart) ? MarketHistoryBarPlaceholder.sourceMarketClosed : MarketHistoryBarPlaceholder.sourceAlpacaEmpty, ); } return needPlaceholder.length; } /// Suppress error on weekends/holidays when no-data placeholders were stored. static bool _suppressEmptyMarketError({ required DateTime slotStart, required int placeholdersWritten, }) { return placeholdersWritten > 0 && MarketHistoryTradingCalendar.isLikelyNoRegularSession(slotStart); } Future> _activeSymbols() async { List symbols = await _tradableAssetsDb.listActiveTradableSymbols(); if (symbols.length > maxSymbols) { symbols = symbols.sublist(0, maxSymbols); } return symbols; } /// Completed slots newest-first; each plan lists symbols missing that slot. Future> _pendingSlotFetchPlans( DateTime now, List symbols, ) async { final List completed = MarketHistorySessionSlot.completedSlotStartsInWindow(now, windowDays); if (completed.isEmpty) { return []; } final List plans = []; for (final DateTime slotStart in completed.reversed) { final Set synced = await _marketDataDb.symbolsWithBarForSlot( symbols: symbols, slotStart: slotStart, timeframe: timeframe, ); final List missing = symbols .where((String symbol) => !synced.contains(symbol)) .toList(growable: false); if (missing.isNotEmpty) { plans.add( MarketHistorySlotFetchPlan( slotStart: slotStart, symbols: missing, ), ); } } return plans; } } /// Splits [items] into consecutive chunks of at most [size]. List> chunkList(List items, int size) { if (size <= 0) { throw ArgumentError.value(size, 'size', 'must be positive'); } if (items.isEmpty) { return >[]; } final List> chunks = >[]; for (int i = 0; i < items.length; i += size) { final int end = i + size > items.length ? items.length : i + size; chunks.add(items.sublist(i, end)); } return chunks; }