import 'package:postgres/postgres.dart'; import 'market_history_config.dart'; import 'market_history_prospective_questions.dart'; import 'market_history_session_slot.dart'; import 'sync_run_recorder.dart'; /// Outcome of a [MarketDataRetention] cleanup pass. class MarketDataRetentionResult { MarketDataRetentionResult({ required this.rowsRemoved, required this.startedAt, required this.finishedAt, this.error, }); final int rowsRemoved; final DateTime startedAt; final DateTime finishedAt; final String? error; bool get succeeded => error == null; } /// Prunes [market_data_snapshots] older than the rolling window. /// /// Phase 1 ([runCleanup]): hard-delete in batches. /// Phase 2 ([runArchiveAndCleanup]): copy expired rows into /// [market_data_archive] inside the same transaction, then delete. class MarketDataRetention { MarketDataRetention({ required Connection connection, this.windowDays = MarketHistoryConfig.windowDays, this.batchSize = MarketHistoryConfig.retentionBatchSize, void Function(String sql)? onExecute, }) : _connection = connection, _recorder = SyncRunRecorder(connection), _onExecute = onExecute; final Connection _connection; final SyncRunRecorder _recorder; final void Function(String sql)? _onExecute; final int windowDays; final int batchSize; static const String kind = 'cleanup'; /// Hard-delete rows older than the [windowDays] trading-day sync window. Future runCleanup({DateTime? now}) { return run(archive: false, now: now); } /// Archive-then-delete for rows older than the [windowDays] trading-day window. Future runArchiveAndCleanup({DateTime? now}) { return run(archive: true, now: now); } /// Dispatches to hard-delete or archive mode. Future run({ bool archive = false, DateTime? now, int? windowDays, }) async { final DateTime tick = (now ?? DateTime.now()).toUtc(); final int effectiveWindow = windowDays ?? this.windowDays; final SyncRunOutcome outcome = await _recorder.record( kind, () => _cleanupBody( now: tick, windowDays: effectiveWindow, archive: archive, ), now: tick, ); return MarketDataRetentionResult( rowsRemoved: outcome.rowsRemoved, startedAt: outcome.startedAt, finishedAt: outcome.finishedAt, error: outcome.error, ); } Future _cleanupBody({ required DateTime now, required int windowDays, required bool archive, }) async { final DateTime cutoff = MarketHistorySessionSlot.windowFirstSlotStart(now, windowDays); int totalRemoved = 0; while (true) { final int removed = archive ? await _archiveAndDeleteBatch(cutoff) : await _deleteBatch(cutoff); if (removed == 0) { break; } totalRemoved += removed; } totalRemoved += await MarketHistoryProspectiveQuestions( connection: _connection, ).deleteExpiredBefore(cutoff); return SyncRunCounts(rowsRemoved: totalRemoved); } Future _deleteBatch(DateTime cutoff) async { const String sql = ''' WITH doomed AS ( SELECT id FROM market_data_snapshots WHERE as_of < @cutoff LIMIT @batch_size ) DELETE FROM market_data_snapshots WHERE id IN (SELECT id FROM doomed) RETURNING id '''; _onExecute?.call(sql); final Result result = await _connection.execute( Sql.named(sql), parameters: { 'cutoff': cutoff, 'batch_size': batchSize, }, ); return result.length; } Future _archiveAndDeleteBatch(DateTime cutoff) async { const String sql = ''' WITH doomed AS ( SELECT id, symbol, asset_class, feed, metric, timeframe, price, volume, as_of, raw FROM market_data_snapshots WHERE as_of < @cutoff LIMIT @batch_size ), inserted AS ( INSERT INTO market_data_archive ( symbol, asset_class, feed, metric, timeframe, price, volume, as_of, raw, archived_at ) SELECT symbol, asset_class, feed, metric, timeframe, price, volume, as_of, raw, now() FROM doomed RETURNING id ) DELETE FROM market_data_snapshots m USING doomed d WHERE m.id = d.id RETURNING m.id '''; int removed = 0; await _connection.runTx((TxSession tx) async { _onExecute?.call(sql); final Result archived = await tx.execute( Sql.named(sql), parameters: { 'cutoff': cutoff, 'batch_size': batchSize, }, ); removed = archived.length; }); return removed; } }