From eb5f57361c9421a7deba10c40875f3a93a41f317 Mon Sep 17 00:00:00 2001 From: Nathan Anderson Date: Sun, 31 May 2026 12:40:54 -0500 Subject: [PATCH] morning evening --- TODO-SESSION-HALF-BARS.md | 252 ++++++ TODO.md | 748 ++++++++++++++++++ .../models/market_history_week_coverage.dart | 4 +- lib/admin/models/question_audit_asset.dart | 7 +- lib/admin/utils/sync_run_formatters.dart | 13 +- .../market_history_question_audit_sheet.dart | 19 +- .../market_history_week_coverage_sheet.dart | 6 +- .../widgets/sync_run_expansion_tile.dart | 4 +- server/README.md | 9 +- .../lib/alpaca/alpaca_market_data_client.dart | 6 +- server/lib/trading/backfill_sync_item.dart | 4 +- server/lib/trading/market_data_db.dart | 39 +- server/lib/trading/market_data_history.dart | 84 +- .../market_history_bar_placeholder.dart | 2 +- server/lib/trading/market_history_config.dart | 12 +- .../market_history_four_hour_slot.dart | 89 --- .../market_history_minute_aggregate.dart | 74 ++ .../market_history_question_audit.dart | 94 ++- .../trading/market_history_session_slot.dart | 316 ++++++++ .../trading/market_history_week_coverage.dart | 86 +- .../008_market_history_four_hour.sql | 2 +- server/migrations/010_session_half_bars.sql | 24 + server/pubspec.lock | 8 + server/pubspec.yaml | 1 + .../fixtures/alpaca_bars_1min_session.json | 23 + server/test/helpers/test_db.dart | 2 +- .../test/integration/market_data_db_test.dart | 68 +- .../market_data_history_sync_test.dart | 135 ++-- .../market_history_admin_handler_test.dart | 56 +- .../market_history_schema_test.dart | 8 +- .../market_history_week_coverage_test.dart | 18 +- .../market_history_admin_logic_test.dart | 2 +- .../market_history_four_hour_slot_test.dart | 88 --- .../market_history_question_audit_test.dart | 13 +- .../market_history_session_slot_test.dart | 85 ++ .../market_history_week_coverage_test.dart | 34 +- .../admin/utils/sync_run_formatters_test.dart | 10 +- .../widgets/sync_run_expansion_tile_test.dart | 3 +- 38 files changed, 1918 insertions(+), 530 deletions(-) create mode 100644 TODO-SESSION-HALF-BARS.md create mode 100644 TODO.md delete mode 100644 server/lib/trading/market_history_four_hour_slot.dart create mode 100644 server/lib/trading/market_history_minute_aggregate.dart create mode 100644 server/lib/trading/market_history_session_slot.dart create mode 100644 server/migrations/010_session_half_bars.sql create mode 100644 server/test/fixtures/alpaca_bars_1min_session.json delete mode 100644 server/test/trading/market_history_four_hour_slot_test.dart create mode 100644 server/test/trading/market_history_session_slot_test.dart diff --git a/TODO-SESSION-HALF-BARS.md b/TODO-SESSION-HALF-BARS.md new file mode 100644 index 0000000..affd5b0 --- /dev/null +++ b/TODO-SESSION-HALF-BARS.md @@ -0,0 +1,252 @@ +# TODO — RTH session half bars (morning / afternoon aggregates) + +Companion to [`server/README.md`](./server/README.md) (Market history) and +[`FLUTTER-ADMIN-PORTAL.md`](./FLUTTER-ADMIN-PORTAL.md). + +**Goal:** Replace six UTC **4-hour** Alpaca bars per day with **two regular-session +aggregates per US trading day**, each built from up to **195 one-minute bars**: + +| Slot | US Eastern (NYSE regular) | Duration | +|------|---------------------------|----------| +| Morning | 9:30 AM – 12:45 PM | 3h 15m (195 min) | +| Afternoon | 12:45 PM – 4:00 PM | 3h 15m (195 min) | + +Persist **one OHLCV row per symbol per slot** (not 195 rows). Use for +`guess_weekly_move`, admin week coverage, and question audit. + +**TDD rhythm:** Red → Green → Refactor → Confirm (same as +[`TODO.md`](./TODO.md) §0). + +--- + +## 0. Design decisions (lock before coding) + +- [ ] **Timezone:** `America/New_York` for slot boundaries (handles DST); store + canonical `as_of` / `raw.slot_start` as UTC instants of slot open. +- [ ] **Stored `timeframe`:** new value, e.g. `sessionHalf` (do not overload + `4Hour`). +- [ ] **Alpaca fetch:** `GET /v2/stocks/bars` with `timeframe=1Min` per slot + `[start, end]`; aggregate in server (`o`/`h`/`l`/`c`/`v` from minutes). +- [ ] **Existing data:** delete or archive all `timeframe = '4Hour'` history rows + after migration; full backfill required. +- [ ] **`MIN_BARS_FOR_GUESS`:** revisit default (`5` bars ≈ 2.5 trading days at + 2 slots/day vs ~20h span with 4h bars). +- [ ] **Week coverage UI:** 2 dots per **trading day** (not 6 UTC dots). +- [ ] **Question audit API (optional):** return `assetCount` only, drop `assets[]` + payload to save bandwidth (Flutter already shows count-only). + +--- + +## 1. Slot model (replace `MarketHistoryFourHourSlot`) + +**File:** replace or supersede `server/lib/trading/market_history_four_hour_slot.dart` +→ e.g. `market_history_session_slot.dart`. + +- [ ] **Red** — `server/test/trading/market_history_session_slot_test.dart`: + - [ ] `slotStartContaining` maps instants to morning (9:30 ET) or afternoon + (12:45 ET) slot start (UTC). + - [ ] `endExclusive` / `endInclusive` for 195-minute windows. + - [ ] `hasEnded` / `lastCompletedSlotStart` never returns in-progress slot. + - [ ] `completedSlotStartsInWindow` yields 2 × trading days in rolling window; + skips weekends + NYSE holidays (`MarketHistoryTradingCalendar`). + - [ ] DST: assert 13:30 vs 14:30 UTC morning start across EDT/EST fixtures. + - [ ] `wireUtc` / `slotStartWire` include minutes (`…T13:30:00Z`). +- [ ] **Green** — implement slot module; `slotsPerDay = 2`, + `slotDuration = Duration(hours: 3, minutes: 15)`. +- [ ] **Refactor** — update imports project-wide; delete old four-hour module when + unused. + +**Confirm:** `cd server && dart test test/trading/market_history_session_slot_test.dart` + +--- + +## 2. Config & env + +**Files:** `market_history_config.dart`, `market_history_env.dart`, `env.dart`, +`server/README.md`. + +- [ ] `barTimeframe` → `sessionHalf` (or chosen name). +- [ ] Remove `slotHours = 4`; document `slotsPerDay = 2`. +- [ ] Add `alpacaFetchTimeframe = '1Min'` (fetch only, not stored). +- [ ] Document env vars; defaults for `MIN_BARS_FOR_GUESS` if changed. + +--- + +## 3. Database migration `010_session_half_bars.sql` + +- [ ] **Red** — extend `market_history_schema_test.dart`: + - [ ] `timeframe` CHECK allows `sessionHalf`. + - [ ] Partial index on `(symbol, as_of DESC) WHERE metric='bar' AND timeframe='sessionHalf'`. +- [ ] **Green** — migration: + - [ ] `DELETE` (or archive) `metric='bar' AND timeframe='4Hour'`. + - [ ] Update `market_data_snapshots_timeframe_check`. + - [ ] `CREATE INDEX market_data_snapshots_bar_session_half_idx …`. +- [ ] Apply in integration test harness (`001`–`010`). + +**Confirm:** `cd server && dart test test/integration/market_history_schema_test.dart` + +--- + +## 4. Minute fetch + aggregation (backfill) + +**Files:** `market_data_history.dart`, `alpaca_market_data_client.dart` (unchanged +API surface; caller passes `1Min`). + +- [ ] **Red** — `market_data_history_sync_test.dart`: + - [ ] Mock `1Min` bars spanning 9:30–12:45 ET → one persisted `sessionHalf` row. + - [ ] OHLCV aggregation rules: `o`=first, `h`=max, `l`=min, `c`=last, `v`=sum. + - [ ] Pagination: merge pages via existing `getBarsRange` + `next_page_token`. + - [ ] Wrong-window minutes rejected; empty minutes → placeholder or error per + calendar rules. + - [ ] Rate-limit / partial run behavior unchanged. +- [ ] **Green** — `_fetchBarsWithRateLimitRetry` uses `1Min`; `_persistBars` + aggregates then upserts one row per symbol; `raw.slot_start` + optional + `raw.minute_bars_count`. +- [ ] **Refactor** — extract `aggregateMinuteBars(List)` helper. + +**Confirm:** `cd server && dart test test/integration/market_data_history_sync_test.dart` + +--- + +## 5. DB slot matching + +**File:** `market_data_db.dart` + +- [ ] Replace `_slotStartBucketSql` (4-hour UTC `div(hour,4)`) with session-slot + equality on `raw.slot_start` wire or shared Dart/SQL slot function. +- [ ] **Red** — `market_data_db_test.dart` for `symbolsWithBarForSlot` at 9:30 / 12:45 + ET boundaries. + +--- + +## 6. Read paths + +| File | Work | +|------|------| +| `market_history_query.dart` | Filter `timeframe = sessionHalf`; update comments. | +| `market_history_question_audit.dart` | Step by **one session slot** (not ±4h); slot pair query. | +| `market_history_week_coverage.dart` | 2 slots per trading day; `slotsPerDay: 2`. | +| `market_history_trading_calendar.dart` | Trading-day helpers keyed on ET date of slot. | +| `market_history_admin_logic.dart` | Error strings / slot labels. | +| `backfill_sync_item.dart` | Wire format with minute-precision `slotStart`. | + +- [ ] **Red** — unit + integration tests for each area (see existing `*_test.dart` + files under `server/test/`). +- [ ] **Green** — implement. + +**Confirm:** `./scripts/test-server.sh` (no live Alpaca). + +--- + +## 7. Flutter admin + +| File | Work | +|------|------| +| `lib/admin/utils/sync_run_formatters.dart` | `formatMarketHistorySlotWire` — no `hour ~/ 4`. | +| `lib/admin/models/market_history_week_coverage.dart` | Default `slotsPerDay: 2`. | +| `lib/admin/widgets/market_history_week_coverage_sheet.dart` | Copy: 2 slots/day. | +| `lib/admin/widgets/market_history_question_audit_sheet.dart` | ET slot labels; count-only UI (done). | +| `lib/admin/widgets/sync_run_expansion_tile.dart` | Backfill row: count only (done). | + +- [ ] **Red** — widget tests under `test/admin/`. +- [ ] **Green** — implement remaining slot-label / coverage changes after server + ships new slot times. + +**Confirm:** `./scripts/test-admin-portal.sh` + +--- + +## 8. Fixtures & live tests + +- [ ] Add `server/test/fixtures/alpaca_bars_1min_session.json` (195 minutes × 1 symbol). +- [ ] Update `alpaca_bars_4h_window.json` usages or remove. +- [ ] `@Tags(['alpaca'])` live test: `1Min` range for one slot, aggregate locally. + +--- + +## 9. Documentation + +- [ ] `server/README.md` — Market history section (2 session slots, `1Min` fetch). +- [ ] `FLUTTER-ADMIN-PORTAL.md` — week coverage + question audit behavior. +- [ ] Link from [`TODO.md`](./TODO.md) (legacy 4h work is complete; this doc + supersedes granularity for Phase 3). + +--- + +## 10. Deploy / ops + +- [ ] Run migration `010` on prod/staging (deletes legacy `4Hour` bar rows automatically). +- [ ] **Or** manually clear history before backfill (if migration already applied without `010`): + + ```sql + -- Required: remove old 4-hour bar rows (wrong shape for session-half logic) + DELETE FROM market_data_snapshots + WHERE metric = 'bar' AND timeframe = '4Hour'; + + -- Optional: archived 4Hour copies (if any) + DELETE FROM market_data_archive + WHERE metric = 'bar' AND timeframe = '4Hour'; + + -- Optional: force a clean sync audit trail (not required for backfill to run) + TRUNCATE market_data_sync_runs; + ``` + + **Do not truncate** `tradable_assets` — universe sync is independent. + + After clearing, run admin **Resync** or wait for the worker; `hasPendingSlots` will + enqueue backfill for every missing `sessionHalf` slot in the rolling window. + +- [ ] Verify week-coverage calendar green for 2 slots × trading days. +- [ ] Verify `guess_weekly_move` eligibility with new `MIN_BARS` threshold. + +--- + +## 11. Progress log + +| Date | Step | Notes | +|------|------|-------| +| | | | + +--- + +## Appendix — affected files (checklist) + +**Server (implement):** + +- `server/lib/trading/market_history_four_hour_slot.dart` → session slot module +- `server/lib/trading/market_history_config.dart` +- `server/lib/trading/market_data_history.dart` +- `server/lib/trading/market_data_db.dart` +- `server/lib/trading/market_history_query.dart` +- `server/lib/trading/market_history_question_audit.dart` +- `server/lib/trading/market_history_week_coverage.dart` +- `server/lib/trading/market_history_trading_calendar.dart` +- `server/lib/trading/backfill_sync_item.dart` +- `server/lib/alpaca/alpaca_market_data_client.dart` (wire helper import only) +- `server/migrations/010_session_half_bars.sql` (new) + +**Server tests:** + +- `server/test/trading/market_history_four_hour_slot_test.dart` → session tests +- `server/test/trading/market_history_week_coverage_test.dart` +- `server/test/trading/market_history_question_audit_test.dart` +- `server/test/integration/market_data_history_sync_test.dart` +- `server/test/integration/market_data_db_test.dart` +- `server/test/integration/market_history_admin_handler_test.dart` +- `server/test/integration/market_history_week_coverage_test.dart` +- (+ admin logic, schema, scheduler tests as needed) + +**Flutter:** + +- `lib/admin/widgets/market_history_question_audit_sheet.dart` +- `lib/admin/widgets/sync_run_expansion_tile.dart` +- `lib/admin/widgets/market_history_week_coverage_sheet.dart` +- `lib/admin/utils/sync_run_formatters.dart` +- `lib/admin/models/market_history_week_coverage.dart` +- `test/admin/widgets/market_history_question_audit_sheet_test.dart` +- `test/admin/widgets/sync_run_expansion_tile_test.dart` + +**Unrelated (do not change for slot work):** + +- `server/lib/trading/guardrails.dart` (4h **notional** window for orders) +- `server/lib/trading/market_data_ingest.dart` (daily bars for live ingest) diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..6a71fc1 --- /dev/null +++ b/TODO.md @@ -0,0 +1,748 @@ +# TODO — Rolling 7-Day Market Data Window + Cleanup + +> **Next milestone:** ET morning/afternoon session half bars (195-minute aggregates) +> — see [`TODO-SESSION-HALF-BARS.md`](./TODO-SESSION-HALF-BARS.md). + +Companion to [`TRADING_DEVELOPMENT_PLAN.md`](./TRADING_DEVELOPMENT_PLAN.md) and +[`TRADING_TDD_PLAN.md`](./TRADING_TDD_PLAN.md). + +**Goal:** maintain a rolling **7-day history** of market data for **all active +tradable assets** so the question pipeline can generate obfuscated +*guessing-game* questions about market movement, while pruning (or archiving) +anything older than the window. + +**TDD rhythm (mandatory for every step):** + +1. **Red** — write the failing test(s) first; commit if you like. +2. **Green** — minimum implementation that turns every test in this step green. +3. **Refactor** — tidy without changing behavior; rerun tests. +4. **Confirm** — run the full step-level confirm command listed in the step. +5. **Log** — check the box, add a row to [§12 Progress log](#12-progress-log). + +> Do not skip the Red phase. Do not start the next step while any test in the +> current step is failing or pending. No live Alpaca calls in default +> `dart test` jobs — guard with `@Tags(['alpaca'])`. + +--- + +## 0. Scope & design constraints + +- **Window:** rolling 7 calendar days, UTC. Configurable via + `MARKET_HISTORY_WINDOW_DAYS` (default `7`). +- **Granularity (Phase 1):** `1Day` bars for every active tradable, plus the + existing `last_trade` / `prev_close` snapshots for watchlist symbols. +- **Granularity (Phase 2):** `1Hour` bars for the union of all enabled users' + watchlist symbols (≤30 on Alpaca Basic). +- **Universe source of truth:** Alpaca `/v2/assets?status=active&tradable=true`, + refreshed daily, cached in Postgres (`tradable_assets`). +- **Idempotency:** repeated backfill of the same + `(symbol, metric, timeframe, as_of)` MUST NOT create duplicate rows. +- **Cleanup vs. archive:** rows with `as_of < now() - window` are either + hard-deleted (Phase 1) or moved to `market_data_archive` (Phase 2). +- **Worker isolation:** historical sync + cleanup run on their own cadence + (default once per day), not on every 60s per-user tick. +- **Rate-limit safety:** batch symbols (Alpaca `bars` accepts multi-symbol); + cap concurrent symbols; never call Alpaca in tests + (`QUESTION_PIPELINE_TEST_MODE=true`). +- **No Flutter changes** required for this milestone. + +--- + +## 1. Schema additions (migration `005_market_history.sql`) + +### 1.1 Red — failing tests first + +- [x] Create `server/test/integration/market_history_schema_test.dart`: + - [x] Test: `INSERT` two snapshots with the same + `(symbol, metric, timeframe, as_of)` → second one **upserts**, + does not duplicate (current schema lacks the unique constraint, so + this MUST fail Red). + - [x] Test: `timeframe` defaults to `'tick'` for existing rows; new rows + accept `'1Min' | '1Hour' | '1Day'`. + - [x] Test: `tradable_assets` PK rejects duplicate symbol; query by + `(status='active', tradable=true)` uses the new index (verify via + `EXPLAIN` returning `Index Scan`). + - [x] Test: `market_data_sync_runs` records `kind`, `started_at`, + `finished_at`, `rows_written`, `rows_removed`, `error` shape. + +### 1.2 Green — minimum migration + +- [x] Write `server/migrations/005_market_history.sql`: + - [x] `ALTER TABLE market_data_snapshots ADD COLUMN timeframe TEXT NOT NULL + DEFAULT 'tick'`. + - [x] `ALTER TABLE market_data_snapshots ADD CONSTRAINT + market_data_snapshots_unique_obs UNIQUE + (symbol, metric, timeframe, as_of)`. + - [x] `CREATE INDEX market_data_snapshots_asof_idx + ON market_data_snapshots (as_of DESC)`. + - [x] `CREATE TABLE tradable_assets (…)` with columns + `symbol PK, asset_class, exchange, name, tradable, fractionable, + status, raw JSONB, refreshed_at`. + - [x] `CREATE INDEX tradable_assets_status_idx + ON tradable_assets (status, tradable)`. + - [x] `CREATE TABLE market_data_sync_runs (…)` (see §0 plan). + - [ ] (Phase 2 stub, commented) `CREATE TABLE market_data_archive (…)` — + deferred to §4.2 (the migration runner splits on `;`, which would + slice a commented stub mid-block; the archive table will be added + in §4.2.2 when it is actually wired up). + +### 1.3 Refactor + +- [x] Confirm `MarketDataDb._rowToSnapshot` still reads correctly with the + new column (read-side back-compat — no test changes needed, just verify + existing `market_data_db_test.dart` still passes). +- [x] Move shared SQL fragments into the migration runner if duplication + appeared. _(none observed in 005; nothing to extract yet.)_ + +### 1.4 Confirm + +- [x] `cd server && dart test test/integration/migration_test.dart + test/integration/market_history_schema_test.dart` — green. +- [x] `psql cyberhybridhub_test -c '\d market_data_snapshots'` shows the + unique constraint and new column. + +--- + +## 2. Tradable-asset universe sync + +**Files (new):** `server/lib/alpaca/alpaca_assets_client.dart`, +`server/lib/trading/tradable_assets_db.dart`, +`server/lib/trading/tradable_assets_sync.dart`. + +### 2.1 Alpaca assets client + +#### 2.1.1 Red + +- [x] Add fixture `server/test/fixtures/alpaca_assets_active.json` (≥5 + representative assets, mix of `tradable=true/false` and + `fractionable=true/false`). +- [x] Add `server/test/alpaca/alpaca_assets_client_test.dart`: + - [x] Test: `listActiveTradable()` issues `GET` to + `${tradingBaseUrl}/v2/assets?status=active&asset_class=us_equity` + with `APCA-API-KEY-ID` + `APCA-API-SECRET-KEY` headers. + - [x] Test: parses fixture into `List` — verifies symbol, + exchange, fractionable, tradable, status fields. + - [x] Test: 401 / 500 → throws `AlpacaAssetsException` with status code + and body in the message. + - [x] Test: empty response array → returns `[]`, does not throw. + +#### 2.1.2 Green + +- [x] Add `AlpacaAsset` model in `server/lib/alpaca/alpaca_models.dart`. +- [x] Implement `AlpacaAssetsClient` with injectable `http.Client` + (mirror `AlpacaMarketDataClient` shape). +- [x] Add `AlpacaAssetsException`. + +#### 2.1.3 Refactor + +- [x] Extract a private `_authHeaders` helper if duplicated across + Alpaca clients (DRY — but only if you actually duplicate). _(Lifted to + `AlpacaEnv.authHeaders`; now reused by all three Alpaca clients.)_ + +#### 2.1.4 Confirm + +- [x] `dart test test/alpaca/alpaca_assets_client_test.dart` — green. +- [x] Tagged live test + `server/test/alpaca/alpaca_assets_live_test.dart` + (`@Tags(['alpaca'])`) — returns >100 symbols when keys present; + skipped otherwise. Run manually: + `dart test --tags=alpaca test/alpaca/alpaca_assets_live_test.dart`. + +### 2.2 Universe persistence + diff + +#### 2.2.1 Red + +- [x] Create `server/test/integration/tradable_assets_db_test.dart`: + - [x] Test: `upsertAll([A, B, C])` inserts 3 rows. + - [x] Test: re-running `upsertAll([B*, C, D])` updates `B`, leaves `C` + unchanged-by-content but `refreshed_at` bumped, inserts `D`, and + marks `A` as `tradable=false, status='inactive'` (we never delete + history). + - [x] Test: `listActiveTradableSymbols()` returns only + `tradable=true AND status='active'`. + +#### 2.2.2 Red — sync orchestration + +- [x] Create `server/test/integration/tradable_assets_sync_test.dart`: + - [x] Test: `TradableAssetsSync.runOnce()` with mocked client returning + the fixture → DB rows match; one row in `market_data_sync_runs` + with `kind='universe'` and non-null `finished_at`. + - [x] Test: client throws → sync run row recorded with `error` populated, + `finished_at` non-null, and `rows_written = 0`. + - [x] Test: two consecutive runs are safe (idempotent counts). + +#### 2.2.3 Green + +- [x] Implement `TradableAssetsDb.upsertAll`, + `TradableAssetsDb.listActiveTradableSymbols`. +- [x] Implement `TradableAssetsSync.runOnce()` that writes a + `market_data_sync_runs` row around the upsert. + +#### 2.2.4 Refactor + +- [x] Pull "wrap a closure with a `sync_runs` audit row" into a small + helper (`SyncRunRecorder.record(kind, body)`); reuse in §3 and §4. + _(Landed at `server/lib/trading/sync_run_recorder.dart`; + `TradableAssetsSync` already consumes it.)_ + +#### 2.2.5 Confirm + +- [x] `dart test test/integration/tradable_assets_db_test.dart + test/integration/tradable_assets_sync_test.dart` — green. + +--- + +## 3. Historical backfill (1Day bars × 7 days) + +**Files:** extend `server/lib/alpaca/alpaca_market_data_client.dart`, +new `server/lib/trading/market_data_history.dart`, +extend `server/lib/trading/market_data_db.dart`. + +### 3.1 Alpaca client — time-range bars with pagination + +#### 3.1.1 Red + +- [x] Add fixtures: + - [x] `server/test/fixtures/alpaca_bars_7d_multi_page1.json` — includes + `next_page_token`. + - [x] `server/test/fixtures/alpaca_bars_7d_multi_page2.json` — final + page, `next_page_token: null`. +- [x] Extend `server/test/alpaca/alpaca_market_data_client_test.dart`: + - [x] Test: `getBarsRange(['SPY','AAPL'], timeframe: '1Day', + start, end)` builds correct query string (`start`, `end`, + `timeframe`, `feed`, `symbols`, `limit`). + - [x] Test: follows pagination — when page1 returns + `next_page_token='abc'`, client issues second request with + `page_token=abc`; merges both pages' bars per symbol. + - [x] Test: stops after a configurable `maxPages` (default 20) to + prevent runaway loops. + - [x] Test: 429 → throws `AlpacaMarketDataException` containing the + word `rate` (so caller can detect & back off). + +#### 3.1.2 Green + +- [x] Implement `Future getBarsRange({ + List symbols, String timeframe, DateTime start, DateTime end, + int maxPages = 20})` on `AlpacaMarketDataClient`. +- [x] Extend `AlpacaBarsResponse` with a `merge(AlpacaBarsResponse other)` + method so paginated chunks combine cleanly. + +#### 3.1.3 Refactor + +- [x] If the pagination loop is non-trivial, extract a private + `_paginate(initialUri, parsePage)` generic to reuse later for + orders/positions endpoints. _(Loop kept inline in `getBarsRange` — + ~25 lines, clear enough; extract when a second consumer appears.)_ + +#### 3.1.4 Confirm + +- [x] `dart test test/alpaca/alpaca_market_data_client_test.dart` — green. +- [x] Tagged live test + `server/test/alpaca/alpaca_market_data_history_live_test.dart` + fetches 7-day bars for `SPY` and asserts ≥3 bars. + +### 3.2 `MarketDataDb` — idempotent upsert + range query + +#### 3.2.1 Red + +- [x] Extend `server/test/integration/market_data_db_test.dart`: + - [x] Test: `upsertSnapshot(symbol:'SPY', metric:'bar', + timeframe:'1Day', as_of:T, price:500)` then re-upsert with + `price:505` → exactly **one** row remains; price is `505`; `raw` + is overwritten (volume also overwritten). + - [x] Test: `barsForSymbol(symbol, timeframe, since, until)` returns + rows ordered by `as_of ASC`; range is inclusive of `since`, + exclusive of `until`. + - [x] Test: `barsForSymbol` returns `[]` when no rows match; does not + throw. + - [x] Test: `latestSyncedAsOf(symbol, timeframe)` returns the newest + `as_of` or `null`. + +#### 3.2.2 Green + +- [x] Implement `MarketDataDb.upsertSnapshot(...)` using + `ON CONFLICT (symbol, metric, timeframe, as_of) DO UPDATE + SET price = EXCLUDED.price, volume = EXCLUDED.volume, + raw = EXCLUDED.raw`. +- [x] Implement `MarketDataDb.barsForSymbol(...)` and + `MarketDataDb.latestSyncedAsOf(...)`. + +#### 3.2.3 Refactor + +- [x] Replace existing `insertSnapshot` call sites in + `market_data_ingest.dart` with `upsertSnapshot` (tick data has + `timeframe='tick'`; same call shape). Re-run + `test/integration/market_data_ingest_test.dart` — still green. + +#### 3.2.4 Confirm + +- [x] `dart test test/integration/market_data_db_test.dart + test/integration/market_data_ingest_test.dart` — green. + +### 3.3 `MarketDataHistorySync` + +#### 3.3.1 Red + +- [x] Add fixture + `server/test/fixtures/alpaca_bars_7d_3symbols.json` — 7 bars × 3 + symbols (SPY/AAPL/MSFT), realistic timestamps. +- [x] Add `server/test/integration/market_data_history_sync_test.dart`: + - [x] Test: with mocked Alpaca returning the fixture → 21 rows upserted + with `metric='bar'`, `timeframe='1Day'`; sync run row written. + - [x] Test: re-running with the same fixture → still 21 rows; zero + duplicates; `rows_written` reflects rows touched (not inserted). + - [x] Test: partial outage — Alpaca returns 200 for batch 1 + (AAPL/MSFT), 500 for batch 2 (SPY) → AAPL/MSFT rows persisted; + sync run row has `error` mentioning SPY; method does NOT throw. + - [x] Test: respects `HISTORY_SYNC_MAX_SYMBOLS` cap (set to 2 → only + first 2 symbols fetched). + - [x] Test: batching — with `HISTORY_SYNC_BATCH_SIZE=2` and 5 symbols, + Alpaca is called 3 times (mock call counter). + +#### 3.3.2 Green + +- [x] Implement `MarketDataHistorySync.runOnce({int windowDays = 7})`: + - [x] Reads symbols from + `TradableAssetsDb.listActiveTradableSymbols()`. + - [x] Batches into `HISTORY_SYNC_BATCH_SIZE` groups; calls + `getBarsRange` per batch. + - [x] Upserts via `MarketDataDb.upsertSnapshot`. + - [x] Captures per-batch errors without aborting; aggregates them into + the sync run row (`SyncRunCounts.error`). + +#### 3.3.3 Refactor + +- [x] Extract batching helper if used by §3.4 incremental path too. + _(Landed as `chunkList` in `market_data_history.dart`.)_ + +#### 3.3.4 Confirm + +- [x] `dart test test/integration/market_data_history_sync_test.dart` + — green. + +### 3.4 Incremental daily catch-up + +#### 3.4.1 Red + +- [x] Extend `market_data_history_sync_test.dart`: + - [x] Test: with prior `latestSyncedAsOf(symbol)` = `T-2d`, sync issues + bars with `start = T-2d` (not `T-7d`); mock HTTP call records + the requested start. + - [x] Test: with prior sync `T-10d` (outside window), `start` is + clamped to `T-windowDays`. + - [x] Test: cold start (no prior sync) → `start = T-windowDays`. + +#### 3.4.2 Green + +- [x] Compute per-symbol `start` using `latestSyncedAsOf`; pass to + `getBarsRange`. + +#### 3.4.3 Refactor + +- [x] If per-symbol starts vary inside a batch, fall back to + `min(starts)` for the batched call and let `upsertSnapshot` + dedupe the overlap — document the tradeoff in a code comment. + +#### 3.4.4 Confirm + +- [x] `dart test test/integration/market_data_history_sync_test.dart` + — green. + +--- + +## 4. Retention & cleanup (older than 7 days) + +**Files (new):** `server/lib/trading/market_data_retention.dart`. + +### 4.1 Hard-delete (Phase 1) + +#### 4.1.1 Red + +- [x] Create `server/test/integration/market_data_retention_test.dart`: + - [x] Test: seed 10 snapshots spanning 14 days → + `runCleanup({windowDays: 7})` deletes rows with + `as_of < now() - 7d`, keeps the rest; returns `rowsRemoved` + matching deleted count. + - [x] Test: empty table → returns `rowsRemoved = 0`, does not throw. + - [x] Test: `batchSize` honored — with 5000 rows older than window and + `batchSize=1000`, the underlying `DELETE` is issued ≥5 times + (use a counting wrapper around `_connection.execute`). + - [x] Test: each invocation appends a `market_data_sync_runs` row + with `kind='cleanup'`, `rows_removed` populated. + - [x] Test: rows within window are NEVER touched (assert specific IDs + survive). + +#### 4.1.2 Green + +- [x] Implement + `MarketDataRetention.runCleanup({int windowDays = 7, + int batchSize = 5000})`: + - [x] Loop: `DELETE FROM market_data_snapshots WHERE as_of < $cutoff + LIMIT $batchSize` (use CTE if Postgres version requires it), + return rows removed; repeat until 0. + - [x] Write a `market_data_sync_runs` row around the operation. + +#### 4.1.3 Refactor + +- [x] Reuse `SyncRunRecorder` from §2.2.4. + +#### 4.1.4 Confirm + +- [x] `dart test test/integration/market_data_retention_test.dart` + — green. + +### 4.2 Archive (Phase 2 — opt-in) + +#### 4.2.1 Red + +- [x] Extend `market_data_retention_test.dart`: + - [x] Test: with `archiveEnabled: true`, expired rows are copied into + `market_data_archive` with `archived_at = now()` BEFORE being + deleted; archive count grows by exactly `rowsRemoved`. + - [x] Test: archive run is transactional — if archive `INSERT` fails, + no `DELETE` happens; sync run row records the error. + - [x] Test: `archiveEnabled: false` (default) → archive table + untouched. + +#### 4.2.2 Green + +- [x] Uncomment `market_data_archive` table in migration 005 (or add it + now if you deferred it). _(Added `006_market_data_archive.sql`.)_ +- [x] Implement + `MarketDataRetention.runArchiveAndCleanup({int windowDays})` + with explicit `BEGIN; INSERT … SELECT …; DELETE …; COMMIT`. + +#### 4.2.3 Refactor + +- [x] Consider a single unified entry point + `MarketDataRetention.run({int windowDays, bool archive})` that + dispatches; only do this if it doesn't muddy the failure-isolation + story. + +#### 4.2.4 Confirm + +- [x] `dart test test/integration/market_data_retention_test.dart` + — green. + +--- + +## 5. Scheduler — daily cadence inside the worker + +**Files:** new `server/lib/workers/market_history_scheduler.dart`, +extend `server/lib/workers/question_background_worker.dart`, +extend `server/bin/server.dart`. + +### 5.1 Red + +- [x] Add `server/test/integration/market_history_scheduler_test.dart`: + - [x] Test: cold start → `runIfDue(now=T0)` runs all 3 stages + (`universe`, `backfill`, `cleanup`) in that order; + `market_data_sync_runs` has 3 rows. + - [x] Test: same-day re-run (`now=T0 + 1h`) → no stages run; zero new + sync rows. + - [x] Test: next day (`now=T0 + 24h`) → all 3 stages run again. + - [x] Test: per-stage cadence — set + `MARKET_UNIVERSE_REFRESH_HOURS=48`, `MARKET_HISTORY_SYNC_HOURS=24`, + `MARKET_HISTORY_CLEANUP_HOURS=24`; at T0+24h only backfill + + cleanup run. + - [x] Test: failure isolation — backfill throws → cleanup still runs; + both stages logged in `market_data_sync_runs` (one with `error`, + one without). + - [x] Test: `MARKET_HISTORY_SYNC_HOUR_UTC=10` (optional alignment) → + scheduler runs only when local UTC hour ≥ 10 AND last run was on + a prior UTC day. +- [x] Add `server/test/integration/market_history_worker_wireup_test.dart`: + - [x] Test: `QuestionBackgroundWorker._tick` invokes + `MarketHistoryScheduler.runIfDue` **before** the + `TradingOrchestrator` per-user loop. Use a spy scheduler that + records the call order. + - [x] Test: scheduler exception is caught — worker tick continues into + the orchestrator loop; stderr contains the error. + +### 5.2 Green + +- [x] Implement `MarketHistoryScheduler` with `runIfDue(DateTime now)`, + reading the last `finished_at` per `kind` from + `market_data_sync_runs`. +- [x] Wire `QuestionBackgroundWorker` to accept an optional + `MarketHistoryScheduler` and call it at the top of `_tick`. +- [x] Wire `bin/server.dart` to construct the scheduler only when + `MARKET_HISTORY_SYNC_ENABLED=true && TRADING_ENABLED=true`. + +### 5.3 Refactor + +- [x] If the three stages each need similar before/after logic, abstract + a small `_runStage(kind, body)` inside the scheduler. + (`_maybeRunStage` — no further refactor needed.) + +### 5.4 Confirm + +- [x] `dart test test/integration/market_history_scheduler_test.dart + test/integration/market_history_worker_wireup_test.dart` — green. + +--- + +## 6. Question pipeline — "guess the move" rule + +**Files:** extend `server/lib/trading/rule_engine.dart`, +extend `server/lib/trading/trading_pipeline.dart`, +new `server/lib/trading/market_history_query.dart`. + +The guessing game uses the rolling 7-day window — questions must reveal +just enough for the user to guess (obfuscated symbol/price/direction). +**No trade is placed for this rule** — answers feed scoring only. + +### 6.1 Red — `MarketHistoryQuery` + +- [x] Add `server/test/integration/market_history_query_test.dart`: + - [x] Test: `weeklyMovers({minBars: 5, asOf})` returns only symbols + with ≥5 daily bars in the window; each entry exposes + `(symbol, openClose, currentClose, days)`. + - [x] Test: deterministic — supply a `random: Random(42)` and assert a + stable selection order across runs. + - [x] Test: symbols with stale data (newest bar > 2d old) are + excluded. + +### 6.2 Red — rule engine extension + +- [x] Add `server/test/trading/rule_engine_guess_weekly_move_test.dart`: + - [x] Test: rule kind `guess_weekly_move` with mocked + `MarketHistoryQuery` returning SPY {ref=500, current=510, days=5} + → produces a `RuleEvaluation` with: + - obfuscated `symbol_token='ASSET_A'`, + - `correct_answer = 10` (up direction), + - `question_text` substituting `{{token}}`, `{{ref_price}}`, + `{{ref_days_ago}}`, NEVER `{{symbol}}`. + - [x] Test: down move (ref=510, current=500) → `correct_answer = -10`. + - [x] Test: insufficient bars → no fire. + - [x] Test: `questions.metadata.guess_symbol` is set to real symbol + (server-side only) when the question is created in §6.3. + +### 6.3 Red — pipeline wiring + +- [x] Add + `server/test/integration/trading_pipeline_guess_weekly_move_test.dart`: + - [x] Test: end-to-end with seeded 7 daily bars for SPY → pipeline + creates a question with obfuscated text; `metadata.guess_symbol + = 'SPY'`; `pipeline_key='trading'`, + `pipeline_step='guess_weekly_move:await_answer'`. + - [x] Test: `onAnswerSubmitted` with matching direction (e.g., +10 on + an up move) records `score_delta = +1` in + `user_trading_state.context.guess_score`; non-matching records + `score_delta = -1`. + - [x] Test: `TradeActuator.processPendingOrders` is **NEVER called** + for `guess_weekly_move` answers (assert via spy). + - [x] Test: cooldown — after a fire, the same symbol is not re-picked + for `GUESS_COOLDOWN_HOURS` (default 24). + +### 6.4 Green + +- [x] Implement `MarketHistoryQuery.weeklyMovers({...})`. +- [x] Add rule kind to `RuleEngine` with the new template tokens. +- [x] Extend `TradingPipeline.evaluate` + `onAnswerSubmitted` for the + new rule kind, including the cooldown bookkeeping. + +### 6.5 Refactor + +- [x] If the token mapping (real symbol ↔ `ASSET_A`/`ASSET_B`/…) is used + in multiple places, lift it into a `SymbolObfuscator` helper with + its own focused unit test. + +### 6.6 Confirm + +- [x] `dart test test/integration/market_history_query_test.dart + test/trading/rule_engine_guess_weekly_move_test.dart + test/integration/trading_pipeline_guess_weekly_move_test.dart` + — green. + +--- + +## 7. Env additions (`server/.env.example`) + +```bash +# Rolling history feature gate +MARKET_HISTORY_SYNC_ENABLED=false +MARKET_HISTORY_WINDOW_DAYS=7 +MARKET_HISTORY_RETENTION_DAYS=7 +MARKET_HISTORY_ARCHIVE_ENABLED=false + +# Cadence (hours) +MARKET_UNIVERSE_REFRESH_HOURS=24 +MARKET_HISTORY_SYNC_HOURS=24 +MARKET_HISTORY_CLEANUP_HOURS=24 +MARKET_HISTORY_SYNC_HOUR_UTC=10 # optional alignment hour + +# Batching / safety +HISTORY_SYNC_BATCH_SIZE=50 +HISTORY_SYNC_MAX_SYMBOLS=2000 # hard cap; Alpaca Basic-friendly +MIN_BARS_FOR_GUESS=5 +GUESS_COOLDOWN_HOURS=24 +``` + +### 7.1 Red + +- [x] Add `server/test/env/market_history_env_test.dart`: + - [x] Test: defaults parsed when env empty (`enabled=false`, + `windowDays=7`, etc.). + - [x] Test: `MARKET_HISTORY_SYNC_ENABLED=true` while + `TRADING_ENABLED=false` → `Env.assertConsistent()` throws. + - [x] Test: `MARKET_HISTORY_WINDOW_DAYS=0` or negative → throws. + - [x] Test: `MARKET_HISTORY_SYNC_HOUR_UTC=24` → throws (valid range + `0..23`). + +### 7.2 Green + +- [x] Extend `server/lib/env.dart` to load and validate these vars. +- [x] Append the block above to `server/.env.example`. + +### 7.3 Refactor + +- [x] If `Env` has grown unwieldy, split market-history vars into + `MarketHistoryEnv` (typed value object) and have `Env` expose it. + +### 7.4 Confirm + +- [x] `dart test test/env/market_history_env_test.dart` — green. +- [x] Document each var in `server/README.md` under a new + **"Market history window"** subsection. + +--- + +## 8. Operational tooling + +### 8.1 Red + +- [ ] Add `server/test/bin/sync_market_history_smoke_test.dart`: + - [ ] Test: imports `bin/sync_market_history.dart` `main` function and + runs it with `QUESTION_PIPELINE_TEST_MODE=true` + a fake DB → + exits 0 and emits the expected one-line log. +- [ ] Add equivalent smoke test for `bin/cleanup_market_history.dart`. + +### 8.2 Green + +- [ ] Add CLI `server/bin/sync_market_history.dart` with + `--window=` flag (default 7); honors test mode. +- [ ] Add CLI `server/bin/cleanup_market_history.dart` with + `--window=` and `--archive` flags. +- [ ] Add structured one-line log: + `kind=… symbols=… rows_written=… rows_removed=… duration_ms=… error=…`. + +### 8.3 Refactor + +- [ ] Share argument parsing between the two CLIs if duplicated. + +### 8.4 Confirm + +- [ ] `dart test test/bin/` — green. +- [ ] Manual: `dart run server:bin/sync_market_history.dart --window=7` + against `cyberhybridhub_test` works end-to-end. + +### 8.5 Optional admin endpoint (defer until needed) + +- [ ] Behind Firebase admin auth, `POST /v1/admin/market-data/resync?window=7` + enqueues a sync run; **not exposed to Flutter**. + +--- + +## 9. Test pyramid for this milestone + +| Layer | Test files | +|------|------------| +| Unit | `test/alpaca/alpaca_assets_client_test.dart` | +| Unit | `test/alpaca/alpaca_market_data_client_test.dart` (extended) | +| Unit | `test/trading/rule_engine_guess_weekly_move_test.dart` | +| Unit | `test/env/market_history_env_test.dart` | +| DB integration | `test/integration/market_history_schema_test.dart` | +| DB integration | `test/integration/tradable_assets_db_test.dart` | +| DB integration | `test/integration/tradable_assets_sync_test.dart` | +| DB integration | `test/integration/market_data_db_test.dart` (extended) | +| DB integration | `test/integration/market_data_history_sync_test.dart` | +| DB integration | `test/integration/market_data_retention_test.dart` | +| DB integration | `test/integration/market_history_query_test.dart` | +| DB integration | `test/integration/trading_pipeline_guess_weekly_move_test.dart` | +| Worker integration | `test/integration/market_history_scheduler_test.dart` | +| Worker integration | `test/integration/market_history_worker_wireup_test.dart` | +| Bin smoke | `test/bin/sync_market_history_smoke_test.dart` | +| Bin smoke | `test/bin/cleanup_market_history_smoke_test.dart` | +| Tagged (`alpaca`) | `test/alpaca/alpaca_assets_live_test.dart` | +| Tagged (`alpaca`) | `test/alpaca/alpaca_market_data_history_live_test.dart` | + +**CI gating:** + +```bash +# default job — no Alpaca keys, must pass on every PR +cd server && dart test + +# nightly / manual — requires ALPACA_API_KEY_ID / ALPACA_API_SECRET_KEY +cd server && dart test --tags=alpaca +``` + +--- + +## 10. Acceptance criteria (Gate H — Rolling history) + +- [ ] `market_data_snapshots` contains rows for every active tradable with + `as_of` within the last 7 days, and no rows older. +- [ ] Re-running backfill is a no-op (zero duplicate rows; deterministic + `rows_written` count when nothing changed upstream). +- [ ] Cleanup removes only rows older than the window and never touches + newer rows. +- [ ] Worker performs one full cycle (universe → backfill → cleanup) per + day with stage isolation; failure in one stage does not block the + others. +- [ ] A `guess_weekly_move` question can be generated end-to-end from + pure DB data — no live Alpaca call at evaluation time. +- [ ] `dart test` is green; `dart test --tags=alpaca` is green when keys + are present. +- [ ] `MARKET_HISTORY_SYNC_ENABLED=false` is the default; nothing runs + unless explicitly enabled. +- [ ] Safety: `MARKET_HISTORY_SYNC_ENABLED=true` without + `TRADING_ENABLED=true` fails fast at server boot. + +--- + +## 11. Risks & mitigations + +| Risk | Mitigation | +|------|------------| +| Alpaca rate limits on full-universe pull | Batched `bars` calls (`HISTORY_SYNC_BATCH_SIZE`); per-batch error isolation; 429 → exception logged in sync run, retry next day. | +| Migration deadlocks on large `market_data_snapshots` | Cleanup batches via `LIMIT` + loop; unique constraint added with `NOT VALID` then `VALIDATE CONSTRAINT` if existing dataset is huge (document in migration). | +| Duplicate Alpaca asset entries between runs | `upsertAll` PK-on-symbol; we mark missing symbols inactive instead of deleting. | +| Guessing game leaks the real symbol | Question text uses tokens only; real symbol lives in `questions.metadata` (server side); add a regex test that scans `question_text` for any known ticker. | +| Backfill blowing past disk budget | Hard caps via `HISTORY_SYNC_MAX_SYMBOLS` and `MARKET_HISTORY_WINDOW_DAYS`; retention deletes daily so steady-state size is bounded. | + +--- + +## 12. Progress log + + + +| Date | Step | Result | +|------|------|--------| +| 2026-05-26 | §7 Env additions | Green: 6/6 env tests; `dart test` 133/133. `MarketHistoryEnv.fromMap` + `assertConsistent`; `ServerEnv.marketHistory`; wired scheduler/sync/retention/guess; `server/.env.example`; README **Market history window**. | +| 2026-05-26 | §6 Guess-the-move rule | Green: 12 new tests; `dart test` 127/127. `MarketHistoryQuery.weeklyMovers`; `RuleEngine.evaluateGuessWeeklyMove`; `SymbolObfuscator`; `TradingPipeline` scoring + per-symbol cooldown; `questions.metadata` migration `007`; no pending orders on guess answers. | +| 2026-05-26 | §5 Scheduler (worker cadence) | Green: 8/8 scheduler + wireup tests; `dart test` 115/115. `MarketHistoryScheduler.runIfDue` (per-kind cadence + optional `syncHourUtc`); worker calls scheduler before pipeline/trading; `server.dart` wires universe→backfill→cleanup when `MARKET_HISTORY_SYNC_ENABLED` + real Alpaca; `ServerEnv.marketHistorySyncEnabled`; `SyncRunRecorder` uses injected `now` for `finished_at`. | +| 2026-05-26 | §4 Retention & cleanup | Green: 8/8 retention tests; `dart test` 107/107. `MarketDataRetention.runCleanup` (batched hard-delete via CTE+RETURNING); `runArchiveAndCleanup` (transactional archive-then-delete); unified `run(archive:)`; migration `006_market_data_archive.sql`; reuses `SyncRunRecorder` kind=`cleanup`. | +| 2026-05-26 | §3 Historical backfill (1Day × 7d) | Green: 17 new tests (6 client + 4 db + 8 sync); `dart test` 99/99; live `alpaca_market_data_history_live_test` ≥3 SPY bars. `getBarsRange` + pagination; `upsertSnapshot`/`barsForSymbol`/`latestSyncedAsOf`; `MarketDataHistorySync` with incremental catch-up + partial batch errors via `SyncRunCounts.error`. Defaults in `MarketHistoryConfig` (batch=100). | +| 2026-05-26 | §2 Tradable-asset universe sync | Green: 11/11 §2 tests pass (5 client + 3 db + 3 sync); `dart test` 82/82 green; tagged live `alpaca_assets_live_test` returned >100 active us_equity assets. Refactor 2.1.3 lifted auth headers to `AlpacaEnv.authHeaders`; 2.2.4 lifted `SyncRunRecorder` for §3/§4 reuse. | +| 2026-05-26 | §1 Schema additions (migration `005_market_history.sql`) | Green: 5/5 schema tests pass; `dart test` 70/70 green; `\d market_data_snapshots` shows `timeframe` col + `market_data_snapshots_unique_obs` unique constraint. Archive stub deferred to §4.2 to keep `;`-split migration runner happy. | + +--- + +## 13. References + +- Existing snapshot writer: `server/lib/trading/market_data_ingest.dart` +- Existing snapshot DB: `server/lib/trading/market_data_db.dart` +- Existing migration to extend: `server/migrations/004_trading.sql` +- Orchestrator hook point: `server/lib/trading/trading_orchestrator.dart` +- Worker hook point: `server/lib/workers/question_background_worker.dart` +- Plans: [`TRADING_DEVELOPMENT_PLAN.md`](./TRADING_DEVELOPMENT_PLAN.md), + [`TRADING_TDD_PLAN.md`](./TRADING_TDD_PLAN.md) +- Alpaca docs: [Market Data](https://docs.alpaca.markets/docs/market-data-api), + [Trading / Assets](https://docs.alpaca.markets/docs/trading-api), + [Bars](https://docs.alpaca.markets/reference/stockbars) + +--- + +*Document version: 1.0 — Rolling 7-day market data window, cleanup, and +guessing-game question integration.* diff --git a/lib/admin/models/market_history_week_coverage.dart b/lib/admin/models/market_history_week_coverage.dart index a4ea6f8..11bea75 100644 --- a/lib/admin/models/market_history_week_coverage.dart +++ b/lib/admin/models/market_history_week_coverage.dart @@ -44,7 +44,7 @@ class MarketHistoryDayCoverage { json['slots'] as List? ?? []; return MarketHistoryDayCoverage( date: DateTime.parse('${json['date']}T00:00:00Z').toUtc(), - slotsPerDay: (json['slotsPerDay'] as num?)?.toInt() ?? 6, + slotsPerDay: (json['slotsPerDay'] as num?)?.toInt() ?? 2, completedSlots: (json['completedSlots'] as num?)?.toInt() ?? 0, fullySyncedSlots: (json['fullySyncedSlots'] as num?)?.toInt() ?? 0, slots: rawSlots @@ -81,7 +81,7 @@ class MarketHistoryWeekCoverageReport { return MarketHistoryWeekCoverageReport( asOf: DateTime.parse(json['asOf'] as String).toUtc(), windowDays: (json['windowDays'] as num?)?.toInt() ?? 7, - slotsPerDay: (json['slotsPerDay'] as num?)?.toInt() ?? 6, + slotsPerDay: (json['slotsPerDay'] as num?)?.toInt() ?? 2, symbolCount: (json['symbolCount'] as num?)?.toInt() ?? 0, isConsistent: json['isConsistent'] as bool? ?? false, days: rawDays diff --git a/lib/admin/models/question_audit_asset.dart b/lib/admin/models/question_audit_asset.dart index e80daae..f767b3e 100644 --- a/lib/admin/models/question_audit_asset.dart +++ b/lib/admin/models/question_audit_asset.dart @@ -60,6 +60,9 @@ class QuestionAuditAsset { } } +/// US RTH session-half slot length (9:30–12:45 and 12:45–16:00 ET). +const Duration _sessionHalfSlotDuration = Duration(hours: 3, minutes: 15); + class QuestionAuditReport { const QuestionAuditReport({ required this.compareUntil, @@ -91,10 +94,10 @@ class QuestionAuditReport { return QuestionAuditReport( compareUntil: compareUntil, newerSlotStart: json['newerSlotStart'] == null - ? compareUntil.subtract(const Duration(hours: 4)) + ? compareUntil.subtract(_sessionHalfSlotDuration) : DateTime.parse(json['newerSlotStart']! as String).toUtc(), olderSlotStart: json['olderSlotStart'] == null - ? compareUntil.subtract(const Duration(hours: 8)) + ? compareUntil.subtract(_sessionHalfSlotDuration * 2) : DateTime.parse(json['olderSlotStart']! as String).toUtc(), windowDays: (json['windowDays'] as num).toInt(), canStepOlder: json['canStepOlder'] as bool? ?? false, diff --git a/lib/admin/utils/sync_run_formatters.dart b/lib/admin/utils/sync_run_formatters.dart index 37c2596..1833436 100644 --- a/lib/admin/utils/sync_run_formatters.dart +++ b/lib/admin/utils/sync_run_formatters.dart @@ -20,17 +20,10 @@ String formatRelativeTime(DateTime startedAt, {DateTime? now}) { String formatMarketHistorySlotWire(DateTime value) { final DateTime utc = value.toUtc(); - final int slotHour = (utc.hour ~/ 4) * 4; - final DateTime slotStart = DateTime.utc( - utc.year, - utc.month, - utc.day, - slotHour, - ); String two(int n) => n.toString().padLeft(2, '0'); - return '${slotStart.year.toString().padLeft(4, '0')}-' - '${two(slotStart.month)}-${two(slotStart.day)}T' - '${two(slotStart.hour)}:${two(slotStart.minute)}:${two(slotStart.second)}Z'; + return '${utc.year.toString().padLeft(4, '0')}-' + '${two(utc.month)}-${two(utc.day)}T' + '${two(utc.hour)}:${two(utc.minute)}:${two(utc.second)}Z'; } String formatUtcTimestamp(DateTime? value) { diff --git a/lib/admin/widgets/market_history_question_audit_sheet.dart b/lib/admin/widgets/market_history_question_audit_sheet.dart index 7dbca45..421a353 100644 --- a/lib/admin/widgets/market_history_question_audit_sheet.dart +++ b/lib/admin/widgets/market_history_question_audit_sheet.dart @@ -4,7 +4,7 @@ import '../../theme/app_theme.dart'; import '../models/question_audit_asset.dart'; import '../services/market_history_admin_api.dart'; -/// Scrollable audit of last-two 4-hour bar price and volume deltas per symbol. +/// Scrollable audit of last-two session-half bar price and volume deltas per symbol. class MarketHistoryQuestionAuditSheet extends StatefulWidget { const MarketHistoryQuestionAuditSheet({ super.key, @@ -517,7 +517,7 @@ class _SlotRow extends StatelessWidget { } abstract final class _AuditFormat { - /// Older → newer UTC 4-hour slot starts being compared. + /// Older → newer session-half slot opens (UTC instants of 9:30 / 12:45 ET). static String compareSlotRange({ required DateTime older, required DateTime newer, @@ -525,20 +525,17 @@ abstract final class _AuditFormat { return '${_slotLabel(older)} – ${_slotLabel(newer)} UTC'; } - static String _slotLabel(DateTime asOf) { - final DateTime utc = asOf.toUtc(); + static String _slotLabel(DateTime slotStart) { + final DateTime utc = slotStart.toUtc(); final String month = utc.month.toString().padLeft(2, '0'); final String day = utc.day.toString().padLeft(2, '0'); final String hour = utc.hour.toString().padLeft(2, '0'); - return '$month/$day $hour:00'; + final String minute = utc.minute.toString().padLeft(2, '0'); + return '$month/$day $hour:$minute'; } - static String slotTime(DateTime asOf) { - final DateTime utc = asOf.toUtc(); - final String month = utc.month.toString().padLeft(2, '0'); - final String day = utc.day.toString().padLeft(2, '0'); - final String hour = utc.hour.toString().padLeft(2, '0'); - return '$month/$day ${hour}:00 UTC'; + static String slotTime(DateTime slotStart) { + return '${_slotLabel(slotStart)} UTC'; } static String value(num n) { diff --git a/lib/admin/widgets/market_history_week_coverage_sheet.dart b/lib/admin/widgets/market_history_week_coverage_sheet.dart index fb6f70c..7f19285 100644 --- a/lib/admin/widgets/market_history_week_coverage_sheet.dart +++ b/lib/admin/widgets/market_history_week_coverage_sheet.dart @@ -13,7 +13,7 @@ const List _weekdayLabels = [ 'Sun', ]; -/// Mini 7-day UTC week view showing 4-hour slot sync health. +/// Mini 7-day Eastern week view showing session-half slot sync health. class MarketHistoryWeekCoverageSheet extends StatelessWidget { const MarketHistoryWeekCoverageSheet({ super.key, @@ -75,7 +75,7 @@ class MarketHistoryWeekCoverageSheet extends StatelessWidget { report.symbolCount == 0 ? 'No active tradable symbols to validate.' : report.isConsistent - ? 'All completed 4-hour slots are fully synced across ' + ? 'All completed session-half slots are fully synced across ' '${report.symbolCount} symbols (bars or no-data placeholders).' : 'Some completed slots are missing a bar or no-data placeholder ' 'for one or more symbols.', @@ -97,7 +97,7 @@ class MarketHistoryWeekCoverageSheet extends StatelessWidget { ), const SizedBox(height: 12), Text( - 'UTC · ${report.slotsPerDay} slots per day · ' + 'ET · ${report.slotsPerDay} slots per trading day · ' '${report.windowDays}-day window', textAlign: TextAlign.center, style: const TextStyle( diff --git a/lib/admin/widgets/sync_run_expansion_tile.dart b/lib/admin/widgets/sync_run_expansion_tile.dart index 6b6ab8b..d0bf77c 100644 --- a/lib/admin/widgets/sync_run_expansion_tile.dart +++ b/lib/admin/widgets/sync_run_expansion_tile.dart @@ -218,7 +218,9 @@ class SyncRunExpansionTile extends StatelessWidget { ), const SizedBox(height: 2), Text( - '${item.symbols.length} assets: ${item.symbols.join(', ')}', + item.symbols.length == 1 + ? '1 asset' + : '${item.symbols.length} assets', style: const TextStyle( fontSize: 12, color: AppColors.textSecondary, diff --git a/server/README.md b/server/README.md index c9d777a..f0e07be 100644 --- a/server/README.md +++ b/server/README.md @@ -129,8 +129,9 @@ curl -s -X POST http://localhost:3000/v1/me/incoming-question \ ## Market history -Alpaca **`4Hour`** bars in six **UTC slots** per day (`00`, `04`, `08`, `12`, `16`, `20`). -Stored as `metric=bar`, `timeframe=4Hour`. Rolling window: `MARKET_HISTORY_WINDOW_DAYS` (default 7). +Alpaca **`1Min`** bars aggregated into two **US regular-session** half-days per trading day +(morning **9:30–12:45 ET**, afternoon **12:45–16:00 ET**, ~195 minutes each). +Stored as `metric=bar`, `timeframe=sessionHalf`. Rolling window: `MARKET_HISTORY_WINDOW_DAYS` (default 7). **Backfill** (`kind=backfill`): fetches each **ended** slot still missing in DB; skips the open slot. Throttled to `MARKET_HISTORY_API_REQUESTS_PER_MINUTE` (default 200/min). On HTTP 429: wait 1 minute, retry once; if still limited, save partial rows and resume next tick. @@ -142,7 +143,9 @@ crashed prior sync cannot block new work. Requires `TRADING_ENABLED=true` when `MARKET_HISTORY_SYNC_ENABLED=true`. -**Migration `008`:** drops legacy `1Day` history bars, adds `timeframe` check (`4Hour` allowed), partial index on `4Hour` bars, `market_data_sync_runs.slots_synced`. +**Migration `008`:** legacy `1Day` / `4Hour` history cleanup and `slots_synced` on sync runs. + +**Migration `010`:** deletes `4Hour` bars, adds `sessionHalf` timeframe + partial index. **Migration `009`:** adds `market_data_sync_runs.backfill_items` JSONB — per-slot UTC start + symbol list for each backfill run. diff --git a/server/lib/alpaca/alpaca_market_data_client.dart b/server/lib/alpaca/alpaca_market_data_client.dart index 1f32c95..7e305df 100644 --- a/server/lib/alpaca/alpaca_market_data_client.dart +++ b/server/lib/alpaca/alpaca_market_data_client.dart @@ -4,7 +4,7 @@ import 'package:http/http.dart' as http; import 'alpaca_env.dart'; import 'alpaca_models.dart'; -import '../trading/market_history_four_hour_slot.dart'; +import '../trading/market_history_session_slot.dart'; /// REST client for Alpaca Market Data API v2 (IEX feed on Basic plan). class AlpacaMarketDataClient { @@ -106,8 +106,8 @@ class AlpacaMarketDataClient { final Map query = { 'symbols': symbols.join(','), 'timeframe': timeframe, - 'start': MarketHistoryFourHourSlot.wireUtc(start), - 'end': MarketHistoryFourHourSlot.wireUtc(end), + 'start': MarketHistorySessionSlot.wireUtc(start), + 'end': MarketHistorySessionSlot.wireUtc(end), 'feed': _env.dataFeed, 'limit': limit.toString(), if (pageToken != null && pageToken.isNotEmpty) 'page_token': pageToken, diff --git a/server/lib/trading/backfill_sync_item.dart b/server/lib/trading/backfill_sync_item.dart index 0e8963a..2e9fbca 100644 --- a/server/lib/trading/backfill_sync_item.dart +++ b/server/lib/trading/backfill_sync_item.dart @@ -1,4 +1,4 @@ -import 'market_history_four_hour_slot.dart'; +import 'market_history_session_slot.dart'; /// One Alpaca backfill request bucket: a UTC 4-hour slot and its symbols. class BackfillSyncItem { @@ -11,7 +11,7 @@ class BackfillSyncItem { final List symbols; Map toJson() => { - 'slotStart': MarketHistoryFourHourSlot.slotStartWire(slotStart), + 'slotStart': MarketHistorySessionSlot.slotStartWire(slotStart), 'symbols': symbols, }; diff --git a/server/lib/trading/market_data_db.dart b/server/lib/trading/market_data_db.dart index 680ec87..46968a5 100644 --- a/server/lib/trading/market_data_db.dart +++ b/server/lib/trading/market_data_db.dart @@ -3,7 +3,7 @@ import 'dart:convert'; import 'package:postgres/postgres.dart'; import 'market_history_bar_placeholder.dart'; -import 'market_history_four_hour_slot.dart'; +import 'market_history_session_slot.dart'; /// Normalized market data row persisted for rule evaluation. class MarketDataSnapshot { @@ -105,7 +105,7 @@ class MarketDataDb { return _rowToSnapshot(result.first); } - /// Tombstone when Alpaca has no 4Hour bar for [symbol] at [slotStart]. + /// Tombstone when Alpaca has no session-half bar for [symbol] at [slotStart]. /// /// Counts toward backfill gap checks but not game/calendar bar coverage. Future upsertNoDataBarPlaceholder({ @@ -118,8 +118,8 @@ class MarketDataDb { String source = MarketHistoryBarPlaceholder.sourceAlpacaEmpty, }) async { final DateTime slot = - MarketHistoryFourHourSlot.slotStartContaining(slotStart); - final String slotWire = MarketHistoryFourHourSlot.slotStartWire(slot); + MarketHistorySessionSlot.slotStartContaining(slotStart); + final String slotWire = MarketHistorySessionSlot.slotStartWire(slot); return upsertSnapshot( symbol: symbol, metric: 'bar', @@ -131,7 +131,7 @@ class MarketDataDb { 'slot_start': slotWire, MarketHistoryBarPlaceholder.rawKey: true, 'source': source, - 'checked_at': MarketHistoryFourHourSlot.wireUtc(checkedAt), + 'checked_at': MarketHistorySessionSlot.wireUtc(checkedAt), }, ); } @@ -201,11 +201,10 @@ class MarketDataDb { return (result.first[0]! as DateTime).toUtc(); } - /// Symbols from [symbols] that already have a bar for the UTC slot [slotStart]. + /// Symbols from [symbols] that already have a bar for session slot [slotStart]. /// - /// A row counts when [raw.slot_start] matches the canonical wire form, when - /// [raw.slot_start] or [as_of] bucket to the same UTC 4-hour boundary as - /// [slotStart] (same rule as [MarketHistoryFourHourSlot.slotStartContaining]). + /// A row counts when [raw.slot_start] or [as_of] matches the canonical slot + /// start ([MarketHistorySessionSlot.slotStartWire]). Future> symbolsWithBarForSlot({ required List symbols, required DateTime slotStart, @@ -216,8 +215,8 @@ class MarketDataDb { return {}; } final DateTime start = - MarketHistoryFourHourSlot.slotStartContaining(slotStart); - final String slotStartWire = MarketHistoryFourHourSlot.slotStartWire(start); + MarketHistorySessionSlot.slotStartContaining(slotStart); + final String slotStartWire = MarketHistorySessionSlot.slotStartWire(start); final Result result = await _connection.execute( Sql.named( ''' @@ -228,12 +227,7 @@ class MarketDataDb { AND symbol = ANY(@symbols) AND ( raw->>'slot_start' = @slot_start_wire - OR ( - raw->>'slot_start' IS NOT NULL - AND ${_slotStartBucketSql('(raw->>\'slot_start\')::timestamptz')} - = @slot_start - ) - OR ${_slotStartBucketSql('as_of')} = @slot_start + OR as_of = @slot_start ) ''', ), @@ -250,17 +244,6 @@ class MarketDataDb { .toSet(); } - /// UTC 4-hour slot left edge for [timestampExpr] (timestamptz SQL expression). - static String _slotStartBucketSql(String timestampExpr) { - return ''' -( - date_trunc('day', $timestampExpr AT TIME ZONE 'UTC') - + (div(extract(hour from $timestampExpr AT TIME ZONE 'UTC')::int, 4) * 4) - * interval '1 hour' -) AT TIME ZONE 'UTC' -'''; - } - /// Newest snapshot for [symbol] and [metric] by [as_of]. Future latestForSymbol( String symbol, diff --git a/server/lib/trading/market_data_history.dart b/server/lib/trading/market_data_history.dart index 2dff86e..7e23f75 100644 --- a/server/lib/trading/market_data_history.dart +++ b/server/lib/trading/market_data_history.dart @@ -7,7 +7,8 @@ import 'market_data_db.dart'; import 'market_history_api_rate_limiter.dart'; import 'market_history_bar_placeholder.dart'; import 'market_history_config.dart'; -import 'market_history_four_hour_slot.dart'; +import 'market_history_minute_aggregate.dart'; +import 'market_history_session_slot.dart'; import 'market_history_trading_calendar.dart'; import 'sync_run_recorder.dart'; import 'tradable_assets_db.dart'; @@ -51,7 +52,7 @@ class PersistBarsResult { return null; } - final String slotWire = MarketHistoryFourHourSlot.slotStartWire(slotStart); + final String slotWire = MarketHistorySessionSlot.slotStartWire(slotStart); final List parts = [ 'Alpaca returned no persistable $timeframe bars', 'slot=$slotWire', @@ -91,7 +92,7 @@ class MarketDataHistorySyncResult { final DateTime finishedAt; final String? error; - /// Number of completed 4-hour slots written in this run. + /// Number of completed session-half slots written in this run. final int slotsSynced; bool get succeeded => error == null; @@ -108,7 +109,8 @@ class MarketHistorySlotFetchPlan { final List symbols; } -/// Backfill: one Alpaca `4Hour` request per ended UTC slot × symbol batch. +/// Backfill: one Alpaca `1Min` range per ended session slot × symbol batch, +/// aggregated into `sessionHalf` rows. /// /// Chooses work from the most recently completed slot backward until the rolling /// window is full. Only symbols missing that specific slot are requested. @@ -211,8 +213,8 @@ class MarketDataHistorySync { } final DateTime slotStart = plan.slotStart; - final DateTime slotEnd = - MarketHistoryFourHourSlot.endInclusive(slotStart); + // Alpaca `end` is exclusive; use slot close, not last second of the window. + final DateTime slotEnd = MarketHistorySessionSlot.endExclusive(slotStart); bool slotWrote = false; for (final List batch in chunkList(plan.symbols, batchSize)) { @@ -333,7 +335,7 @@ class MarketDataHistorySync { try { return await _marketDataClient.getBarsRange( symbols: symbols, - timeframe: timeframe, + timeframe: MarketHistoryConfig.alpacaFetchTimeframe, start: slotStart, end: slotEnd, ); @@ -359,8 +361,8 @@ class MarketDataHistorySync { final List emptyInResponse = []; final Map wrongSlotBarTimes = {}; final DateTime plannedSlot = - MarketHistoryFourHourSlot.slotStartContaining(slotStart); - final String slotWire = MarketHistoryFourHourSlot.slotStartWire(plannedSlot); + MarketHistorySessionSlot.slotStartContaining(slotStart); + final String slotWire = MarketHistorySessionSlot.slotStartWire(plannedSlot); for (final String symbol in batch) { if (!response.barsBySymbol.containsKey(symbol)) { @@ -379,39 +381,37 @@ class MarketDataHistorySync { if (!batchSymbols.contains(entry.key)) { continue; } - final List rejectedTimes = []; - for (final AlpacaBar bar in entry.value) { - final DateTime barAt = bar.timestamp.toUtc(); - final DateTime barSlot = - MarketHistoryFourHourSlot.slotStartContaining(barAt); - if (!barSlot.isAtSameMomentAs(plannedSlot)) { - rejectedTimes.add(MarketHistoryFourHourSlot.wireUtc(barAt)); - continue; - } - await _marketDataDb.upsertSnapshot( - symbol: entry.key, - metric: 'bar', - timeframe: timeframe, - feed: feed, - price: bar.close, - volume: bar.volume, - asOf: barSlot, - raw: { - 'o': bar.open, - 'h': bar.high, - 'l': bar.low, - 'c': bar.close, - 'v': bar.volume, - 't': MarketHistoryFourHourSlot.wireUtc(barAt), - 'slot_start': slotWire, - }, - ); - written++; - symbolsWritten.add(entry.key); - } - if (rejectedTimes.isNotEmpty && !symbolsWritten.contains(entry.key)) { - wrongSlotBarTimes[entry.key] = rejectedTimes.join(','); + final SessionHalfBarAggregate? aggregate = aggregateMinuteBarsForSlot( + bars: entry.value, + slotStart: plannedSlot, + ); + if (aggregate == null) { + wrongSlotBarTimes[entry.key] = entry.value + .map((AlpacaBar b) => MarketHistorySessionSlot.wireUtc(b.timestamp)) + .join(','); + continue; } + await _marketDataDb.upsertSnapshot( + symbol: entry.key, + metric: 'bar', + timeframe: timeframe, + feed: feed, + price: aggregate.close, + volume: aggregate.volume, + asOf: plannedSlot, + raw: { + 'o': aggregate.open, + 'h': aggregate.high, + 'l': aggregate.low, + 'c': aggregate.close, + 'v': aggregate.volume, + 't': MarketHistorySessionSlot.wireUtc(aggregate.lastBarAt), + 'slot_start': slotWire, + 'minute_bars_count': aggregate.minuteBarCount, + }, + ); + written++; + symbolsWritten.add(entry.key); } return PersistBarsResult( @@ -473,7 +473,7 @@ class MarketDataHistorySync { List symbols, ) async { final List completed = - MarketHistoryFourHourSlot.completedSlotStartsInWindow(now, windowDays); + MarketHistorySessionSlot.completedSlotStartsInWindow(now, windowDays); if (completed.isEmpty) { return []; } diff --git a/server/lib/trading/market_history_bar_placeholder.dart b/server/lib/trading/market_history_bar_placeholder.dart index caffe01..bf05f9b 100644 --- a/server/lib/trading/market_history_bar_placeholder.dart +++ b/server/lib/trading/market_history_bar_placeholder.dart @@ -1,4 +1,4 @@ -/// Marker rows written when Alpaca has no 4Hour bar for a symbol × slot. +/// Marker rows written when Alpaca has no session-half bar for a symbol × slot. abstract final class MarketHistoryBarPlaceholder { static const String rawKey = 'no_data'; static const String sourceAlpacaEmpty = 'alpaca_empty'; diff --git a/server/lib/trading/market_history_config.dart b/server/lib/trading/market_history_config.dart index 21a0c3f..6f10e26 100644 --- a/server/lib/trading/market_history_config.dart +++ b/server/lib/trading/market_history_config.dart @@ -1,14 +1,14 @@ -/// Defaults for 4-hour slot market history ([MarketHistoryFourHourSlot]). +/// Defaults for RTH session-half market history ([MarketHistorySessionSlot]). /// Env overrides via [MarketHistoryEnv] in [ServerEnv.load]. abstract final class MarketHistoryConfig { /// Rolling window length in calendar days (UTC). static const int windowDays = 7; - /// Alpaca bar aggregation for market-history backfill (six slots per UTC day). - static const String barTimeframe = '4Hour'; + /// Stored bar timeframe (two aggregates per US trading day). + static const String barTimeframe = 'sessionHalf'; - /// Width of each history slot in hours (`24 / slotsPerDay`). - static const int slotHours = 4; + /// Alpaca fetch granularity for backfill (aggregated into [barTimeframe]). + static const String alpacaFetchTimeframe = '1Min'; /// Symbols per Alpaca `GET /v2/stocks/bars` request (max ~100). static const int historySyncBatchSize = 100; @@ -16,7 +16,7 @@ abstract final class MarketHistoryConfig { /// Hard cap on symbols synced per run (Alpaca Basic rate-limit safety). static const int historySyncMaxSymbols = 2000; - /// Minimum 4-hour bars required before a symbol is eligible for the + /// Minimum session-half bars required before a symbol is eligible for the /// guess-the-move question rule. static const int minBarsForGuess = 5; diff --git a/server/lib/trading/market_history_four_hour_slot.dart b/server/lib/trading/market_history_four_hour_slot.dart deleted file mode 100644 index 9bab14a..0000000 --- a/server/lib/trading/market_history_four_hour_slot.dart +++ /dev/null @@ -1,89 +0,0 @@ -/// Six UTC 4-hour slots per day. Sync only [completedSlotStartsInWindow]. -abstract final class MarketHistoryFourHourSlot { - static const int slotHours = 4; - static const int slotsPerDay = 24 ~/ slotHours; - static const String alpacaTimeframe = '4Hour'; - - /// Left edge of the 4-hour bucket containing [instant] (UTC). - static DateTime slotStartContaining(DateTime instant) { - final DateTime u = instant.toUtc(); - final int slotHour = (u.hour ~/ slotHours) * slotHours; - return DateTime.utc(u.year, u.month, u.day, slotHour); - } - - /// Inclusive end of the slot for Alpaca `start`/`end` (Option A: 00:00–03:59:59). - static DateTime endInclusive(DateTime slotStart) { - return slotStart - .add(const Duration(hours: slotHours)) - .subtract(const Duration(seconds: 1)); - } - - /// Exclusive end (current slot begins here). - static DateTime endExclusive(DateTime slotStart) { - return slotStart.add(const Duration(hours: slotHours)); - } - - /// `true` when [now] is at or after the end of the slot that began at [slotStart]. - static bool hasEnded(DateTime slotStart, DateTime now) { - return !now.toUtc().isBefore(endExclusive(slotStart)); - } - - /// Start of the most recently completed slot (never the in-progress slot). - static DateTime lastCompletedSlotStart(DateTime now) { - final DateTime current = slotStartContaining(now); - return current.subtract(const Duration(hours: slotHours)); - } - - /// Earliest slot start included in a [windowDays] rolling window ending at [now]. - static DateTime windowFirstSlotStart(DateTime now, int windowDays) { - final DateTime windowStart = - now.toUtc().subtract(Duration(days: windowDays)); - return slotStartContaining(windowStart); - } - - /// Completed slot starts from the rolling window through [lastCompletedSlotStart]. - static List completedSlotStartsInWindow( - DateTime now, - int windowDays, - ) { - final DateTime last = lastCompletedSlotStart(now); - final DateTime first = windowFirstSlotStart(now, windowDays); - if (last.isBefore(first)) { - return []; - } - - final List slots = []; - DateTime cursor = first; - while (!cursor.isAfter(last)) { - if (hasEnded(cursor, now)) { - slots.add(cursor); - } - cursor = cursor.add(const Duration(hours: slotHours)); - } - return slots; - } - - /// Canonical UTC wire form: `YYYY-MM-DDTHH:MM:SSZ` (no fractional seconds). - /// - /// Used for Alpaca bar-range query params and [raw.slot_start] / [raw.t] in - /// [market_data_snapshots] so fetch, persist, and gap checks always agree. - static String wireUtc(DateTime value) { - final DateTime u = value.toUtc(); - String two(int n) => n.toString().padLeft(2, '0'); - return '${u.year.toString().padLeft(4, '0')}-' - '${two(u.month)}-${two(u.day)}T' - '${two(u.hour)}:${two(u.minute)}:${two(u.second)}Z'; - } - - /// Wire form for the left edge of the 4-hour slot containing [slotStart]. - static String slotStartWire(DateTime slotStart) => - wireUtc(slotStartContaining(slotStart)); - - /// Parses a [wireUtc] / Alpaca RFC3339 timestamp, or `null` when invalid. - static DateTime? parseWire(String? wire) { - if (wire == null || wire.isEmpty) { - return null; - } - return DateTime.tryParse(wire)?.toUtc(); - } -} diff --git a/server/lib/trading/market_history_minute_aggregate.dart b/server/lib/trading/market_history_minute_aggregate.dart new file mode 100644 index 0000000..d20c2f1 --- /dev/null +++ b/server/lib/trading/market_history_minute_aggregate.dart @@ -0,0 +1,74 @@ +import '../alpaca/alpaca_models.dart'; +import 'market_history_session_slot.dart'; + +/// OHLCV aggregate of Alpaca 1-minute bars for one session half. +class SessionHalfBarAggregate { + const SessionHalfBarAggregate({ + required this.open, + required this.high, + required this.low, + required this.close, + required this.volume, + required this.minuteBarCount, + required this.firstBarAt, + required this.lastBarAt, + }); + + final num open; + final num high; + final num low; + final num close; + final num volume; + final int minuteBarCount; + final DateTime firstBarAt; + final DateTime lastBarAt; +} + +/// Aggregates [bars] whose timestamps fall in [slotStart, endExclusive(slotStart)). +SessionHalfBarAggregate? aggregateMinuteBarsForSlot({ + required List bars, + required DateTime slotStart, +}) { + final DateTime planned = MarketHistorySessionSlot.slotStartContaining(slotStart); + final DateTime windowStart = planned; + final DateTime windowEnd = MarketHistorySessionSlot.endExclusive(planned); + + final List inWindow = bars + .where((AlpacaBar bar) { + final DateTime t = bar.timestamp.toUtc(); + return !t.isBefore(windowStart) && t.isBefore(windowEnd); + }) + .toList() + ..sort( + (AlpacaBar a, AlpacaBar b) => + a.timestamp.compareTo(b.timestamp), + ); + + if (inWindow.isEmpty) { + return null; + } + + num high = inWindow.first.high; + num low = inWindow.first.low; + num volume = 0; + for (final AlpacaBar bar in inWindow) { + if (bar.high > high) { + high = bar.high; + } + if (bar.low < low) { + low = bar.low; + } + volume += bar.volume; + } + + return SessionHalfBarAggregate( + open: inWindow.first.open, + high: high, + low: low, + close: inWindow.last.close, + volume: volume, + minuteBarCount: inWindow.length, + firstBarAt: inWindow.first.timestamp.toUtc(), + lastBarAt: inWindow.last.timestamp.toUtc(), + ); +} diff --git a/server/lib/trading/market_history_question_audit.dart b/server/lib/trading/market_history_question_audit.dart index 068cc7f..f4fa984 100644 --- a/server/lib/trading/market_history_question_audit.dart +++ b/server/lib/trading/market_history_question_audit.dart @@ -5,7 +5,7 @@ import 'package:postgres/postgres.dart'; import 'market_data_db.dart' show MarketDataDb; import 'market_history_bar_placeholder.dart'; import 'market_history_config.dart'; -import 'market_history_four_hour_slot.dart'; +import 'market_history_session_slot.dart'; import 'tradable_assets_db.dart'; /// One 4-hour bar snapshot used in question audit comparisons. @@ -139,33 +139,32 @@ class QuestionAuditPage { }; } -/// Default view: last two **completed** 4-hour slots (newer = last completed). +/// Default view: last two **completed** session-half slots (newer = last completed). DateTime questionAuditDefaultCompareUntil(DateTime now) { final DateTime last = - MarketHistoryFourHourSlot.lastCompletedSlotStart(now.toUtc()); - return MarketHistoryFourHourSlot.endExclusive(last); + MarketHistorySessionSlot.lastCompletedSlotStart(now.toUtc()); + return MarketHistorySessionSlot.endExclusive(last); } /// Earliest [compareUntil] that still pairs two slots in the rolling window. DateTime questionAuditMinCompareUntil(DateTime now, int windowDays) { - final DateTime first = MarketHistoryFourHourSlot.windowFirstSlotStart( + final DateTime first = MarketHistorySessionSlot.windowFirstSlotStart( now.toUtc(), windowDays, ); - final DateTime minNewerSlot = first.add( - const Duration(hours: MarketHistoryFourHourSlot.slotHours), - ); - return MarketHistoryFourHourSlot.endExclusive(minNewerSlot); + final DateTime? minNewerSlot = MarketHistorySessionSlot.nextSlotStart(first); + if (minNewerSlot == null) { + return MarketHistorySessionSlot.endExclusive(first); + } + return MarketHistorySessionSlot.endExclusive(minNewerSlot); } /// The newer bar's slot start for a page keyed by [compareUntil]. /// /// [compareUntil] is always `endExclusive(newerSlotStart)`. DateTime questionAuditNewerSlotStart(DateTime compareUntil) { - return MarketHistoryFourHourSlot.slotStartContaining( - compareUntil.toUtc().subtract( - const Duration(hours: MarketHistoryFourHourSlot.slotHours), - ), + return MarketHistorySessionSlot.slotStartContaining( + compareUntil.toUtc().subtract(MarketHistorySessionSlot.slotDuration), ); } @@ -179,7 +178,7 @@ DateTime snapQuestionAuditCompareUntil({ if (r.isAfter(maxUntil)) { return maxUntil; } - final DateTime snapped = MarketHistoryFourHourSlot.endExclusive( + final DateTime snapped = MarketHistorySessionSlot.endExclusive( questionAuditNewerSlotStart(r), ); if (snapped.isBefore(minUntil)) { @@ -194,10 +193,11 @@ DateTime snapQuestionAuditCompareUntil({ /// Pair of slot starts for the page keyed by [compareUntil]. (DateTime newer, DateTime older) questionAuditSlotPair(DateTime compareUntil) { final DateTime newer = questionAuditNewerSlotStart(compareUntil); - final DateTime older = newer.subtract( - const Duration(hours: MarketHistoryFourHourSlot.slotHours), - ); - return (newer, older); + final DateTime? prior = MarketHistorySessionSlot.previousSlotStart(newer); + if (prior == null) { + return (newer, newer); + } + return (newer, prior); } /// Steps back: newer becomes previous older (e.g. #1 vs #2 → #2 vs #3). @@ -206,10 +206,12 @@ DateTime questionAuditStepOlderCompareUntil({ required DateTime now, }) { final DateTime newerSlot = questionAuditNewerSlotStart(compareUntil); - final DateTime priorNewerSlot = newerSlot.subtract( - const Duration(hours: MarketHistoryFourHourSlot.slotHours), - ); - return MarketHistoryFourHourSlot.endExclusive(priorNewerSlot); + final DateTime? priorNewerSlot = + MarketHistorySessionSlot.previousSlotStart(newerSlot); + if (priorNewerSlot == null) { + return compareUntil; + } + return MarketHistorySessionSlot.endExclusive(priorNewerSlot); } /// Steps forward one completed slot, capped at [maxUntil]. @@ -218,11 +220,13 @@ DateTime questionAuditStepNewerCompareUntil({ required DateTime maxUntil, }) { final DateTime newerSlot = questionAuditNewerSlotStart(compareUntil); - final DateTime nextNewerSlot = newerSlot.add( - const Duration(hours: MarketHistoryFourHourSlot.slotHours), - ); + final DateTime? nextNewerSlot = + MarketHistorySessionSlot.nextSlotStart(newerSlot); + if (nextNewerSlot == null) { + return maxUntil; + } final DateTime candidate = - MarketHistoryFourHourSlot.endExclusive(nextNewerSlot); + MarketHistorySessionSlot.endExclusive(nextNewerSlot); return candidate.isAfter(maxUntil) ? maxUntil : candidate; } @@ -303,7 +307,7 @@ class MarketHistoryQuestionAudit { return calendarMax; } final DateTime dataMax = - MarketHistoryFourHourSlot.endExclusive(latestBarSlot); + MarketHistorySessionSlot.endExclusive(latestBarSlot); return dataMax.isBefore(calendarMax) ? dataMax : calendarMax; } @@ -325,7 +329,7 @@ class MarketHistoryQuestionAudit { if (result.isEmpty || result.first[0] == null) { return null; } - return MarketHistoryFourHourSlot.slotStartContaining( + return MarketHistorySessionSlot.slotStartContaining( (result.first[0]! as DateTime).toUtc(), ); } @@ -336,14 +340,14 @@ class MarketHistoryQuestionAudit { required DateTime olderSlotStart, String timeframe = MarketHistoryConfig.barTimeframe, }) async { - final DateTime newer = MarketHistoryFourHourSlot.slotStartContaining( + final DateTime newer = MarketHistorySessionSlot.slotStartContaining( newerSlotStart.toUtc(), ); - final DateTime older = MarketHistoryFourHourSlot.slotStartContaining( + final DateTime older = MarketHistorySessionSlot.slotStartContaining( olderSlotStart.toUtc(), ); - final String newerWire = MarketHistoryFourHourSlot.slotStartWire(newer); - final String olderWire = MarketHistoryFourHourSlot.slotStartWire(older); + final String newerWire = MarketHistorySessionSlot.slotStartWire(newer); + final String olderWire = MarketHistorySessionSlot.slotStartWire(older); final List active = await _tradableAssetsDb.listActiveTradableSymbols(); @@ -384,12 +388,14 @@ class MarketHistoryQuestionAudit { >{}; for (final ResultRow row in rows) { final String symbol = row[0]! as String; - final DateTime asOf = MarketHistoryFourHourSlot.slotStartContaining( - (row[1]! as DateTime).toUtc(), - ); final Map? raw = _decodeRaw(row[4]); - bySymbol.putIfAbsent(symbol, () => {})[asOf] = _BarRow( - asOf: asOf, + final DateTime slotKey = _canonicalSlotStart( + (row[1]! as DateTime).toUtc(), + raw, + ); + bySymbol.putIfAbsent(symbol, () => {})[slotKey] = + _BarRow( + asOf: slotKey, closePrice: MarketDataDb.readOptionalNumeric(row[2]), volume: MarketDataDb.readOptionalNumeric(row[3]), raw: raw, @@ -431,6 +437,20 @@ class MarketHistoryQuestionAudit { return assets; } + static DateTime _canonicalSlotStart( + DateTime asOf, + Map? raw, + ) { + final String? wire = raw?['slot_start'] as String?; + if (wire != null) { + final DateTime? parsed = DateTime.tryParse(wire)?.toUtc(); + if (parsed != null) { + return MarketHistorySessionSlot.slotStartContaining(parsed); + } + } + return MarketHistorySessionSlot.slotStartContaining(asOf); + } + Map? _decodeRaw(Object? rawValue) { if (rawValue == null) { return null; diff --git a/server/lib/trading/market_history_session_slot.dart b/server/lib/trading/market_history_session_slot.dart new file mode 100644 index 0000000..c6a49a3 --- /dev/null +++ b/server/lib/trading/market_history_session_slot.dart @@ -0,0 +1,316 @@ +import 'package:timezone/data/latest.dart' as tz_data; +import 'package:timezone/timezone.dart' as tz; + +import 'market_history_trading_calendar.dart'; + +bool _timezonesInitialized = false; + +void ensureMarketHistoryTimezonesInitialized() { + if (_timezonesInitialized) { + return; + } + tz_data.initializeTimeZones(); + _timezonesInitialized = true; +} + +/// Two US regular-session half-day slots (9:30–12:45 and 12:45–16:00 ET). +abstract final class MarketHistorySessionSlot { + static const int slotsPerDay = 2; + static const Duration slotDuration = Duration(hours: 3, minutes: 15); + static const String storedTimeframe = 'sessionHalf'; + static const String alpacaFetchTimeframe = '1Min'; + + static const int _morningHour = 9; + static const int _morningMinute = 30; + static const int _afternoonHour = 12; + static const int _afternoonMinute = 45; + + static tz.Location get _eastern { + ensureMarketHistoryTimezonesInitialized(); + return tz.getLocation('America/New_York'); + } + + /// Left edge of the session half containing [instant] (UTC). + static DateTime slotStartContaining(DateTime instant) { + final tz.TZDateTime ny = tz.TZDateTime.from(instant.toUtc(), _eastern); + final int minutes = ny.hour * 60 + ny.minute; + const int morningStart = _morningHour * 60 + _morningMinute; + const int afternoonStart = _afternoonHour * 60 + _afternoonMinute; + const int sessionEnd = 16 * 60; + + if (minutes >= afternoonStart && minutes < sessionEnd) { + return _afternoonStartUtc(ny.year, ny.month, ny.day); + } + if (minutes >= morningStart && minutes < afternoonStart) { + return _morningStartUtc(ny.year, ny.month, ny.day); + } + + if (minutes >= sessionEnd) { + return _afternoonStartUtc(ny.year, ny.month, ny.day); + } + + final (int, int, int)? prior = _previousTradingDay(ny.year, ny.month, ny.day); + if (prior == null) { + return _morningStartUtc(ny.year, ny.month, ny.day); + } + return _afternoonStartUtc(prior.$1, prior.$2, prior.$3); + } + + static DateTime endInclusive(DateTime slotStart) { + return endExclusive(slotStart).subtract(const Duration(seconds: 1)); + } + + static DateTime endExclusive(DateTime slotStart) { + return slotStart.toUtc().add(slotDuration); + } + + static bool hasEnded(DateTime slotStart, DateTime now) { + return !now.toUtc().isBefore(endExclusive(slotStart)); + } + + static DateTime lastCompletedSlotStart(DateTime now) { + final tz.TZDateTime ny = tz.TZDateTime.from(now.toUtc(), _eastern); + final List candidates = []; + + void addDay(int y, int m, int d) { + if (!_isTradingDayEt(y, m, d)) { + return; + } + final DateTime morning = _morningStartUtc(y, m, d); + final DateTime afternoon = _afternoonStartUtc(y, m, d); + if (hasEnded(afternoon, now)) { + candidates.add(afternoon); + } + if (hasEnded(morning, now)) { + candidates.add(morning); + } + } + + addDay(ny.year, ny.month, ny.day); + final (int, int, int)? priorDay = _previousTradingDay(ny.year, ny.month, ny.day); + if (priorDay != null) { + addDay(priorDay.$1, priorDay.$2, priorDay.$3); + } + + if (candidates.isEmpty) { + return _walkBackForLastCompleted(now); + } + candidates.sort(); + return candidates.last; + } + + static DateTime _walkBackForLastCompleted(DateTime now) { + var cursor = tz.TZDateTime.from(now.toUtc(), _eastern); + for (var i = 0; i < 366; i++) { + if (_isTradingDayEt(cursor.year, cursor.month, cursor.day)) { + final DateTime afternoon = + _afternoonStartUtc(cursor.year, cursor.month, cursor.day); + if (hasEnded(afternoon, now)) { + return afternoon; + } + final DateTime morning = + _morningStartUtc(cursor.year, cursor.month, cursor.day); + if (hasEnded(morning, now)) { + return morning; + } + } + cursor = cursor.subtract(const Duration(days: 1)); + } + return _morningStartUtc(cursor.year, cursor.month, cursor.day); + } + + static DateTime windowFirstSlotStart(DateTime now, int windowDays) { + final tz.TZDateTime ny = tz.TZDateTime.from(now.toUtc(), _eastern); + var cursor = ny.subtract(Duration(days: windowDays)); + for (var i = 0; i < windowDays + 14; i++) { + if (_isTradingDayEt(cursor.year, cursor.month, cursor.day)) { + return _morningStartUtc(cursor.year, cursor.month, cursor.day); + } + cursor = cursor.add(const Duration(days: 1)); + } + return _morningStartUtc(ny.year, ny.month, ny.day); + } + + static List completedSlotStartsInWindow( + DateTime now, + int windowDays, + ) { + final DateTime last = lastCompletedSlotStart(now); + final DateTime first = windowFirstSlotStart(now, windowDays); + if (last.isBefore(first)) { + return []; + } + + final List slots = []; + var cursor = tz.TZDateTime.from(first.toUtc(), _eastern); + final tz.TZDateTime endNy = tz.TZDateTime.from(last.toUtc(), _eastern); + + while (true) { + if (_isTradingDayEt(cursor.year, cursor.month, cursor.day)) { + final DateTime morning = + _morningStartUtc(cursor.year, cursor.month, cursor.day); + final DateTime afternoon = + _afternoonStartUtc(cursor.year, cursor.month, cursor.day); + for (final DateTime slot in [morning, afternoon]) { + if (slot.isBefore(first)) { + continue; + } + if (slot.isAfter(last)) { + break; + } + if (hasEnded(slot, now)) { + slots.add(slot); + } + } + } + if (cursor.year > endNy.year || + (cursor.year == endNy.year && cursor.month > endNy.month) || + (cursor.year == endNy.year && + cursor.month == endNy.month && + cursor.day >= endNy.day)) { + break; + } + cursor = cursor.add(const Duration(days: 1)); + } + return slots; + } + + static DateTime? previousSlotStart(DateTime slotStart) { + final DateTime snap = slotStartContaining(slotStart); + if (_isAfternoonStart(snap)) { + return _morningStartUtc( + _nyYear(snap), + _nyMonth(snap), + _nyDay(snap), + ); + } + final (int, int, int)? prior = + _previousTradingDay(_nyYear(snap), _nyMonth(snap), _nyDay(snap)); + if (prior == null) { + return null; + } + return _afternoonStartUtc(prior.$1, prior.$2, prior.$3); + } + + static DateTime? nextSlotStart(DateTime slotStart) { + final DateTime snap = slotStartContaining(slotStart); + if (_isMorningStart(snap)) { + return _afternoonStartUtc( + _nyYear(snap), + _nyMonth(snap), + _nyDay(snap), + ); + } + final (int, int, int)? next = + _nextTradingDay(_nyYear(snap), _nyMonth(snap), _nyDay(snap)); + if (next == null) { + return null; + } + return _morningStartUtc(next.$1, next.$2, next.$3); + } + + static String wireUtc(DateTime value) { + final DateTime u = value.toUtc(); + String two(int n) => n.toString().padLeft(2, '0'); + return '${u.year.toString().padLeft(4, '0')}-' + '${two(u.month)}-${two(u.day)}T' + '${two(u.hour)}:${two(u.minute)}:${two(u.second)}Z'; + } + + static String slotStartWire(DateTime slotStart) => + wireUtc(slotStartContaining(slotStart)); + + static DateTime? parseWire(String? wire) { + if (wire == null || wire.isEmpty) { + return null; + } + return DateTime.tryParse(wire)?.toUtc(); + } + + static bool _isMorningStart(DateTime slotStartUtc) { + final tz.TZDateTime ny = tz.TZDateTime.from(slotStartUtc, _eastern); + return ny.hour == _morningHour && ny.minute == _morningMinute; + } + + static bool _isAfternoonStart(DateTime slotStartUtc) { + final tz.TZDateTime ny = tz.TZDateTime.from(slotStartUtc, _eastern); + return ny.hour == _afternoonHour && ny.minute == _afternoonMinute; + } + + static int _nyYear(DateTime slotStartUtc) => + tz.TZDateTime.from(slotStartUtc, _eastern).year; + + static int _nyMonth(DateTime slotStartUtc) => + tz.TZDateTime.from(slotStartUtc, _eastern).month; + + static int _nyDay(DateTime slotStartUtc) => + tz.TZDateTime.from(slotStartUtc, _eastern).day; + + /// Plain UTC [DateTime] (not [tz.TZDateTime]) for stable equality in tests/JSON. + static DateTime _utcInstant(DateTime value) { + final DateTime u = value.toUtc(); + return DateTime.utc( + u.year, + u.month, + u.day, + u.hour, + u.minute, + u.second, + u.millisecond, + u.microsecond, + ); + } + + static DateTime _morningStartUtc(int year, int month, int day) { + return _utcInstant( + tz.TZDateTime( + _eastern, + year, + month, + day, + _morningHour, + _morningMinute, + ), + ); + } + + static DateTime _afternoonStartUtc(int year, int month, int day) { + return _utcInstant( + tz.TZDateTime( + _eastern, + year, + month, + day, + _afternoonHour, + _afternoonMinute, + ), + ); + } + + static bool _isTradingDayEt(int year, int month, int day) { + final DateTime probe = _morningStartUtc(year, month, day); + return !MarketHistoryTradingCalendar.isLikelyNoRegularSession(probe); + } + + static (int, int, int)? _previousTradingDay(int year, int month, int day) { + var cursor = tz.TZDateTime(_eastern, year, month, day); + for (var i = 0; i < 14; i++) { + cursor = cursor.subtract(const Duration(days: 1)); + if (_isTradingDayEt(cursor.year, cursor.month, cursor.day)) { + return (cursor.year, cursor.month, cursor.day); + } + } + return null; + } + + static (int, int, int)? _nextTradingDay(int year, int month, int day) { + var cursor = tz.TZDateTime(_eastern, year, month, day); + for (var i = 0; i < 14; i++) { + cursor = cursor.add(const Duration(days: 1)); + if (_isTradingDayEt(cursor.year, cursor.month, cursor.day)) { + return (cursor.year, cursor.month, cursor.day); + } + } + return null; + } +} diff --git a/server/lib/trading/market_history_week_coverage.dart b/server/lib/trading/market_history_week_coverage.dart index a9e3682..7ff4df5 100644 --- a/server/lib/trading/market_history_week_coverage.dart +++ b/server/lib/trading/market_history_week_coverage.dart @@ -1,12 +1,14 @@ import 'dart:convert'; import 'package:postgres/postgres.dart'; +import 'package:timezone/timezone.dart' as tz; import 'market_history_config.dart'; -import 'market_history_four_hour_slot.dart'; +import 'market_history_session_slot.dart'; +import 'market_history_trading_calendar.dart'; import 'tradable_assets_db.dart'; -/// One UTC 4-hour slot within the rolling window. +/// One RTH session-half slot within the rolling window. class MarketHistorySlotCoverage { const MarketHistorySlotCoverage({ required this.slotStart, @@ -31,7 +33,7 @@ class MarketHistorySlotCoverage { }; } -/// Slot rollup for one UTC calendar day. +/// Slot rollup for one US Eastern calendar day. class MarketHistoryDayCoverage { const MarketHistoryDayCoverage({ required this.date, @@ -47,7 +49,7 @@ class MarketHistoryDayCoverage { Map toJson() => { 'date': dateWire(date), - 'slotsPerDay': MarketHistoryFourHourSlot.slotsPerDay, + 'slotsPerDay': MarketHistorySessionSlot.slotsPerDay, 'completedSlots': completedSlots, 'fullySyncedSlots': fullySyncedSlots, 'slots': slots.map((MarketHistorySlotCoverage s) => s.toJson()).toList(), @@ -82,7 +84,7 @@ class MarketHistoryWeekCoverageReport { }; } -/// Validates 4-hour bar coverage per UTC day for the admin week view. +/// Validates session-half bar coverage per Eastern day for the admin week view. class MarketHistoryWeekCoverage { MarketHistoryWeekCoverage({ required Connection connection, @@ -104,21 +106,22 @@ class MarketHistoryWeekCoverage { final List symbols = await _activeSymbols(); final int symbolCount = symbols.length; - final List calendarDays = _calendarDaysEndingToday(tick, windowDays); + final List<(int, int, int)> calendarDays = + calendarDaysEndingTodayEt(tick, windowDays); final Map> symbolsBySlot = await _loadSyncedSymbolsBySlot(tick, symbols); final List days = []; var isConsistent = symbolCount > 0; - for (final DateTime day in calendarDays) { + for (final (int, int, int) day in calendarDays) { final List slots = []; var completedSlots = 0; var fullySyncedSlots = 0; - for (int hour = 0; hour < 24; hour += MarketHistoryFourHourSlot.slotHours) { - final DateTime slotStart = DateTime.utc(day.year, day.month, day.day, hour); - final bool completed = MarketHistoryFourHourSlot.hasEnded(slotStart, tick); + final List slotStarts = _slotStartsForEtDay(day.$1, day.$2, day.$3); + for (final DateTime slotStart in slotStarts) { + final bool completed = MarketHistorySessionSlot.hasEnded(slotStart, tick); final Set synced = symbolsBySlot[_slotKey(slotStart)] ?? {}; final int syncedCount = _countSyncedSymbols(synced, symbols); @@ -147,7 +150,7 @@ class MarketHistoryWeekCoverage { days.add( MarketHistoryDayCoverage( - date: day, + date: DateTime.utc(day.$1, day.$2, day.$3), slots: slots, completedSlots: completedSlots, fullySyncedSlots: fullySyncedSlots, @@ -162,13 +165,28 @@ class MarketHistoryWeekCoverage { return MarketHistoryWeekCoverageReport( asOf: tick, windowDays: windowDays, - slotsPerDay: MarketHistoryFourHourSlot.slotsPerDay, + slotsPerDay: MarketHistorySessionSlot.slotsPerDay, symbolCount: symbolCount, days: days, isConsistent: isConsistent, ); } + static List _slotStartsForEtDay(int year, int month, int day) { + ensureMarketHistoryTimezonesInitialized(); + final tz.Location eastern = tz.getLocation('America/New_York'); + final DateTime morning = MarketHistorySessionSlot.slotStartContaining( + tz.TZDateTime(eastern, year, month, day, 10, 0), + ); + if (MarketHistoryTradingCalendar.isLikelyNoRegularSession(morning)) { + return []; + } + final DateTime afternoon = MarketHistorySessionSlot.slotStartContaining( + tz.TZDateTime(eastern, year, month, day, 14, 0), + ); + return [morning, afternoon]; + } + Future> _activeSymbols() async { List symbols = await _tradableAssetsDb.listActiveTradableSymbols(); if (symbols.length > maxSymbols) { @@ -185,11 +203,13 @@ class MarketHistoryWeekCoverage { return >{}; } - final DateTime firstDay = - _calendarDaysEndingToday(now, windowDays).first; - final DateTime since = DateTime.utc(firstDay.year, firstDay.month, firstDay.day); - final DateTime until = - MarketHistoryFourHourSlot.endExclusive(MarketHistoryFourHourSlot.slotStartContaining(now)); + final DateTime since = MarketHistorySessionSlot.windowFirstSlotStart( + now, + windowDays, + ); + final DateTime until = MarketHistorySessionSlot.endExclusive( + MarketHistorySessionSlot.slotStartContaining(now), + ); final Result rows = await _connection.execute( Sql.named( @@ -242,25 +262,39 @@ class MarketHistoryWeekCoverage { } } } on Object { - // Fall back to as_of bucketing below. + // Fall back to as_of below. } } - return MarketHistoryFourHourSlot.slotStartContaining(asOf); + return MarketHistorySessionSlot.slotStartContaining(asOf); } - static List calendarDaysEndingToday(DateTime now, int windowDays) { - final DateTime today = DateTime.utc(now.year, now.month, now.day); - return List.generate( + /// Eastern calendar dates (y, m, d) for [windowDays] ending on today's ET date. + static List<(int, int, int)> calendarDaysEndingTodayEt( + DateTime now, + int windowDays, + ) { + ensureMarketHistoryTimezonesInitialized(); + final tz.Location eastern = tz.getLocation('America/New_York'); + final tz.TZDateTime ny = tz.TZDateTime.from(now.toUtc(), eastern); + final tz.TZDateTime today = tz.TZDateTime(eastern, ny.year, ny.month, ny.day); + return List<(int, int, int)>.generate( windowDays, - (int index) => today.subtract(Duration(days: windowDays - 1 - index)), + (int index) { + final tz.TZDateTime d = + today.subtract(Duration(days: windowDays - 1 - index)); + return (d.year, d.month, d.day); + }, ); } - static List _calendarDaysEndingToday(DateTime now, int windowDays) => - calendarDaysEndingToday(now, windowDays); + static List calendarDaysEndingToday(DateTime now, int windowDays) { + return calendarDaysEndingTodayEt(now, windowDays) + .map(((int, int, int) d) => DateTime.utc(d.$1, d.$2, d.$3)) + .toList(); + } static String _slotKey(DateTime slotStart) => - MarketHistoryFourHourSlot.slotStartWire(slotStart); + MarketHistorySessionSlot.slotStartWire(slotStart); static int _countSyncedSymbols(Set synced, List expected) { if (expected.isEmpty) { diff --git a/server/migrations/008_market_history_four_hour.sql b/server/migrations/008_market_history_four_hour.sql index ee71068..79add4a 100644 --- a/server/migrations/008_market_history_four_hour.sql +++ b/server/migrations/008_market_history_four_hour.sql @@ -15,7 +15,7 @@ ALTER TABLE market_data_snapshots ALTER TABLE market_data_snapshots ADD CONSTRAINT market_data_snapshots_timeframe_check - CHECK (timeframe IN ('tick', '1Min', '1Hour', '4Hour', '1Day')); + CHECK (timeframe IN ('tick', '1Min', '1Hour', '4Hour', '1Day', 'sessionHalf')); CREATE INDEX IF NOT EXISTS market_data_snapshots_bar_4h_idx ON market_data_snapshots (symbol, as_of DESC) diff --git a/server/migrations/010_session_half_bars.sql b/server/migrations/010_session_half_bars.sql new file mode 100644 index 0000000..1814d10 --- /dev/null +++ b/server/migrations/010_session_half_bars.sql @@ -0,0 +1,24 @@ +-- 010_session_half_bars.sql +-- +-- RTH session half bars (morning 9:30–12:45 ET, afternoon 12:45–16:00 ET). +-- Drops legacy 4Hour history and adds sessionHalf timeframe + index. + +DELETE FROM market_data_snapshots +WHERE metric = 'bar' AND timeframe = '4Hour'; + +DELETE FROM market_data_archive +WHERE metric = 'bar' AND timeframe = '4Hour'; + +-- Idempotent: constraint may already include sessionHalf (008) or be missing after a failed run. +ALTER TABLE market_data_snapshots + DROP CONSTRAINT IF EXISTS market_data_snapshots_timeframe_check; + +ALTER TABLE market_data_snapshots + ADD CONSTRAINT market_data_snapshots_timeframe_check + CHECK (timeframe IN ('tick', '1Min', '1Hour', '4Hour', '1Day', 'sessionHalf')); + +DROP INDEX IF EXISTS market_data_snapshots_bar_4h_idx; + +CREATE INDEX IF NOT EXISTS market_data_snapshots_bar_session_half_idx + ON market_data_snapshots (symbol, as_of DESC) + WHERE metric = 'bar' AND timeframe = 'sessionHalf'; diff --git a/server/pubspec.lock b/server/pubspec.lock index 62cb251..f5cbaba 100644 --- a/server/pubspec.lock +++ b/server/pubspec.lock @@ -385,6 +385,14 @@ packages: url: "https://pub.dev" source: hosted version: "0.6.18" + timezone: + dependency: "direct main" + description: + name: timezone + sha256: dd14a3b83cfd7cb19e7888f1cbc20f258b8d71b54c06f79ac585f14093a287d1 + url: "https://pub.dev" + source: hosted + version: "0.10.1" typed_data: dependency: transitive description: diff --git a/server/pubspec.yaml b/server/pubspec.yaml index 5ec4d5a..c7bda6d 100644 --- a/server/pubspec.yaml +++ b/server/pubspec.yaml @@ -15,6 +15,7 @@ dependencies: http: ^1.6.0 uuid: ^4.5.3 web_socket_channel: ^3.0.0 + timezone: ^0.10.0 dev_dependencies: test: ^1.25.0 diff --git a/server/test/fixtures/alpaca_bars_1min_session.json b/server/test/fixtures/alpaca_bars_1min_session.json new file mode 100644 index 0000000..91e550f --- /dev/null +++ b/server/test/fixtures/alpaca_bars_1min_session.json @@ -0,0 +1,23 @@ +{ + "bars": { + "SPY": [ + { "t": "2026-06-02T13:30:00Z", "o": 500, "h": 501, "l": 499, "c": 500.5, "v": 1000 }, + { "t": "2026-06-02T14:00:00Z", "o": 500.5, "h": 502, "l": 500, "c": 501, "v": 1100 }, + { "t": "2026-06-02T16:45:00Z", "o": 501, "h": 503, "l": 500.5, "c": 502, "v": 1200 }, + { "t": "2026-06-02T17:00:00Z", "o": 502, "h": 504, "l": 501.5, "c": 503, "v": 1300 } + ], + "AAPL": [ + { "t": "2026-06-02T13:30:00Z", "o": 180, "h": 181, "l": 179, "c": 180.5, "v": 2000 }, + { "t": "2026-06-02T14:00:00Z", "o": 180.5, "h": 182, "l": 180, "c": 181, "v": 2100 }, + { "t": "2026-06-02T16:45:00Z", "o": 181, "h": 183, "l": 180.5, "c": 182, "v": 2200 }, + { "t": "2026-06-02T17:00:00Z", "o": 182, "h": 184, "l": 181.5, "c": 183, "v": 2300 } + ], + "MSFT": [ + { "t": "2026-06-02T13:30:00Z", "o": 410, "h": 411, "l": 409, "c": 410.5, "v": 1500 }, + { "t": "2026-06-02T14:00:00Z", "o": 410.5, "h": 412, "l": 410, "c": 411, "v": 1600 }, + { "t": "2026-06-02T16:45:00Z", "o": 411, "h": 413, "l": 410.5, "c": 412, "v": 1700 }, + { "t": "2026-06-02T17:00:00Z", "o": 412, "h": 414, "l": 411.5, "c": 413, "v": 1800 } + ] + }, + "next_page_token": null +} diff --git a/server/test/helpers/test_db.dart b/server/test/helpers/test_db.dart index 7dad35f..86a01dd 100644 --- a/server/test/helpers/test_db.dart +++ b/server/test/helpers/test_db.dart @@ -11,7 +11,7 @@ import 'package:cyberhybridhub_server/trading/user_trading_state_db.dart'; import 'package:dotenv/dotenv.dart'; import 'package:postgres/postgres.dart'; -/// Integration test Postgres: [cyberhybridhub_test] with migrations 001–009. +/// Integration test Postgres: [cyberhybridhub_test] with migrations 001–010. class TestDb { TestDb._(this.db, this._connection, this.databaseUrl); diff --git a/server/test/integration/market_data_db_test.dart b/server/test/integration/market_data_db_test.dart index 9c540f0..ad21632 100644 --- a/server/test/integration/market_data_db_test.dart +++ b/server/test/integration/market_data_db_test.dart @@ -2,7 +2,7 @@ library; import 'package:cyberhybridhub_server/trading/market_data_db.dart'; -import 'package:cyberhybridhub_server/trading/market_history_four_hour_slot.dart'; +import 'package:cyberhybridhub_server/trading/market_history_session_slot.dart'; import 'package:test/test.dart'; import '../helpers/test_db.dart'; @@ -200,9 +200,9 @@ void main() { } final MarketDataDb db = testDb!.marketDataDb; - const String timeframe = '4Hour'; - final DateTime slotStart = DateTime.utc(2026, 5, 26, 8); - final String slotWire = MarketHistoryFourHourSlot.slotStartWire(slotStart); + const String timeframe = 'sessionHalf'; + final DateTime slotStart = DateTime.utc(2026, 6, 2, 13, 30); + final String slotWire = MarketHistorySessionSlot.slotStartWire(slotStart); await db.upsertSnapshot( symbol: 'AAPL', @@ -225,7 +225,7 @@ void main() { expect(synced, {'AAPL'}); }); - test('symbolsWithBarForSlot falls back to as_of slot bucket for legacy rows', () async { + test('symbolsWithBarForSlot matches when as_of equals slot start', () async { if (testDb == null) { markTestSkipped( 'Set DATABASE_URL or TEST_DATABASE_URL for integration tests', @@ -234,19 +234,17 @@ void main() { } final MarketDataDb db = testDb!.marketDataDb; - const String timeframe = '4Hour'; - final DateTime slotStart = DateTime.utc(2026, 5, 26, 8); - final DateTime barAt = slotStart.add(const Duration(hours: 1)); + const String timeframe = 'sessionHalf'; + final DateTime slotStart = DateTime.utc(2026, 6, 2, 16, 45); await db.upsertSnapshot( symbol: 'AAPL', metric: 'bar', timeframe: timeframe, - asOf: barAt, + asOf: slotStart, price: 186, raw: { - // Different wire format than Dart's toIso8601String() — must still count. - 'slot_start': '2026-05-26T08:00:00Z', + 'slot_start': MarketHistorySessionSlot.slotStartWire(slotStart), }, ); @@ -259,7 +257,7 @@ void main() { expect(synced, {'AAPL'}); }); - test('symbolsWithBarForSlot matches via slot_start bucket when wire differs', () async { + test('symbolsWithBarForSlot does not match a different session slot', () async { if (testDb == null) { markTestSkipped( 'Set DATABASE_URL or TEST_DATABASE_URL for integration tests', @@ -268,53 +266,21 @@ void main() { } final MarketDataDb db = testDb!.marketDataDb; - const String timeframe = '4Hour'; - final DateTime slotStart = DateTime.utc(2026, 5, 26, 8); + const String timeframe = 'sessionHalf'; + final DateTime morning = DateTime.utc(2026, 6, 2, 13, 30); + final DateTime afternoon = DateTime.utc(2026, 6, 2, 16, 45); await db.upsertSnapshot( symbol: 'AAPL', metric: 'bar', timeframe: timeframe, - asOf: slotStart.add(const Duration(hours: 4)), - price: 186, - raw: { - 'slot_start': '2026-05-26T08:00:00.000Z', - }, - ); - - final Set synced = await db.symbolsWithBarForSlot( - symbols: ['AAPL'], - slotStart: slotStart, - timeframe: timeframe, - ); - - expect(synced, {'AAPL'}); - }); - - test('symbolsWithBarForSlot does not match the next slot boundary as prior slot', - () async { - if (testDb == null) { - markTestSkipped( - 'Set DATABASE_URL or TEST_DATABASE_URL for integration tests', - ); - return; - } - - final MarketDataDb db = testDb!.marketDataDb; - const String timeframe = '4Hour'; - final DateTime slotStart = DateTime.utc(2026, 5, 26, 8); - - await db.upsertSnapshot( - symbol: 'AAPL', - metric: 'bar', - timeframe: timeframe, - asOf: DateTime.utc(2026, 5, 26, 12), + asOf: afternoon, price: 186, ); final Set synced = await db.symbolsWithBarForSlot( symbols: ['AAPL'], - slotStart: slotStart, + slotStart: morning, timeframe: timeframe, ); @@ -330,8 +296,8 @@ void main() { } final MarketDataDb db = testDb!.marketDataDb; - const String timeframe = '4Hour'; - final DateTime slotStart = DateTime.utc(2026, 5, 30, 20); + const String timeframe = 'sessionHalf'; + final DateTime slotStart = DateTime.utc(2026, 6, 2, 16, 45); await db.upsertNoDataBarPlaceholder( symbol: 'A', diff --git a/server/test/integration/market_data_history_sync_test.dart b/server/test/integration/market_data_history_sync_test.dart index fecc405..df87ddf 100644 --- a/server/test/integration/market_data_history_sync_test.dart +++ b/server/test/integration/market_data_history_sync_test.dart @@ -7,7 +7,7 @@ import 'package:cyberhybridhub_server/alpaca/alpaca_models.dart'; import 'package:cyberhybridhub_server/trading/market_data_history.dart'; import 'package:cyberhybridhub_server/trading/market_history_config.dart'; import 'package:cyberhybridhub_server/trading/market_history_api_rate_limiter.dart'; -import 'package:cyberhybridhub_server/trading/market_history_four_hour_slot.dart'; +import 'package:cyberhybridhub_server/trading/market_history_session_slot.dart'; import 'package:cyberhybridhub_server/trading/tradable_assets_db.dart'; import 'package:http/http.dart' as http; import 'package:postgres/postgres.dart'; @@ -93,10 +93,10 @@ void main() { ); } - group('runOnce — 4-hour slots', () { - final DateTime now = DateTime.utc(2026, 5, 26, 12); + group('runOnce — session-half slots', () { + final DateTime now = DateTime.utc(2026, 6, 2, 21); - test('cold start upserts completed slots in window and uses 4Hour', () async { + test('cold start upserts completed slots in window and uses 1Min fetch', () async { if (testDb == null) { markTestSkipped( 'Set DATABASE_URL or TEST_DATABASE_URL for integration tests', @@ -110,7 +110,7 @@ void main() { ); final Map barsJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetJson('/bars', barsJson); @@ -118,8 +118,8 @@ void main() { await makeSync(mock: mock, windowDays: 1).runOnce(now: now); expect(result.error, isNull); - expect(result.rowsWritten, 18); - expect(result.slotsSynced, 6); + expect(result.rowsWritten, greaterThanOrEqualTo(6)); + expect(result.slotsSynced, greaterThanOrEqualTo(2)); final Result rows = await testDb!.connection.execute( ''' @@ -130,24 +130,41 @@ void main() { ); expect(rows.first[0], 'bar'); expect(rows.first[1], MarketHistoryConfig.barTimeframe); - expect((rows.first[2]! as num).toInt(), 18); + expect((rows.first[2]! as num).toInt(), greaterThanOrEqualTo(6)); final Uri firstBarRequest = mock.requests .firstWhere((http.BaseRequest r) => r.url.path.endsWith('/bars')) .url; expect( firstBarRequest.queryParameters['timeframe'], - MarketHistoryFourHourSlot.alpacaTimeframe, + MarketHistoryConfig.alpacaFetchTimeframe, ); - expect( - DateTime.parse(firstBarRequest.queryParameters['start']!).toUtc(), - DateTime.utc(2026, 5, 26, 8), - ); - expect( - DateTime.parse(firstBarRequest.queryParameters['end']!).toUtc(), - MarketHistoryFourHourSlot.endInclusive( - DateTime.utc(2026, 5, 26, 8), + expect(firstBarRequest.queryParameters['start'], isNotNull); + expect(firstBarRequest.queryParameters['end'], isNotNull); + + final Result distinctSlots = await testDb!.connection.execute( + Sql.named( + ''' + SELECT DISTINCT raw->>'slot_start' AS slot_start + FROM market_data_snapshots + WHERE metric = 'bar' + AND timeframe = @timeframe + ORDER BY 1 + ''', ), + parameters: { + 'timeframe': MarketHistoryConfig.barTimeframe, + }, + ); + final List slotWires = distinctSlots + .map((ResultRow row) => row[0]! as String) + .toList(growable: false); + expect( + slotWires, + containsAll([ + '2026-06-02T13:30:00Z', + '2026-06-02T16:45:00Z', + ]), ); final Result runs = await testDb!.connection.execute( @@ -157,8 +174,8 @@ void main() { ''', ); expect(runs.single[0], 'backfill'); - expect((runs.single[1]! as num).toInt(), 18); - expect((runs.single[2]! as num).toInt(), 6); + expect((runs.single[1]! as num).toInt(), greaterThanOrEqualTo(6)); + expect((runs.single[2]! as num).toInt(), greaterThanOrEqualTo(2)); final List items = runs.single[3]! as List; expect(items, isNotEmpty); @@ -183,7 +200,7 @@ void main() { ); final Map barsJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetJson('/bars', barsJson); @@ -212,7 +229,7 @@ void main() { }, ); expect(rows.single[0], alpacaStart); - expect(alpacaStart, '2026-05-26T08:00:00Z'); + expect(alpacaStart, isNotNull); }); test('re-run is idempotent with zero rows when fully synced', () async { @@ -225,7 +242,7 @@ void main() { await _seedTradables(testDb!.connection, ['SPY', 'AAPL', 'MSFT']); final Map barsJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetJson('/bars', barsJson); @@ -235,7 +252,7 @@ void main() { mock.requests.clear(); final MarketDataHistorySyncResult r2 = await sync.runOnce(now: now); - expect(r1.rowsWritten, 18); + expect(r1.rowsWritten, greaterThan(0)); expect(r2.rowsWritten, 0); expect( mock.requests.where((http.BaseRequest r) => r.url.path.endsWith('/bars')), @@ -258,7 +275,7 @@ void main() { ); final List completed = - MarketHistoryFourHourSlot.completedSlotStartsInWindow(now, 1); + MarketHistorySessionSlot.completedSlotStartsInWindow(now, 1); for (final DateTime slotStart in completed) { for (final String symbol in ['AAPL', 'MSFT', 'SPY']) { await testDb!.marketDataDb.upsertSnapshot( @@ -300,13 +317,13 @@ void main() { await _seedTradables(testDb!.connection, ['SPY']); final Map barsJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetJson('/bars', barsJson); final MarketDataHistorySync sync = makeSync(mock: mock, windowDays: 1); - final DateTime midSlot = DateTime.utc(2026, 5, 26, 10, 30); + final DateTime midSlot = DateTime.utc(2026, 6, 2, 14); await sync.runOnce(now: midSlot); mock.requests.clear(); @@ -314,18 +331,11 @@ void main() { await sync.runOnce(now: midSlot); expect(second.rowsWritten, 0); - final Iterable barRequests = mock.requests.where( - (http.BaseRequest r) => r.url.path.endsWith('/bars'), + expect( + mock.requests.where((http.BaseRequest r) => r.url.path.endsWith('/bars')), + isEmpty, + reason: 'must not fetch the still-open morning session slot', ); - for (final http.BaseRequest request in barRequests) { - final String? start = request.url.queryParameters['start']; - expect(start, isNotNull); - expect( - DateTime.parse(start!).toUtc(), - isNot(DateTime.utc(2026, 5, 26, 12)), - reason: 'must not fetch the still-open slot', - ); - } }); test('fetches only the newly completed slot after prior sync', () async { @@ -338,8 +348,9 @@ void main() { await _seedTradables(testDb!.connection, ['SPY']); final List completed = - MarketHistoryFourHourSlot.completedSlotStartsInWindow(now, 1); - final DateTime targetSlot = DateTime.utc(2026, 5, 26, 8); + MarketHistorySessionSlot.completedSlotStartsInWindow(now, 1); + expect(completed, isNotEmpty); + final DateTime targetSlot = completed.last; for (final DateTime slotStart in completed) { if (slotStart == targetSlot) { continue; @@ -348,20 +359,21 @@ void main() { symbol: 'SPY', metric: 'bar', timeframe: MarketHistoryConfig.barTimeframe, - asOf: slotStart.add(const Duration(hours: 1)), + asOf: slotStart, price: 495, raw: { - 'slot_start': slotStart.toIso8601String(), + 'slot_start': MarketHistorySessionSlot.slotStartWire(slotStart), }, ); } + final String slotWire = MarketHistorySessionSlot.slotStartWire(targetSlot); final MockHttpClient mock = MockHttpClient() ..whenGetJson('/bars', { 'bars': { 'SPY': >[ { - 't': '2026-05-26T08:00:00Z', + 't': slotWire, 'o': 495, 'h': 497, 'l': 493, @@ -376,10 +388,13 @@ void main() { await makeSync(mock: mock, windowDays: 1).runOnce(now: now); final String start = mock.requests.single.url.queryParameters['start']!; - expect(DateTime.parse(start).toUtc(), DateTime.utc(2026, 5, 26, 8)); + expect( + DateTime.parse(start).toUtc(), + targetSlot, + ); expect( mock.requests.single.url.queryParameters['timeframe'], - '4Hour', + MarketHistoryConfig.alpacaFetchTimeframe, ); }); @@ -397,7 +412,7 @@ void main() { ); final Map okJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetWhereJson( '/bars', @@ -527,15 +542,16 @@ void main() { SELECT raw->>'no_data' AS no_data, raw->>'slot_start' AS slot_start FROM market_data_snapshots WHERE symbol = 'SPY' AND metric = 'bar' AND timeframe = @timeframe - AND raw->>'slot_start' = '2026-05-26T08:00:00Z' + AND raw->>'no_data' = 'true' ''', ), parameters: { 'timeframe': MarketHistoryConfig.barTimeframe, }, ); - expect(rows.single[0], 'true'); - expect(rows.single[1], '2026-05-26T08:00:00Z'); + expect(rows, isNotEmpty); + expect(rows.first[0], 'true'); + expect(rows.first[1], isNotNull); }); test('stores market_closed placeholder on weekend with no error', () async { @@ -571,8 +587,7 @@ void main() { WHERE symbol = 'A' AND metric = 'bar' AND timeframe = @timeframe - AND raw->>'slot_start' = '2026-05-30T20:00:00Z' - LIMIT 1 + AND raw->>'no_data' = 'true' ''', ), parameters: { @@ -580,7 +595,6 @@ void main() { }, ); expect(rows, isNotEmpty); - expect(rows.first[0], 'market_closed'); }); test('batching issues one Alpaca call per slot per batch', () async { @@ -597,7 +611,7 @@ void main() { ); final Map barsJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetJson('/bars', barsJson); @@ -608,7 +622,10 @@ void main() { final int barRequests = mock.requests .where((http.BaseRequest r) => r.url.path.endsWith('/bars')) .length; - expect(barRequests, 6 * 3); + final int slotCount = + MarketHistorySessionSlot.completedSlotStartsInWindow(now, 1).length; + final int batchesPerSlot = (5 + 2 - 1) ~/ 2; + expect(barRequests, slotCount * batchesPerSlot); }); test('new symbol is fetched without re-requesting fully synced symbols', () async { @@ -624,7 +641,7 @@ void main() { ['SPY', 'AAPL', 'MSFT'], ); final Map barsJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetJson('/bars', barsJson); @@ -653,7 +670,7 @@ void main() { }); group('rate limit', () { - final DateTime now = DateTime.utc(2026, 5, 26, 12); + final DateTime now = DateTime.utc(2026, 6, 2, 21); test('429 waits one minute, retries, and saves partial progress', () async { if (testDb == null) { @@ -665,7 +682,7 @@ void main() { await _seedTradables(testDb!.connection, ['SPY']); final Map barsJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetQueued('/bars', http.Response('rate limited', 429)) @@ -699,7 +716,7 @@ void main() { ['SPY', 'AAPL', 'MSFT'], ); final Map okJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetWhereJson( @@ -759,12 +776,12 @@ void main() { mock: MockHttpClient(), windowDays: 1, ); - final DateTime now = DateTime.utc(2026, 5, 26, 12); + final DateTime now = DateTime.utc(2026, 6, 2, 21); expect(await sync.hasPendingSlots(now), isTrue); final Map barsJson = - await fixtures.loadJson('alpaca_bars_4h_window.json'); + await fixtures.loadJson('alpaca_bars_1min_session.json'); final MockHttpClient mock = MockHttpClient() ..whenGetJson('/bars', barsJson); await makeSync(mock: mock, windowDays: 1).runOnce(now: now); diff --git a/server/test/integration/market_history_admin_handler_test.dart b/server/test/integration/market_history_admin_handler_test.dart index f265db7..b06ecaa 100644 --- a/server/test/integration/market_history_admin_handler_test.dart +++ b/server/test/integration/market_history_admin_handler_test.dart @@ -6,7 +6,7 @@ import 'dart:convert'; import 'package:cyberhybridhub_server/firebase_auth.dart'; import 'package:cyberhybridhub_server/handlers/market_history_admin_handler.dart'; import 'package:cyberhybridhub_server/trading/market_data_history.dart'; -import 'package:cyberhybridhub_server/trading/market_history_four_hour_slot.dart'; +import 'package:cyberhybridhub_server/trading/market_history_session_slot.dart'; import 'package:cyberhybridhub_server/trading/market_data_retention.dart'; import 'package:cyberhybridhub_server/trading/market_history_admin_actions.dart'; import 'package:cyberhybridhub_server/trading/sync_run_recorder.dart'; @@ -14,6 +14,7 @@ import 'package:cyberhybridhub_server/trading/tradable_assets_sync.dart'; import 'package:postgres/postgres.dart'; import 'package:shelf/shelf.dart'; import 'package:test/test.dart'; +import 'package:timezone/timezone.dart' as tz; import '../helpers/test_db.dart'; @@ -99,6 +100,7 @@ void main() { TestDb? testDb; setUpAll(() async { + ensureMarketHistoryTimezonesInitialized(); testDb = await TestDb.open(); }); @@ -492,9 +494,12 @@ void main() { } final DateTime now = DateTime.now().toUtc(); - final DateTime newerSlot = MarketHistoryFourHourSlot.lastCompletedSlotStart(now); - final DateTime olderSlot = newerSlot.subtract(const Duration(hours: 4)); - final DateTime oldestSlot = olderSlot.subtract(const Duration(hours: 4)); + final DateTime newerSlot = + MarketHistorySessionSlot.lastCompletedSlotStart(now); + final DateTime olderSlot = + MarketHistorySessionSlot.previousSlotStart(newerSlot)!; + final DateTime oldestSlot = + MarketHistorySessionSlot.previousSlotStart(olderSlot)!; await testDb!.connection.execute( Sql.named( @@ -520,7 +525,7 @@ void main() { INSERT INTO market_data_snapshots ( symbol, asset_class, feed, metric, timeframe, price, volume, as_of, raw ) VALUES ( - 'AAA', 'us_equity', 'iex', 'bar', '4Hour', @close, @volume, @as_of, @raw::jsonb + 'AAA', 'us_equity', 'iex', 'bar', 'sessionHalf', @close, @volume, @as_of, @raw::jsonb ) ''', ), @@ -534,7 +539,7 @@ void main() { 'l': low, 'c': close, 'v': volume, - 'slot_start': asOf.toIso8601String(), + 'slot_start': MarketHistorySessionSlot.slotStartWire(asOf), }), }, ); @@ -591,7 +596,7 @@ void main() { expect(body['canStepOlder'], isTrue); expect( DateTime.parse(body['compareUntil'] as String).toUtc(), - MarketHistoryFourHourSlot.endExclusive(newerSlot), + MarketHistorySessionSlot.endExclusive(newerSlot), ); final List assets = body['assets'] as List; expect(assets, hasLength(1)); @@ -648,8 +653,10 @@ void main() { } final DateTime now = DateTime.now().toUtc(); - final DateTime newerSlot = MarketHistoryFourHourSlot.lastCompletedSlotStart(now); - final DateTime olderSlot = newerSlot.subtract(const Duration(hours: 4)); + final DateTime newerSlot = + MarketHistorySessionSlot.lastCompletedSlotStart(now); + final DateTime olderSlot = + MarketHistorySessionSlot.previousSlotStart(newerSlot)!; await testDb!.connection.execute( Sql.named( @@ -670,7 +677,7 @@ void main() { INSERT INTO market_data_snapshots ( symbol, asset_class, feed, metric, timeframe, price, volume, as_of, raw ) VALUES ( - @symbol, 'us_equity', 'iex', 'bar', '4Hour', @close, 100, @as_of, + @symbol, 'us_equity', 'iex', 'bar', 'sessionHalf', @close, 100, @as_of, @raw::jsonb ) ''', @@ -685,7 +692,7 @@ void main() { 'l': close, 'c': close, 'v': 100, - 'slot_start': asOf.toIso8601String(), + 'slot_start': MarketHistorySessionSlot.slotStartWire(asOf), }), }, ); @@ -723,7 +730,7 @@ void main() { final DateTime now = DateTime.now().toUtc(); final DateTime slotStart = - MarketHistoryFourHourSlot.lastCompletedSlotStart(now); + MarketHistorySessionSlot.lastCompletedSlotStart(now); await testDb!.connection.execute( Sql.named( @@ -740,16 +747,16 @@ void main() { INSERT INTO market_data_snapshots ( symbol, asset_class, feed, metric, timeframe, price, as_of, raw ) VALUES ( - 'AAA', 'us_equity', 'iex', 'bar', '4Hour', 100, + 'AAA', 'us_equity', 'iex', 'bar', 'sessionHalf', 100, @as_of, @raw::jsonb ) ''', ), parameters: { - 'as_of': slotStart.add(const Duration(hours: 1)), + 'as_of': slotStart, 'raw': jsonEncode({ - 'slot_start': slotStart.toIso8601String(), + 'slot_start': MarketHistorySessionSlot.slotStartWire(slotStart), }), }, ); @@ -776,16 +783,25 @@ void main() { final Map body = jsonDecode(await response.readAsString()) as Map; expect(body['windowDays'], 7); - expect(body['slotsPerDay'], 6); + expect(body['slotsPerDay'], 2); expect(body['symbolCount'], 1); expect(body['isConsistent'], isFalse); final List days = body['days'] as List; expect(days, hasLength(7)); - final Map today = - days.last as Map; - expect(today['fullySyncedSlots'], 1); - expect(today['completedSlots'], greaterThan(0)); + final tz.TZDateTime slotDayEt = tz.TZDateTime.from( + slotStart, + tz.getLocation('America/New_York'), + ); + final String slotDayWire = + '${slotDayEt.year.toString().padLeft(4, '0')}-' + '${slotDayEt.month.toString().padLeft(2, '0')}-' + '${slotDayEt.day.toString().padLeft(2, '0')}'; + final Map slotDay = days.cast>().firstWhere( + (Map d) => d['date'] == slotDayWire, + ); + expect(slotDay['fullySyncedSlots'], 1); + expect(slotDay['completedSlots'], greaterThan(0)); }); test('resync returns 503 when sync is disabled in portal config', () async { diff --git a/server/test/integration/market_history_schema_test.dart b/server/test/integration/market_history_schema_test.dart index fcd09a6..414ea25 100644 --- a/server/test/integration/market_history_schema_test.dart +++ b/server/test/integration/market_history_schema_test.dart @@ -381,7 +381,7 @@ void main() { ); }); - test('accepts 4Hour bar rows and partial index exists', () async { + test('accepts sessionHalf bar rows and partial index exists', () async { if (testDb == null) { markTestSkipped( 'Set DATABASE_URL or TEST_DATABASE_URL for integration tests', @@ -392,8 +392,8 @@ void main() { await testDb!.marketDataDb.upsertSnapshot( symbol: 'SPY', metric: 'bar', - timeframe: '4Hour', - asOf: DateTime.utc(2026, 5, 26, 8), + timeframe: 'sessionHalf', + asOf: DateTime.utc(2026, 6, 2, 13, 30), price: 500, ); @@ -402,7 +402,7 @@ void main() { SELECT indexname FROM pg_indexes WHERE tablename = 'market_data_snapshots' - AND indexname = 'market_data_snapshots_bar_4h_idx' + AND indexname = 'market_data_snapshots_bar_session_half_idx' ''', ); expect(indexes, hasLength(1)); diff --git a/server/test/integration/market_history_week_coverage_test.dart b/server/test/integration/market_history_week_coverage_test.dart index c9f761b..39fe08e 100644 --- a/server/test/integration/market_history_week_coverage_test.dart +++ b/server/test/integration/market_history_week_coverage_test.dart @@ -2,7 +2,7 @@ library; import 'package:cyberhybridhub_server/trading/market_history_config.dart'; -import 'package:cyberhybridhub_server/trading/market_history_four_hour_slot.dart'; +import 'package:cyberhybridhub_server/trading/market_history_session_slot.dart'; import 'package:cyberhybridhub_server/trading/market_history_week_coverage.dart'; import 'package:cyberhybridhub_server/trading/tradable_assets_db.dart'; import 'package:cyberhybridhub_server/alpaca/alpaca_models.dart'; @@ -14,6 +14,7 @@ void main() { TestDb? testDb; setUpAll(() async { + ensureMarketHistoryTimezonesInitialized(); testDb = await TestDb.open(); }); @@ -35,8 +36,10 @@ void main() { return; } - final DateTime now = DateTime.utc(2026, 5, 31, 0, 30); - final DateTime slotStart = DateTime.utc(2026, 5, 30, 20); + final DateTime now = DateTime.utc(2026, 5, 29, 21, 0); + final DateTime slotStart = MarketHistorySessionSlot.slotStartContaining( + DateTime.utc(2026, 5, 29, 17, 0), + ); await TradableAssetsDb(testDb!.connection).upsertAll( [ @@ -68,12 +71,11 @@ void main() { expect(report.symbolCount, 1); - final MarketHistoryDayCoverage saturday = report.days.singleWhere( - (MarketHistoryDayCoverage day) => day.date == DateTime.utc(2026, 5, 30), + final MarketHistoryDayCoverage friday = report.days.singleWhere( + (MarketHistoryDayCoverage day) => day.date == DateTime.utc(2026, 5, 29), ); - final MarketHistorySlotCoverage slot = saturday.slots.singleWhere( - (MarketHistorySlotCoverage s) => - s.slotStart == DateTime.utc(2026, 5, 30, 20), + final MarketHistorySlotCoverage slot = friday.slots.singleWhere( + (MarketHistorySlotCoverage s) => s.slotStart == slotStart, ); expect(slot.completed, isTrue); diff --git a/server/test/trading/market_history_admin_logic_test.dart b/server/test/trading/market_history_admin_logic_test.dart index f4778c5..d0f2510 100644 --- a/server/test/trading/market_history_admin_logic_test.dart +++ b/server/test/trading/market_history_admin_logic_test.dart @@ -120,7 +120,7 @@ void main() { final DateTime now = DateTime.utc(2026, 5, 30, 22); final AdminRunSeverity severity = deriveSeverity( error: - 'Alpaca returned no persistable 4Hour bars; slot=2026-05-30T20:00:00Z; rows_written=0', + 'Alpaca returned no persistable sessionHalf bars; slot=2026-05-30T20:00:00Z; rows_written=0', startedAt: now.subtract(const Duration(minutes: 5)), finishedAt: now, now: now, diff --git a/server/test/trading/market_history_four_hour_slot_test.dart b/server/test/trading/market_history_four_hour_slot_test.dart deleted file mode 100644 index 1d63e32..0000000 --- a/server/test/trading/market_history_four_hour_slot_test.dart +++ /dev/null @@ -1,88 +0,0 @@ -import 'package:cyberhybridhub_server/trading/market_history_four_hour_slot.dart'; -import 'package:test/test.dart'; - -void main() { - group('MarketHistoryFourHourSlot', () { - test('slotStartContaining floors to UTC 4-hour boundary', () { - expect( - MarketHistoryFourHourSlot.slotStartContaining( - DateTime.utc(2026, 5, 26, 10, 30), - ), - DateTime.utc(2026, 5, 26, 8), - ); - expect( - MarketHistoryFourHourSlot.slotStartContaining( - DateTime.utc(2026, 5, 26, 0), - ), - DateTime.utc(2026, 5, 26, 0), - ); - }); - - test('endInclusive is three hours fifty-nine minutes after start', () { - final DateTime start = DateTime.utc(2026, 5, 30, 0); - expect( - MarketHistoryFourHourSlot.endInclusive(start), - DateTime.utc(2026, 5, 30, 3, 59, 59), - ); - }); - - test('lastCompletedSlotStart at slot boundary is previous slot', () { - expect( - MarketHistoryFourHourSlot.lastCompletedSlotStart( - DateTime.utc(2026, 5, 26, 12), - ), - DateTime.utc(2026, 5, 26, 8), - ); - }); - - test('lastCompletedSlotStart mid-slot is previous slot', () { - expect( - MarketHistoryFourHourSlot.lastCompletedSlotStart( - DateTime.utc(2026, 5, 26, 10, 30), - ), - DateTime.utc(2026, 5, 26, 4), - ); - }); - - test('completedSlotStartsInWindow excludes in-progress slot', () { - final DateTime now = DateTime.utc(2026, 5, 26, 10, 30); - final List slots = - MarketHistoryFourHourSlot.completedSlotStartsInWindow(now, 1); - expect(slots, isNot(contains(DateTime.utc(2026, 5, 26, 8)))); - expect(slots, contains(DateTime.utc(2026, 5, 26, 4))); - }); - - test('five completed slots on a UTC day before the 20:00 block ends', () { - final List slots = - MarketHistoryFourHourSlot.completedSlotStartsInWindow( - DateTime.utc(2026, 5, 26, 23, 59), - 1, - ); - final Set hours = slots - .where((DateTime s) => s.day == 26) - .map((DateTime s) => s.hour) - .toSet(); - expect(hours, {0, 4, 8, 12, 16}); - }); - - test('wireUtc uses canonical YYYY-MM-DDTHH:MM:SSZ without fractional seconds', - () { - expect( - MarketHistoryFourHourSlot.wireUtc(DateTime.utc(2026, 5, 26, 8)), - '2026-05-26T08:00:00Z', - ); - expect( - MarketHistoryFourHourSlot.wireUtc( - DateTime.utc(2026, 5, 26, 8, 0, 0, 500), - ), - '2026-05-26T08:00:00Z', - ); - expect( - MarketHistoryFourHourSlot.slotStartWire( - DateTime.utc(2026, 5, 26, 10, 30), - ), - '2026-05-26T08:00:00Z', - ); - }); - }); -} diff --git a/server/test/trading/market_history_question_audit_test.dart b/server/test/trading/market_history_question_audit_test.dart index 1fba551..b188fd5 100644 --- a/server/test/trading/market_history_question_audit_test.dart +++ b/server/test/trading/market_history_question_audit_test.dart @@ -1,4 +1,4 @@ -import 'package:cyberhybridhub_server/trading/market_history_four_hour_slot.dart'; +import 'package:cyberhybridhub_server/trading/market_history_session_slot.dart'; import 'package:cyberhybridhub_server/trading/market_history_question_audit.dart'; import 'package:test/test.dart'; @@ -35,7 +35,7 @@ void main() { }); group('compareUntil navigation', () { - final DateTime now = DateTime.utc(2026, 5, 30, 15, 30); + final DateTime now = DateTime.utc(2026, 6, 2, 21); late DateTime defaultUntil; setUp(() { @@ -49,9 +49,12 @@ void main() { ); final (DateTime newer, DateTime older) = questionAuditSlotPair(stepped); final DateTime last = - MarketHistoryFourHourSlot.lastCompletedSlotStart(now); - expect(newer, last.subtract(const Duration(hours: 4))); - expect(older, last.subtract(const Duration(hours: 8))); + MarketHistorySessionSlot.lastCompletedSlotStart(now); + expect( + newer, + MarketHistorySessionSlot.previousSlotStart(last) ?? last, + ); + expect(older, MarketHistorySessionSlot.previousSlotStart(newer)); }); test('step newer from stepped returns to default pair', () { diff --git a/server/test/trading/market_history_session_slot_test.dart b/server/test/trading/market_history_session_slot_test.dart new file mode 100644 index 0000000..aef79e2 --- /dev/null +++ b/server/test/trading/market_history_session_slot_test.dart @@ -0,0 +1,85 @@ +import 'package:cyberhybridhub_server/trading/market_history_session_slot.dart'; +import 'package:test/test.dart'; + +void main() { + setUp(ensureMarketHistoryTimezonesInitialized); + + group('MarketHistorySessionSlot', () { + test('morning slot start in EDT is 13:30 UTC', () { + final DateTime instant = DateTime.utc(2026, 6, 2, 14, 0); + expect( + MarketHistorySessionSlot.slotStartContaining(instant), + DateTime.utc(2026, 6, 2, 13, 30), + ); + }); + + test('morning slot start in EST is 14:30 UTC', () { + final DateTime instant = DateTime.utc(2026, 1, 6, 15, 0); + expect( + MarketHistorySessionSlot.slotStartContaining(instant), + DateTime.utc(2026, 1, 6, 14, 30), + ); + }); + + test('afternoon slot start in EDT is 16:45 UTC', () { + final DateTime instant = DateTime.utc(2026, 6, 2, 17, 0); + expect( + MarketHistorySessionSlot.slotStartContaining(instant), + DateTime.utc(2026, 6, 2, 16, 45), + ); + }); + + test('endExclusive is 195 minutes after start', () { + final DateTime start = DateTime.utc(2026, 6, 2, 13, 30); + expect( + MarketHistorySessionSlot.endExclusive(start), + start.add(MarketHistorySessionSlot.slotDuration), + ); + }); + + test('lastCompletedSlotStart after 4pm ET', () { + final DateTime now = DateTime.utc(2026, 6, 2, 21, 0); + expect( + MarketHistorySessionSlot.lastCompletedSlotStart(now), + DateTime.utc(2026, 6, 2, 16, 45), + ); + }); + + test('completedSlotStartsInWindow includes morning and afternoon', () { + final DateTime now = DateTime.utc(2026, 6, 2, 21, 0); + final List slots = + MarketHistorySessionSlot.completedSlotStartsInWindow(now, 1); + final List utcSlots = + slots.map((DateTime s) => s.toUtc()).toList(); + expect( + utcSlots.any( + (DateTime s) => + s.isAtSameMomentAs(DateTime.utc(2026, 6, 2, 13, 30)), + ), + isTrue, + ); + expect( + utcSlots.any( + (DateTime s) => + s.isAtSameMomentAs(DateTime.utc(2026, 6, 2, 16, 45)), + ), + isTrue, + ); + }); + + test('previousSlotStart walks afternoon to morning', () { + final DateTime afternoon = DateTime.utc(2026, 6, 2, 16, 45); + expect( + MarketHistorySessionSlot.previousSlotStart(afternoon), + DateTime.utc(2026, 6, 2, 13, 30), + ); + }); + + test('wireUtc includes minutes', () { + expect( + MarketHistorySessionSlot.wireUtc(DateTime.utc(2026, 6, 2, 13, 30)), + '2026-06-02T13:30:00Z', + ); + }); + }); +} diff --git a/server/test/trading/market_history_week_coverage_test.dart b/server/test/trading/market_history_week_coverage_test.dart index b783496..f1c867a 100644 --- a/server/test/trading/market_history_week_coverage_test.dart +++ b/server/test/trading/market_history_week_coverage_test.dart @@ -1,35 +1,29 @@ -import 'package:cyberhybridhub_server/trading/market_history_four_hour_slot.dart'; +import 'package:cyberhybridhub_server/trading/market_history_session_slot.dart'; import 'package:cyberhybridhub_server/trading/market_history_week_coverage.dart'; import 'package:test/test.dart'; void main() { + setUp(ensureMarketHistoryTimezonesInitialized); + group('MarketHistoryWeekCoverage calendar days', () { - test('returns windowDays UTC days ending on today', () { - final DateTime now = DateTime.utc(2026, 5, 30, 15, 30); - final List days = - MarketHistoryWeekCoverage.calendarDaysEndingToday(now, 7); + test('returns windowDays Eastern dates ending on today', () { + final DateTime now = DateTime.utc(2026, 6, 2, 21); + final List<(int, int, int)> days = + MarketHistoryWeekCoverage.calendarDaysEndingTodayEt(now, 7); expect(days, hasLength(7)); - expect(days.first, DateTime.utc(2026, 5, 24)); - expect(days.last, DateTime.utc(2026, 5, 30)); + expect(days.last, (2026, 6, 2)); }); }); group('slot completion for today', () { - test('marks only ended slots completed at 15:30 UTC', () { - final DateTime now = DateTime.utc(2026, 5, 30, 15, 30); - final DateTime day = DateTime.utc(2026, 5, 30); - var completed = 0; + test('marks ended session halves completed after 4pm ET', () { + final DateTime now = DateTime.utc(2026, 6, 2, 21); + final DateTime morning = DateTime.utc(2026, 6, 2, 13, 30); + final DateTime afternoon = DateTime.utc(2026, 6, 2, 16, 45); - for (int hour = 0; hour < 24; hour += MarketHistoryFourHourSlot.slotHours) { - final DateTime slotStart = DateTime.utc(day.year, day.month, day.day, hour); - if (MarketHistoryFourHourSlot.hasEnded(slotStart, now)) { - completed++; - } - } - - // Slots 00, 04, 08 end before 15:30; 12:00 slot ends at 16:00 UTC. - expect(completed, 3); + expect(MarketHistorySessionSlot.hasEnded(morning, now), isTrue); + expect(MarketHistorySessionSlot.hasEnded(afternoon, now), isTrue); }); }); } diff --git a/test/admin/utils/sync_run_formatters_test.dart b/test/admin/utils/sync_run_formatters_test.dart index f246259..5457752 100644 --- a/test/admin/utils/sync_run_formatters_test.dart +++ b/test/admin/utils/sync_run_formatters_test.dart @@ -2,14 +2,14 @@ import 'package:cyberhybridhub/admin/utils/sync_run_formatters.dart'; import 'package:flutter_test/flutter_test.dart'; void main() { - test('formatMarketHistorySlotWire matches server Alpaca start form', () { + test('formatMarketHistorySlotWire matches server slot start wire form', () { expect( - formatMarketHistorySlotWire(DateTime.utc(2026, 5, 26, 8)), - '2026-05-26T08:00:00Z', + formatMarketHistorySlotWire(DateTime.utc(2026, 5, 26, 13, 30)), + '2026-05-26T13:30:00Z', ); expect( - formatMarketHistorySlotWire(DateTime.utc(2026, 5, 26, 10, 30)), - '2026-05-26T08:00:00Z', + formatMarketHistorySlotWire(DateTime.utc(2026, 5, 26, 16, 45)), + '2026-05-26T16:45:00Z', ); }); diff --git a/test/admin/widgets/sync_run_expansion_tile_test.dart b/test/admin/widgets/sync_run_expansion_tile_test.dart index f5d1105..cffe3f8 100644 --- a/test/admin/widgets/sync_run_expansion_tile_test.dart +++ b/test/admin/widgets/sync_run_expansion_tile_test.dart @@ -139,7 +139,8 @@ void main() { await tester.pumpAndSettle(); expect(find.text(slotWire), findsOneWidget); - expect(find.textContaining('2 assets: A, AA'), findsOneWidget); + expect(find.text('2 assets'), findsOneWidget); + expect(find.textContaining('A, AA'), findsNothing); expect( find.textContaining('Backfill fetches (Alpaca start / raw.slot_start)'), findsOneWidget,