309 lines
8.4 KiB
Dart
309 lines
8.4 KiB
Dart
import 'dart:convert';
|
|
|
|
import 'package:postgres/postgres.dart';
|
|
|
|
import 'market_history_bar_placeholder.dart';
|
|
import 'market_history_session_slot.dart';
|
|
|
|
/// Normalized market data row persisted for rule evaluation.
|
|
class MarketDataSnapshot {
|
|
MarketDataSnapshot({
|
|
required this.symbol,
|
|
required this.metric,
|
|
required this.asOf,
|
|
this.id,
|
|
this.assetClass = 'us_equity',
|
|
this.feed = 'iex',
|
|
this.price,
|
|
this.volume,
|
|
this.raw,
|
|
this.createdAt,
|
|
});
|
|
|
|
final int? id;
|
|
final String symbol;
|
|
final String assetClass;
|
|
final String feed;
|
|
final String metric;
|
|
final num? price;
|
|
final num? volume;
|
|
final DateTime asOf;
|
|
final Map<String, dynamic>? raw;
|
|
final DateTime? createdAt;
|
|
}
|
|
|
|
/// Postgres access for [market_data_snapshots].
|
|
class MarketDataDb {
|
|
MarketDataDb(this._connection);
|
|
|
|
final Connection _connection;
|
|
|
|
Future<MarketDataSnapshot> insertSnapshot({
|
|
required String symbol,
|
|
required String metric,
|
|
required DateTime asOf,
|
|
String assetClass = 'us_equity',
|
|
String feed = 'iex',
|
|
String timeframe = 'tick',
|
|
num? price,
|
|
num? volume,
|
|
Map<String, dynamic>? raw,
|
|
}) {
|
|
return upsertSnapshot(
|
|
symbol: symbol,
|
|
metric: metric,
|
|
asOf: asOf,
|
|
assetClass: assetClass,
|
|
feed: feed,
|
|
timeframe: timeframe,
|
|
price: price,
|
|
volume: volume,
|
|
raw: raw,
|
|
);
|
|
}
|
|
|
|
/// Idempotent write keyed by `(symbol, metric, timeframe, as_of)`.
|
|
Future<MarketDataSnapshot> upsertSnapshot({
|
|
required String symbol,
|
|
required String metric,
|
|
required DateTime asOf,
|
|
String assetClass = 'us_equity',
|
|
String feed = 'iex',
|
|
String timeframe = 'tick',
|
|
num? price,
|
|
num? volume,
|
|
Map<String, dynamic>? raw,
|
|
}) async {
|
|
final Result result = await _connection.execute(
|
|
Sql.named(
|
|
'''
|
|
INSERT INTO market_data_snapshots (
|
|
symbol, asset_class, feed, metric, timeframe, price, volume, as_of, raw
|
|
) VALUES (
|
|
@symbol, @asset_class, @feed, @metric, @timeframe,
|
|
@price, @volume, @as_of, @raw::jsonb
|
|
)
|
|
ON CONFLICT (symbol, metric, timeframe, as_of) DO UPDATE SET
|
|
price = EXCLUDED.price,
|
|
volume = EXCLUDED.volume,
|
|
raw = EXCLUDED.raw
|
|
RETURNING id, symbol, asset_class, feed, metric, price, volume, as_of, raw, created_at
|
|
''',
|
|
),
|
|
parameters: <String, dynamic>{
|
|
'symbol': symbol,
|
|
'asset_class': assetClass,
|
|
'feed': feed,
|
|
'metric': metric,
|
|
'timeframe': timeframe,
|
|
'price': price,
|
|
'volume': volume,
|
|
'as_of': asOf.toUtc(),
|
|
'raw': raw == null ? null : jsonEncode(raw),
|
|
},
|
|
);
|
|
return _rowToSnapshot(result.first);
|
|
}
|
|
|
|
/// Tombstone when Alpaca has no session-half bar for [symbol] at [slotStart].
|
|
///
|
|
/// Counts toward backfill gap checks but not game/calendar bar coverage.
|
|
Future<MarketDataSnapshot> upsertNoDataBarPlaceholder({
|
|
required String symbol,
|
|
required DateTime slotStart,
|
|
required String timeframe,
|
|
required DateTime checkedAt,
|
|
String assetClass = 'us_equity',
|
|
String feed = 'iex',
|
|
String source = MarketHistoryBarPlaceholder.sourceAlpacaEmpty,
|
|
}) async {
|
|
final DateTime slot =
|
|
MarketHistorySessionSlot.slotStartContaining(slotStart);
|
|
final String slotWire = MarketHistorySessionSlot.slotStartWire(slot);
|
|
return upsertSnapshot(
|
|
symbol: symbol,
|
|
metric: 'bar',
|
|
timeframe: timeframe,
|
|
asOf: slot,
|
|
assetClass: assetClass,
|
|
feed: feed,
|
|
raw: <String, dynamic>{
|
|
'slot_start': slotWire,
|
|
MarketHistoryBarPlaceholder.rawKey: true,
|
|
'source': source,
|
|
'checked_at': MarketHistorySessionSlot.wireUtc(checkedAt),
|
|
},
|
|
);
|
|
}
|
|
|
|
/// Daily (or intraday) bars for [symbol] in [`since`, `until`).
|
|
Future<List<MarketDataSnapshot>> barsForSymbol({
|
|
required String symbol,
|
|
required String timeframe,
|
|
required DateTime since,
|
|
required DateTime until,
|
|
String metric = 'bar',
|
|
}) async {
|
|
final Result result = await _connection.execute(
|
|
Sql.named(
|
|
'''
|
|
SELECT id, symbol, asset_class, feed, metric, price, volume, as_of, raw, created_at
|
|
FROM market_data_snapshots
|
|
WHERE symbol = @symbol
|
|
AND metric = @metric
|
|
AND timeframe = @timeframe
|
|
AND as_of >= @since
|
|
AND as_of < @until
|
|
ORDER BY as_of ASC
|
|
''',
|
|
),
|
|
parameters: <String, dynamic>{
|
|
'symbol': symbol,
|
|
'metric': metric,
|
|
'timeframe': timeframe,
|
|
'since': since.toUtc(),
|
|
'until': until.toUtc(),
|
|
},
|
|
);
|
|
if (result.isEmpty) {
|
|
return <MarketDataSnapshot>[];
|
|
}
|
|
return result.map(_rowToSnapshot).toList(growable: false);
|
|
}
|
|
|
|
/// Newest `as_of` for historical bars, or `null` on cold start.
|
|
Future<DateTime?> latestSyncedAsOf(
|
|
String symbol,
|
|
String timeframe, {
|
|
String metric = 'bar',
|
|
}) async {
|
|
final Result result = await _connection.execute(
|
|
Sql.named(
|
|
'''
|
|
SELECT as_of
|
|
FROM market_data_snapshots
|
|
WHERE symbol = @symbol
|
|
AND metric = @metric
|
|
AND timeframe = @timeframe
|
|
ORDER BY as_of DESC
|
|
LIMIT 1
|
|
''',
|
|
),
|
|
parameters: <String, dynamic>{
|
|
'symbol': symbol,
|
|
'metric': metric,
|
|
'timeframe': timeframe,
|
|
},
|
|
);
|
|
if (result.isEmpty) {
|
|
return null;
|
|
}
|
|
return (result.first[0]! as DateTime).toUtc();
|
|
}
|
|
|
|
/// Symbols from [symbols] that already have a bar for session slot [slotStart].
|
|
///
|
|
/// A row counts when [raw.slot_start] or [as_of] matches the canonical slot
|
|
/// start ([MarketHistorySessionSlot.slotStartWire]).
|
|
Future<Set<String>> symbolsWithBarForSlot({
|
|
required List<String> symbols,
|
|
required DateTime slotStart,
|
|
required String timeframe,
|
|
String metric = 'bar',
|
|
}) async {
|
|
if (symbols.isEmpty) {
|
|
return <String>{};
|
|
}
|
|
final DateTime start =
|
|
MarketHistorySessionSlot.slotStartContaining(slotStart);
|
|
final String slotStartWire = MarketHistorySessionSlot.slotStartWire(start);
|
|
final Result result = await _connection.execute(
|
|
Sql.named(
|
|
'''
|
|
SELECT DISTINCT symbol
|
|
FROM market_data_snapshots
|
|
WHERE metric = @metric
|
|
AND timeframe = @timeframe
|
|
AND symbol = ANY(@symbols)
|
|
AND (
|
|
raw->>'slot_start' = @slot_start_wire
|
|
OR as_of = @slot_start
|
|
)
|
|
''',
|
|
),
|
|
parameters: <String, dynamic>{
|
|
'metric': metric,
|
|
'timeframe': timeframe,
|
|
'symbols': symbols,
|
|
'slot_start_wire': slotStartWire,
|
|
'slot_start': start,
|
|
},
|
|
);
|
|
return result
|
|
.map((ResultRow row) => row[0]! as String)
|
|
.toSet();
|
|
}
|
|
|
|
/// Newest snapshot for [symbol] and [metric] by [as_of].
|
|
Future<MarketDataSnapshot?> latestForSymbol(
|
|
String symbol,
|
|
String metric,
|
|
) async {
|
|
final Result result = await _connection.execute(
|
|
Sql.named(
|
|
'''
|
|
SELECT id, symbol, asset_class, feed, metric, price, volume, as_of, raw, created_at
|
|
FROM market_data_snapshots
|
|
WHERE symbol = @symbol AND metric = @metric
|
|
ORDER BY as_of DESC
|
|
LIMIT 1
|
|
''',
|
|
),
|
|
parameters: <String, dynamic>{
|
|
'symbol': symbol,
|
|
'metric': metric,
|
|
},
|
|
);
|
|
if (result.isEmpty) {
|
|
return null;
|
|
}
|
|
return _rowToSnapshot(result.first);
|
|
}
|
|
|
|
MarketDataSnapshot _rowToSnapshot(ResultRow row) {
|
|
final Object? rawValue = row[8];
|
|
Map<String, dynamic>? raw;
|
|
if (rawValue is Map<String, dynamic>) {
|
|
raw = rawValue;
|
|
} else if (rawValue != null) {
|
|
raw = jsonDecode(rawValue.toString()) as Map<String, dynamic>;
|
|
}
|
|
|
|
return MarketDataSnapshot(
|
|
id: (row[0]! as num).toInt(),
|
|
symbol: row[1]! as String,
|
|
assetClass: row[2]! as String,
|
|
feed: row[3]! as String,
|
|
metric: row[4]! as String,
|
|
price: readOptionalNumeric(row[5]),
|
|
volume: readOptionalNumeric(row[6]),
|
|
asOf: (row[7]! as DateTime).toUtc(),
|
|
raw: raw,
|
|
createdAt: (row[9]! as DateTime).toUtc(),
|
|
);
|
|
}
|
|
|
|
static num? readOptionalNumeric(Object? value) {
|
|
if (value == null) {
|
|
return null;
|
|
}
|
|
if (value is num) {
|
|
return value;
|
|
}
|
|
if (value is String) {
|
|
return num.parse(value);
|
|
}
|
|
return num.parse(value.toString());
|
|
}
|
|
}
|