import 'dart:convert'; import 'package:postgres/postgres.dart'; import 'market_history_bar_placeholder.dart'; import 'market_history_four_hour_slot.dart'; /// Normalized market data row persisted for rule evaluation. class MarketDataSnapshot { MarketDataSnapshot({ required this.symbol, required this.metric, required this.asOf, this.id, this.assetClass = 'us_equity', this.feed = 'iex', this.price, this.volume, this.raw, this.createdAt, }); final int? id; final String symbol; final String assetClass; final String feed; final String metric; final num? price; final num? volume; final DateTime asOf; final Map? raw; final DateTime? createdAt; } /// Postgres access for [market_data_snapshots]. class MarketDataDb { MarketDataDb(this._connection); final Connection _connection; Future insertSnapshot({ required String symbol, required String metric, required DateTime asOf, String assetClass = 'us_equity', String feed = 'iex', String timeframe = 'tick', num? price, num? volume, Map? raw, }) { return upsertSnapshot( symbol: symbol, metric: metric, asOf: asOf, assetClass: assetClass, feed: feed, timeframe: timeframe, price: price, volume: volume, raw: raw, ); } /// Idempotent write keyed by `(symbol, metric, timeframe, as_of)`. Future upsertSnapshot({ required String symbol, required String metric, required DateTime asOf, String assetClass = 'us_equity', String feed = 'iex', String timeframe = 'tick', num? price, num? volume, Map? raw, }) async { final Result result = await _connection.execute( Sql.named( ''' INSERT INTO market_data_snapshots ( symbol, asset_class, feed, metric, timeframe, price, volume, as_of, raw ) VALUES ( @symbol, @asset_class, @feed, @metric, @timeframe, @price, @volume, @as_of, @raw::jsonb ) ON CONFLICT (symbol, metric, timeframe, as_of) DO UPDATE SET price = EXCLUDED.price, volume = EXCLUDED.volume, raw = EXCLUDED.raw RETURNING id, symbol, asset_class, feed, metric, price, volume, as_of, raw, created_at ''', ), parameters: { 'symbol': symbol, 'asset_class': assetClass, 'feed': feed, 'metric': metric, 'timeframe': timeframe, 'price': price, 'volume': volume, 'as_of': asOf.toUtc(), 'raw': raw == null ? null : jsonEncode(raw), }, ); return _rowToSnapshot(result.first); } /// Tombstone when Alpaca has no 4Hour bar for [symbol] at [slotStart]. /// /// Counts toward backfill gap checks but not game/calendar bar coverage. Future upsertNoDataBarPlaceholder({ required String symbol, required DateTime slotStart, required String timeframe, required DateTime checkedAt, String assetClass = 'us_equity', String feed = 'iex', String source = MarketHistoryBarPlaceholder.sourceAlpacaEmpty, }) async { final DateTime slot = MarketHistoryFourHourSlot.slotStartContaining(slotStart); final String slotWire = MarketHistoryFourHourSlot.slotStartWire(slot); return upsertSnapshot( symbol: symbol, metric: 'bar', timeframe: timeframe, asOf: slot, assetClass: assetClass, feed: feed, raw: { 'slot_start': slotWire, MarketHistoryBarPlaceholder.rawKey: true, 'source': source, 'checked_at': MarketHistoryFourHourSlot.wireUtc(checkedAt), }, ); } /// Daily (or intraday) bars for [symbol] in [`since`, `until`). Future> barsForSymbol({ required String symbol, required String timeframe, required DateTime since, required DateTime until, String metric = 'bar', }) async { final Result result = await _connection.execute( Sql.named( ''' SELECT id, symbol, asset_class, feed, metric, price, volume, as_of, raw, created_at FROM market_data_snapshots WHERE symbol = @symbol AND metric = @metric AND timeframe = @timeframe AND as_of >= @since AND as_of < @until ORDER BY as_of ASC ''', ), parameters: { 'symbol': symbol, 'metric': metric, 'timeframe': timeframe, 'since': since.toUtc(), 'until': until.toUtc(), }, ); if (result.isEmpty) { return []; } return result.map(_rowToSnapshot).toList(growable: false); } /// Newest `as_of` for historical bars, or `null` on cold start. Future latestSyncedAsOf( String symbol, String timeframe, { String metric = 'bar', }) async { final Result result = await _connection.execute( Sql.named( ''' SELECT as_of FROM market_data_snapshots WHERE symbol = @symbol AND metric = @metric AND timeframe = @timeframe ORDER BY as_of DESC LIMIT 1 ''', ), parameters: { 'symbol': symbol, 'metric': metric, 'timeframe': timeframe, }, ); if (result.isEmpty) { return null; } return (result.first[0]! as DateTime).toUtc(); } /// Symbols from [symbols] that already have a bar for the UTC slot [slotStart]. /// /// A row counts when [raw.slot_start] matches the canonical wire form, when /// [raw.slot_start] or [as_of] bucket to the same UTC 4-hour boundary as /// [slotStart] (same rule as [MarketHistoryFourHourSlot.slotStartContaining]). Future> symbolsWithBarForSlot({ required List symbols, required DateTime slotStart, required String timeframe, String metric = 'bar', }) async { if (symbols.isEmpty) { return {}; } final DateTime start = MarketHistoryFourHourSlot.slotStartContaining(slotStart); final String slotStartWire = MarketHistoryFourHourSlot.slotStartWire(start); final Result result = await _connection.execute( Sql.named( ''' SELECT DISTINCT symbol FROM market_data_snapshots WHERE metric = @metric AND timeframe = @timeframe AND symbol = ANY(@symbols) AND ( raw->>'slot_start' = @slot_start_wire OR ( raw->>'slot_start' IS NOT NULL AND ${_slotStartBucketSql('(raw->>\'slot_start\')::timestamptz')} = @slot_start ) OR ${_slotStartBucketSql('as_of')} = @slot_start ) ''', ), parameters: { 'metric': metric, 'timeframe': timeframe, 'symbols': symbols, 'slot_start_wire': slotStartWire, 'slot_start': start, }, ); return result .map((ResultRow row) => row[0]! as String) .toSet(); } /// UTC 4-hour slot left edge for [timestampExpr] (timestamptz SQL expression). static String _slotStartBucketSql(String timestampExpr) { return ''' ( date_trunc('day', $timestampExpr AT TIME ZONE 'UTC') + (div(extract(hour from $timestampExpr AT TIME ZONE 'UTC')::int, 4) * 4) * interval '1 hour' ) AT TIME ZONE 'UTC' '''; } /// Newest snapshot for [symbol] and [metric] by [as_of]. Future latestForSymbol( String symbol, String metric, ) async { final Result result = await _connection.execute( Sql.named( ''' SELECT id, symbol, asset_class, feed, metric, price, volume, as_of, raw, created_at FROM market_data_snapshots WHERE symbol = @symbol AND metric = @metric ORDER BY as_of DESC LIMIT 1 ''', ), parameters: { 'symbol': symbol, 'metric': metric, }, ); if (result.isEmpty) { return null; } return _rowToSnapshot(result.first); } MarketDataSnapshot _rowToSnapshot(ResultRow row) { final Object? rawValue = row[8]; Map? raw; if (rawValue is Map) { raw = rawValue; } else if (rawValue != null) { raw = jsonDecode(rawValue.toString()) as Map; } return MarketDataSnapshot( id: (row[0]! as num).toInt(), symbol: row[1]! as String, assetClass: row[2]! as String, feed: row[3]! as String, metric: row[4]! as String, price: readOptionalNumeric(row[5]), volume: readOptionalNumeric(row[6]), asOf: (row[7]! as DateTime).toUtc(), raw: raw, createdAt: (row[9]! as DateTime).toUtc(), ); } static num? readOptionalNumeric(Object? value) { if (value == null) { return null; } if (value is num) { return value; } if (value is String) { return num.parse(value); } return num.parse(value.toString()); } }