cyberhybridhub/server/lib/trading/market_data_history.dart
2026-05-31 12:40:54 -05:00

519 lines
16 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'package:postgres/postgres.dart';
import '../alpaca/alpaca_market_data_client.dart';
import '../alpaca/alpaca_models.dart';
import 'backfill_sync_item.dart';
import 'market_data_db.dart';
import 'market_history_api_rate_limiter.dart';
import 'market_history_bar_placeholder.dart';
import 'market_history_config.dart';
import 'market_history_minute_aggregate.dart';
import 'market_history_session_slot.dart';
import 'market_history_trading_calendar.dart';
import 'sync_run_recorder.dart';
import 'tradable_assets_db.dart';
/// Result of persisting one Alpaca bars batch for a single slot.
class PersistBarsResult {
const PersistBarsResult({
required this.written,
required this.symbolsWritten,
required this.notInResponse,
required this.emptyInResponse,
required this.wrongSlotBarTimes,
});
final int written;
final Set<String> symbolsWritten;
final List<String> notInResponse;
final List<String> emptyInResponse;
/// Symbol → comma-separated bar [t] values outside the requested slot.
final Map<String, String> wrongSlotBarTimes;
/// Full error when [requested] symbols were fetched but not all persisted
/// and [placeholdersWritten] did not cover the remainder.
String? errorIfIncomplete({
required List<String> requested,
required DateTime slotStart,
required String timeframe,
int placeholdersWritten = 0,
}) {
if (requested.isEmpty) {
return null;
}
final List<String> unpersisted = requested
.where((String symbol) => !symbolsWritten.contains(symbol))
.toList(growable: false);
if (unpersisted.isEmpty) {
return null;
}
if (placeholdersWritten >= unpersisted.length) {
return null;
}
final String slotWire = MarketHistorySessionSlot.slotStartWire(slotStart);
final List<String> parts = <String>[
'Alpaca returned no persistable $timeframe bars',
'slot=$slotWire',
'requested=${requested.join(",")}',
'unpersisted=${unpersisted.join(",")}',
];
if (notInResponse.isNotEmpty) {
parts.add('missing_from_response=${notInResponse.join(",")}');
}
if (emptyInResponse.isNotEmpty) {
parts.add('empty_bar_series=${emptyInResponse.join(",")}');
}
if (wrongSlotBarTimes.isNotEmpty) {
parts.add(
'wrong_slot_bars=${wrongSlotBarTimes.entries.map((MapEntry<String, String> e) => '${e.key}:${e.value}').join("|")}',
);
}
if (written == 0) {
parts.add('rows_written=0');
}
return parts.join('; ');
}
}
/// Outcome of one [MarketDataHistorySync.runOnce] invocation.
class MarketDataHistorySyncResult {
MarketDataHistorySyncResult({
required this.rowsWritten,
required this.startedAt,
required this.finishedAt,
this.error,
this.slotsSynced = 0,
});
final int rowsWritten;
final DateTime startedAt;
final DateTime finishedAt;
final String? error;
/// Number of completed session-half slots written in this run.
final int slotsSynced;
bool get succeeded => error == null;
}
/// One ended UTC slot and the symbols still missing a bar for that slot.
class MarketHistorySlotFetchPlan {
const MarketHistorySlotFetchPlan({
required this.slotStart,
required this.symbols,
});
final DateTime slotStart;
final List<String> symbols;
}
/// Backfill: one Alpaca `1Min` range per ended session slot × symbol batch,
/// aggregated into `sessionHalf` rows.
///
/// Chooses work from the most recently completed slot backward until the rolling
/// window is full. Only symbols missing that specific slot are requested.
///
/// Throttles to [apiRequestsPerMinute]. On HTTP 429, waits [rateLimitCooldown],
/// retries once, then stops the run so partial rows are kept for the next tick.
class MarketDataHistorySync {
MarketDataHistorySync({
required AlpacaMarketDataClient marketDataClient,
required TradableAssetsDb tradableAssetsDb,
required MarketDataDb marketDataDb,
required Connection connection,
this.batchSize = MarketHistoryConfig.historySyncBatchSize,
this.maxSymbols = MarketHistoryConfig.historySyncMaxSymbols,
this.windowDays = MarketHistoryConfig.windowDays,
this.timeframe = MarketHistoryConfig.barTimeframe,
this.feed = 'iex',
int? apiRequestsPerMinute,
MarketHistoryApiRateLimiter? rateLimiter,
Duration rateLimitCooldown = MarketHistoryConfig.rateLimitCooldown,
Future<void> Function(Duration delay)? sleep,
}) : _marketDataClient = marketDataClient,
_tradableAssetsDb = tradableAssetsDb,
_marketDataDb = marketDataDb,
_recorder = SyncRunRecorder(connection),
_rateLimiter = rateLimiter ??
MarketHistoryApiRateLimiter(
requestsPerMinute:
apiRequestsPerMinute ?? MarketHistoryConfig.apiRequestsPerMinute,
),
_rateLimitCooldown = rateLimitCooldown,
_sleep = sleep ?? Future<void>.delayed;
final AlpacaMarketDataClient _marketDataClient;
final TradableAssetsDb _tradableAssetsDb;
final MarketDataDb _marketDataDb;
final SyncRunRecorder _recorder;
final MarketHistoryApiRateLimiter _rateLimiter;
final Duration _rateLimitCooldown;
final Future<void> Function(Duration delay) _sleep;
final int batchSize;
final int maxSymbols;
final int windowDays;
final String timeframe;
final String feed;
static const String kind = 'backfill';
Future<MarketDataHistorySyncResult> runOnce({DateTime? now}) async {
final DateTime tick = (now ?? DateTime.now()).toUtc();
final SyncRunOutcome outcome = await _recorder.record(
kind,
() => _syncBody(tick),
now: tick,
);
return MarketDataHistorySyncResult(
rowsWritten: outcome.rowsWritten,
startedAt: outcome.startedAt,
finishedAt: outcome.finishedAt,
error: outcome.error,
slotsSynced: outcome.slotsSynced ?? 0,
);
}
/// Whether any symbol is missing a completed slot in the rolling window.
Future<bool> hasPendingSlots(DateTime now) async {
final List<String> symbols = await _activeSymbols();
if (symbols.isEmpty) {
return false;
}
final List<MarketHistorySlotFetchPlan> plans =
await _pendingSlotFetchPlans(now, symbols);
return plans.isNotEmpty;
}
Future<SyncRunCounts> _syncBody(DateTime now) async {
List<String> symbols = await _activeSymbols();
if (symbols.isEmpty) {
return const SyncRunCounts();
}
final List<MarketHistorySlotFetchPlan> fetchPlans =
await _pendingSlotFetchPlans(now, symbols);
if (fetchPlans.isEmpty) {
return const SyncRunCounts();
}
int rowsWritten = 0;
int slotsSynced = 0;
final List<String> batchErrors = <String>[];
final Map<DateTime, Set<String>> backfillItemsBySlot = <DateTime, Set<String>>{};
bool stopForRateLimit = false;
for (final MarketHistorySlotFetchPlan plan in fetchPlans) {
if (stopForRateLimit) {
break;
}
final DateTime slotStart = plan.slotStart;
// Alpaca `end` is exclusive; use slot close, not last second of the window.
final DateTime slotEnd = MarketHistorySessionSlot.endExclusive(slotStart);
bool slotWrote = false;
for (final List<String> batch in chunkList(plan.symbols, batchSize)) {
if (stopForRateLimit) {
break;
}
final Set<String> alreadySynced =
await _marketDataDb.symbolsWithBarForSlot(
symbols: batch,
slotStart: slotStart,
timeframe: timeframe,
);
final List<String> toFetch = batch
.where((String symbol) => !alreadySynced.contains(symbol))
.toList(growable: false);
if (toFetch.isEmpty) {
continue;
}
backfillItemsBySlot
.putIfAbsent(slotStart, () => <String>{})
.addAll(toFetch);
try {
final AlpacaBarsResponse response = await _fetchBarsWithRateLimitRetry(
symbols: toFetch,
slotStart: slotStart,
slotEnd: slotEnd,
);
final PersistBarsResult persisted = await _persistBars(
response: response,
batch: toFetch,
slotStart: slotStart,
);
rowsWritten += persisted.written;
final int placeholders = await _writeNoDataPlaceholders(
symbols: toFetch,
persisted: persisted,
slotStart: slotStart,
checkedAt: now,
);
rowsWritten += placeholders;
if (persisted.written > 0 || placeholders > 0) {
slotWrote = true;
}
final String? emptyDataError = persisted.errorIfIncomplete(
requested: toFetch,
slotStart: slotStart,
timeframe: timeframe,
placeholdersWritten: placeholders,
);
if (emptyDataError != null &&
!_suppressEmptyMarketError(
slotStart: slotStart,
placeholdersWritten: placeholders,
)) {
batchErrors.add(emptyDataError);
}
} on AlpacaMarketDataException catch (e) {
if (e.isRateLimited) {
batchErrors.add(
'rate limited after ${_rateLimitCooldown.inSeconds}s cooldown; '
'partial sync saved ($rowsWritten rows); resume on next run',
);
stopForRateLimit = true;
break;
}
batchErrors.add(
'slot ${slotStart.toIso8601String()} batch ${toFetch.join(",")}: $e',
);
} on Object catch (e) {
batchErrors.add(
'slot ${slotStart.toIso8601String()} batch ${toFetch.join(",")}: $e',
);
}
}
if (slotWrote) {
slotsSynced++;
}
}
return SyncRunCounts(
rowsWritten: rowsWritten,
error: batchErrors.isEmpty ? null : batchErrors.join('; '),
slotsSynced: slotsSynced,
backfillItems: _backfillItemsFromMap(backfillItemsBySlot),
);
}
static List<BackfillSyncItem> _backfillItemsFromMap(
Map<DateTime, Set<String>> itemsBySlot,
) {
if (itemsBySlot.isEmpty) {
return <BackfillSyncItem>[];
}
final List<DateTime> slotStarts = itemsBySlot.keys.toList()
..sort((DateTime a, DateTime b) => b.compareTo(a));
return slotStarts
.map(
(DateTime slotStart) => BackfillSyncItem(
slotStart: slotStart,
symbols: itemsBySlot[slotStart]!.toList()..sort(),
),
)
.toList(growable: false);
}
Future<AlpacaBarsResponse> _fetchBarsWithRateLimitRetry({
required List<String> symbols,
required DateTime slotStart,
required DateTime slotEnd,
}) async {
for (int attempt = 0; attempt < 2; attempt++) {
await _rateLimiter.acquire();
try {
return await _marketDataClient.getBarsRange(
symbols: symbols,
timeframe: MarketHistoryConfig.alpacaFetchTimeframe,
start: slotStart,
end: slotEnd,
);
} on AlpacaMarketDataException catch (e) {
if (!e.isRateLimited || attempt == 1) {
rethrow;
}
await _sleep(_rateLimitCooldown);
}
}
throw StateError('unreachable fetch retry loop');
}
Future<PersistBarsResult> _persistBars({
required AlpacaBarsResponse response,
required List<String> batch,
required DateTime slotStart,
}) async {
int written = 0;
final Set<String> batchSymbols = batch.toSet();
final Set<String> symbolsWritten = <String>{};
final List<String> notInResponse = <String>[];
final List<String> emptyInResponse = <String>[];
final Map<String, String> wrongSlotBarTimes = <String, String>{};
final DateTime plannedSlot =
MarketHistorySessionSlot.slotStartContaining(slotStart);
final String slotWire = MarketHistorySessionSlot.slotStartWire(plannedSlot);
for (final String symbol in batch) {
if (!response.barsBySymbol.containsKey(symbol)) {
notInResponse.add(symbol);
continue;
}
final List<AlpacaBar> bars = response.barsBySymbol[symbol]!;
if (bars.isEmpty) {
emptyInResponse.add(symbol);
continue;
}
}
for (final MapEntry<String, List<AlpacaBar>> entry
in response.barsBySymbol.entries) {
if (!batchSymbols.contains(entry.key)) {
continue;
}
final SessionHalfBarAggregate? aggregate = aggregateMinuteBarsForSlot(
bars: entry.value,
slotStart: plannedSlot,
);
if (aggregate == null) {
wrongSlotBarTimes[entry.key] = entry.value
.map((AlpacaBar b) => MarketHistorySessionSlot.wireUtc(b.timestamp))
.join(',');
continue;
}
await _marketDataDb.upsertSnapshot(
symbol: entry.key,
metric: 'bar',
timeframe: timeframe,
feed: feed,
price: aggregate.close,
volume: aggregate.volume,
asOf: plannedSlot,
raw: <String, dynamic>{
'o': aggregate.open,
'h': aggregate.high,
'l': aggregate.low,
'c': aggregate.close,
'v': aggregate.volume,
't': MarketHistorySessionSlot.wireUtc(aggregate.lastBarAt),
'slot_start': slotWire,
'minute_bars_count': aggregate.minuteBarCount,
},
);
written++;
symbolsWritten.add(entry.key);
}
return PersistBarsResult(
written: written,
symbolsWritten: symbolsWritten,
notInResponse: notInResponse,
emptyInResponse: emptyInResponse,
wrongSlotBarTimes: wrongSlotBarTimes,
);
}
Future<int> _writeNoDataPlaceholders({
required List<String> symbols,
required PersistBarsResult persisted,
required DateTime slotStart,
required DateTime checkedAt,
}) async {
final List<String> needPlaceholder = symbols
.where((String symbol) => !persisted.symbolsWritten.contains(symbol))
.toList(growable: false);
if (needPlaceholder.isEmpty) {
return 0;
}
for (final String symbol in needPlaceholder) {
await _marketDataDb.upsertNoDataBarPlaceholder(
symbol: symbol,
slotStart: slotStart,
timeframe: timeframe,
feed: feed,
checkedAt: checkedAt,
source: MarketHistoryTradingCalendar.isLikelyNoRegularSession(slotStart)
? MarketHistoryBarPlaceholder.sourceMarketClosed
: MarketHistoryBarPlaceholder.sourceAlpacaEmpty,
);
}
return needPlaceholder.length;
}
/// Suppress error on weekends/holidays when no-data placeholders were stored.
static bool _suppressEmptyMarketError({
required DateTime slotStart,
required int placeholdersWritten,
}) {
return placeholdersWritten > 0 &&
MarketHistoryTradingCalendar.isLikelyNoRegularSession(slotStart);
}
Future<List<String>> _activeSymbols() async {
List<String> symbols = await _tradableAssetsDb.listActiveTradableSymbols();
if (symbols.length > maxSymbols) {
symbols = symbols.sublist(0, maxSymbols);
}
return symbols;
}
/// Completed slots newest-first; each plan lists symbols missing that slot.
Future<List<MarketHistorySlotFetchPlan>> _pendingSlotFetchPlans(
DateTime now,
List<String> symbols,
) async {
final List<DateTime> completed =
MarketHistorySessionSlot.completedSlotStartsInWindow(now, windowDays);
if (completed.isEmpty) {
return <MarketHistorySlotFetchPlan>[];
}
final List<MarketHistorySlotFetchPlan> plans = <MarketHistorySlotFetchPlan>[];
for (final DateTime slotStart in completed.reversed) {
final Set<String> synced = await _marketDataDb.symbolsWithBarForSlot(
symbols: symbols,
slotStart: slotStart,
timeframe: timeframe,
);
final List<String> missing = symbols
.where((String symbol) => !synced.contains(symbol))
.toList(growable: false);
if (missing.isNotEmpty) {
plans.add(
MarketHistorySlotFetchPlan(
slotStart: slotStart,
symbols: missing,
),
);
}
}
return plans;
}
}
/// Splits [items] into consecutive chunks of at most [size].
List<List<T>> chunkList<T>(List<T> items, int size) {
if (size <= 0) {
throw ArgumentError.value(size, 'size', 'must be positive');
}
if (items.isEmpty) {
return <List<T>>[];
}
final List<List<T>> chunks = <List<T>>[];
for (int i = 0; i < items.length; i += size) {
final int end = i + size > items.length ? items.length : i + size;
chunks.add(items.sublist(i, end));
}
return chunks;
}