diff --git a/TRADING_DEVELOPMENT_PLAN.md b/TRADING_DEVELOPMENT_PLAN.md new file mode 100644 index 0000000..eca7b7e --- /dev/null +++ b/TRADING_DEVELOPMENT_PLAN.md @@ -0,0 +1,540 @@ +# Trading Development Plan — Alpaca Data & Execution + +This document defines how Cyber Hybrid Hub will add **Alpaca market data gathering**, **Alpaca trade actuation**, and a **dynamically configurable data-input layer** that feeds the existing **interval worker** (`QuestionBackgroundWorker`), **Postgres** persistence, **user questions** (SignalR), and **automated trade decisions**. + +It extends the current architecture documented in `server/README.md` and implemented in `server/lib/pipeline/question_pipeline.dart` — not replace it. + +--- + +## 1. Goals & constraints + +| Goal | Detail | +|------|--------| +| **Data** | Ingest US equity (and optional crypto) prices from Alpaca Market Data; store normalized snapshots in Postgres for the worker to read. | +| **Questions** | Worker evaluates stored data + pipeline config → enqueues swipe/numeric questions via `QuestionService` → Flutter receives `ReceiveQuestion` over SignalR. | +| **Trades** | After rules + optional user confirmation via answers, place orders through Alpaca Trading API (paper first). | +| **Configuration** | Watchlists, thresholds, and pipeline steps are **DB-driven** (and env defaults), not hard-coded in Dart. | +| **Security** | Alpaca secret keys live **only** on the Dart API server; Flutter never holds trading credentials. | +| **Cost** | Target Alpaca **Basic** market data (IEX, ~30 WS symbols) + **paper** trading until strategies are validated. | + +**Non-goals (initial phases):** sub-second HFT, full SIP market data, client-side WebSockets to Alpaca, or multi-broker routing. + +--- + +## 2. Fit with the current system + +```mermaid +flowchart TB + subgraph flutter [Flutter app] + UI[HomeScreen / SwipeQuestionTile] + Hub[QuestionsHubService SignalR] + end + + subgraph server [Dart API server] + API[Shelf handlers] + Worker[QuestionBackgroundWorker interval] + MDP[MarketDataPipeline NEW] + TP[TradingPipeline NEW] + QP[QuestionPipeline existing] + QS[QuestionService] + AlpacaMD[Alpaca Market Data client] + AlpacaTR[Alpaca Trading client] + end + + subgraph pg [Postgres] + MD[market_data_snapshots] + CFG[trading_config / user_trading_state] + Q[questions] + UPS[user_pipeline_state] + ORD[trade_orders] + end + + AlpacaMD -->|REST poll or WS| MDP + MDP --> MD + Worker --> MDP + Worker --> TP + Worker --> QP + TP --> MD + TP --> CFG + TP --> ORD + TP --> AlpacaTR + QP --> QS + QS --> Q + QS --> Hub + API --> QS + Hub --> UI + API --> UI +``` + +**Existing touchpoints to extend:** + +| Component | Path | Role | +|-----------|------|------| +| Interval worker | `server/lib/workers/question_background_worker.dart` | Orchestrates tick; will call market-data ingest + trading evaluation before/alongside `QuestionPipeline.runMaintenanceCycle`. | +| Question pipeline | `server/lib/pipeline/question_pipeline.dart` | Add `PipelineKeys.trading` branch; reuse `BranchDecision` for threshold-style answers. | +| External fetcher pattern | `server/lib/pipeline/external_data_fetcher.dart` | Precedent for HTTP clients; Alpaca clients live in `server/lib/alpaca/`. | +| Env loading | `server/lib/env.dart` | Add Alpaca + trading feature flags. | +| Flutter | `lib/services/questions_hub_service.dart`, `lib/widgets/swipe_question_tile.dart` | Unchanged contract: questions in, numeric answers out. | + +--- + +## 3. Alpaca accounts & APIs + +Use **one Alpaca account** with two API surfaces: + +| Surface | Base URL (paper) | Purpose | +|---------|------------------|---------| +| **Market Data** | `https://data.alpaca.markets` | Snapshots, bars, trades, quotes; WS `wss://stream.data.alpaca.markets/v2/iex` (Basic). | +| **Trading** | `https://paper-api.alpaca.markets` | Orders, positions, account; switch to live URL only after explicit env flip. | + +**Authentication:** Headers `APCA-API-KEY-ID` and `APCA-API-SECRET-KEY` on every request. + +**Free-tier limits to design around:** + +- Market Data Basic: **IEX real-time**, **~30 WebSocket subscriptions**, historical REST capped (use DB as source of truth for worker ticks). +- Trading: paper account; rate limits documented by Alpaca; idempotent `client_order_id` required. + +--- + +## 4. Dynamically configurable data input design + +Configuration is split into **global templates** (reusable) and **per-user overrides** (Firebase UID). The worker always resolves effective config before reading market data or emitting questions. + +### 4.1 Configuration layers + +```text +trading_config_templates (optional defaults by name) + ↓ merge +user_trading_config (per user: watchlist, rules, enabled flag) + ↓ resolved at tick +EffectiveTradingConfig (in-memory for one worker cycle) +``` + +### 4.2 Config document shape (JSONB) + +Store in Postgres as `JSONB`; validate on write via API or seed migrations. + +```json +{ + "version": 1, + "enabled": true, + "mode": "paper", + "data_inputs": [ + { + "id": "primary_watchlist", + "source": "alpaca", + "asset_class": "us_equity", + "symbols": ["AAPL", "MSFT", "SPY"], + "feed": "iex", + "poll_interval_seconds": 60, + "metrics": ["last_trade", "daily_bar", "prev_close"] + } + ], + "rules": [ + { + "id": "dip_confirm", + "type": "price_below_pct_of_ref", + "symbol": "SPY", + "ref_metric": "prev_close", + "threshold_pct": -1.5, + "question_template": "SPY is down {{pct}}% vs yesterday. Swipe +10 to approve a small buy, -10 to skip.", + "on_answer_match": { "action": "propose_order", "side": "buy", "notional_usd": 10 } + } + ], + "guardrails": { + "max_orders_per_day": 3, + "max_notional_usd_per_day": 100, + "require_question_before_order": true, + "symbols_blocklist": [] + } +} +``` + +**Design principles:** + +- **`data_inputs[]`** — what to fetch and how often; worker matches `poll_interval_seconds` against `QUESTION_WORKER_INTERVAL_SECONDS` or stores last fetch time per input id in `user_trading_state.context`. +- **`rules[]`** — declarative triggers evaluated against **Postgres snapshots**, not live Alpaca on every rule check (decouples question logic from API rate limits). +- **`question_template`** — supports `{{symbol}}`, `{{price}}`, `{{pct}}` substitution when creating `question_text`. +- **User answers** map to actions via existing `pipeline_key` / `pipeline_step` / `source_tag` conventions (e.g. `trading:dip_confirm:await_answer`). + +### 4.3 Config administration + +| Method | Phase | +|--------|-------| +| SQL seed / migration | Phase 1 — dev templates | +| `PUT /v1/me/trading/config` | Phase 2 — authenticated user override | +| Admin script | Optional bulk updates | + +Flutter does **not** edit raw JSON initially; a settings screen can come later. Phase 1 uses server-side defaults keyed by `firebase_uid`. + +--- + +## 5. Postgres schema (new migration `004_trading.sql`) + +### 5.1 `market_data_snapshots` + +Append-only (or upsert latest) normalized facts per symbol. + +```sql +CREATE TABLE market_data_snapshots ( + id BIGSERIAL PRIMARY KEY, + symbol TEXT NOT NULL, + asset_class TEXT NOT NULL DEFAULT 'us_equity', + feed TEXT NOT NULL DEFAULT 'iex', + metric TEXT NOT NULL, -- last_trade | daily_bar | prev_close + price NUMERIC, + volume NUMERIC, + as_of TIMESTAMPTZ NOT NULL, + raw JSONB, -- optional Alpaca payload fragment + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX market_data_snapshots_symbol_as_of_idx + ON market_data_snapshots (symbol, as_of DESC); +``` + +Worker queries: “latest row per `(symbol, metric)`” via `DISTINCT ON` or a materialized view `market_data_latest` (Phase 2 optimization). + +### 5.2 `trading_config_templates` + +```sql +CREATE TABLE trading_config_templates ( + name TEXT PRIMARY KEY, + config JSONB NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +``` + +### 5.3 `user_trading_config` + +```sql +CREATE TABLE user_trading_config ( + firebase_uid TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE, + template_name TEXT REFERENCES trading_config_templates (name), + config JSONB NOT NULL DEFAULT '{}', + enabled BOOLEAN NOT NULL DEFAULT false, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +``` + +### 5.4 `user_trading_state` + +Extends the pipeline state pattern in `user_pipeline_state` for trading-specific cursor. + +```sql +CREATE TABLE user_trading_state ( + firebase_uid TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE, + last_ingest_at TIMESTAMPTZ, + last_eval_at TIMESTAMPTZ, + context JSONB NOT NULL DEFAULT '{}', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +``` + +`context` holds: last fired rule ids, pending order proposal ids, daily order counts, last fetch times per `data_inputs[].id`. + +### 5.5 `trade_orders` + +Audit trail and idempotency. + +```sql +CREATE TABLE trade_orders ( + id UUID PRIMARY KEY, + firebase_uid TEXT NOT NULL REFERENCES users (firebase_uid) ON DELETE CASCADE, + client_order_id TEXT NOT NULL UNIQUE, + alpaca_order_id TEXT, + symbol TEXT NOT NULL, + side TEXT NOT NULL, + order_type TEXT NOT NULL, + notional_usd NUMERIC, + qty NUMERIC, + status TEXT NOT NULL, + question_id UUID REFERENCES questions (id), + rule_id TEXT, + submitted_at TIMESTAMPTZ, + filled_at TIMESTAMPTZ, + raw JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +``` + +### 5.6 Questions table linkage + +Reuse existing columns: + +- `source_tag` — e.g. `trading:rule:dip_confirm` +- `pipeline_key` — `trading` +- `pipeline_step` — rule id + phase (`propose`, `confirm`, `idle`) + +Optional Phase 2: `questions.metadata JSONB` for symbol/price at question time. + +--- + +## 6. Server modules (proposed layout) + +```text +server/lib/ + alpaca/ + alpaca_env.dart # keys, paper vs live, data host + alpaca_market_data_client.dart + alpaca_trading_client.dart + alpaca_models.dart + trading/ + trading_config.dart # parse/merge JSONB → EffectiveTradingConfig + trading_config_db.dart + market_data_ingest.dart # data_inputs → snapshots + market_data_db.dart + rule_engine.dart # rules × snapshots → RuleEvaluation + trading_pipeline.dart # questions + order proposals + trade_orders_db.dart + pipeline/ + question_pipeline.dart # wire trading branch in onAnswerSubmitted + workers/ + question_background_worker.dart # invoke trading tick +``` + +--- + +## 7. Worker tick sequence (single interval) + +On each `QuestionBackgroundWorker._tick()` when `TRADING_ENABLED=true`: + +```text +1. For each user with user_trading_config.enabled: + a. Resolve EffectiveTradingConfig (template + user JSON merge). + b. MarketDataIngest.runIfDue(user, config.data_inputs) + → Alpaca REST (or WS service) → INSERT market_data_snapshots + c. TradingPipeline.evaluate(user, config, snapshots from DB) + → if rule fires and guardrails OK: + - maybe create question (respect maxQueuedQuestions) + - or stage pending order in user_trading_state.context + d. QuestionPipeline.runMaintenanceCycle() for same user (existing geography/weather/trading branches) + +2. For users with pending order + confirmed answer (see §8): + TradeActuator.submitOrder() → Alpaca Trading → UPDATE trade_orders +``` + +**Concurrency:** keep `_running` guard; ingest and evaluate per user sequentially first; optimize later. + +**Alignment with `QUESTION_WORKER_INTERVAL_SECONDS`:** default 60s matches Alpaca Basic REST polling for watchlists ≤30 symbols. A dedicated **long-lived WS ingest task** (optional Phase 3) writes snapshots on every tick and reduces REST calls. + +--- + +## 8. Question ↔ trade decision flow + +### 8.1 Phases per rule + +| Phase | `pipeline_step` | Worker behavior | +|-------|-----------------|-----------------| +| `idle` | — | Evaluate rules against DB snapshots only. | +| `await_confirm` | rule id | Question delivered; `correct_answer` encodes expected confirmation (+10 / -10). | +| `submit_order` | rule id | User answered; if match and guardrails pass → POST Alpaca order. | +| `done` | rule id | Log outcome; return to `idle`. | + +### 8.2 Branching (reuse `BranchDecision`) + +| User swipe | Meaning | Trade action | +|------------|---------|--------------| +| `+10` (yes) | Approve proposed action | `TradeActuator` places configured order | +| `-10` (no) | Reject | No order; log skip in `user_trading_state.context` | +| near 0 | Ambiguous | Optional follow-up question (same pattern as weather `close`) | + +### 8.3 Guardrails (always before Alpaca POST) + +- `require_question_before_order` and unanswered question must be resolved. +- `max_orders_per_day` / `max_notional_usd_per_day` from config. +- `mode == paper` unless `ALPACA_ALLOW_LIVE=true`. +- Symbol in user watchlist and not in `symbols_blocklist`. +- Idempotent `client_order_id` = `uuid` or `{uid}-{rule_id}-{question_id}`. + +--- + +## 9. Alpaca data gathering (implementation notes) + +### 9.1 Phase 1 — REST polling (recommended start) + +For each symbol in `data_inputs[].symbols`: + +| Metric | Alpaca endpoint (v2) | +|--------|----------------------| +| `last_trade` | `GET /v2/stocks/{symbol}/trades/latest` | +| `daily_bar` | `GET /v2/stocks/bars?symbols=...&timeframe=1Day&limit=1` | +| `prev_close` | Derived from previous daily bar or snapshot | + +Batch symbols where possible (`bars` multi-symbol) to minimize calls. + +### 9.2 Phase 2 — WebSocket ingest service + +- Connect to `wss://stream.data.alpaca.markets/v2/iex`. +- Subscribe to union of all enabled users’ symbols (cap 30 on Basic — enforce in config validator). +- On trade/quote events → upsert `market_data_snapshots` with `as_of = now()`. +- Worker reads DB only; WS process runs independently inside server isolate or secondary entrypoint. + +### 9.3 Normalization + +All ingested values stored as `NUMERIC` in USD where applicable; `raw` JSONB for debugging. Rule engine never parses Alpaca JSON directly — only snapshot rows. + +--- + +## 10. Alpaca trade actuation (implementation notes) + +### 10.1 Order types (initial) + +| Config | Alpaca order | +|--------|--------------| +| `notional_usd` | `POST /v2/orders` with `notional`, `side`, `type: market`, `time_in_force: day` | +| Future: qty | `qty` instead of `notional` | + +### 10.2 Paper vs live + +| Env var | Default | +|---------|---------| +| `ALPACA_TRADING_BASE_URL` | `https://paper-api.alpaca.markets` | +| `ALPACA_ALLOW_LIVE` | `false` | + +Server refuses live URL unless `ALPACA_ALLOW_LIVE=true` and `TRADING_MODE=live` in user config. + +### 10.3 Post-submit + +- Poll order status or use Alpaca trade updates (Phase 3). +- Update `trade_orders.status` and notify user via optional future SignalR event `ReceiveTradeUpdate` (not required for MVP). + +--- + +## 11. Environment variables + +Add to `server/.env.example` (document only in plan until implemented): + +```bash +# Trading feature gate +TRADING_ENABLED=false +TRADING_WORKER_INGEST_ENABLED=true +TRADING_WORKER_EVAL_ENABLED=true + +# Alpaca (server only — never in Flutter) +ALPACA_API_KEY_ID= +ALPACA_API_SECRET_KEY= +ALPACA_TRADING_BASE_URL=https://paper-api.alpaca.markets +ALPACA_DATA_BASE_URL=https://data.alpaca.markets +ALPACA_DATA_FEED=iex +ALPACA_ALLOW_LIVE=false +``` + +Existing worker flags remain: + +- `QUESTION_WORKER_ENABLED` — master switch for interval timer. +- `QUESTION_WORKER_INTERVAL_SECONDS` — drives ingest due-times and evaluation frequency. +- `QUESTION_PIPELINE_TEST_MODE` — when true, skip real Alpaca calls; use fixture snapshots and fake order ids. + +--- + +## 12. Flutter app responsibilities + +| Responsibility | Owner | +|----------------|-------| +| Display questions, collect swipe/numeric answer | Flutter (`SwipeQuestionTile`) | +| Submit answer `POST /v1/me/questions/{id}/answer` | Flutter | +| Real-time question delivery | SignalR `ReceiveQuestion` | +| Alpaca credentials | **Never on device** | +| Market data storage | Postgres via server | +| Trade execution | Server only | + +Optional later: read-only `GET /v1/me/trading/status` for positions and recent `trade_orders` (no secrets). + +--- + +## 13. HTTP API additions (Phase 2+) + +| Method | Path | Purpose | +|--------|------|---------| +| `GET` | `/v1/me/trading/config` | Return merged effective config (sanitized). | +| `PUT` | `/v1/me/trading/config` | Update user JSONB override. | +| `GET` | `/v1/me/trading/orders` | List `trade_orders` for user. | +| `GET` | `/v1/me/trading/snapshots?symbol=SPY` | Debug/latest metrics (auth required). | + +Phase 1 can omit public endpoints and use SQL seeds for test users. + +--- + +## 14. Implementation phases + +### Phase 1 — Foundation (MVP) + +- [ ] Migration `004_trading.sql` +- [ ] `AlpacaMarketDataClient` + `AlpacaTradingClient` (paper, REST only) +- [ ] `MarketDataIngest` + `market_data_snapshots` writes +- [ ] `TradingConfig` resolver + seed template `default_paper_watchlist` +- [ ] `RuleEngine` with one rule type: `price_below_pct_of_ref` +- [ ] `TradingPipeline.evaluate` → create question via `QuestionService` +- [ ] Extend `QuestionPipeline.onAnswerSubmitted` for `PipelineKeys.trading` +- [ ] `TradeActuator` on confirm → `trade_orders` + Alpaca POST +- [ ] Wire into `QuestionBackgroundWorker` tick behind `TRADING_ENABLED` +- [ ] `QUESTION_PIPELINE_TEST_MODE` fixtures for CI/local without Alpaca keys + +### Phase 2 — Configuration API & hardening + +- [ ] `PUT/GET /v1/me/trading/config` +- [ ] Config validation (symbol cap 30, required fields) +- [ ] `market_data_latest` view or cached latest query +- [ ] Daily guardrail counters in `user_trading_state.context` +- [ ] Order status polling + failed order user question + +### Phase 3 — Streaming & observability + +- [ ] Alpaca IEX WebSocket ingest sidecar +- [ ] `ReceiveTradeUpdate` SignalR event (optional) +- [ ] Metrics logging (ingest lag, rule fires, order latency) + +--- + +## 15. Testing strategy + +| Layer | Approach | +|-------|----------| +| Unit | `RuleEngine` with fixture snapshot rows; no network. | +| Integration | Alpaca paper account; verify order appears in Alpaca dashboard. | +| Worker | Run server with `QUESTION_PIPELINE_TEST_MODE=true` and `TRADING_ENABLED=true` using seeded snapshots. | +| Flutter | Existing question flow; manual swipe on trading copy. | + +**Safety checklist before live:** + +- [ ] `ALPACA_ALLOW_LIVE=false` in production until explicit review +- [ ] Guardrail notional limits enforced in code, not only config +- [ ] Every order traceable to `question_id` + `rule_id` + +--- + +## 16. Risks & mitigations + +| Risk | Mitigation | +|------|------------| +| Stale DB snapshots | Store `as_of`; rules ignore data older than `max_staleness_seconds` in config. | +| Alpaca rate limits | Batch REST; cap symbols; WS in Phase 3. | +| Duplicate orders | `client_order_id` UNIQUE; check `trade_orders` before POST. | +| User confusion on auto-trades | Clear `question_text`; default `require_question_before_order: true`. | +| Regulatory / suitability | Paper only initially; disclaimers in app; no investment advice in question copy. | + +--- + +## 17. Example end-to-end scenario + +1. **Seed** `user_trading_config` for user U: enabled, `SPY` in watchlist, `dip_confirm` rule at -1.5% vs `prev_close`. +2. **Worker tick (T0):** ingest writes `prev_close=500`, `last_trade=492` → rule fires. +3. **Worker:** queue question: “SPY is down 1.6% … +10 approve buy $10, -10 skip” with `pipeline_key=trading`, `pipeline_step=dip_confirm:await_confirm`. +4. **Flutter:** user swipes +10 → `POST .../answer`. +5. **Pipeline:** `onAnswerSubmitted` → `TradingPipeline` sees match → `TradeActuator` POST paper market order $10 SPY → row in `trade_orders`. +6. **Next tick:** rule not re-fired (cooldown in `user_trading_state.context`). + +--- + +## 18. References + +- [Alpaca Market Data API](https://docs.alpaca.markets/docs/market-data-api) +- [Alpaca Trading API](https://docs.alpaca.markets/docs/trading-api) +- [Alpaca WebSocket streaming](https://docs.alpaca.markets/docs/streaming-market-data) +- Existing pipeline: `server/lib/pipeline/question_pipeline.dart` +- Existing worker: `server/lib/workers/question_background_worker.dart` + +--- + +*Document version: 1.0 — Alpaca unified data + execution, Postgres-backed configurable inputs, interval worker integration.* diff --git a/lib/config/api_config.dart b/lib/config/api_config.dart index d3da3af..3eb4767 100644 --- a/lib/config/api_config.dart +++ b/lib/config/api_config.dart @@ -6,3 +6,6 @@ const String apiBaseUrl = String.fromEnvironment( 'API_BASE_URL', defaultValue: 'http://localhost:3000', ); + +/// SignalR hub for real-time incoming questions. +String get questionsHubUrl => '$apiBaseUrl/hubs/questions'; diff --git a/lib/guid/guid_glyph_shape.dart b/lib/guid/guid_glyph_shape.dart new file mode 100644 index 0000000..44dc8a5 --- /dev/null +++ b/lib/guid/guid_glyph_shape.dart @@ -0,0 +1,236 @@ +import 'dart:math' as math; +import 'dart:typed_data'; + +import 'package:flutter/material.dart'; + +/// Deterministic irregular polygon + palette derived from a question UUID. +/// +/// Parses the canonical 16-byte UUID (hyphens optional) and maps byte pairs to +/// polar vertices so the same id always produces the same glyph. +class GuidGlyphShape { + GuidGlyphShape._(this.bytes); + + final Uint8List bytes; + + /// Builds a shape from [guid]. Non-UUID strings fall back to a stable hash. + factory GuidGlyphShape.fromGuid(String guid) { + final Uint8List? parsed = tryParseUuidBytes(guid); + if (parsed != null) { + return GuidGlyphShape._(parsed); + } + return GuidGlyphShape._(_hashTo16Bytes(guid)); + } + + /// Vertex count in [5, 10], from the first UUID byte. + int get vertexCount => 5 + (bytes[0] % 6); + + /// Unit-circle offsets (rough polygon); multiply by size when painting. + List unitVertices() { + final int n = vertexCount; + final List points = []; + double angle = (bytes[1] / 255) * math.pi * 2; + + for (int i = 0; i < n; i++) { + final int rIndex = (i * 2) % 16; + final int aIndex = (i * 2 + 1) % 16; + final double radius = 0.42 + (bytes[rIndex] / 255) * 0.52; + angle += (math.pi * 2 / n) + ((bytes[aIndex] / 255) - 0.5) * 0.95; + points.add( + Offset(math.cos(angle) * radius, math.sin(angle) * radius), + ); + } + return points; + } + + /// Fill color: warm accent band so glyphs stay on-brand but distinct. + Color fillColor() { + final double hue = 8 + ((bytes[2] << 8 | bytes[3]) / 65535) * 42; + return HSLColor.fromAHSL(1, hue, 0.72, 0.52).toColor(); + } + + Color strokeColor() => HSLColor.fromColor(fillColor()).withLightness(0.38).toColor(); + + /// Normalized slider position in [-1, 1] for [displayValue] in [-10, 10]. + static double displayT(num displayValue) => + (displayValue / 10).clamp(-1.0, 1.0).toDouble(); + + /// Display-only vertex warp; same [guid] + [displayValue] always matches. + List displayUnitVertices(num displayValue) { + final double t = displayT(displayValue); + final List base = unitVertices(); + final int n = base.length; + + final double spinSign = (bytes[9] & 1) == 0 ? 1.0 : -1.0; + final double spin = t * + math.pi * + (0.28 + (bytes[4] / 255) * 0.22) * + spinSign; + final double cosR = math.cos(spin); + final double sinR = math.sin(spin); + + final double stretchY = 1 + t * (0.38 + (bytes[5] / 255) * 0.2); + final double stretchX = 1 - t * 0.14 * (bytes[6] / 255); + + final List out = []; + for (int i = 0; i < n; i++) { + final Offset p = base[i]; + final double bulge = + 1 + t * ((bytes[(i * 3 + 7) % 16] / 255) - 0.5) * 0.55; + final double x = p.dx * stretchX * bulge; + final double y = p.dy * stretchY * bulge; + out.add(Offset(x * cosR - y * sinR, x * sinR + y * cosR)); + } + return out; + } + + /// Display-only fill; base identity color shifts with slider value. + Color displayFillColor(num displayValue) { + final double t = displayT(displayValue); + final HSLColor hsl = HSLColor.fromColor(fillColor()); + final double hueShift = t * (12 + (bytes[7] % 20)); + final double lightness = (hsl.lightness + t * 0.12).clamp(0.35, 0.68); + final double saturation = + (hsl.saturation + t.abs() * 0.1).clamp(0.5, 0.85); + return hsl + .withHue((hsl.hue + hueShift) % 360) + .withLightness(lightness) + .withSaturation(saturation) + .toColor(); + } + + Color displayStrokeColor(num displayValue) => + HSLColor.fromColor(displayFillColor(displayValue)) + .withLightness(0.38) + .toColor(); + + double displayStrokeWidth(num displayValue) { + final double t = displayT(displayValue); + return (2.5 + t.abs() * 1.2 + (bytes[8] / 255) * 0.4).clamp(2.0, 4.0); + } + + static Uint8List? tryParseUuidBytes(String guid) { + final String hex = guid.replaceAll('-', '').trim().toLowerCase(); + if (hex.length != 32 || !RegExp(r'^[0-9a-f]{32}$').hasMatch(hex)) { + return null; + } + final Uint8List out = Uint8List(16); + for (int i = 0; i < 16; i++) { + out[i] = int.parse(hex.substring(i * 2, i * 2 + 2), radix: 16); + } + return out; + } + + static Uint8List _hashTo16Bytes(String input) { + var h1 = 0x811c9dc5; + var h2 = 0x01000193; + for (final int codeUnit in input.codeUnits) { + h1 = (h1 ^ codeUnit) * 0x01000193; + h2 = (h2 + codeUnit) * 0x85ebca6b; + } + final Uint8List out = Uint8List(16); + for (int i = 0; i < 16; i++) { + final int mix = (h1 >> (i % 16)) ^ (h2 >> ((i + 3) % 16)) ^ (i * 31); + out[i] = mix & 0xff; + } + return out; + } +} + +/// Painted glyph for a question id (replaces the fixed red dot). +class QuestionGuidGlyph extends StatelessWidget { + const QuestionGuidGlyph({ + super.key, + required this.guid, + this.size = 80, + this.displayValue = 0, + }); + + final String guid; + final double size; + + /// Slider value in [-10, 10]; warps the painted glyph for display only. + final num displayValue; + + @override + Widget build(BuildContext context) { + final GuidGlyphShape shape = GuidGlyphShape.fromGuid(guid); + final Color glow = shape.displayFillColor(displayValue); + + return SizedBox( + width: size, + height: size, + child: DecoratedBox( + decoration: BoxDecoration( + shape: BoxShape.circle, + boxShadow: [ + BoxShadow( + color: glow.withValues( + alpha: 0.32 + GuidGlyphShape.displayT(displayValue).abs() * 0.18, + ), + blurRadius: 16 + GuidGlyphShape.displayT(displayValue).abs() * 6, + spreadRadius: 2, + ), + ], + ), + child: CustomPaint( + painter: _GuidGlyphPainter( + shape: shape, + displayValue: displayValue, + ), + ), + ), + ); + } +} + +class _GuidGlyphPainter extends CustomPainter { + _GuidGlyphPainter({ + required this.shape, + required this.displayValue, + }); + + final GuidGlyphShape shape; + final num displayValue; + + @override + void paint(Canvas canvas, Size size) { + final Offset center = Offset(size.width / 2, size.height / 2); + final double scale = size.shortestSide * 0.42; + final List unit = shape.displayUnitVertices(displayValue); + + final Path path = Path(); + for (int i = 0; i < unit.length; i++) { + final Offset p = center + Offset(unit[i].dx * scale, unit[i].dy * scale); + if (i == 0) { + path.moveTo(p.dx, p.dy); + } else { + path.lineTo(p.dx, p.dy); + } + } + path.close(); + + final Color fill = shape.displayFillColor(displayValue); + final Color stroke = shape.displayStrokeColor(displayValue); + final double strokeWidth = shape.displayStrokeWidth(displayValue); + + canvas.drawPath( + path, + Paint() + ..color = fill + ..style = PaintingStyle.fill, + ); + canvas.drawPath( + path, + Paint() + ..color = stroke + ..style = PaintingStyle.stroke + ..strokeWidth = strokeWidth + ..strokeJoin = StrokeJoin.round, + ); + } + + @override + bool shouldRepaint(covariant _GuidGlyphPainter oldDelegate) => + oldDelegate.shape.bytes != shape.bytes || + oldDelegate.displayValue != displayValue; +} diff --git a/lib/main.dart b/lib/main.dart index acb6d14..5fdd7c5 100644 --- a/lib/main.dart +++ b/lib/main.dart @@ -21,6 +21,7 @@ class CyberHybridHubApp extends StatelessWidget { Widget build(BuildContext context) { return MaterialApp( title: 'Cyber Hybrid Hub', + debugShowCheckedModeBanner: false, theme: buildAppTheme(), home: const AuthGate(), ); diff --git a/lib/models/incoming_question.dart b/lib/models/incoming_question.dart new file mode 100644 index 0000000..b1acfcc --- /dev/null +++ b/lib/models/incoming_question.dart @@ -0,0 +1,25 @@ +/// Question pushed from the API over the SignalR questions hub. +class IncomingQuestion { + const IncomingQuestion({ + required this.id, + required this.text, + required this.sentAt, + this.unansweredCount = 1, + }); + + final String id; + final String text; + final DateTime sentAt; + + /// Unanswered questions in the database for this user (from API). + final int unansweredCount; + + factory IncomingQuestion.fromJson(Map json) { + return IncomingQuestion( + id: json['id'] as String, + text: json['text'] as String, + sentAt: DateTime.parse(json['sentAt'] as String).toUtc(), + unansweredCount: (json['unansweredCount'] as num?)?.toInt() ?? 1, + ); + } +} diff --git a/lib/screens/home_screen.dart b/lib/screens/home_screen.dart index 94c1a80..b934d1e 100644 --- a/lib/screens/home_screen.dart +++ b/lib/screens/home_screen.dart @@ -1,11 +1,14 @@ import 'package:flutter/material.dart'; import '../models/app_user.dart'; +import '../models/incoming_question.dart'; import '../models/sync_result.dart'; import '../models/user_profile.dart'; import '../repositories/user_profile_repository.dart'; import '../services/auth_service.dart'; +import '../services/questions_hub_service.dart'; import '../theme/app_theme.dart'; +import '../widgets/swipe_question_tile.dart'; class HomeScreen extends StatelessWidget { const HomeScreen({ @@ -25,16 +28,45 @@ class HomeScreen extends StatelessWidget { final String displayName = profile?.displayName ?? user.displayName ?? 'there'; final String? email = profile?.email ?? user.email; + final Color syncIconColor = switch (syncStatus) { + ProfileSyncStatus.syncing => Colors.white, + ProfileSyncStatus.synced => AppColors.success, + ProfileSyncStatus.error => Colors.redAccent, + ProfileSyncStatus.offline => Colors.orange, + ProfileSyncStatus.idle => AppColors.textSecondary, + }; return Scaffold( appBar: AppBar( backgroundColor: Colors.transparent, - title: const Text('Cyber Hybrid Hub'), + centerTitle: true, + title: ListenableBuilder( + listenable: Listenable.merge([ + QuestionsHubService.instance.hasPendingQuestion, + QuestionsHubService.instance.pendingQuestion, + QuestionsHubService.instance.pendingQuestionCount, + ]), + builder: (BuildContext context, Widget? child) { + final int count = + QuestionsHubService.instance.pendingQuestionCount.value; + final bool hasPending = + QuestionsHubService.instance.hasPendingQuestion.value; + if (!hasPending || count < 1) { + return const SizedBox.shrink(); + } + final int displayCount = count; + return _QuestionEnvelopeButton( + count: displayCount, + onPressed: () => + QuestionsHubService.instance.openQuestionPanel(), + ); + }, + ), actions: [ IconButton( onPressed: () => UserProfileRepository.instance.sync(), tooltip: 'Sync profile', - icon: const Icon(Icons.sync), + icon: Icon(Icons.sync, color: syncIconColor), ), IconButton( onPressed: () => AuthService.instance.signOut(), @@ -52,102 +84,166 @@ class HomeScreen extends StatelessWidget { ), ), child: SafeArea( - child: Padding( - padding: const EdgeInsets.all(24), - child: Column( - crossAxisAlignment: CrossAxisAlignment.stretch, - children: [ - Container( - padding: const EdgeInsets.all(24), - decoration: BoxDecoration( - color: AppColors.surfaceElevated, - borderRadius: BorderRadius.circular(16), - border: Border.all( - color: AppColors.accent.withValues(alpha: 0.2), - ), - ), - child: Column( - children: [ - if (photoUrl != null) - CircleAvatar( - radius: 36, - backgroundImage: NetworkImage(photoUrl), - ) - else - const CircleAvatar( - radius: 36, - child: Icon(Icons.person, size: 36), - ), - const SizedBox(height: 16), - Text( - 'Welcome, $displayName', - style: Theme.of(context).textTheme.headlineMedium, - textAlign: TextAlign.center, - ), - if (email != null) ...[ - const SizedBox(height: 8), - Text( - email, - style: Theme.of(context).textTheme.bodyLarge, - textAlign: TextAlign.center, - ), - ], - const SizedBox(height: 12), - _SyncStatusChip(status: syncStatus), - const SizedBox(height: 20), - const Row( - mainAxisAlignment: MainAxisAlignment.center, - children: [ - Icon( - Icons.check_circle, - color: AppColors.success, - size: 20, - ), - SizedBox(width: 8), - Text( - 'You\'re signed in', - style: TextStyle( - color: AppColors.success, - fontWeight: FontWeight.w600, + child: ListenableBuilder( + listenable: Listenable.merge([ + QuestionsHubService.instance.questionPanelOpen, + QuestionsHubService.instance.hasPendingQuestion, + QuestionsHubService.instance.questionQueue, + QuestionsHubService.instance.pendingQuestion, + QuestionsHubService.instance.questionActionBusy, + QuestionsHubService.instance.pendingQuestionCount, + ]), + builder: (BuildContext context, Widget? child) { + final QuestionsHubService hub = QuestionsHubService.instance; + final bool panelOpen = hub.questionPanelOpen.value; + final bool hasPending = hub.hasPendingQuestion.value; + final IncomingQuestion? question = hub.currentQuestion; + final bool showQuestionPanel = + panelOpen && hasPending && question != null; + + return Column( + crossAxisAlignment: CrossAxisAlignment.stretch, + children: [ + if (showQuestionPanel) + Expanded( + child: Padding( + padding: const EdgeInsets.fromLTRB(8, 4, 8, 8), + child: Column( + crossAxisAlignment: CrossAxisAlignment.stretch, + children: [ + Row( + children: [ + const Spacer(), + IconButton( + onPressed: hub.closeQuestionPanel, + tooltip: 'Close', + icon: const Icon(Icons.close), + ), + ], ), - ), - ], - ), - ], - ), - ), - const SizedBox(height: 24), - Text( - profile?.dirty == true - ? 'Profile saved locally. Will sync when online.' - : 'Profile synced with your account.', - style: Theme.of(context).textTheme.bodyLarge, - textAlign: TextAlign.center, - ), - if (UserProfileRepository.instance.usesLocalStore) ...[ - const SizedBox(height: 16), - OutlinedButton.icon( - onPressed: profile == null - ? null - : () async { - final UserProfile current = profile!; - await UserProfileRepository.instance.updateProfile( - current.copyWith( - onboardingCompleted: - !current.onboardingCompleted, + const SizedBox(height: 4), + Expanded( + child: SwipeQuestionTile( + key: ValueKey(question.id), + questionId: question.id, + busy: hub.questionActionBusy.value, + onSwipeRight: (num answer) => + hub.submitCurrentAnswer(answer: answer), + onSwipeLeft: hub.deferCurrentQuestion, ), - ); - }, - icon: const Icon(Icons.toggle_on_outlined), - label: Text( - profile?.onboardingCompleted == true - ? 'Mark onboarding incomplete' - : 'Mark onboarding complete', + ), + ], + ), + ), + ), + if (!showQuestionPanel) + Expanded( + child: Padding( + padding: const EdgeInsets.all(24), + child: Column( + crossAxisAlignment: CrossAxisAlignment.stretch, + children: [ + Container( + padding: const EdgeInsets.all(24), + decoration: BoxDecoration( + color: AppColors.surfaceElevated, + borderRadius: BorderRadius.circular(16), + border: Border.all( + color: AppColors.accent.withValues(alpha: 0.2), + ), + ), + child: Column( + children: [ + if (photoUrl != null) + CircleAvatar( + radius: 36, + backgroundImage: NetworkImage(photoUrl), + ) + else + const CircleAvatar( + radius: 36, + child: Icon(Icons.person, size: 36), + ), + const SizedBox(height: 16), + Text( + 'Welcome, $displayName', + style: Theme.of(context) + .textTheme + .headlineMedium, + textAlign: TextAlign.center, + ), + if (email != null) ...[ + const SizedBox(height: 8), + Text( + email, + style: + Theme.of(context).textTheme.bodyLarge, + textAlign: TextAlign.center, + ), + ], + const SizedBox(height: 12), + _SyncStatusChip(status: syncStatus), + const SizedBox(height: 20), + const Row( + mainAxisAlignment: MainAxisAlignment.center, + children: [ + Icon( + Icons.check_circle, + color: AppColors.success, + size: 20, + ), + SizedBox(width: 8), + Text( + 'You\'re signed in', + style: TextStyle( + color: AppColors.success, + fontWeight: FontWeight.w600, + ), + ), + ], + ), + ], + ), + ), + const SizedBox(height: 24), + Text( + profile?.dirty == true + ? 'Profile saved locally. Will sync when online.' + : 'Profile synced with your account.', + style: Theme.of(context).textTheme.bodyLarge, + textAlign: TextAlign.center, + ), + if (UserProfileRepository.instance.usesLocalStore) + ...[ + const SizedBox(height: 16), + OutlinedButton.icon( + onPressed: profile == null + ? null + : () async { + final UserProfile current = profile!; + await UserProfileRepository.instance + .updateProfile( + current.copyWith( + onboardingCompleted: !current + .onboardingCompleted, + ), + ); + }, + icon: const Icon(Icons.toggle_on_outlined), + label: Text( + profile?.onboardingCompleted == true + ? 'Mark onboarding incomplete' + : 'Mark onboarding complete', + ), + ), + ], + ], + ), + ), ), - ), ], - ], - ), + ); + }, ), ), ), @@ -155,6 +251,62 @@ class HomeScreen extends StatelessWidget { } } +class _QuestionEnvelopeButton extends StatelessWidget { + const _QuestionEnvelopeButton({ + required this.count, + required this.onPressed, + }); + + final int count; + final VoidCallback onPressed; + + @override + Widget build(BuildContext context) { + return Stack( + clipBehavior: Clip.none, + alignment: Alignment.center, + children: [ + IconButton( + onPressed: onPressed, + tooltip: count > 1 ? '$count questions' : 'New question', + icon: const Icon(Icons.mail_outline, size: 22), + style: IconButton.styleFrom( + backgroundColor: AppColors.accent.withValues(alpha: 0.15), + foregroundColor: AppColors.accent, + padding: const EdgeInsets.all(8), + minimumSize: const Size(36, 36), + tapTargetSize: MaterialTapTargetSize.shrinkWrap, + ), + ), + if (count >= 2) + Positioned( + top: 4, + right: 0, + child: Container( + padding: const EdgeInsets.symmetric(horizontal: 5, vertical: 2), + decoration: BoxDecoration( + color: AppColors.accent, + borderRadius: BorderRadius.circular(10), + border: Border.all(color: AppColors.surface, width: 1.5), + ), + constraints: const BoxConstraints(minWidth: 18, minHeight: 18), + child: Text( + '$count', + textAlign: TextAlign.center, + style: const TextStyle( + color: AppColors.background, + fontSize: 11, + fontWeight: FontWeight.w700, + height: 1.1, + ), + ), + ), + ), + ], + ); + } +} + class _SyncStatusChip extends StatelessWidget { const _SyncStatusChip({required this.status}); diff --git a/lib/services/questions_api_service.dart b/lib/services/questions_api_service.dart new file mode 100644 index 0000000..535cb56 --- /dev/null +++ b/lib/services/questions_api_service.dart @@ -0,0 +1,127 @@ +import 'dart:convert'; + +import 'package:flutter/foundation.dart'; +import 'package:http/http.dart' as http; + +import '../config/api_config.dart'; +import '../models/incoming_question.dart'; +import 'auth_service.dart'; + +/// HTTP client for question queue, answers, and deferrals. +class QuestionsApiService { + QuestionsApiService({http.Client? client}) : _client = client ?? http.Client(); + + final http.Client _client; + + /// Ensures a starter question exists for the signed-in user (login only). + Future bootstrapOnLogin() async { + final String? token = await AuthService.instance.getIdToken(); + if (token == null) { + return null; + } + + final http.Response response = await _client.post( + Uri.parse('$apiBaseUrl/v1/me/questions/bootstrap'), + headers: _authHeaders(token), + ); + if (response.statusCode != 200) { + debugPrint( + 'bootstrapOnLogin failed: ${response.statusCode} ${response.body}', + ); + return null; + } + + final Map body = + jsonDecode(response.body) as Map; + final Map? questionJson = + body['question'] as Map?; + if (questionJson == null) { + return null; + } + return IncomingQuestion.fromJson(questionJson); + } + + Future> fetchUnanswered() async { + final String? token = await AuthService.instance.getIdToken(); + if (token == null) { + return []; + } + + final http.Response response = await _client.get( + Uri.parse('$apiBaseUrl/v1/me/questions'), + headers: _authHeaders(token), + ); + if (response.statusCode != 200) { + debugPrint( + 'fetchUnanswered failed: ${response.statusCode} ${response.body}', + ); + return []; + } + + final Map body = + jsonDecode(response.body) as Map; + final List raw = body['questions'] as List? ?? []; + return raw + .map( + (dynamic item) => + IncomingQuestion.fromJson(item as Map), + ) + .toList(); + } + + Future submitAnswer({ + required String questionId, + num answer = 0, + }) async { + final String? token = await AuthService.instance.getIdToken(); + if (token == null) { + return null; + } + + final http.Response response = await _client.post( + Uri.parse('$apiBaseUrl/v1/me/questions/$questionId/answer'), + headers: _authHeaders(token), + body: jsonEncode({'answer': answer}), + ); + if (response.statusCode != 200) { + debugPrint( + 'submitAnswer failed: ${response.statusCode} ${response.body}', + ); + return null; + } + + final Map body = + jsonDecode(response.body) as Map; + return (body['unansweredCount'] as num?)?.toInt(); + } + + Future deferQuestion({required String questionId}) async { + final String? token = await AuthService.instance.getIdToken(); + if (token == null) { + return null; + } + + final http.Response response = await _client.post( + Uri.parse('$apiBaseUrl/v1/me/questions/$questionId/defer'), + headers: _authHeaders(token), + ); + if (response.statusCode != 200) { + debugPrint( + 'deferQuestion failed: ${response.statusCode} ${response.body}', + ); + return null; + } + + final Map body = + jsonDecode(response.body) as Map; + return (body['unansweredCount'] as num?)?.toInt(); + } + + Map _authHeaders(String token) { + return { + 'Authorization': 'Bearer $token', + 'Content-Type': 'application/json', + 'Accept': 'application/json', + }; + } +} diff --git a/lib/services/questions_hub_service.dart b/lib/services/questions_hub_service.dart new file mode 100644 index 0000000..7d6b9c3 --- /dev/null +++ b/lib/services/questions_hub_service.dart @@ -0,0 +1,256 @@ +import 'dart:async'; + +import 'package:flutter/foundation.dart'; +import 'package:signalr_netcore/signalr_client.dart'; + +import '../config/api_config.dart'; +import '../models/incoming_question.dart'; +import 'auth_service.dart'; +import 'questions_api_service.dart'; + +/// Maintains a SignalR connection to receive incoming questions from the API. +class QuestionsHubService { + QuestionsHubService._(); + + static final QuestionsHubService instance = QuestionsHubService._(); + + final QuestionsApiService _api = QuestionsApiService(); + + final ValueNotifier hasPendingQuestion = ValueNotifier(false); + final ValueNotifier pendingQuestion = + ValueNotifier(null); + final ValueNotifier pendingQuestionCount = ValueNotifier(0); + final ValueNotifier questionPanelOpen = ValueNotifier(false); + final ValueNotifier> questionQueue = + ValueNotifier>([]); + final ValueNotifier questionActionBusy = ValueNotifier(false); + + HubConnection? _connection; + bool _connecting = false; + + IncomingQuestion? get currentQuestion { + final List queue = questionQueue.value; + if (queue.isNotEmpty) { + return queue.first; + } + return pendingQuestion.value; + } + + /// Login hook: create starter question if needed, then open SignalR. + Future onLogin() async { + final IncomingQuestion? bootstrapped = await _api.bootstrapOnLogin(); + if (bootstrapped != null) { + _applyIncoming(bootstrapped); + } + await connect(); + } + + Future connect() async { + if (_connecting) { + return; + } + if (_connection?.state == HubConnectionState.Connected) { + return; + } + + final String? token = await AuthService.instance.getIdToken(); + if (token == null) { + return; + } + + _connecting = true; + try { + await disconnect(); + + final HubConnection connection = HubConnectionBuilder() + .withUrl( + questionsHubUrl, + options: HttpConnectionOptions( + accessTokenFactory: () async => + await AuthService.instance.getIdToken() ?? '', + requestTimeout: 30000, + ), + ) + .withAutomaticReconnect( + retryDelays: [0, 2000, 5000, 10000], + ) + .build(); + + connection.on('ReceiveQuestion', _onReceiveQuestion); + _connection = connection; + await connection.start(); + } catch (e, st) { + debugPrint('Questions hub connect failed: $e\n$st'); + await disconnect(); + } finally { + _connecting = false; + } + } + + void _onReceiveQuestion(List? arguments) { + if (arguments == null || arguments.isEmpty) { + return; + } + final Object? raw = arguments.first; + final Map json; + if (raw is Map) { + json = raw; + } else if (raw is Map) { + json = Map.from(raw); + } else { + debugPrint('ReceiveQuestion: unexpected payload type ${raw.runtimeType}'); + return; + } + final IncomingQuestion question = IncomingQuestion.fromJson(json); + _applyIncoming(question); + } + + void _applyIncoming(IncomingQuestion question) { + final int count = + question.unansweredCount < 1 ? 1 : question.unansweredCount; + pendingQuestion.value = question; + pendingQuestionCount.value = count; + hasPendingQuestion.value = count >= 1; + + if (questionPanelOpen.value) { + final List queue = List.from( + questionQueue.value, + ); + if (!queue.any((IncomingQuestion q) => q.id == question.id)) { + queue.add(question); + questionQueue.value = queue; + } + } + } + + /// Opens the inline question panel and loads the full pending queue. + Future openQuestionPanel() async { + questionActionBusy.value = true; + try { + final List fetched = await _api.fetchUnanswered(); + if (fetched.isNotEmpty) { + questionQueue.value = fetched; + pendingQuestion.value = fetched.first; + pendingQuestionCount.value = fetched.length; + hasPendingQuestion.value = true; + } else if (pendingQuestion.value != null) { + questionQueue.value = [pendingQuestion.value!]; + } else { + questionQueue.value = []; + } + questionPanelOpen.value = hasPendingQuestion.value; + } finally { + questionActionBusy.value = false; + } + } + + void closeQuestionPanel() { + questionPanelOpen.value = false; + } + + /// Swipe left: move current question to end of queue without answering. + Future deferCurrentQuestion() async { + final IncomingQuestion? question = currentQuestion; + if (question == null || questionActionBusy.value) { + return; + } + + questionActionBusy.value = true; + try { + final List queue = List.from( + questionQueue.value, + ); + if (queue.isEmpty) { + return; + } + final IncomingQuestion current = queue.removeAt(0); + queue.add(current); + + final int? serverCount = await _api.deferQuestion(questionId: current.id); + if (serverCount != null) { + final List refreshed = await _api.fetchUnanswered(); + questionQueue.value = refreshed.isNotEmpty ? refreshed : queue; + _syncPendingFromQueue(serverCount); + } else { + questionQueue.value = queue; + _syncPendingFromQueue(queue.length); + } + } finally { + questionActionBusy.value = false; + } + } + + /// Swipe right: submit default answer (0). + Future submitCurrentAnswer({num answer = 0}) async { + final IncomingQuestion? question = currentQuestion; + if (question == null || questionActionBusy.value) { + return; + } + + questionActionBusy.value = true; + try { + final int? serverCount = await _api.submitAnswer( + questionId: question.id, + answer: answer, + ); + if (serverCount == null) { + return; + } + + if (serverCount == 0) { + _clearPendingUi(); + return; + } + + final List refreshed = await _api.fetchUnanswered(); + if (refreshed.isEmpty) { + _clearPendingUi(); + return; + } + + questionQueue.value = refreshed; + pendingQuestion.value = refreshed.first; + pendingQuestionCount.value = serverCount; + hasPendingQuestion.value = true; + } finally { + questionActionBusy.value = false; + } + } + + void _syncPendingFromQueue(int count) { + final List queue = questionQueue.value; + if (queue.isEmpty || count == 0) { + _clearPendingUi(); + return; + } + pendingQuestion.value = queue.first; + pendingQuestionCount.value = count; + hasPendingQuestion.value = true; + } + + /// Clears pending question UI; keeps the SignalR connection alive. + void _clearPendingUi() { + hasPendingQuestion.value = false; + pendingQuestion.value = null; + pendingQuestionCount.value = 0; + questionQueue.value = []; + questionPanelOpen.value = false; + } + + void dismissPending() { + _clearPendingUi(); + } + + Future disconnect() async { + final HubConnection? connection = _connection; + _connection = null; + if (connection != null) { + try { + await connection.stop(); + } catch (_) { + // Ignore shutdown errors. + } + } + _clearPendingUi(); + } +} diff --git a/lib/widgets/profile_session.dart b/lib/widgets/profile_session.dart index b5012ed..9c08195 100644 --- a/lib/widgets/profile_session.dart +++ b/lib/widgets/profile_session.dart @@ -6,6 +6,7 @@ import '../models/app_user.dart'; import '../models/sync_result.dart'; import '../models/user_profile.dart'; import '../repositories/user_profile_repository.dart'; +import '../services/questions_hub_service.dart'; import '../theme/app_theme.dart'; /// Starts a profile sync session for [user] and exposes profile + sync state. @@ -35,7 +36,8 @@ class _ProfileSessionState extends State { @override void initState() { super.initState(); - _sessionReady = UserProfileRepository.instance.startSession(widget.user); + _sessionReady = UserProfileRepository.instance.startSession(widget.user) + ..then((_) => QuestionsHubService.instance.onLogin()); _syncStatusSubscription = UserProfileRepository.instance.syncStatusStream.listen(( ProfileSyncStatus status, @@ -49,6 +51,7 @@ class _ProfileSessionState extends State { @override void dispose() { _syncStatusSubscription?.cancel(); + unawaited(QuestionsHubService.instance.disconnect()); UserProfileRepository.instance.endSession(); super.dispose(); } diff --git a/lib/widgets/swipe_question_tile.dart b/lib/widgets/swipe_question_tile.dart new file mode 100644 index 0000000..dd4668a --- /dev/null +++ b/lib/widgets/swipe_question_tile.dart @@ -0,0 +1,198 @@ +import 'dart:async'; + +import 'package:flutter/material.dart'; + +import '../guid/guid_glyph_shape.dart'; +import '../theme/app_theme.dart'; + +/// Swipeable question card: right submits, left defers (does not resolve). +/// +/// The center band supports vertical drag as a slider from -10 (down) to 10 (up). +class SwipeQuestionTile extends StatefulWidget { + const SwipeQuestionTile({ + super.key, + required this.questionId, + required this.onSwipeRight, + required this.onSwipeLeft, + this.busy = false, + }); + + final String questionId; + final Future Function(num answer) onSwipeRight; + final Future Function() onSwipeLeft; + final bool busy; + + @override + State createState() => _SwipeQuestionTileState(); +} + +class _SwipeQuestionTileState extends State { + double _dragOffset = 0; + double _verticalOffset = 0; + bool _acting = false; + + static const double _swipeThreshold = 96; + static const double _maxVerticalDrag = 120; + static const double _sliderMin = -10; + static const double _sliderMax = 10; + + /// Swipe up → +10, swipe down → -10 (linear between). + double get _sliderValue => + (_verticalOffset / _maxVerticalDrag * _sliderMax).clamp(_sliderMin, _sliderMax); + + Future _releaseDrag() async { + if (_acting || widget.busy) { + setState(() => _dragOffset = 0); + return; + } + + if (_dragOffset > _swipeThreshold) { + setState(() { + _acting = true; + _dragOffset = MediaQuery.sizeOf(context).width; + }); + await widget.onSwipeRight(_sliderValue); + } else if (_dragOffset < -_swipeThreshold) { + setState(() { + _acting = true; + _dragOffset = -MediaQuery.sizeOf(context).width; + }); + await widget.onSwipeLeft(); + } else if (mounted) { + setState(() => _dragOffset = 0); + } + if (mounted) { + setState(() { + _acting = false; + _dragOffset = 0; + }); + } + } + + @override + Widget build(BuildContext context) { + final double width = MediaQuery.sizeOf(context).width; + final double progress = (_dragOffset / _swipeThreshold).clamp(-1.0, 1.0); + + return LayoutBuilder( + builder: (BuildContext context, BoxConstraints constraints) { + return Stack( + alignment: Alignment.center, + children: [ + Positioned.fill( + child: DecoratedBox( + decoration: BoxDecoration( + borderRadius: BorderRadius.circular(16), + gradient: LinearGradient( + begin: Alignment.centerLeft, + end: Alignment.centerRight, + colors: [ + Colors.redAccent.withValues( + alpha: 0.15 + 0.35 * (-progress).clamp(0.0, 1.0), + ), + AppColors.surfaceElevated, + AppColors.success.withValues( + alpha: 0.15 + 0.35 * progress.clamp(0.0, 1.0), + ), + ], + ), + ), + ), + ), + Transform.translate( + offset: Offset(_dragOffset, 0), + child: GestureDetector( + onHorizontalDragUpdate: widget.busy || _acting + ? null + : (DragUpdateDetails details) { + setState(() { + _dragOffset += details.delta.dx; + _dragOffset = _dragOffset.clamp(-width * 0.55, width * 0.55); + }); + }, + onHorizontalDragEnd: widget.busy || _acting + ? null + : (_) => unawaited(_releaseDrag()), + child: Material( + color: AppColors.surfaceElevated, + elevation: 4, + shadowColor: Colors.black45, + borderRadius: BorderRadius.circular(16), + child: Container( + width: constraints.maxWidth, + constraints: const BoxConstraints(minHeight: 220), + padding: const EdgeInsets.symmetric( + horizontal: 16, + vertical: 24, + ), + decoration: BoxDecoration( + borderRadius: BorderRadius.circular(16), + ), + child: Stack( + alignment: Alignment.center, + children: [ + Positioned( + top: 20, + bottom: 20, + left: constraints.maxWidth * 0.22, + right: constraints.maxWidth * 0.22, + child: DecoratedBox( + decoration: BoxDecoration( + borderRadius: BorderRadius.circular(14), + color: Color.lerp( + AppColors.surfaceElevated, + AppColors.surface, + 0.45, + ), + ), + ), + ), + Positioned( + top: 24, + bottom: 24, + left: constraints.maxWidth * 0.22, + right: constraints.maxWidth * 0.22, + child: GestureDetector( + behavior: HitTestBehavior.opaque, + onVerticalDragUpdate: widget.busy || _acting + ? null + : (DragUpdateDetails details) { + setState(() { + _verticalOffset -= details.delta.dy; + _verticalOffset = _verticalOffset.clamp( + -_maxVerticalDrag, + _maxVerticalDrag, + ); + }); + }, + child: Center( + child: Transform.translate( + offset: Offset(0, -_verticalOffset), + child: QuestionGuidGlyph( + guid: widget.questionId, + displayValue: _sliderValue, + ), + ), + ), + ), + ), + ], + ), + ), + ), + ), + ), + if (widget.busy) + const Positioned.fill( + child: ColoredBox( + color: Color(0x66000000), + child: Center(child: CircularProgressIndicator()), + ), + ), + ], + ); + }, + ); + } +} + diff --git a/pubspec.lock b/pubspec.lock index d67b05b..3eb23d2 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -585,7 +585,7 @@ packages: source: hosted version: "6.1.0" logging: - dependency: transitive + dependency: "direct main" description: name: logging sha256: c8245ada5f1717ed44271ed1c26b8ce85ca3228fd2ffdb75468ab01979309d61 @@ -608,6 +608,14 @@ packages: url: "https://pub.dev" source: hosted version: "0.13.0" + message_pack_dart: + dependency: transitive + description: + name: message_pack_dart + sha256: "71b9f0ff60e5896e60b337960bb535380d7dba3297b457ac763ccae807385b59" + url: "https://pub.dev" + source: hosted + version: "2.0.1" meta: dependency: transitive description: @@ -848,6 +856,14 @@ packages: url: "https://pub.dev" source: hosted version: "3.0.0" + signalr_netcore: + dependency: "direct main" + description: + name: signalr_netcore + sha256: "8d59dc61284c5ff8aa27c4e3e802fcf782367f06cf42b39d5ded81680b72f8b8" + url: "https://pub.dev" + source: hosted + version: "1.4.4" sky_engine: dependency: transitive description: flutter @@ -893,6 +909,22 @@ packages: url: "https://pub.dev" source: hosted version: "0.43.1" + sse: + dependency: transitive + description: + name: sse + sha256: a9a804dbde8bfd369da3b4aa241d44d63a6486a97388c54ec166073d88c52302 + url: "https://pub.dev" + source: hosted + version: "4.2.0" + sse_channel: + dependency: transitive + description: + name: sse_channel + sha256: "9aad5d4eef63faf6ecdefb636c0f857bd6f74146d2196087dcf4b17ab5b49b1b" + url: "https://pub.dev" + source: hosted + version: "0.1.1" stack_trace: dependency: transitive description: @@ -941,6 +973,14 @@ packages: url: "https://pub.dev" source: hosted version: "0.7.11" + tuple: + dependency: transitive + description: + name: tuple + sha256: a97ce2013f240b2f3807bcbaf218765b6f301c3eff91092bcfa23a039e7dd151 + url: "https://pub.dev" + source: hosted + version: "2.0.2" typed_data: dependency: transitive description: @@ -1013,6 +1053,14 @@ packages: url: "https://pub.dev" source: hosted version: "3.1.5" + uuid: + dependency: transitive + description: + name: uuid + sha256: "1fef9e8e11e2991bb773070d4656b7bd5d850967a2456cfc83cf47925ba79489" + url: "https://pub.dev" + source: hosted + version: "4.5.3" vector_math: dependency: transitive description: diff --git a/pubspec.yaml b/pubspec.yaml index 651eaf3..04c8554 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -45,6 +45,8 @@ dependencies: path: ^1.9.1 path_provider: ^2.1.5 connectivity_plus: ^6.1.4 + signalr_netcore: ^1.4.4 + logging: ^1.3.0 dev_dependencies: flutter_test: diff --git a/server/README.md b/server/README.md index 06489ba..27ff75a 100644 --- a/server/README.md +++ b/server/README.md @@ -31,6 +31,80 @@ The API listens on `http://localhost:3000` by default (`PORT` in `.env`). |--------|------|------| | `GET` | `/v1/me/profile` | `Authorization: Bearer ` | | `PUT` | `/v1/me/profile` | same | +| `POST` | `/v1/me/incoming-question` | same — pushes a question to the client via SignalR | +| `POST` | `/v1/me/questions/bootstrap` | ensure starter question at login | +| `GET` | `/v1/me/questions` | list unanswered questions (queue order) | +| `POST` | `/v1/me/questions/{id}/answer` | submit answer (`{"answer": 0}` default) | +| `POST` | `/v1/me/questions/{id}/defer` | move question to end of queue | + +## SignalR — incoming questions + +Hub URL: `http://localhost:3000/hubs/questions` + +The Flutter app calls `POST /v1/me/questions/bootstrap` once at login to ensure a +starter question exists (random correct answer from -10 to 10) when the user has none. +After sign-in it connects to SignalR and listens for `ReceiveQuestion`. On each new +WebSocket connection the API only delivers an existing unanswered question — it does +not create new rows. + +Client payload (correct answer is not sent): + +```json +{ + "id": "uuid", + "assignedUserId": "firebase-uid", + "text": "...", + "sentAt": "...", + "unansweredCount": 2 +} +``` + +`unansweredCount` is the number of unanswered rows for that user (shown in the app when greater than 1). + +`questions` table: `id` (UUID), `assigned_user_id`, `question_text`, `user_response` +(nullable), `correct_answer`, `created_at`, `modified_at`. + +## Background question pipeline + +A background worker runs inside the API process (enabled by default). On each +interval it walks registered users, fetches data from public web APIs, and +enqueues pipeline questions when the user's queue is not full. + +| Env var | Default | Purpose | +|---------|---------|---------| +| `QUESTION_WORKER_ENABLED` | `true` | Set to `false` to disable the worker | +| `QUESTION_WORKER_INTERVAL_SECONDS` | `60` | Seconds between maintenance cycles | +| `QUESTION_PIPELINE_TEST_MODE` | `false` | Use random -10..10 starter-style questions instead of API copy | + +**External APIs used** + +- [REST Countries](https://restcountries.com/) — population and capital facts +- [Open-Meteo](https://open-meteo.com/) — current temperature by city + +**Branching flow** + +1. **Track choice** — user swipes toward +10 (weather) or -10 (geography). +2. **Geography** — yes/no population threshold, then capital confirmation; + wrong population guess triggers a recovery question. +3. **Weather** — yes/no warm/cool for a random European city, then a follow-up + to continue weather or switch to geography. + +When a user submits an answer (`POST .../answer`), the pipeline evaluates the +response and may immediately create the next branched question and push it over +SignalR. + +Pipeline state is stored in `user_pipeline_state`; questions may include +`source_tag`, `pipeline_key`, and `pipeline_step` columns (migration +`003_question_pipeline.sql`). + +Test from the shell (replace `ID_TOKEN`): + +```bash +curl -s -X POST http://localhost:3000/v1/me/incoming-question \ + -H "Authorization: Bearer ID_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"text":"What is your preferred contact method?"}' +``` ## Flutter client diff --git a/server/bin/server.dart b/server/bin/server.dart index 5c95ca0..cda3ac4 100644 --- a/server/bin/server.dart +++ b/server/bin/server.dart @@ -6,7 +6,14 @@ import 'package:shelf/shelf_io.dart' as shelf_io; import '../lib/db.dart'; import '../lib/env.dart'; import '../lib/firebase_auth.dart'; +import '../lib/handlers/incoming_question_handler.dart'; import '../lib/handlers/profile_handler.dart'; +import '../lib/handlers/questions_handler.dart'; +import '../lib/handlers/questions_hub_handler.dart'; +import '../lib/pipeline/question_pipeline.dart'; +import '../lib/question_service.dart'; +import '../lib/questions_db.dart'; +import '../lib/workers/question_background_worker.dart'; Future main() async { final Directory serverRoot = Directory.current; @@ -24,9 +31,55 @@ Future main() async { await db.migrate(); final FirebaseAuthVerifier auth = FirebaseAuthVerifier(env.firebaseWebApiKey); + final QuestionsDb questionsDb = QuestionsDb(db.connection); + final QuestionService questionService = QuestionService( + questionsDb: questionsDb, + hubConnections: questionsHubConnections, + ); + final QuestionPipeline questionPipeline = QuestionPipeline( + questionsDb: questionsDb, + questionService: questionService, + testMode: env.questionPipelineTestMode, + ); + QuestionBackgroundWorker? backgroundWorker; + if (env.questionWorkerEnabled) { + backgroundWorker = QuestionBackgroundWorker( + pipeline: questionPipeline, + interval: Duration(seconds: env.questionWorkerIntervalSeconds), + ); + backgroundWorker.start(); + } + final Handler profile = profileHandler(db: db, auth: auth); + final Handler questionsHub = questionsHubHandler( + auth: auth, + questionService: questionService, + ); + final Handler incomingQuestion = incomingQuestionHandler( + auth: auth, + questionsDb: questionsDb, + ); + final Handler questions = questionsHandler( + auth: auth, + questionsDb: questionsDb, + questionService: questionService, + questionPipeline: questionPipeline, + ); + final Handler handler = Pipeline() .addMiddleware(logRequests()) - .addHandler(profileHandler(db: db, auth: auth)); + .addHandler((Request request) { + final String path = request.requestedUri.path; + if (path.startsWith(questionsHubPath)) { + return questionsHub(request); + } + if (path == '/v1/me/incoming-question') { + return incomingQuestion(request); + } + if (path.startsWith(questionsBasePath)) { + return questions(request); + } + return profile(request); + }); final HttpServer server = await shelf_io.serve( handler, diff --git a/server/lib/cors_headers.dart b/server/lib/cors_headers.dart new file mode 100644 index 0000000..32d68fa --- /dev/null +++ b/server/lib/cors_headers.dart @@ -0,0 +1,9 @@ +/// CORS headers for API and SignalR (Flutter web sends X-Requested-With). +Map apiCorsHeaders({String methods = 'GET, PUT, POST, OPTIONS'}) { + return { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': methods, + 'Access-Control-Allow-Headers': + 'Authorization, Content-Type, X-Requested-With', + }; +} diff --git a/server/lib/db.dart b/server/lib/db.dart index 53f238a..fc8d584 100644 --- a/server/lib/db.dart +++ b/server/lib/db.dart @@ -24,16 +24,27 @@ class ProfileDb { return ProfileDb(connection); } - Future migrate() async { - final String sql = await File('migrations/001_users.sql').readAsString(); - final List statements = sql - .split(';') - .map((String s) => s.trim()) - .where((String s) => s.isNotEmpty) - .toList(); + Connection get connection => _connection; - for (final String statement in statements) { - await _connection.execute(statement); + Future migrate() async { + final List files = Directory('migrations') + .listSync() + .whereType() + .where((File f) => f.path.endsWith('.sql')) + .toList() + ..sort((File a, File b) => a.path.compareTo(b.path)); + + for (final File file in files) { + final String sql = await file.readAsString(); + final List statements = sql + .split(';') + .map((String s) => s.trim()) + .where((String s) => s.isNotEmpty) + .toList(); + + for (final String statement in statements) { + await _connection.execute(statement); + } } } diff --git a/server/lib/env.dart b/server/lib/env.dart index 4a93d4c..d1a59c7 100644 --- a/server/lib/env.dart +++ b/server/lib/env.dart @@ -3,11 +3,21 @@ import 'dart:io'; import 'package:dotenv/dotenv.dart'; class ServerEnv { - ServerEnv._(this.databaseUrl, this.port, this.firebaseWebApiKey); + ServerEnv._({ + required this.databaseUrl, + required this.port, + required this.firebaseWebApiKey, + required this.questionWorkerEnabled, + required this.questionWorkerIntervalSeconds, + required this.questionPipelineTestMode, + }); final String databaseUrl; final int port; final String firebaseWebApiKey; + final bool questionWorkerEnabled; + final int questionWorkerIntervalSeconds; + final bool questionPipelineTestMode; static ServerEnv load() { final DotEnv env = DotEnv(includePlatformEnvironment: true) @@ -26,7 +36,20 @@ class ServerEnv { } final int port = int.tryParse(env['PORT'] ?? '3000') ?? 3000; + final bool workerEnabled = + (env['QUESTION_WORKER_ENABLED'] ?? 'true').toLowerCase() != 'false'; + final int workerIntervalSeconds = + int.tryParse(env['QUESTION_WORKER_INTERVAL_SECONDS'] ?? '60') ?? 60; + final bool pipelineTestMode = + (env['QUESTION_PIPELINE_TEST_MODE'] ?? 'false').toLowerCase() == 'true'; - return ServerEnv._(databaseUrl, port, apiKey); + return ServerEnv._( + databaseUrl: databaseUrl, + port: port, + firebaseWebApiKey: apiKey, + questionWorkerEnabled: workerEnabled, + questionWorkerIntervalSeconds: workerIntervalSeconds, + questionPipelineTestMode: pipelineTestMode, + ); } } diff --git a/server/lib/handlers/incoming_question_handler.dart b/server/lib/handlers/incoming_question_handler.dart new file mode 100644 index 0000000..1ee0834 --- /dev/null +++ b/server/lib/handlers/incoming_question_handler.dart @@ -0,0 +1,82 @@ +import 'dart:convert'; +import 'dart:io'; + +import 'package:shelf/shelf.dart'; +import '../cors_headers.dart'; +import '../firebase_auth.dart'; +import '../questions_db.dart'; +import 'questions_hub_handler.dart'; + +/// REST helper to push a question to the signed-in user over SignalR. +Handler incomingQuestionHandler({ + required FirebaseAuthVerifier auth, + required QuestionsDb questionsDb, +}) { + return (Request request) async { + if (request.method == 'OPTIONS') { + return Response.ok('', headers: apiCorsHeaders()); + } + + final String? firebaseUid = await auth.verifyBearerToken( + request.headers['Authorization'] ?? request.headers['authorization'], + ); + if (firebaseUid == null) { + return _jsonResponse(401, {'error': 'Unauthorized'}); + } + + if (request.requestedUri.path != '/v1/me/incoming-question') { + return _jsonResponse(404, {'error': 'Not found'}); + } + + if (request.method != 'POST') { + return _jsonResponse(405, {'error': 'Method not allowed'}); + } + + try { + final String body = await request.readAsString(); + final Map json = body.isEmpty + ? {} + : jsonDecode(body) as Map; + final String text = (json['text'] as String?)?.trim().isNotEmpty == true + ? (json['text'] as String).trim() + : 'You have a new question.'; + final num correctAnswer = (json['correctAnswer'] as num?) ?? 0; + + final Map question = await questionsDb.createQuestion( + assignedUserId: firebaseUid, + questionText: text, + correctAnswer: correctAnswer, + ); + final int unansweredCount = + await questionsDb.countUnansweredQuestions(firebaseUid); + final Map payload = questionsDb.toClientPayload( + question, + unansweredCount: unansweredCount, + ); + + final int delivered = await questionsHubConnections.pushQuestion( + firebaseUid, + payload, + ); + + return _jsonResponse(200, { + 'question': question, + 'deliveredToConnections': delivered, + }); + } catch (e, st) { + stderr.writeln('Incoming question handler error: $e\n$st'); + return _jsonResponse(500, {'error': 'Internal error'}); + } + }; +} + +Response _jsonResponse(int status, Map body) { + return Response( + status, + body: jsonEncode(body), + headers: { + ...apiCorsHeaders(), + 'Content-Type': 'application/json', + }, + ); +} diff --git a/server/lib/handlers/profile_handler.dart b/server/lib/handlers/profile_handler.dart index 95b915f..bbc6665 100644 --- a/server/lib/handlers/profile_handler.dart +++ b/server/lib/handlers/profile_handler.dart @@ -3,6 +3,7 @@ import 'dart:io'; import 'package:shelf/shelf.dart'; +import '../cors_headers.dart'; import '../db.dart'; import '../firebase_auth.dart'; @@ -12,7 +13,7 @@ Handler profileHandler({ }) { return (Request request) async { if (request.method == 'OPTIONS') { - return Response.ok('', headers: _corsHeaders); + return Response.ok('', headers: apiCorsHeaders(methods: 'GET, PUT, OPTIONS')); } final String? firebaseUid = await auth.verifyBearerToken( @@ -64,18 +65,12 @@ Handler profileHandler({ }; } -Map get _corsHeaders => { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, PUT, OPTIONS', - 'Access-Control-Allow-Headers': 'Authorization, Content-Type', -}; - Response _jsonResponse(int status, Map body) { return Response( status, body: jsonEncode(body), headers: { - ..._corsHeaders, + ...apiCorsHeaders(methods: 'GET, PUT, OPTIONS'), 'Content-Type': 'application/json', }, ); diff --git a/server/lib/handlers/questions_handler.dart b/server/lib/handlers/questions_handler.dart new file mode 100644 index 0000000..3ec1115 --- /dev/null +++ b/server/lib/handlers/questions_handler.dart @@ -0,0 +1,173 @@ +import 'dart:convert'; +import 'dart:io'; + +import 'package:shelf/shelf.dart'; +import 'package:shelf_router/shelf_router.dart'; + +import '../cors_headers.dart'; +import '../firebase_auth.dart'; +import '../pipeline/question_pipeline.dart'; +import '../question_service.dart'; +import '../questions_db.dart'; + +const String questionsBasePath = '/v1/me/questions'; + +Handler questionsHandler({ + required FirebaseAuthVerifier auth, + required QuestionsDb questionsDb, + required QuestionService questionService, + QuestionPipeline? questionPipeline, +}) { + final Router router = Router(); + + router.post('$questionsBasePath/bootstrap', (Request request) async { + final String? firebaseUid = await _verify(auth, request); + if (firebaseUid == null) { + return _jsonResponse(401, {'error': 'Unauthorized'}); + } + try { + final Map question = + await questionService.ensureStarterQuestionOnLogin(firebaseUid); + final int unansweredCount = + await questionsDb.countUnansweredQuestions(firebaseUid); + return _jsonResponse(200, { + 'question': question, + 'unansweredCount': unansweredCount, + }); + } catch (e, st) { + stderr.writeln('Bootstrap questions error: $e\n$st'); + return _jsonResponse(500, {'error': 'Internal error'}); + } + }); + + router.get(questionsBasePath, (Request request) async { + final String? firebaseUid = await _verify(auth, request); + if (firebaseUid == null) { + return _jsonResponse(401, {'error': 'Unauthorized'}); + } + try { + final List> rows = + await questionsDb.listUnansweredQuestions(firebaseUid); + final List> questions = rows + .map( + (Map row) => questionsDb.toClientPayload( + row, + unansweredCount: rows.length, + ), + ) + .toList(); + return _jsonResponse(200, { + 'questions': questions, + 'unansweredCount': questions.length, + }); + } catch (e, st) { + stderr.writeln('List questions error: $e\n$st'); + return _jsonResponse(500, {'error': 'Internal error'}); + } + }); + + router.post( + '$questionsBasePath//answer', + (Request request, String id) async { + final String? firebaseUid = await _verify(auth, request); + if (firebaseUid == null) { + return _jsonResponse(401, {'error': 'Unauthorized'}); + } + try { + final String body = await request.readAsString(); + final Map json = body.isEmpty + ? {} + : jsonDecode(body) as Map; + final num answer = (json['answer'] as num?) ?? 0; + + final Map? updated = await questionsDb.submitAnswer( + questionId: id, + assignedUserId: firebaseUid, + userResponse: answer, + ); + if (updated == null) { + return _jsonResponse(404, {'error': 'Not found'}); + } + if (questionPipeline != null) { + await questionPipeline.onAnswerSubmitted( + firebaseUid: firebaseUid, + answeredQuestion: updated, + userResponse: answer, + ); + } + final int unansweredCount = + await questionsDb.countUnansweredQuestions(firebaseUid); + return _jsonResponse(200, { + 'question': updated, + 'unansweredCount': unansweredCount, + }); + } catch (e, st) { + stderr.writeln('Answer question error: $e\n$st'); + return _jsonResponse(500, {'error': 'Internal error'}); + } + }, + ); + + router.post( + '$questionsBasePath//defer', + (Request request, String id) async { + final String? firebaseUid = await _verify(auth, request); + if (firebaseUid == null) { + return _jsonResponse(401, {'error': 'Unauthorized'}); + } + try { + final Map? updated = await questionsDb.deferQuestion( + questionId: id, + assignedUserId: firebaseUid, + ); + if (updated == null) { + return _jsonResponse(404, {'error': 'Not found'}); + } + final int unansweredCount = + await questionsDb.countUnansweredQuestions(firebaseUid); + return _jsonResponse(200, { + 'question': updated, + 'unansweredCount': unansweredCount, + }); + } catch (e, st) { + stderr.writeln('Defer question error: $e\n$st'); + return _jsonResponse(500, {'error': 'Internal error'}); + } + }, + ); + + return (Request request) async { + if (request.method == 'OPTIONS' && + request.requestedUri.path.startsWith(questionsBasePath)) { + return Response.ok('', headers: apiCorsHeaders()); + } + + final String? firebaseUid = await _verify(auth, request); + if (firebaseUid == null) { + return _jsonResponse(401, {'error': 'Unauthorized'}); + } + + final Response response = await router.call(request); + if (response.statusCode != 404) { + return response; + } + return _jsonResponse(404, {'error': 'Not found'}); + }; +} + +Future _verify(FirebaseAuthVerifier auth, Request request) { + return auth.verifyBearerToken( + request.headers['Authorization'] ?? request.headers['authorization'], + ); +} + +Response _jsonResponse(int status, Map body) { + return Response( + status, + body: jsonEncode(body), + headers: { + ...apiCorsHeaders(), + 'Content-Type': 'application/json', + }, + ); +} diff --git a/server/lib/handlers/questions_hub_handler.dart b/server/lib/handlers/questions_hub_handler.dart new file mode 100644 index 0000000..93b8d77 --- /dev/null +++ b/server/lib/handlers/questions_hub_handler.dart @@ -0,0 +1,200 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:shelf/shelf.dart'; +import 'package:shelf_web_socket/shelf_web_socket.dart'; +import 'package:uuid/uuid.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +import '../cors_headers.dart'; +import '../firebase_auth.dart'; +import '../question_service.dart'; +import '../signalr/questions_hub_connections.dart'; +import '../signalr/signalr_protocol.dart'; +import '../signalr/text_message_format.dart'; + +const String questionsHubPath = '/hubs/questions'; + +final QuestionsHubConnections questionsHubConnections = QuestionsHubConnections(); + +Handler questionsHubHandler({ + required FirebaseAuthVerifier auth, + required QuestionService questionService, +}) { + return (Request request) async { + if (request.method == 'OPTIONS') { + return Response.ok('', headers: apiCorsHeaders()); + } + + final String path = request.requestedUri.path; + if (path.endsWith('/negotiate') && request.method == 'POST') { + return _handleNegotiate(request, auth); + } + + if (_isWebSocketUpgrade(request)) { + final String? firebaseUid = await auth.verifyBearerToken( + _tokenFromRequest(request), + ); + if (firebaseUid == null) { + return Response.forbidden('Unauthorized', headers: apiCorsHeaders()); + } + + final String? connectionToken = request.url.queryParameters['id']; + if (connectionToken == null || connectionToken.isEmpty) { + return Response.badRequest( + body: 'Missing connection id', + headers: apiCorsHeaders(), + ); + } + + final Handler wsHandler = webSocketHandler( + (WebSocketChannel channel, String? subprotocol) { + _handleWebSocket( + channel: channel, + connectionToken: connectionToken, + firebaseUid: firebaseUid, + questionService: questionService, + ); + }, + ); + return wsHandler(request); + } + + return Response.notFound('Not found', headers: apiCorsHeaders()); + }; +} + +Future _handleNegotiate( + Request request, + FirebaseAuthVerifier auth, +) async { + final String? firebaseUid = await auth.verifyBearerToken( + request.headers['Authorization'] ?? request.headers['authorization'], + ); + if (firebaseUid == null) { + return _jsonResponse(401, {'error': 'Unauthorized'}); + } + + const Uuid uuid = Uuid(); + final String connectionToken = uuid.v4(); + + return _jsonResponse(200, { + 'negotiateVersion': 1, + 'connectionId': connectionToken, + 'connectionToken': connectionToken, + 'availableTransports': >[ + { + 'transport': 'WebSockets', + 'transferFormats': ['Text'], + }, + ], + }); +} + +void _handleWebSocket({ + required WebSocketChannel channel, + required String connectionToken, + required String firebaseUid, + required QuestionService questionService, +}) { + final QuestionsHubConnection connection = QuestionsHubConnection( + connectionToken: connectionToken, + firebaseUid: firebaseUid, + channel: channel, + ); + questionsHubConnections.register(connection); + + connection.listen( + (String message) => _onSocketMessage( + connection, + message, + questionService: questionService, + ), + onDone: () => questionsHubConnections.unregister(connectionToken), + ); +} + +void _onSocketMessage( + QuestionsHubConnection connection, + String payload, { + required QuestionService questionService, +}) { + if (!connection.handshakeComplete) { + try { + final List messages = TextMessageFormat.parse(payload); + final Map handshake = + jsonDecode(messages.first) as Map; + if (handshake['protocol'] != 'json') { + unawaited(connection.sendRaw( + SignalrProtocol.handshakeResponse(error: 'Unsupported protocol'), + )); + return; + } + connection.handshakeComplete = true; + unawaited(_completeHandshakeAndDeliverPending( + connection: connection, + questionService: questionService, + )); + if (messages.length > 1) { + final String remaining = messages + .sublist(1) + .join(TextMessageFormat.recordSeparator); + questionsHubConnections.handleClientMessage( + connection, + '$remaining${TextMessageFormat.recordSeparator}', + ); + } + } catch (_) { + unawaited(connection.sendRaw( + SignalrProtocol.handshakeResponse(error: 'Invalid handshake'), + )); + } + return; + } + + questionsHubConnections.handleClientMessage(connection, payload); +} + +Future _completeHandshakeAndDeliverPending({ + required QuestionsHubConnection connection, + required QuestionService questionService, +}) async { + await connection.sendRaw(SignalrProtocol.handshakeResponse()); + await questionService.deliverPendingQuestionOnConnect(connection); +} + +bool _isWebSocketUpgrade(Request request) { + if (request.method != 'GET') { + return false; + } + final String? connection = request.headers['Connection']; + if (connection == null || + !connection.toLowerCase().split(',').map((t) => t.trim()).contains('upgrade')) { + return false; + } + return request.headers['Upgrade']?.toLowerCase() == 'websocket'; +} + +String? _tokenFromRequest(Request request) { + final String? authHeader = + request.headers['Authorization'] ?? request.headers['authorization']; + if (authHeader != null && authHeader.startsWith('Bearer ')) { + return authHeader; + } + final String? accessToken = request.url.queryParameters['access_token']; + if (accessToken != null && accessToken.isNotEmpty) { + return 'Bearer $accessToken'; + } + return null; +} + +Response _jsonResponse(int status, Map body) { + return Response( + status, + body: jsonEncode(body), + headers: { + ...apiCorsHeaders(), + 'Content-Type': 'application/json', + }, + ); +} diff --git a/server/lib/pipeline/branch_decision.dart b/server/lib/pipeline/branch_decision.dart new file mode 100644 index 0000000..b6725ba --- /dev/null +++ b/server/lib/pipeline/branch_decision.dart @@ -0,0 +1,72 @@ +/// Outcome of evaluating a user's numeric answer against pipeline rules. +enum BranchOutcome { + /// Answer matched the expected value (within tolerance when applicable). + match, + + /// Answer was on the positive side of the scale (e.g. preference toward +10). + preferHigh, + + /// Answer was on the negative side of the scale. + preferLow, + + /// Numeric answer was close but not exact. + close, + + /// Answer was far from the target. + miss, +} + +class BranchDecision { + const BranchDecision._(); + + /// Exact match for discrete choices encoded as integers. + static BranchOutcome discreteChoice({ + required num userResponse, + required num correctAnswer, + }) { + if (userResponse.round() == correctAnswer.round()) { + return BranchOutcome.match; + } + return userResponse.sign >= 0 ? BranchOutcome.preferHigh : BranchOutcome.preferLow; + } + + /// Compares a population or temperature estimate with tolerance. + static BranchOutcome numericEstimate({ + required num userResponse, + required num correctAnswer, + int tolerance = 2, + }) { + final int delta = (userResponse.round() - correctAnswer.round()).abs(); + if (delta == 0) { + return BranchOutcome.match; + } + if (delta <= tolerance) { + return BranchOutcome.close; + } + return BranchOutcome.miss; + } + + /// Yes/no questions encoded as +10 (yes) vs -10 (no) on the slider. + static BranchOutcome yesNo({ + required num userResponse, + required num correctAnswer, + }) { + final bool userYes = userResponse > 0; + final bool correctYes = correctAnswer > 0; + if (userYes == correctYes) { + return BranchOutcome.match; + } + return BranchOutcome.miss; + } + + /// Interprets slider position as a preference (geography vs weather track). + static BranchOutcome trackPreference(num userResponse) { + if (userResponse >= 3) { + return BranchOutcome.preferHigh; + } + if (userResponse <= -3) { + return BranchOutcome.preferLow; + } + return BranchOutcome.close; + } +} diff --git a/server/lib/pipeline/external_data_fetcher.dart b/server/lib/pipeline/external_data_fetcher.dart new file mode 100644 index 0000000..6348526 --- /dev/null +++ b/server/lib/pipeline/external_data_fetcher.dart @@ -0,0 +1,111 @@ +import 'dart:convert'; +import 'dart:math'; + +import 'package:http/http.dart' as http; + +/// Fetches public web API data used to build question text and answers. +class ExternalDataFetcher { + ExternalDataFetcher({http.Client? client}) : _client = client ?? http.Client(); + + final http.Client _client; + final Random _random = Random(); + + /// Random country with population (millions) and capital for trivia questions. + Future fetchRandomCountry() async { + final http.Response response = await _client.get( + Uri.parse( + 'https://restcountries.com/v3.1/all?fields=name,population,capital', + ), + ); + if (response.statusCode != 200) { + return null; + } + final List countries = + jsonDecode(response.body) as List; + if (countries.isEmpty) { + return null; + } + + for (int attempt = 0; attempt < 8; attempt++) { + final Map raw = + countries[_random.nextInt(countries.length)] as Map; + final int? population = raw['population'] as int?; + final List? capitals = raw['capital'] as List?; + final Map? nameMap = + raw['name'] as Map?; + final String? commonName = nameMap?['common'] as String?; + if (population == null || + population < 500_000 || + capitals == null || + capitals.isEmpty || + commonName == null) { + continue; + } + return CountryFacts( + name: commonName, + populationMillions: (population / 1_000_000).round(), + capital: capitals.first as String, + ); + } + return null; + } + + /// Current temperature (°C, rounded) for a European city via Open-Meteo. + Future fetchRandomCityWeather() async { + const List<({String city, double lat, double lon})> cities = + <({String city, double lat, double lon})>[ + (city: 'Berlin', lat: 52.52, lon: 13.41), + (city: 'Paris', lat: 48.86, lon: 2.35), + (city: 'Madrid', lat: 40.42, lon: -3.70), + (city: 'Rome', lat: 41.90, lon: 12.50), + (city: 'Amsterdam', lat: 52.37, lon: 4.90), + ]; + final ({String city, double lat, double lon}) pick = + cities[_random.nextInt(cities.length)]; + final Uri uri = Uri.parse( + 'https://api.open-meteo.com/v1/forecast' + '?latitude=${pick.lat}&longitude=${pick.lon}' + '¤t=temperature_2m&timezone=auto', + ); + final http.Response response = await _client.get(uri); + if (response.statusCode != 200) { + return null; + } + final Map json = + jsonDecode(response.body) as Map; + final Map? current = + json['current'] as Map?; + final num? temp = current?['temperature_2m'] as num?; + if (temp == null) { + return null; + } + return WeatherFacts( + city: pick.city, + temperatureCelsius: temp.round(), + ); + } + + void close() => _client.close(); +} + +class CountryFacts { + CountryFacts({ + required this.name, + required this.populationMillions, + required this.capital, + }); + + final String name; + final int populationMillions; + final String capital; +} + +class WeatherFacts { + WeatherFacts({ + required this.city, + required this.temperatureCelsius, + }); + + final String city; + final int temperatureCelsius; +} diff --git a/server/lib/pipeline/question_pipeline.dart b/server/lib/pipeline/question_pipeline.dart new file mode 100644 index 0000000..2203210 --- /dev/null +++ b/server/lib/pipeline/question_pipeline.dart @@ -0,0 +1,445 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:math'; + +import '../question_service.dart'; +import '../questions_db.dart'; +import 'branch_decision.dart'; +import 'external_data_fetcher.dart'; + +/// Same format as [QuestionsDb.createStarterQuestion] for local pipeline testing. +abstract final class PipelineTestQuestions { + static const String text = + 'Starter question: enter a whole number between -10 and 10.'; + + static int randomCorrectAnswer() => Random().nextInt(21) - 10; +} + +/// Pipeline keys stored in [user_pipeline_state] and question rows. +abstract final class PipelineKeys { + static const String root = 'root'; + static const String geography = 'geography'; + static const String weather = 'weather'; +} + +/// Steps within each pipeline branch. +abstract final class PipelineSteps { + static const String chooseTrack = 'choose_track'; + static const String populationQuiz = 'population_quiz'; + static const String capitalFollowUp = 'capital_follow_up'; + static const String capitalRecovery = 'capital_recovery'; + static const String temperatureQuiz = 'temperature_quiz'; + static const String temperatureFollowUp = 'temperature_follow_up'; + static const String idle = 'idle'; +} + +/// Orchestrates API-driven question creation and branches on user answers. +class QuestionPipeline { + QuestionPipeline({ + required QuestionsDb questionsDb, + required QuestionService questionService, + ExternalDataFetcher? fetcher, + this.maxQueuedQuestions = 3, + this.testMode = false, + }) : _questionsDb = questionsDb, + _questionService = questionService, + _fetcher = fetcher ?? ExternalDataFetcher(); + + final QuestionsDb _questionsDb; + final QuestionService _questionService; + final ExternalDataFetcher _fetcher; + final int maxQueuedQuestions; + final bool testMode; + + /// Periodic tick: start pipelines for idle users and top up shallow queues. + Future runMaintenanceCycle() async { + final List userIds = await _questionsDb.listAllUserFirebaseUids(); + for (final String uid in userIds) { + try { + await _maintainUser(uid); + } catch (e, st) { + stderr.writeln('Pipeline maintenance failed for $uid: $e\n$st'); + } + } + } + + Future _maintainUser(String firebaseUid) async { + final int queued = await _questionsDb.countUnansweredQuestions(firebaseUid); + if (queued >= maxQueuedQuestions) { + return; + } + + final Map? state = + await _questionsDb.getPipelineState(firebaseUid); + if (state == null) { + if (queued == 0) { + await _startRootPipeline(firebaseUid); + } + return; + } + + final String step = state['step'] as String; + if (step == PipelineSteps.idle && queued == 0) { + await _advanceFromIdle(firebaseUid, state); + } + } + + Future _deliverPipelineQuestion({ + required String firebaseUid, + required String questionText, + required num correctAnswer, + required String sourceTag, + required String pipelineKey, + required String pipelineStep, + }) async { + await _questionService.createAndDeliverQuestion( + assignedUserId: firebaseUid, + questionText: + testMode ? PipelineTestQuestions.text : questionText, + correctAnswer: + testMode ? PipelineTestQuestions.randomCorrectAnswer() : correctAnswer, + sourceTag: sourceTag, + pipelineKey: pipelineKey, + pipelineStep: pipelineStep, + ); + } + + BranchOutcome _evaluateAnswer({ + required num userResponse, + required num correctAnswer, + required BranchOutcome Function() production, + }) { + if (testMode) { + return userResponse.round() == correctAnswer.round() + ? BranchOutcome.match + : BranchOutcome.miss; + } + return production(); + } + + Future _canEnqueue(String firebaseUid) async { + final int queued = await _questionsDb.countUnansweredQuestions(firebaseUid); + return queued < maxQueuedQuestions; + } + + /// Called after a question is answered; may enqueue the next branched question. + Future onAnswerSubmitted({ + required String firebaseUid, + required Map answeredQuestion, + required num userResponse, + }) async { + final String? pipelineKey = answeredQuestion['pipelineKey'] as String?; + final String? pipelineStep = answeredQuestion['pipelineStep'] as String?; + if (pipelineKey == null || pipelineStep == null) { + return; + } + if (!await _canEnqueue(firebaseUid)) { + return; + } + + final num correctAnswer = answeredQuestion['correctAnswer'] as num; + final Map context = + await _loadContext(firebaseUid, pipelineKey); + + switch (pipelineKey) { + case PipelineKeys.root: + await _handleRootAnswer( + firebaseUid: firebaseUid, + step: pipelineStep, + userResponse: userResponse, + ); + case PipelineKeys.geography: + await _handleGeographyAnswer( + firebaseUid: firebaseUid, + step: pipelineStep, + userResponse: userResponse, + correctAnswer: correctAnswer, + context: context, + ); + case PipelineKeys.weather: + await _handleWeatherAnswer( + firebaseUid: firebaseUid, + step: pipelineStep, + userResponse: userResponse, + correctAnswer: correctAnswer, + context: context, + ); + } + } + + Future> _loadContext( + String firebaseUid, + String pipelineKey, + ) async { + final Map? state = + await _questionsDb.getPipelineState(firebaseUid); + if (state == null || state['pipelineKey'] != pipelineKey) { + return {}; + } + return Map.from( + state['context'] as Map? ?? {}, + ); + } + + Future _startRootPipeline(String firebaseUid) async { + if (!await _canEnqueue(firebaseUid)) { + return; + } + await _questionsDb.upsertPipelineState( + assignedUserId: firebaseUid, + pipelineKey: PipelineKeys.root, + step: PipelineSteps.chooseTrack, + context: {}, + ); + await _deliverPipelineQuestion( + firebaseUid: firebaseUid, + questionText: + 'Which topics interest you? Swipe toward +10 for weather questions, ' + 'or toward -10 for geography. Submit your preference.', + correctAnswer: 0, + sourceTag: 'pipeline:root:choose_track', + pipelineKey: PipelineKeys.root, + pipelineStep: PipelineSteps.chooseTrack, + ); + } + + Future _handleRootAnswer({ + required String firebaseUid, + required String step, + required num userResponse, + }) async { + if (step != PipelineSteps.chooseTrack) { + return; + } + + final BranchOutcome outcome = BranchDecision.trackPreference(userResponse); + if (outcome == BranchOutcome.preferHigh) { + await _startWeatherPipeline(firebaseUid); + } else { + await _startGeographyPipeline(firebaseUid); + } + } + + Future _startGeographyPipeline(String firebaseUid) async { + if (!await _canEnqueue(firebaseUid)) { + return; + } + final CountryFacts? country = + testMode ? null : await _fetcher.fetchRandomCountry(); + if (!testMode && country == null) { + await _setIdle(firebaseUid, PipelineKeys.geography); + return; + } + + final Map context = testMode + ? { + 'country': 'Testland', + 'populationMillions': 40, + 'capital': 'Test City', + } + : { + 'country': country!.name, + 'populationMillions': country.populationMillions, + 'capital': country.capital, + }; + await _questionsDb.upsertPipelineState( + assignedUserId: firebaseUid, + pipelineKey: PipelineKeys.geography, + step: PipelineSteps.populationQuiz, + context: context, + ); + const int thresholdMillions = 30; + final bool overThreshold = (context['populationMillions'] as int) >= + thresholdMillions; + await _deliverPipelineQuestion( + firebaseUid: firebaseUid, + questionText: + 'Does ${context['country']} have a population over $thresholdMillions million? ' + 'Swipe toward +10 for yes, toward -10 for no.', + correctAnswer: overThreshold ? 10 : -10, + sourceTag: 'pipeline:geography:population', + pipelineKey: PipelineKeys.geography, + pipelineStep: PipelineSteps.populationQuiz, + ); + } + + Future _handleGeographyAnswer({ + required String firebaseUid, + required String step, + required num userResponse, + required num correctAnswer, + required Map context, + }) async { + final String country = context['country'] as String? ?? 'this country'; + final String capital = context['capital'] as String? ?? ''; + + switch (step) { + case PipelineSteps.populationQuiz: + final BranchOutcome outcome = _evaluateAnswer( + userResponse: userResponse, + correctAnswer: correctAnswer, + production: () => BranchDecision.yesNo( + userResponse: userResponse, + correctAnswer: correctAnswer, + ), + ); + if (outcome == BranchOutcome.match) { + await _questionsDb.upsertPipelineState( + assignedUserId: firebaseUid, + pipelineKey: PipelineKeys.geography, + step: PipelineSteps.capitalFollowUp, + context: context, + ); + await _deliverPipelineQuestion( + firebaseUid: firebaseUid, + questionText: + 'Nice estimate! What is the capital of $country? ' + 'Enter 1 if it is $capital, or -1 if you are unsure.', + correctAnswer: 1, + sourceTag: 'pipeline:geography:capital', + pipelineKey: PipelineKeys.geography, + pipelineStep: PipelineSteps.capitalFollowUp, + ); + } else { + await _questionsDb.upsertPipelineState( + assignedUserId: firebaseUid, + pipelineKey: PipelineKeys.geography, + step: PipelineSteps.capitalRecovery, + context: context, + ); + await _deliverPipelineQuestion( + firebaseUid: firebaseUid, + questionText: + 'The population of $country is about ' + '${context['populationMillions']} million. ' + 'Try again: enter 1 if the capital is $capital, -1 otherwise.', + correctAnswer: 1, + sourceTag: 'pipeline:geography:recovery', + pipelineKey: PipelineKeys.geography, + pipelineStep: PipelineSteps.capitalRecovery, + ); + } + case PipelineSteps.capitalFollowUp: + case PipelineSteps.capitalRecovery: + await _setIdle(firebaseUid, PipelineKeys.geography); + } + } + + Future _startWeatherPipeline(String firebaseUid) async { + if (!await _canEnqueue(firebaseUid)) { + return; + } + final WeatherFacts? weather = + testMode ? null : await _fetcher.fetchRandomCityWeather(); + if (!testMode && weather == null) { + await _setIdle(firebaseUid, PipelineKeys.weather); + return; + } + + final Map context = testMode + ? {'city': 'Test City', 'temperatureCelsius': 20} + : { + 'city': weather!.city, + 'temperatureCelsius': weather.temperatureCelsius, + }; + await _questionsDb.upsertPipelineState( + assignedUserId: firebaseUid, + pipelineKey: PipelineKeys.weather, + step: PipelineSteps.temperatureQuiz, + context: context, + ); + const int warmThresholdC = 15; + final bool isWarm = + (context['temperatureCelsius'] as int) >= warmThresholdC; + await _deliverPipelineQuestion( + firebaseUid: firebaseUid, + questionText: + 'Is it $warmThresholdC°C or warmer in ${context['city']} right now? ' + 'Swipe toward +10 for yes, toward -10 for no.', + correctAnswer: isWarm ? 10 : -10, + sourceTag: 'pipeline:weather:temperature', + pipelineKey: PipelineKeys.weather, + pipelineStep: PipelineSteps.temperatureQuiz, + ); + } + + Future _handleWeatherAnswer({ + required String firebaseUid, + required String step, + required num userResponse, + required num correctAnswer, + required Map context, + }) async { + final String city = context['city'] as String? ?? 'the city'; + final int actual = (context['temperatureCelsius'] as num?)?.round() ?? 0; + + switch (step) { + case PipelineSteps.temperatureQuiz: + final BranchOutcome outcome = _evaluateAnswer( + userResponse: userResponse, + correctAnswer: correctAnswer, + production: () => BranchDecision.yesNo( + userResponse: userResponse, + correctAnswer: correctAnswer, + ), + ); + final String followUp = switch (outcome) { + BranchOutcome.match => + 'Spot on! It is about $actual°C in $city. ' + 'Enter +1 to get another weather question, -1 to switch to geography.', + BranchOutcome.close => + 'Close — it is about $actual°C in $city. ' + 'Enter +1 for another weather round, -1 for geography instead.', + _ => + 'It is about $actual°C in $city right now. ' + 'Enter +1 to try another city, -1 to try geography questions.', + }; + await _questionsDb.upsertPipelineState( + assignedUserId: firebaseUid, + pipelineKey: PipelineKeys.weather, + step: PipelineSteps.temperatureFollowUp, + context: context, + ); + await _deliverPipelineQuestion( + firebaseUid: firebaseUid, + questionText: followUp, + correctAnswer: 1, + sourceTag: 'pipeline:weather:follow_up', + pipelineKey: PipelineKeys.weather, + pipelineStep: PipelineSteps.temperatureFollowUp, + ); + case PipelineSteps.temperatureFollowUp: + if (userResponse > 0) { + await _startWeatherPipeline(firebaseUid); + } else { + await _startGeographyPipeline(firebaseUid); + } + } + } + + Future _advanceFromIdle( + String firebaseUid, + Map state, + ) async { + final String pipelineKey = state['pipelineKey'] as String; + switch (pipelineKey) { + case PipelineKeys.geography: + await _startGeographyPipeline(firebaseUid); + case PipelineKeys.weather: + await _startWeatherPipeline(firebaseUid); + default: + await _startRootPipeline(firebaseUid); + } + } + + Future _setIdle(String firebaseUid, String pipelineKey) async { + await _questionsDb.upsertPipelineState( + assignedUserId: firebaseUid, + pipelineKey: pipelineKey, + step: PipelineSteps.idle, + context: await _loadContext(firebaseUid, pipelineKey), + ); + } + + void close() => _fetcher.close(); +} diff --git a/server/lib/question_service.dart b/server/lib/question_service.dart new file mode 100644 index 0000000..28d03b7 --- /dev/null +++ b/server/lib/question_service.dart @@ -0,0 +1,85 @@ +import 'dart:async'; +import 'dart:io'; + +import 'questions_db.dart'; +import 'signalr/questions_hub_connections.dart'; + +/// Creates questions in Postgres and delivers them over SignalR. +class QuestionService { + QuestionService({ + required QuestionsDb questionsDb, + required QuestionsHubConnections hubConnections, + }) : _questionsDb = questionsDb, + _hubConnections = hubConnections; + + final QuestionsDb _questionsDb; + final QuestionsHubConnections _hubConnections; + + /// Called at login: ensures a starter question exists when the user has none. + Future> ensureStarterQuestionOnLogin( + String firebaseUid, + ) async { + final Map question = + await _questionsDb.getOrCreateStarterQuestion(firebaseUid); + final int unansweredCount = + await _questionsDb.countUnansweredQuestions(firebaseUid); + final Map payload = _questionsDb.toClientPayload( + question, + unansweredCount: unansweredCount, + ); + await _hubConnections.pushQuestion(firebaseUid, payload); + return payload; + } + + /// Inserts a question and pushes it to connected SignalR clients. + Future> createAndDeliverQuestion({ + required String assignedUserId, + required String questionText, + required num correctAnswer, + String? sourceTag, + String? pipelineKey, + String? pipelineStep, + }) async { + final Map question = await _questionsDb.createQuestion( + assignedUserId: assignedUserId, + questionText: questionText, + correctAnswer: correctAnswer, + sourceTag: sourceTag, + pipelineKey: pipelineKey, + pipelineStep: pipelineStep, + ); + final int unansweredCount = + await _questionsDb.countUnansweredQuestions(assignedUserId); + final Map payload = _questionsDb.toClientPayload( + question, + unansweredCount: unansweredCount, + ); + await _hubConnections.pushQuestion(assignedUserId, payload); + return payload; + } + + /// On SignalR connect: deliver an existing unanswered question only (no create). + Future deliverPendingQuestionOnConnect( + QuestionsHubConnection connection, + ) async { + try { + final String uid = connection.firebaseUid; + final Map? question = + await _questionsDb.findUnansweredQuestion(uid); + if (question == null) { + return; + } + final int unansweredCount = + await _questionsDb.countUnansweredQuestions(uid); + final Map payload = _questionsDb.toClientPayload( + question, + unansweredCount: unansweredCount, + ); + await _hubConnections.pushQuestionToConnection(connection, payload); + } catch (e, st) { + stderr.writeln( + 'Failed to deliver pending question for ${connection.firebaseUid}: $e\n$st', + ); + } + } +} diff --git a/server/lib/questions_db.dart b/server/lib/questions_db.dart new file mode 100644 index 0000000..5b78d11 --- /dev/null +++ b/server/lib/questions_db.dart @@ -0,0 +1,409 @@ +import 'dart:convert'; +import 'dart:math'; + +import 'package:postgres/postgres.dart'; +import 'package:uuid/uuid.dart'; + +/// Postgres access for the questions table. +class QuestionsDb { + QuestionsDb(this._connection); + + final Connection _connection; + + static const Uuid _uuid = Uuid(); + + Future ensureUserExists(String firebaseUid) async { + await _connection.execute( + Sql.named( + ''' + INSERT INTO users (firebase_uid) + VALUES (@uid) + ON CONFLICT (firebase_uid) DO NOTHING + ''', + ), + parameters: {'uid': firebaseUid}, + ); + } + + /// Latest unanswered question for [assignedUserId], or null if none. + Future?> findUnansweredQuestion( + String assignedUserId, + ) async { + final Result result = await _connection.execute( + Sql.named( + ''' + SELECT id, assigned_user_id, question_text, user_response, correct_answer, + created_at, modified_at, source_tag, pipeline_key, pipeline_step + FROM questions + WHERE assigned_user_id = @uid AND user_response IS NULL + ORDER BY created_at ASC + LIMIT 1 + ''', + ), + parameters: {'uid': assignedUserId}, + ); + if (result.isEmpty) { + return null; + } + return _rowFromResult(result.first); + } + + /// All unanswered questions for [assignedUserId], oldest first. + Future>> listUnansweredQuestions( + String assignedUserId, + ) async { + final Result result = await _connection.execute( + Sql.named( + ''' + SELECT id, assigned_user_id, question_text, user_response, correct_answer, + created_at, modified_at, source_tag, pipeline_key, pipeline_step + FROM questions + WHERE assigned_user_id = @uid AND user_response IS NULL + ORDER BY created_at ASC + ''', + ), + parameters: {'uid': assignedUserId}, + ); + return result.map(_rowFromResult).toList(); + } + + /// Records [userResponse] for an unanswered question owned by [assignedUserId]. + Future?> submitAnswer({ + required String questionId, + required String assignedUserId, + required num userResponse, + }) async { + final DateTime now = DateTime.now().toUtc(); + final Result result = await _connection.execute( + Sql.named( + ''' + UPDATE questions + SET user_response = @user_response, modified_at = @modified_at + WHERE id = @id::uuid + AND assigned_user_id = @uid + AND user_response IS NULL + RETURNING id, assigned_user_id, question_text, user_response, correct_answer, + created_at, modified_at, source_tag, pipeline_key, pipeline_step + ''', + ), + parameters: { + 'id': questionId, + 'uid': assignedUserId, + 'user_response': userResponse, + 'modified_at': now, + }, + ); + if (result.isEmpty) { + return null; + } + return _rowFromResult(result.first); + } + + /// All Firebase UIDs that have a profile row (pipeline targets). + Future> listAllUserFirebaseUids() async { + final Result result = await _connection.execute( + 'SELECT firebase_uid FROM users ORDER BY updated_at DESC', + ); + return result + .map((ResultRow row) => row[0]! as String) + .toList(); + } + + Future?> getPipelineState(String assignedUserId) async { + final Result result = await _connection.execute( + Sql.named( + ''' + SELECT pipeline_key, step, context, updated_at + FROM user_pipeline_state + WHERE assigned_user_id = @uid + ''', + ), + parameters: {'uid': assignedUserId}, + ); + if (result.isEmpty) { + return null; + } + final ResultRow row = result.first; + final Object? contextRaw = row[2]; + final Map context = contextRaw is Map + ? contextRaw + : jsonDecode(contextRaw.toString()) as Map; + return { + 'assignedUserId': assignedUserId, + 'pipelineKey': row[0]! as String, + 'step': row[1]! as String, + 'context': context, + 'updatedAt': (row[3]! as DateTime).toIso8601String(), + }; + } + + Future upsertPipelineState({ + required String assignedUserId, + required String pipelineKey, + required String step, + required Map context, + }) async { + await ensureUserExists(assignedUserId); + final DateTime now = DateTime.now().toUtc(); + await _connection.execute( + Sql.named( + ''' + INSERT INTO user_pipeline_state ( + assigned_user_id, pipeline_key, step, context, updated_at + ) VALUES ( + @uid, @pipeline_key, @step, @context::jsonb, @updated_at + ) + ON CONFLICT (assigned_user_id) DO UPDATE SET + pipeline_key = EXCLUDED.pipeline_key, + step = EXCLUDED.step, + context = EXCLUDED.context, + updated_at = EXCLUDED.updated_at + ''', + ), + parameters: { + 'uid': assignedUserId, + 'pipeline_key': pipelineKey, + 'step': step, + 'context': jsonEncode(context), + 'updated_at': now, + }, + ); + } + + /// Moves an unanswered question to the end of the user's queue. + Future?> deferQuestion({ + required String questionId, + required String assignedUserId, + }) async { + final DateTime now = DateTime.now().toUtc(); + final Result result = await _connection.execute( + Sql.named( + ''' + UPDATE questions q + SET created_at = sub.max_ts + INTERVAL '1 millisecond', + modified_at = @modified_at + FROM ( + SELECT COALESCE(MAX(created_at), @modified_at) AS max_ts + FROM questions + WHERE assigned_user_id = @uid AND user_response IS NULL + ) sub + WHERE q.id = @id::uuid + AND q.assigned_user_id = @uid + AND q.user_response IS NULL + RETURNING q.id, q.assigned_user_id, q.question_text, q.user_response, + q.correct_answer, q.created_at, q.modified_at, + q.source_tag, q.pipeline_key, q.pipeline_step + ''', + ), + parameters: { + 'id': questionId, + 'uid': assignedUserId, + 'modified_at': now, + }, + ); + if (result.isEmpty) { + return null; + } + return _rowFromResult(result.first); + } + + /// Count of unanswered questions assigned to [assignedUserId]. + Future countUnansweredQuestions(String assignedUserId) async { + final Result result = await _connection.execute( + Sql.named( + ''' + SELECT COUNT(*)::int AS count + FROM questions + WHERE assigned_user_id = @uid AND user_response IS NULL + ''', + ), + parameters: {'uid': assignedUserId}, + ); + return (result.first[0]! as num).toInt(); + } + + /// Returns an existing unanswered question or creates the starter question. + Future> getOrCreateStarterQuestion( + String assignedUserId, + ) async { + final Map? existing = + await findUnansweredQuestion(assignedUserId); + if (existing != null) { + return existing; + } + return createStarterQuestion(assignedUserId); + } + + /// Creates the starter test question with a random correct answer in [-10, 10]. + Future> createStarterQuestion(String assignedUserId) async { + await ensureUserExists(assignedUserId); + final int correctAnswer = Random().nextInt(21) - 10; + final String id = _uuid.v4(); + final DateTime now = DateTime.now().toUtc(); + const String questionText = + 'Starter question: enter a whole number between -10 and 10.'; + + await _connection.execute( + Sql.named( + ''' + INSERT INTO questions ( + id, assigned_user_id, question_text, user_response, correct_answer, + created_at, modified_at + ) VALUES ( + @id::uuid, @assigned_user_id, @question_text, NULL, @correct_answer, + @created_at, @modified_at + ) + ''', + ), + parameters: { + 'id': id, + 'assigned_user_id': assignedUserId, + 'question_text': questionText, + 'correct_answer': correctAnswer, + 'created_at': now, + 'modified_at': now, + }, + ); + + return { + 'id': id, + 'assignedUserId': assignedUserId, + 'text': questionText, + 'userResponse': null, + 'correctAnswer': correctAnswer, + 'createdAt': now.toIso8601String(), + 'modifiedAt': now.toIso8601String(), + }; + } + + Future> createQuestion({ + required String assignedUserId, + required String questionText, + required num correctAnswer, + String? sourceTag, + String? pipelineKey, + String? pipelineStep, + }) async { + await ensureUserExists(assignedUserId); + final String id = _uuid.v4(); + final DateTime now = DateTime.now().toUtc(); + + await _connection.execute( + Sql.named( + ''' + INSERT INTO questions ( + id, assigned_user_id, question_text, user_response, correct_answer, + created_at, modified_at, source_tag, pipeline_key, pipeline_step + ) VALUES ( + @id::uuid, @assigned_user_id, @question_text, NULL, @correct_answer, + @created_at, @modified_at, @source_tag, @pipeline_key, @pipeline_step + ) + ''', + ), + parameters: { + 'id': id, + 'assigned_user_id': assignedUserId, + 'question_text': questionText, + 'correct_answer': correctAnswer, + 'created_at': now, + 'modified_at': now, + 'source_tag': sourceTag, + 'pipeline_key': pipelineKey, + 'pipeline_step': pipelineStep, + }, + ); + + return _rowToJson( + id: id, + assignedUserId: assignedUserId, + questionText: questionText, + userResponse: null, + correctAnswer: correctAnswer, + createdAt: now, + modifiedAt: now, + sourceTag: sourceTag, + pipelineKey: pipelineKey, + pipelineStep: pipelineStep, + ); + } + + /// Payload sent to the Flutter client over SignalR (excludes correct answer). + Map toClientPayload( + Map question, { + required int unansweredCount, + }) { + return { + 'id': question['id'], + 'assignedUserId': question['assignedUserId'], + 'text': question['text'], + 'sentAt': question['createdAt'], + 'unansweredCount': unansweredCount, + }; + } + + Map _rowFromResult(ResultRow row) { + final Object idValue = row[0]!; + final String id = idValue is String ? idValue : idValue.toString(); + final DateTime createdAt = row[5]! as DateTime; + final DateTime modifiedAt = row[6]! as DateTime; + return _rowToJson( + id: id, + assignedUserId: row[1]! as String, + questionText: row[2]! as String, + userResponse: _readOptionalNumeric(row[3]), + correctAnswer: _readNumeric(row[4]), + createdAt: createdAt, + modifiedAt: modifiedAt, + sourceTag: row.length > 7 ? row[7] as String? : null, + pipelineKey: row.length > 8 ? row[8] as String? : null, + pipelineStep: row.length > 9 ? row[9] as String? : null, + ); + } + + /// Postgres NUMERIC columns may decode as [String] or [num]. + static num _readNumeric(Object? value) { + if (value == null) { + return 0; + } + if (value is num) { + return value; + } + if (value is String) { + return num.parse(value); + } + return num.parse(value.toString()); + } + + static num? _readOptionalNumeric(Object? value) { + if (value == null) { + return null; + } + return _readNumeric(value); + } + + Map _rowToJson({ + required String id, + required String assignedUserId, + required String questionText, + required Object? userResponse, + required num correctAnswer, + required DateTime createdAt, + required DateTime modifiedAt, + String? sourceTag, + String? pipelineKey, + String? pipelineStep, + }) { + return { + 'id': id, + 'assignedUserId': assignedUserId, + 'text': questionText, + 'userResponse': userResponse, + 'correctAnswer': correctAnswer, + 'createdAt': createdAt.toIso8601String(), + 'modifiedAt': modifiedAt.toIso8601String(), + if (sourceTag != null) 'sourceTag': sourceTag, + if (pipelineKey != null) 'pipelineKey': pipelineKey, + if (pipelineStep != null) 'pipelineStep': pipelineStep, + }; + } +} diff --git a/server/lib/signalr/questions_hub_connections.dart b/server/lib/signalr/questions_hub_connections.dart new file mode 100644 index 0000000..21d3cf7 --- /dev/null +++ b/server/lib/signalr/questions_hub_connections.dart @@ -0,0 +1,123 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:web_socket_channel/web_socket_channel.dart'; + +import 'signalr_protocol.dart'; +import 'text_message_format.dart'; + +/// Active SignalR connection for the questions hub. +class QuestionsHubConnection { + QuestionsHubConnection({ + required this.connectionToken, + required this.firebaseUid, + required this.channel, + }); + + final String connectionToken; + final String firebaseUid; + final WebSocketChannel channel; + + bool handshakeComplete = false; + StreamSubscription? _subscription; + + void listen(void Function(String message) onMessage, {void Function()? onDone}) { + _subscription = channel.stream.listen( + (Object? message) { + if (message is String) { + onMessage(message); + } + }, + onDone: onDone, + cancelOnError: true, + ); + } + + Future sendRaw(String payload) async { + channel.sink.add(payload); + } + + Future sendInvocation(String target, List arguments) async { + await sendRaw(SignalrProtocol.invocation( + target: target, + arguments: arguments, + )); + } + + Future close() async { + await _subscription?.cancel(); + await channel.sink.close(); + } +} + +/// Tracks connected clients and delivers hub invocations by Firebase UID. +class QuestionsHubConnections { + final Map _byToken = + {}; + final Map> _tokensByUid = >{}; + + void register(QuestionsHubConnection connection) { + _byToken[connection.connectionToken] = connection; + _tokensByUid + .putIfAbsent(connection.firebaseUid, () => {}) + .add(connection.connectionToken); + } + + Future unregister(String connectionToken) async { + final QuestionsHubConnection? connection = _byToken.remove(connectionToken); + if (connection == null) { + return; + } + final Set? tokens = _tokensByUid[connection.firebaseUid]; + tokens?.remove(connectionToken); + if (tokens != null && tokens.isEmpty) { + _tokensByUid.remove(connection.firebaseUid); + } + await connection.close(); + } + + bool isConnected(String firebaseUid) => + (_tokensByUid[firebaseUid]?.isNotEmpty ?? false); + + Future pushQuestionToConnection( + QuestionsHubConnection connection, + Map question, + ) async { + if (!connection.handshakeComplete) { + return; + } + await connection.sendInvocation('ReceiveQuestion', [question]); + } + + Future pushQuestion( + String firebaseUid, + Map question, + ) async { + final Set tokens = _tokensByUid[firebaseUid] ?? {}; + var delivered = 0; + for (final String token in tokens) { + final QuestionsHubConnection? connection = _byToken[token]; + if (connection == null || !connection.handshakeComplete) { + continue; + } + await connection.sendInvocation('ReceiveQuestion', [question]); + delivered++; + } + return delivered; + } + + /// Parses inbound frames after the handshake. + void handleClientMessage( + QuestionsHubConnection connection, + String payload, + ) { + for (final String message in TextMessageFormat.parse(payload)) { + final Map json = + jsonDecode(message) as Map; + final int? type = json['type'] as int?; + if (type == 6) { + unawaited(connection.sendRaw(SignalrProtocol.ping())); + } + } + } +} diff --git a/server/lib/signalr/signalr_protocol.dart b/server/lib/signalr/signalr_protocol.dart new file mode 100644 index 0000000..404970c --- /dev/null +++ b/server/lib/signalr/signalr_protocol.dart @@ -0,0 +1,40 @@ +import 'dart:convert'; + +import 'text_message_format.dart'; + +/// Writes ASP.NET Core SignalR JSON hub messages. +class SignalrProtocol { + static String handshakeRequest() => + TextMessageFormat.write(jsonEncode({ + 'protocol': 'json', + 'version': 1, + })); + + static String handshakeResponse({String? error}) => + TextMessageFormat.write(jsonEncode({ + if (error != null) 'error': error, + })); + + static String invocation({ + required String target, + required List arguments, + String? invocationId, + }) { + return TextMessageFormat.write(jsonEncode({ + 'type': 1, + 'target': target, + 'arguments': arguments, + if (invocationId != null) 'invocationId': invocationId, + })); + } + + static String ping() => TextMessageFormat.write(jsonEncode({ + 'type': 6, + })); + + static String close({String? error}) => + TextMessageFormat.write(jsonEncode({ + 'type': 7, + if (error != null) 'error': error, + })); +} diff --git a/server/lib/signalr/text_message_format.dart b/server/lib/signalr/text_message_format.dart new file mode 100644 index 0000000..0af714d --- /dev/null +++ b/server/lib/signalr/text_message_format.dart @@ -0,0 +1,16 @@ +/// SignalR text framing (record separator 0x1E). +class TextMessageFormat { + static const int recordSeparatorCode = 0x1e; + static final String recordSeparator = String.fromCharCode(recordSeparatorCode); + + static String write(String output) => '$output$recordSeparator'; + + static List parse(String input) { + if (input.isEmpty || input.codeUnitAt(input.length - 1) != recordSeparatorCode) { + throw StateError('Message is incomplete.'); + } + final List messages = input.split(recordSeparator); + messages.removeLast(); + return messages; + } +} diff --git a/server/lib/workers/question_background_worker.dart b/server/lib/workers/question_background_worker.dart new file mode 100644 index 0000000..49486a5 --- /dev/null +++ b/server/lib/workers/question_background_worker.dart @@ -0,0 +1,49 @@ +import 'dart:async'; +import 'dart:io'; + +import '../pipeline/question_pipeline.dart'; + +/// Runs [QuestionPipeline.runMaintenanceCycle] on a fixed interval. +class QuestionBackgroundWorker { + QuestionBackgroundWorker({ + required QuestionPipeline pipeline, + required Duration interval, + }) : _pipeline = pipeline, + _interval = interval; + + final QuestionPipeline _pipeline; + final Duration _interval; + Timer? _timer; + bool _running = false; + + void start() { + if (_timer != null) { + return; + } + stdout.writeln( + 'Question background worker started (interval ${_interval.inSeconds}s)', + ); + _timer = Timer.periodic(_interval, (_) => _tick()); + unawaited(_tick()); + } + + void stop() { + _timer?.cancel(); + _timer = null; + _pipeline.close(); + } + + Future _tick() async { + if (_running) { + return; + } + _running = true; + try { + await _pipeline.runMaintenanceCycle(); + } catch (e, st) { + stderr.writeln('Question background worker tick failed: $e\n$st'); + } finally { + _running = false; + } + } +} diff --git a/server/migrations/002_questions.sql b/server/migrations/002_questions.sql new file mode 100644 index 0000000..be1c455 --- /dev/null +++ b/server/migrations/002_questions.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS questions ( + id UUID PRIMARY KEY, + assigned_user_id TEXT NOT NULL REFERENCES users (firebase_uid) ON DELETE CASCADE, + question_text TEXT NOT NULL, + user_response NUMERIC, + correct_answer NUMERIC NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + modified_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS questions_assigned_user_id_idx ON questions (assigned_user_id); + +CREATE INDEX IF NOT EXISTS questions_created_at_idx ON questions (created_at); diff --git a/server/migrations/003_question_pipeline.sql b/server/migrations/003_question_pipeline.sql new file mode 100644 index 0000000..4b74ad7 --- /dev/null +++ b/server/migrations/003_question_pipeline.sql @@ -0,0 +1,15 @@ +ALTER TABLE questions + ADD COLUMN IF NOT EXISTS source_tag TEXT, + ADD COLUMN IF NOT EXISTS pipeline_key TEXT, + ADD COLUMN IF NOT EXISTS pipeline_step TEXT; + +CREATE INDEX IF NOT EXISTS questions_source_tag_idx ON questions (source_tag) + WHERE source_tag IS NOT NULL; + +CREATE TABLE IF NOT EXISTS user_pipeline_state ( + assigned_user_id TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE, + pipeline_key TEXT NOT NULL, + step TEXT NOT NULL, + context JSONB NOT NULL DEFAULT '{}', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/server/pubspec.lock b/server/pubspec.lock index d03f3ab..e6ffe5d 100644 --- a/server/pubspec.lock +++ b/server/pubspec.lock @@ -57,6 +57,14 @@ packages: url: "https://pub.dev" source: hosted version: "4.2.0" + fixnum: + dependency: transitive + description: + name: fixnum + sha256: b6dc7065e46c974bc7c5f143080a6764ec7a4be6da1285ececdc37be96de53be + url: "https://pub.dev" + source: hosted + version: "1.1.1" http: dependency: "direct main" description: @@ -137,6 +145,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.1.4" + shelf_web_socket: + dependency: "direct main" + description: + name: shelf_web_socket + sha256: "3632775c8e90d6c9712f883e633716432a27758216dfb61bd86a8321c0580925" + url: "https://pub.dev" + source: hosted + version: "3.0.0" source_span: dependency: transitive description: @@ -185,6 +201,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.4.0" + uuid: + dependency: "direct main" + description: + name: uuid + sha256: "1fef9e8e11e2991bb773070d4656b7bd5d850967a2456cfc83cf47925ba79489" + url: "https://pub.dev" + source: hosted + version: "4.5.3" web: dependency: transitive description: @@ -193,5 +217,21 @@ packages: url: "https://pub.dev" source: hosted version: "1.1.1" + web_socket: + dependency: transitive + description: + name: web_socket + sha256: "34d64019aa8e36bf9842ac014bb5d2f5586ca73df5e4d9bf5c936975cae6982c" + url: "https://pub.dev" + source: hosted + version: "1.0.1" + web_socket_channel: + dependency: "direct main" + description: + name: web_socket_channel + sha256: d645757fb0f4773d602444000a8131ff5d48c9e47adfe9772652dd1a4f2d45c8 + url: "https://pub.dev" + source: hosted + version: "3.0.3" sdks: dart: ">=3.12.0 <4.0.0" diff --git a/server/pubspec.yaml b/server/pubspec.yaml index 3566f3f..a76a3ed 100644 --- a/server/pubspec.yaml +++ b/server/pubspec.yaml @@ -9,6 +9,9 @@ dependencies: shelf: ^1.4.2 shelf_router: ^1.1.4 shelf_cors_headers: ^0.1.5 + shelf_web_socket: ^3.0.0 postgres: ^3.5.6 dotenv: ^4.2.0 http: ^1.6.0 + uuid: ^4.5.3 + web_socket_channel: ^3.0.0 diff --git a/startup.sh b/startup.sh new file mode 100755 index 0000000..fc6f16c --- /dev/null +++ b/startup.sh @@ -0,0 +1,98 @@ +#!/usr/bin/env bash +# Start the API (port 3000) and Flutter web dev server (port 8080) with merged logs. +# Usage: ./startup.sh +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +API_PORT=3000 +WEB_PORT=8080 + +if [ -f "$ROOT/scripts/flutter-env.sh" ]; then + # shellcheck source=/dev/null + source "$ROOT/scripts/flutter-env.sh" +fi + +log() { printf '%s\n' "$*"; } + +kill_port() { + local port=$1 + if command -v fuser >/dev/null 2>&1; then + if fuser "${port}/tcp" >/dev/null 2>&1; then + log "Stopping process(es) on port ${port}..." + fuser -k "${port}/tcp" >/dev/null 2>&1 || true + sleep 0.5 + fi + return + fi + + if ! command -v lsof >/dev/null 2>&1; then + warn "Cannot free port ${port}: install fuser or lsof" + return + fi + + local pids + pids="$(lsof -t -iTCP:"${port}" -sTCP:LISTEN 2>/dev/null || true)" + if [ -n "$pids" ]; then + log "Stopping process(es) on port ${port}: ${pids}" + # shellcheck disable=SC2086 + kill ${pids} 2>/dev/null || true + sleep 0.5 + # shellcheck disable=SC2086 + kill -9 ${pids} 2>/dev/null || true + fi +} + +warn() { printf 'WARN: %s\n' "$*" >&2; } + +prefix_lines() { + local tag=$1 + while IFS= read -r line || [ -n "${line:-}" ]; do + printf '[%s] %s\n' "$tag" "$line" + done +} + +cleanup() { + trap - INT TERM EXIT + log "Shutting down..." + if [ -n "${API_PID:-}" ]; then kill "$API_PID" 2>/dev/null || true; fi + if [ -n "${WEB_PID:-}" ]; then kill "$WEB_PID" 2>/dev/null || true; fi + kill_port "$API_PORT" + kill_port "$WEB_PORT" + wait 2>/dev/null || true +} + +trap cleanup INT TERM EXIT + +log "Freeing ports ${API_PORT} (API) and ${WEB_PORT} (web)..." +kill_port "$API_PORT" +kill_port "$WEB_PORT" + +if [ ! -f "$ROOT/server/.env" ]; then + warn "Missing server/.env — copy server/.env.example and configure it first." +fi + +log "Starting API on http://localhost:${API_PORT} ..." +( + cd "$ROOT/server" + dart run bin/server.dart 2>&1 | prefix_lines api +) & +API_PID=$! + +log "Starting web app on http://localhost:${WEB_PORT} ..." +( + cd "$ROOT" + flutter run -d web-server \ + --dart-define=API_BASE_URL="http://localhost:${API_PORT}" \ + 2>&1 | prefix_lines web +) & +WEB_PID=$! + +log "Both services running. Press Ctrl+C to stop." +log " API: http://localhost:${API_PORT}" +log " Web: http://localhost:${WEB_PORT}" +log "" + +wait -n 2>/dev/null || wait || true +exit_code=$? +cleanup +exit "$exit_code"