import 'dart:convert'; import 'package:postgres/postgres.dart'; import 'backfill_sync_item.dart'; /// Outcome of one [SyncRunRecorder.record] body. /// /// The recorder ALWAYS emits one of these regardless of whether the body /// threw, so callers can convert to a domain-specific result type without /// re-implementing the start/finish bookkeeping. class SyncRunOutcome { SyncRunOutcome({ required this.id, required this.kind, required this.startedAt, required this.finishedAt, required this.rowsWritten, required this.rowsRemoved, this.slotsSynced, this.error, }); final int id; final String kind; final DateTime startedAt; final DateTime finishedAt; final int rowsWritten; final int rowsRemoved; final int? slotsSynced; final String? error; bool get succeeded => error == null; } /// Counts the body of a sync run reports back to the recorder. class SyncRunCounts { const SyncRunCounts({ this.rowsWritten = 0, this.rowsRemoved = 0, this.slotsSynced, this.backfillItems, this.error, }); final int rowsWritten; final int rowsRemoved; /// Completed 4-hour slots written (market-history backfill only). final int? slotsSynced; /// Slot starts and symbol lists requested from Alpaca (backfill only). final List? backfillItems; /// Non-fatal partial failure (e.g. one Alpaca batch 500 while others /// succeeded). Recorded on the sync run without discarding [rowsWritten]. final String? error; } /// Wraps a closure with a `market_data_sync_runs` audit row. /// /// On entry, INSERTs a row with `kind`, `started_at` and returns its id. /// After the body completes (success or thrown exception), UPDATEs the /// row with `finished_at`, `rows_written`, `rows_removed`, and `error`. /// /// The body's exception is **swallowed** — the recorder records it and /// returns a [SyncRunOutcome] with `error` set instead. This is the /// "scheduler-friendly" contract every sync stage in §2/§3/§4 needs. /// /// **Do not change that contract for §3/§4:** `MarketHistoryScheduler` /// (§5) runs universe → backfill → cleanup in sequence and expects a /// thrown Alpaca 500 in backfill to be recorded here without aborting /// cleanup. Partial batch failures in `MarketDataHistorySync` rely on /// the same behaviour. class SyncRunRecorder { SyncRunRecorder(this._connection); final Connection _connection; static const String abortSupersededMessage = 'aborted: superseded by new worker sync'; /// Closes every run still marked in-progress (crashed or superseded worker). Future abortAllInProgressRuns({ DateTime? now, String? message, }) async { return _abortInProgress( now: now, olderThan: null, message: message ?? abortSupersededMessage, ); } /// Closes in-progress runs older than [olderThan] (hung without finishing). Future abortStaleInProgressRuns({ DateTime? now, Duration olderThan = const Duration(minutes: 30), String? message, }) async { return _abortInProgress( now: now, olderThan: olderThan, message: message ?? 'aborted: stale in-progress sync run', ); } Future _abortInProgress({ required DateTime? now, required Duration? olderThan, required String message, }) async { final DateTime tick = (now ?? DateTime.now()).toUtc(); final Result rows; if (olderThan == null) { rows = await _connection.execute( Sql.named( ''' UPDATE market_data_sync_runs SET finished_at = @finished_at, error = COALESCE(error, @message) WHERE finished_at IS NULL ''', ), parameters: { 'finished_at': tick, 'message': message, }, ); } else { rows = await _connection.execute( Sql.named( ''' UPDATE market_data_sync_runs SET finished_at = @finished_at, error = COALESCE(error, @message) WHERE finished_at IS NULL AND started_at < @cutoff ''', ), parameters: { 'finished_at': tick, 'message': message, 'cutoff': tick.subtract(olderThan), }, ); } return rows.affectedRows; } Future record( String kind, Future Function() body, { DateTime? now, }) async { final DateTime startedAt = (now ?? DateTime.now()).toUtc(); final Result inserted = await _connection.execute( Sql.named( ''' INSERT INTO market_data_sync_runs (kind, started_at) VALUES (@kind, @started_at) RETURNING id ''', ), parameters: { 'kind': kind, 'started_at': startedAt, }, ); final int id = (inserted.first[0]! as num).toInt(); int rowsWritten = 0; int rowsRemoved = 0; int? slotsSynced; List? backfillItems; String? error; try { final SyncRunCounts counts = await body(); rowsWritten = counts.rowsWritten; rowsRemoved = counts.rowsRemoved; slotsSynced = counts.slotsSynced; backfillItems = counts.backfillItems; error = counts.error; } on Object catch (e) { // Always recorded, never rethrown — the scheduler in §5 expects // each stage to fail in isolation, not to bubble up. error = e.toString(); } final DateTime finishedAt = (now ?? DateTime.now()).toUtc(); await _connection.execute( Sql.named( ''' UPDATE market_data_sync_runs SET finished_at = @finished_at, rows_written = @rows_written, rows_removed = @rows_removed, slots_synced = @slots_synced, backfill_items = @backfill_items::jsonb, error = @error WHERE id = @id ''', ), parameters: { 'id': id, 'finished_at': finishedAt, 'rows_written': rowsWritten, 'rows_removed': rowsRemoved, 'slots_synced': slotsSynced ?? 0, 'backfill_items': backfillItems == null || backfillItems.isEmpty ? null : jsonEncode(BackfillSyncItem.encodeList(backfillItems)), 'error': error, }, ); return SyncRunOutcome( id: id, kind: kind, startedAt: startedAt, finishedAt: finishedAt, rowsWritten: rowsWritten, rowsRemoved: rowsRemoved, slotsSynced: slotsSynced, error: error, ); } }