cyberhybridhub/server/lib/trading/tradable_assets_db.dart
2026-05-31 11:17:12 -05:00

164 lines
5.1 KiB
Dart

import 'dart:convert';
import 'package:postgres/postgres.dart';
import '../alpaca/alpaca_models.dart';
/// Read-side row for [tradable_assets].
class TradableAssetRow {
TradableAssetRow({
required this.symbol,
required this.assetClass,
required this.tradable,
required this.fractionable,
required this.status,
required this.refreshedAt,
this.exchange,
this.name,
this.raw,
});
final String symbol;
final String assetClass;
final String? exchange;
final String? name;
final bool tradable;
final bool fractionable;
final String status;
final Map<String, dynamic>? raw;
final DateTime refreshedAt;
}
/// Postgres access for [tradable_assets] (the daily Alpaca asset universe).
///
/// Writers: only [TradableAssetsSync] (§2.2). Readers: the historical
/// backfill in §3 plus operator/query tooling.
class TradableAssetsDb {
TradableAssetsDb(this._connection);
final Connection _connection;
/// Upserts every entry in [assets] (PK on `symbol`) using [now] as the
/// shared `refreshed_at`, then marks any rows whose `refreshed_at` is
/// older than [now] as `status='inactive', tradable=false` — preserving
/// the historical row but flagging it as no longer in the live universe.
///
/// The two phases run inside a single transaction so a partial failure
/// can't leave the universe half-updated.
Future<void> upsertAll(
List<AlpacaAsset> assets, {
DateTime? now,
}) async {
final DateTime ts = (now ?? DateTime.now()).toUtc();
await _connection.runTx((TxSession tx) async {
for (final AlpacaAsset asset in assets) {
await tx.execute(
Sql.named(
'''
INSERT INTO tradable_assets (
symbol, asset_class, exchange, name, tradable,
fractionable, status, raw, refreshed_at
) VALUES (
@symbol, @asset_class, @exchange, @name, @tradable,
@fractionable, @status, @raw::jsonb, @refreshed_at
)
ON CONFLICT (symbol) DO UPDATE SET
asset_class = EXCLUDED.asset_class,
exchange = EXCLUDED.exchange,
name = EXCLUDED.name,
tradable = EXCLUDED.tradable,
fractionable = EXCLUDED.fractionable,
status = EXCLUDED.status,
raw = EXCLUDED.raw,
refreshed_at = EXCLUDED.refreshed_at
''',
),
parameters: <String, dynamic>{
'symbol': asset.symbol,
'asset_class': asset.assetClass,
'exchange': asset.exchange,
'name': asset.name,
'tradable': asset.tradable,
'fractionable': asset.fractionable,
'status': asset.status,
'raw': asset.raw == null ? null : jsonEncode(asset.raw),
'refreshed_at': ts,
},
);
}
// Anything not seen in this batch hasn't been refreshed at [ts] yet.
// Flip it to inactive without bumping refreshed_at, so the audit
// trail still records "last seen as part of the live universe."
await tx.execute(
Sql.named(
'''
UPDATE tradable_assets
SET status = 'inactive', tradable = false
WHERE refreshed_at < @now
AND (status <> 'inactive' OR tradable = true)
''',
),
parameters: <String, dynamic>{'now': ts},
);
});
}
/// Symbols currently tradable on the active universe.
Future<List<String>> listActiveTradableSymbols() async {
final Result result = await _connection.execute(
'''
SELECT symbol
FROM tradable_assets
WHERE status = 'active' AND tradable = true
ORDER BY symbol ASC
''',
);
return result.map((ResultRow r) => r[0]! as String).toList(growable: false);
}
/// Single-row lookup, primarily for tests and admin tooling.
Future<TradableAssetRow?> getBySymbol(String symbol) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT symbol, asset_class, exchange, name, tradable,
fractionable, status, raw, refreshed_at
FROM tradable_assets
WHERE symbol = @symbol
''',
),
parameters: <String, dynamic>{'symbol': symbol},
);
if (result.isEmpty) {
return null;
}
return _rowToModel(result.first);
}
TradableAssetRow _rowToModel(ResultRow row) {
final Object? rawValue = row[7];
Map<String, dynamic>? raw;
if (rawValue is Map<String, dynamic>) {
raw = rawValue;
} else if (rawValue is Map) {
raw = Map<String, dynamic>.from(rawValue);
} else if (rawValue != null) {
raw = jsonDecode(rawValue.toString()) as Map<String, dynamic>;
}
return TradableAssetRow(
symbol: row[0]! as String,
assetClass: row[1]! as String,
exchange: row[2] as String?,
name: row[3] as String?,
tradable: row[4]! as bool,
fractionable: row[5]! as bool,
status: row[6]! as String,
raw: raw,
refreshedAt: (row[8]! as DateTime).toUtc(),
);
}
}