Starting test driven development

This commit is contained in:
Nathan Anderson 2026-05-23 21:12:58 -05:00
parent 651779b1cb
commit 33bde0dc82
35 changed files with 3846 additions and 116 deletions

540
TRADING_DEVELOPMENT_PLAN.md Normal file
View File

@ -0,0 +1,540 @@
# Trading Development Plan — Alpaca Data & Execution
This document defines how Cyber Hybrid Hub will add **Alpaca market data gathering**, **Alpaca trade actuation**, and a **dynamically configurable data-input layer** that feeds the existing **interval worker** (`QuestionBackgroundWorker`), **Postgres** persistence, **user questions** (SignalR), and **automated trade decisions**.
It extends the current architecture documented in `server/README.md` and implemented in `server/lib/pipeline/question_pipeline.dart` — not replace it.
---
## 1. Goals & constraints
| Goal | Detail |
|------|--------|
| **Data** | Ingest US equity (and optional crypto) prices from Alpaca Market Data; store normalized snapshots in Postgres for the worker to read. |
| **Questions** | Worker evaluates stored data + pipeline config → enqueues swipe/numeric questions via `QuestionService` → Flutter receives `ReceiveQuestion` over SignalR. |
| **Trades** | After rules + optional user confirmation via answers, place orders through Alpaca Trading API (paper first). |
| **Configuration** | Watchlists, thresholds, and pipeline steps are **DB-driven** (and env defaults), not hard-coded in Dart. |
| **Security** | Alpaca secret keys live **only** on the Dart API server; Flutter never holds trading credentials. |
| **Cost** | Target Alpaca **Basic** market data (IEX, ~30 WS symbols) + **paper** trading until strategies are validated. |
**Non-goals (initial phases):** sub-second HFT, full SIP market data, client-side WebSockets to Alpaca, or multi-broker routing.
---
## 2. Fit with the current system
```mermaid
flowchart TB
subgraph flutter [Flutter app]
UI[HomeScreen / SwipeQuestionTile]
Hub[QuestionsHubService SignalR]
end
subgraph server [Dart API server]
API[Shelf handlers]
Worker[QuestionBackgroundWorker interval]
MDP[MarketDataPipeline NEW]
TP[TradingPipeline NEW]
QP[QuestionPipeline existing]
QS[QuestionService]
AlpacaMD[Alpaca Market Data client]
AlpacaTR[Alpaca Trading client]
end
subgraph pg [Postgres]
MD[market_data_snapshots]
CFG[trading_config / user_trading_state]
Q[questions]
UPS[user_pipeline_state]
ORD[trade_orders]
end
AlpacaMD -->|REST poll or WS| MDP
MDP --> MD
Worker --> MDP
Worker --> TP
Worker --> QP
TP --> MD
TP --> CFG
TP --> ORD
TP --> AlpacaTR
QP --> QS
QS --> Q
QS --> Hub
API --> QS
Hub --> UI
API --> UI
```
**Existing touchpoints to extend:**
| Component | Path | Role |
|-----------|------|------|
| Interval worker | `server/lib/workers/question_background_worker.dart` | Orchestrates tick; will call market-data ingest + trading evaluation before/alongside `QuestionPipeline.runMaintenanceCycle`. |
| Question pipeline | `server/lib/pipeline/question_pipeline.dart` | Add `PipelineKeys.trading` branch; reuse `BranchDecision` for threshold-style answers. |
| External fetcher pattern | `server/lib/pipeline/external_data_fetcher.dart` | Precedent for HTTP clients; Alpaca clients live in `server/lib/alpaca/`. |
| Env loading | `server/lib/env.dart` | Add Alpaca + trading feature flags. |
| Flutter | `lib/services/questions_hub_service.dart`, `lib/widgets/swipe_question_tile.dart` | Unchanged contract: questions in, numeric answers out. |
---
## 3. Alpaca accounts & APIs
Use **one Alpaca account** with two API surfaces:
| Surface | Base URL (paper) | Purpose |
|---------|------------------|---------|
| **Market Data** | `https://data.alpaca.markets` | Snapshots, bars, trades, quotes; WS `wss://stream.data.alpaca.markets/v2/iex` (Basic). |
| **Trading** | `https://paper-api.alpaca.markets` | Orders, positions, account; switch to live URL only after explicit env flip. |
**Authentication:** Headers `APCA-API-KEY-ID` and `APCA-API-SECRET-KEY` on every request.
**Free-tier limits to design around:**
- Market Data Basic: **IEX real-time**, **~30 WebSocket subscriptions**, historical REST capped (use DB as source of truth for worker ticks).
- Trading: paper account; rate limits documented by Alpaca; idempotent `client_order_id` required.
---
## 4. Dynamically configurable data input design
Configuration is split into **global templates** (reusable) and **per-user overrides** (Firebase UID). The worker always resolves effective config before reading market data or emitting questions.
### 4.1 Configuration layers
```text
trading_config_templates (optional defaults by name)
↓ merge
user_trading_config (per user: watchlist, rules, enabled flag)
↓ resolved at tick
EffectiveTradingConfig (in-memory for one worker cycle)
```
### 4.2 Config document shape (JSONB)
Store in Postgres as `JSONB`; validate on write via API or seed migrations.
```json
{
"version": 1,
"enabled": true,
"mode": "paper",
"data_inputs": [
{
"id": "primary_watchlist",
"source": "alpaca",
"asset_class": "us_equity",
"symbols": ["AAPL", "MSFT", "SPY"],
"feed": "iex",
"poll_interval_seconds": 60,
"metrics": ["last_trade", "daily_bar", "prev_close"]
}
],
"rules": [
{
"id": "dip_confirm",
"type": "price_below_pct_of_ref",
"symbol": "SPY",
"ref_metric": "prev_close",
"threshold_pct": -1.5,
"question_template": "SPY is down {{pct}}% vs yesterday. Swipe +10 to approve a small buy, -10 to skip.",
"on_answer_match": { "action": "propose_order", "side": "buy", "notional_usd": 10 }
}
],
"guardrails": {
"max_orders_per_day": 3,
"max_notional_usd_per_day": 100,
"require_question_before_order": true,
"symbols_blocklist": []
}
}
```
**Design principles:**
- **`data_inputs[]`** — what to fetch and how often; worker matches `poll_interval_seconds` against `QUESTION_WORKER_INTERVAL_SECONDS` or stores last fetch time per input id in `user_trading_state.context`.
- **`rules[]`** — declarative triggers evaluated against **Postgres snapshots**, not live Alpaca on every rule check (decouples question logic from API rate limits).
- **`question_template`** — supports `{{symbol}}`, `{{price}}`, `{{pct}}` substitution when creating `question_text`.
- **User answers** map to actions via existing `pipeline_key` / `pipeline_step` / `source_tag` conventions (e.g. `trading:dip_confirm:await_answer`).
### 4.3 Config administration
| Method | Phase |
|--------|-------|
| SQL seed / migration | Phase 1 — dev templates |
| `PUT /v1/me/trading/config` | Phase 2 — authenticated user override |
| Admin script | Optional bulk updates |
Flutter does **not** edit raw JSON initially; a settings screen can come later. Phase 1 uses server-side defaults keyed by `firebase_uid`.
---
## 5. Postgres schema (new migration `004_trading.sql`)
### 5.1 `market_data_snapshots`
Append-only (or upsert latest) normalized facts per symbol.
```sql
CREATE TABLE market_data_snapshots (
id BIGSERIAL PRIMARY KEY,
symbol TEXT NOT NULL,
asset_class TEXT NOT NULL DEFAULT 'us_equity',
feed TEXT NOT NULL DEFAULT 'iex',
metric TEXT NOT NULL, -- last_trade | daily_bar | prev_close
price NUMERIC,
volume NUMERIC,
as_of TIMESTAMPTZ NOT NULL,
raw JSONB, -- optional Alpaca payload fragment
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX market_data_snapshots_symbol_as_of_idx
ON market_data_snapshots (symbol, as_of DESC);
```
Worker queries: “latest row per `(symbol, metric)`” via `DISTINCT ON` or a materialized view `market_data_latest` (Phase 2 optimization).
### 5.2 `trading_config_templates`
```sql
CREATE TABLE trading_config_templates (
name TEXT PRIMARY KEY,
config JSONB NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
### 5.3 `user_trading_config`
```sql
CREATE TABLE user_trading_config (
firebase_uid TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE,
template_name TEXT REFERENCES trading_config_templates (name),
config JSONB NOT NULL DEFAULT '{}',
enabled BOOLEAN NOT NULL DEFAULT false,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
### 5.4 `user_trading_state`
Extends the pipeline state pattern in `user_pipeline_state` for trading-specific cursor.
```sql
CREATE TABLE user_trading_state (
firebase_uid TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE,
last_ingest_at TIMESTAMPTZ,
last_eval_at TIMESTAMPTZ,
context JSONB NOT NULL DEFAULT '{}',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
`context` holds: last fired rule ids, pending order proposal ids, daily order counts, last fetch times per `data_inputs[].id`.
### 5.5 `trade_orders`
Audit trail and idempotency.
```sql
CREATE TABLE trade_orders (
id UUID PRIMARY KEY,
firebase_uid TEXT NOT NULL REFERENCES users (firebase_uid) ON DELETE CASCADE,
client_order_id TEXT NOT NULL UNIQUE,
alpaca_order_id TEXT,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
order_type TEXT NOT NULL,
notional_usd NUMERIC,
qty NUMERIC,
status TEXT NOT NULL,
question_id UUID REFERENCES questions (id),
rule_id TEXT,
submitted_at TIMESTAMPTZ,
filled_at TIMESTAMPTZ,
raw JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
### 5.6 Questions table linkage
Reuse existing columns:
- `source_tag` — e.g. `trading:rule:dip_confirm`
- `pipeline_key``trading`
- `pipeline_step` — rule id + phase (`propose`, `confirm`, `idle`)
Optional Phase 2: `questions.metadata JSONB` for symbol/price at question time.
---
## 6. Server modules (proposed layout)
```text
server/lib/
alpaca/
alpaca_env.dart # keys, paper vs live, data host
alpaca_market_data_client.dart
alpaca_trading_client.dart
alpaca_models.dart
trading/
trading_config.dart # parse/merge JSONB → EffectiveTradingConfig
trading_config_db.dart
market_data_ingest.dart # data_inputs → snapshots
market_data_db.dart
rule_engine.dart # rules × snapshots → RuleEvaluation
trading_pipeline.dart # questions + order proposals
trade_orders_db.dart
pipeline/
question_pipeline.dart # wire trading branch in onAnswerSubmitted
workers/
question_background_worker.dart # invoke trading tick
```
---
## 7. Worker tick sequence (single interval)
On each `QuestionBackgroundWorker._tick()` when `TRADING_ENABLED=true`:
```text
1. For each user with user_trading_config.enabled:
a. Resolve EffectiveTradingConfig (template + user JSON merge).
b. MarketDataIngest.runIfDue(user, config.data_inputs)
→ Alpaca REST (or WS service) → INSERT market_data_snapshots
c. TradingPipeline.evaluate(user, config, snapshots from DB)
→ if rule fires and guardrails OK:
- maybe create question (respect maxQueuedQuestions)
- or stage pending order in user_trading_state.context
d. QuestionPipeline.runMaintenanceCycle() for same user (existing geography/weather/trading branches)
2. For users with pending order + confirmed answer (see §8):
TradeActuator.submitOrder() → Alpaca Trading → UPDATE trade_orders
```
**Concurrency:** keep `_running` guard; ingest and evaluate per user sequentially first; optimize later.
**Alignment with `QUESTION_WORKER_INTERVAL_SECONDS`:** default 60s matches Alpaca Basic REST polling for watchlists ≤30 symbols. A dedicated **long-lived WS ingest task** (optional Phase 3) writes snapshots on every tick and reduces REST calls.
---
## 8. Question ↔ trade decision flow
### 8.1 Phases per rule
| Phase | `pipeline_step` | Worker behavior |
|-------|-----------------|-----------------|
| `idle` | — | Evaluate rules against DB snapshots only. |
| `await_confirm` | rule id | Question delivered; `correct_answer` encodes expected confirmation (+10 / -10). |
| `submit_order` | rule id | User answered; if match and guardrails pass → POST Alpaca order. |
| `done` | rule id | Log outcome; return to `idle`. |
### 8.2 Branching (reuse `BranchDecision`)
| User swipe | Meaning | Trade action |
|------------|---------|--------------|
| `+10` (yes) | Approve proposed action | `TradeActuator` places configured order |
| `-10` (no) | Reject | No order; log skip in `user_trading_state.context` |
| near 0 | Ambiguous | Optional follow-up question (same pattern as weather `close`) |
### 8.3 Guardrails (always before Alpaca POST)
- `require_question_before_order` and unanswered question must be resolved.
- `max_orders_per_day` / `max_notional_usd_per_day` from config.
- `mode == paper` unless `ALPACA_ALLOW_LIVE=true`.
- Symbol in user watchlist and not in `symbols_blocklist`.
- Idempotent `client_order_id` = `uuid` or `{uid}-{rule_id}-{question_id}`.
---
## 9. Alpaca data gathering (implementation notes)
### 9.1 Phase 1 — REST polling (recommended start)
For each symbol in `data_inputs[].symbols`:
| Metric | Alpaca endpoint (v2) |
|--------|----------------------|
| `last_trade` | `GET /v2/stocks/{symbol}/trades/latest` |
| `daily_bar` | `GET /v2/stocks/bars?symbols=...&timeframe=1Day&limit=1` |
| `prev_close` | Derived from previous daily bar or snapshot |
Batch symbols where possible (`bars` multi-symbol) to minimize calls.
### 9.2 Phase 2 — WebSocket ingest service
- Connect to `wss://stream.data.alpaca.markets/v2/iex`.
- Subscribe to union of all enabled users symbols (cap 30 on Basic — enforce in config validator).
- On trade/quote events → upsert `market_data_snapshots` with `as_of = now()`.
- Worker reads DB only; WS process runs independently inside server isolate or secondary entrypoint.
### 9.3 Normalization
All ingested values stored as `NUMERIC` in USD where applicable; `raw` JSONB for debugging. Rule engine never parses Alpaca JSON directly — only snapshot rows.
---
## 10. Alpaca trade actuation (implementation notes)
### 10.1 Order types (initial)
| Config | Alpaca order |
|--------|--------------|
| `notional_usd` | `POST /v2/orders` with `notional`, `side`, `type: market`, `time_in_force: day` |
| Future: qty | `qty` instead of `notional` |
### 10.2 Paper vs live
| Env var | Default |
|---------|---------|
| `ALPACA_TRADING_BASE_URL` | `https://paper-api.alpaca.markets` |
| `ALPACA_ALLOW_LIVE` | `false` |
Server refuses live URL unless `ALPACA_ALLOW_LIVE=true` and `TRADING_MODE=live` in user config.
### 10.3 Post-submit
- Poll order status or use Alpaca trade updates (Phase 3).
- Update `trade_orders.status` and notify user via optional future SignalR event `ReceiveTradeUpdate` (not required for MVP).
---
## 11. Environment variables
Add to `server/.env.example` (document only in plan until implemented):
```bash
# Trading feature gate
TRADING_ENABLED=false
TRADING_WORKER_INGEST_ENABLED=true
TRADING_WORKER_EVAL_ENABLED=true
# Alpaca (server only — never in Flutter)
ALPACA_API_KEY_ID=
ALPACA_API_SECRET_KEY=
ALPACA_TRADING_BASE_URL=https://paper-api.alpaca.markets
ALPACA_DATA_BASE_URL=https://data.alpaca.markets
ALPACA_DATA_FEED=iex
ALPACA_ALLOW_LIVE=false
```
Existing worker flags remain:
- `QUESTION_WORKER_ENABLED` — master switch for interval timer.
- `QUESTION_WORKER_INTERVAL_SECONDS` — drives ingest due-times and evaluation frequency.
- `QUESTION_PIPELINE_TEST_MODE` — when true, skip real Alpaca calls; use fixture snapshots and fake order ids.
---
## 12. Flutter app responsibilities
| Responsibility | Owner |
|----------------|-------|
| Display questions, collect swipe/numeric answer | Flutter (`SwipeQuestionTile`) |
| Submit answer `POST /v1/me/questions/{id}/answer` | Flutter |
| Real-time question delivery | SignalR `ReceiveQuestion` |
| Alpaca credentials | **Never on device** |
| Market data storage | Postgres via server |
| Trade execution | Server only |
Optional later: read-only `GET /v1/me/trading/status` for positions and recent `trade_orders` (no secrets).
---
## 13. HTTP API additions (Phase 2+)
| Method | Path | Purpose |
|--------|------|---------|
| `GET` | `/v1/me/trading/config` | Return merged effective config (sanitized). |
| `PUT` | `/v1/me/trading/config` | Update user JSONB override. |
| `GET` | `/v1/me/trading/orders` | List `trade_orders` for user. |
| `GET` | `/v1/me/trading/snapshots?symbol=SPY` | Debug/latest metrics (auth required). |
Phase 1 can omit public endpoints and use SQL seeds for test users.
---
## 14. Implementation phases
### Phase 1 — Foundation (MVP)
- [ ] Migration `004_trading.sql`
- [ ] `AlpacaMarketDataClient` + `AlpacaTradingClient` (paper, REST only)
- [ ] `MarketDataIngest` + `market_data_snapshots` writes
- [ ] `TradingConfig` resolver + seed template `default_paper_watchlist`
- [ ] `RuleEngine` with one rule type: `price_below_pct_of_ref`
- [ ] `TradingPipeline.evaluate` → create question via `QuestionService`
- [ ] Extend `QuestionPipeline.onAnswerSubmitted` for `PipelineKeys.trading`
- [ ] `TradeActuator` on confirm → `trade_orders` + Alpaca POST
- [ ] Wire into `QuestionBackgroundWorker` tick behind `TRADING_ENABLED`
- [ ] `QUESTION_PIPELINE_TEST_MODE` fixtures for CI/local without Alpaca keys
### Phase 2 — Configuration API & hardening
- [ ] `PUT/GET /v1/me/trading/config`
- [ ] Config validation (symbol cap 30, required fields)
- [ ] `market_data_latest` view or cached latest query
- [ ] Daily guardrail counters in `user_trading_state.context`
- [ ] Order status polling + failed order user question
### Phase 3 — Streaming & observability
- [ ] Alpaca IEX WebSocket ingest sidecar
- [ ] `ReceiveTradeUpdate` SignalR event (optional)
- [ ] Metrics logging (ingest lag, rule fires, order latency)
---
## 15. Testing strategy
| Layer | Approach |
|-------|----------|
| Unit | `RuleEngine` with fixture snapshot rows; no network. |
| Integration | Alpaca paper account; verify order appears in Alpaca dashboard. |
| Worker | Run server with `QUESTION_PIPELINE_TEST_MODE=true` and `TRADING_ENABLED=true` using seeded snapshots. |
| Flutter | Existing question flow; manual swipe on trading copy. |
**Safety checklist before live:**
- [ ] `ALPACA_ALLOW_LIVE=false` in production until explicit review
- [ ] Guardrail notional limits enforced in code, not only config
- [ ] Every order traceable to `question_id` + `rule_id`
---
## 16. Risks & mitigations
| Risk | Mitigation |
|------|------------|
| Stale DB snapshots | Store `as_of`; rules ignore data older than `max_staleness_seconds` in config. |
| Alpaca rate limits | Batch REST; cap symbols; WS in Phase 3. |
| Duplicate orders | `client_order_id` UNIQUE; check `trade_orders` before POST. |
| User confusion on auto-trades | Clear `question_text`; default `require_question_before_order: true`. |
| Regulatory / suitability | Paper only initially; disclaimers in app; no investment advice in question copy. |
---
## 17. Example end-to-end scenario
1. **Seed** `user_trading_config` for user U: enabled, `SPY` in watchlist, `dip_confirm` rule at -1.5% vs `prev_close`.
2. **Worker tick (T0):** ingest writes `prev_close=500`, `last_trade=492` → rule fires.
3. **Worker:** queue question: “SPY is down 1.6% … +10 approve buy $10, -10 skip” with `pipeline_key=trading`, `pipeline_step=dip_confirm:await_confirm`.
4. **Flutter:** user swipes +10 → `POST .../answer`.
5. **Pipeline:** `onAnswerSubmitted``TradingPipeline` sees match → `TradeActuator` POST paper market order $10 SPY → row in `trade_orders`.
6. **Next tick:** rule not re-fired (cooldown in `user_trading_state.context`).
---
## 18. References
- [Alpaca Market Data API](https://docs.alpaca.markets/docs/market-data-api)
- [Alpaca Trading API](https://docs.alpaca.markets/docs/trading-api)
- [Alpaca WebSocket streaming](https://docs.alpaca.markets/docs/streaming-market-data)
- Existing pipeline: `server/lib/pipeline/question_pipeline.dart`
- Existing worker: `server/lib/workers/question_background_worker.dart`
---
*Document version: 1.0 — Alpaca unified data + execution, Postgres-backed configurable inputs, interval worker integration.*

View File

@ -6,3 +6,6 @@ const String apiBaseUrl = String.fromEnvironment(
'API_BASE_URL', 'API_BASE_URL',
defaultValue: 'http://localhost:3000', defaultValue: 'http://localhost:3000',
); );
/// SignalR hub for real-time incoming questions.
String get questionsHubUrl => '$apiBaseUrl/hubs/questions';

View File

@ -0,0 +1,236 @@
import 'dart:math' as math;
import 'dart:typed_data';
import 'package:flutter/material.dart';
/// Deterministic irregular polygon + palette derived from a question UUID.
///
/// Parses the canonical 16-byte UUID (hyphens optional) and maps byte pairs to
/// polar vertices so the same id always produces the same glyph.
class GuidGlyphShape {
GuidGlyphShape._(this.bytes);
final Uint8List bytes;
/// Builds a shape from [guid]. Non-UUID strings fall back to a stable hash.
factory GuidGlyphShape.fromGuid(String guid) {
final Uint8List? parsed = tryParseUuidBytes(guid);
if (parsed != null) {
return GuidGlyphShape._(parsed);
}
return GuidGlyphShape._(_hashTo16Bytes(guid));
}
/// Vertex count in [5, 10], from the first UUID byte.
int get vertexCount => 5 + (bytes[0] % 6);
/// Unit-circle offsets (rough polygon); multiply by size when painting.
List<Offset> unitVertices() {
final int n = vertexCount;
final List<Offset> points = <Offset>[];
double angle = (bytes[1] / 255) * math.pi * 2;
for (int i = 0; i < n; i++) {
final int rIndex = (i * 2) % 16;
final int aIndex = (i * 2 + 1) % 16;
final double radius = 0.42 + (bytes[rIndex] / 255) * 0.52;
angle += (math.pi * 2 / n) + ((bytes[aIndex] / 255) - 0.5) * 0.95;
points.add(
Offset(math.cos(angle) * radius, math.sin(angle) * radius),
);
}
return points;
}
/// Fill color: warm accent band so glyphs stay on-brand but distinct.
Color fillColor() {
final double hue = 8 + ((bytes[2] << 8 | bytes[3]) / 65535) * 42;
return HSLColor.fromAHSL(1, hue, 0.72, 0.52).toColor();
}
Color strokeColor() => HSLColor.fromColor(fillColor()).withLightness(0.38).toColor();
/// Normalized slider position in [-1, 1] for [displayValue] in [-10, 10].
static double displayT(num displayValue) =>
(displayValue / 10).clamp(-1.0, 1.0).toDouble();
/// Display-only vertex warp; same [guid] + [displayValue] always matches.
List<Offset> displayUnitVertices(num displayValue) {
final double t = displayT(displayValue);
final List<Offset> base = unitVertices();
final int n = base.length;
final double spinSign = (bytes[9] & 1) == 0 ? 1.0 : -1.0;
final double spin = t *
math.pi *
(0.28 + (bytes[4] / 255) * 0.22) *
spinSign;
final double cosR = math.cos(spin);
final double sinR = math.sin(spin);
final double stretchY = 1 + t * (0.38 + (bytes[5] / 255) * 0.2);
final double stretchX = 1 - t * 0.14 * (bytes[6] / 255);
final List<Offset> out = <Offset>[];
for (int i = 0; i < n; i++) {
final Offset p = base[i];
final double bulge =
1 + t * ((bytes[(i * 3 + 7) % 16] / 255) - 0.5) * 0.55;
final double x = p.dx * stretchX * bulge;
final double y = p.dy * stretchY * bulge;
out.add(Offset(x * cosR - y * sinR, x * sinR + y * cosR));
}
return out;
}
/// Display-only fill; base identity color shifts with slider value.
Color displayFillColor(num displayValue) {
final double t = displayT(displayValue);
final HSLColor hsl = HSLColor.fromColor(fillColor());
final double hueShift = t * (12 + (bytes[7] % 20));
final double lightness = (hsl.lightness + t * 0.12).clamp(0.35, 0.68);
final double saturation =
(hsl.saturation + t.abs() * 0.1).clamp(0.5, 0.85);
return hsl
.withHue((hsl.hue + hueShift) % 360)
.withLightness(lightness)
.withSaturation(saturation)
.toColor();
}
Color displayStrokeColor(num displayValue) =>
HSLColor.fromColor(displayFillColor(displayValue))
.withLightness(0.38)
.toColor();
double displayStrokeWidth(num displayValue) {
final double t = displayT(displayValue);
return (2.5 + t.abs() * 1.2 + (bytes[8] / 255) * 0.4).clamp(2.0, 4.0);
}
static Uint8List? tryParseUuidBytes(String guid) {
final String hex = guid.replaceAll('-', '').trim().toLowerCase();
if (hex.length != 32 || !RegExp(r'^[0-9a-f]{32}$').hasMatch(hex)) {
return null;
}
final Uint8List out = Uint8List(16);
for (int i = 0; i < 16; i++) {
out[i] = int.parse(hex.substring(i * 2, i * 2 + 2), radix: 16);
}
return out;
}
static Uint8List _hashTo16Bytes(String input) {
var h1 = 0x811c9dc5;
var h2 = 0x01000193;
for (final int codeUnit in input.codeUnits) {
h1 = (h1 ^ codeUnit) * 0x01000193;
h2 = (h2 + codeUnit) * 0x85ebca6b;
}
final Uint8List out = Uint8List(16);
for (int i = 0; i < 16; i++) {
final int mix = (h1 >> (i % 16)) ^ (h2 >> ((i + 3) % 16)) ^ (i * 31);
out[i] = mix & 0xff;
}
return out;
}
}
/// Painted glyph for a question id (replaces the fixed red dot).
class QuestionGuidGlyph extends StatelessWidget {
const QuestionGuidGlyph({
super.key,
required this.guid,
this.size = 80,
this.displayValue = 0,
});
final String guid;
final double size;
/// Slider value in [-10, 10]; warps the painted glyph for display only.
final num displayValue;
@override
Widget build(BuildContext context) {
final GuidGlyphShape shape = GuidGlyphShape.fromGuid(guid);
final Color glow = shape.displayFillColor(displayValue);
return SizedBox(
width: size,
height: size,
child: DecoratedBox(
decoration: BoxDecoration(
shape: BoxShape.circle,
boxShadow: <BoxShadow>[
BoxShadow(
color: glow.withValues(
alpha: 0.32 + GuidGlyphShape.displayT(displayValue).abs() * 0.18,
),
blurRadius: 16 + GuidGlyphShape.displayT(displayValue).abs() * 6,
spreadRadius: 2,
),
],
),
child: CustomPaint(
painter: _GuidGlyphPainter(
shape: shape,
displayValue: displayValue,
),
),
),
);
}
}
class _GuidGlyphPainter extends CustomPainter {
_GuidGlyphPainter({
required this.shape,
required this.displayValue,
});
final GuidGlyphShape shape;
final num displayValue;
@override
void paint(Canvas canvas, Size size) {
final Offset center = Offset(size.width / 2, size.height / 2);
final double scale = size.shortestSide * 0.42;
final List<Offset> unit = shape.displayUnitVertices(displayValue);
final Path path = Path();
for (int i = 0; i < unit.length; i++) {
final Offset p = center + Offset(unit[i].dx * scale, unit[i].dy * scale);
if (i == 0) {
path.moveTo(p.dx, p.dy);
} else {
path.lineTo(p.dx, p.dy);
}
}
path.close();
final Color fill = shape.displayFillColor(displayValue);
final Color stroke = shape.displayStrokeColor(displayValue);
final double strokeWidth = shape.displayStrokeWidth(displayValue);
canvas.drawPath(
path,
Paint()
..color = fill
..style = PaintingStyle.fill,
);
canvas.drawPath(
path,
Paint()
..color = stroke
..style = PaintingStyle.stroke
..strokeWidth = strokeWidth
..strokeJoin = StrokeJoin.round,
);
}
@override
bool shouldRepaint(covariant _GuidGlyphPainter oldDelegate) =>
oldDelegate.shape.bytes != shape.bytes ||
oldDelegate.displayValue != displayValue;
}

View File

@ -21,6 +21,7 @@ class CyberHybridHubApp extends StatelessWidget {
Widget build(BuildContext context) { Widget build(BuildContext context) {
return MaterialApp( return MaterialApp(
title: 'Cyber Hybrid Hub', title: 'Cyber Hybrid Hub',
debugShowCheckedModeBanner: false,
theme: buildAppTheme(), theme: buildAppTheme(),
home: const AuthGate(), home: const AuthGate(),
); );

View File

@ -0,0 +1,25 @@
/// Question pushed from the API over the SignalR questions hub.
class IncomingQuestion {
const IncomingQuestion({
required this.id,
required this.text,
required this.sentAt,
this.unansweredCount = 1,
});
final String id;
final String text;
final DateTime sentAt;
/// Unanswered questions in the database for this user (from API).
final int unansweredCount;
factory IncomingQuestion.fromJson(Map<String, dynamic> json) {
return IncomingQuestion(
id: json['id'] as String,
text: json['text'] as String,
sentAt: DateTime.parse(json['sentAt'] as String).toUtc(),
unansweredCount: (json['unansweredCount'] as num?)?.toInt() ?? 1,
);
}
}

View File

@ -1,11 +1,14 @@
import 'package:flutter/material.dart'; import 'package:flutter/material.dart';
import '../models/app_user.dart'; import '../models/app_user.dart';
import '../models/incoming_question.dart';
import '../models/sync_result.dart'; import '../models/sync_result.dart';
import '../models/user_profile.dart'; import '../models/user_profile.dart';
import '../repositories/user_profile_repository.dart'; import '../repositories/user_profile_repository.dart';
import '../services/auth_service.dart'; import '../services/auth_service.dart';
import '../services/questions_hub_service.dart';
import '../theme/app_theme.dart'; import '../theme/app_theme.dart';
import '../widgets/swipe_question_tile.dart';
class HomeScreen extends StatelessWidget { class HomeScreen extends StatelessWidget {
const HomeScreen({ const HomeScreen({
@ -25,16 +28,45 @@ class HomeScreen extends StatelessWidget {
final String displayName = final String displayName =
profile?.displayName ?? user.displayName ?? 'there'; profile?.displayName ?? user.displayName ?? 'there';
final String? email = profile?.email ?? user.email; final String? email = profile?.email ?? user.email;
final Color syncIconColor = switch (syncStatus) {
ProfileSyncStatus.syncing => Colors.white,
ProfileSyncStatus.synced => AppColors.success,
ProfileSyncStatus.error => Colors.redAccent,
ProfileSyncStatus.offline => Colors.orange,
ProfileSyncStatus.idle => AppColors.textSecondary,
};
return Scaffold( return Scaffold(
appBar: AppBar( appBar: AppBar(
backgroundColor: Colors.transparent, backgroundColor: Colors.transparent,
title: const Text('Cyber Hybrid Hub'), centerTitle: true,
title: ListenableBuilder(
listenable: Listenable.merge(<Listenable>[
QuestionsHubService.instance.hasPendingQuestion,
QuestionsHubService.instance.pendingQuestion,
QuestionsHubService.instance.pendingQuestionCount,
]),
builder: (BuildContext context, Widget? child) {
final int count =
QuestionsHubService.instance.pendingQuestionCount.value;
final bool hasPending =
QuestionsHubService.instance.hasPendingQuestion.value;
if (!hasPending || count < 1) {
return const SizedBox.shrink();
}
final int displayCount = count;
return _QuestionEnvelopeButton(
count: displayCount,
onPressed: () =>
QuestionsHubService.instance.openQuestionPanel(),
);
},
),
actions: <Widget>[ actions: <Widget>[
IconButton( IconButton(
onPressed: () => UserProfileRepository.instance.sync(), onPressed: () => UserProfileRepository.instance.sync(),
tooltip: 'Sync profile', tooltip: 'Sync profile',
icon: const Icon(Icons.sync), icon: Icon(Icons.sync, color: syncIconColor),
), ),
IconButton( IconButton(
onPressed: () => AuthService.instance.signOut(), onPressed: () => AuthService.instance.signOut(),
@ -52,102 +84,166 @@ class HomeScreen extends StatelessWidget {
), ),
), ),
child: SafeArea( child: SafeArea(
child: Padding( child: ListenableBuilder(
padding: const EdgeInsets.all(24), listenable: Listenable.merge(<Listenable>[
child: Column( QuestionsHubService.instance.questionPanelOpen,
crossAxisAlignment: CrossAxisAlignment.stretch, QuestionsHubService.instance.hasPendingQuestion,
children: <Widget>[ QuestionsHubService.instance.questionQueue,
Container( QuestionsHubService.instance.pendingQuestion,
padding: const EdgeInsets.all(24), QuestionsHubService.instance.questionActionBusy,
decoration: BoxDecoration( QuestionsHubService.instance.pendingQuestionCount,
color: AppColors.surfaceElevated, ]),
borderRadius: BorderRadius.circular(16), builder: (BuildContext context, Widget? child) {
border: Border.all( final QuestionsHubService hub = QuestionsHubService.instance;
color: AppColors.accent.withValues(alpha: 0.2), final bool panelOpen = hub.questionPanelOpen.value;
), final bool hasPending = hub.hasPendingQuestion.value;
), final IncomingQuestion? question = hub.currentQuestion;
child: Column( final bool showQuestionPanel =
children: <Widget>[ panelOpen && hasPending && question != null;
if (photoUrl != null)
CircleAvatar( return Column(
radius: 36, crossAxisAlignment: CrossAxisAlignment.stretch,
backgroundImage: NetworkImage(photoUrl), children: <Widget>[
) if (showQuestionPanel)
else Expanded(
const CircleAvatar( child: Padding(
radius: 36, padding: const EdgeInsets.fromLTRB(8, 4, 8, 8),
child: Icon(Icons.person, size: 36), child: Column(
), crossAxisAlignment: CrossAxisAlignment.stretch,
const SizedBox(height: 16), children: <Widget>[
Text( Row(
'Welcome, $displayName', children: <Widget>[
style: Theme.of(context).textTheme.headlineMedium, const Spacer(),
textAlign: TextAlign.center, IconButton(
), onPressed: hub.closeQuestionPanel,
if (email != null) ...<Widget>[ tooltip: 'Close',
const SizedBox(height: 8), icon: const Icon(Icons.close),
Text( ),
email, ],
style: Theme.of(context).textTheme.bodyLarge,
textAlign: TextAlign.center,
),
],
const SizedBox(height: 12),
_SyncStatusChip(status: syncStatus),
const SizedBox(height: 20),
const Row(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
Icon(
Icons.check_circle,
color: AppColors.success,
size: 20,
),
SizedBox(width: 8),
Text(
'You\'re signed in',
style: TextStyle(
color: AppColors.success,
fontWeight: FontWeight.w600,
), ),
), const SizedBox(height: 4),
], Expanded(
), child: SwipeQuestionTile(
], key: ValueKey<String>(question.id),
), questionId: question.id,
), busy: hub.questionActionBusy.value,
const SizedBox(height: 24), onSwipeRight: (num answer) =>
Text( hub.submitCurrentAnswer(answer: answer),
profile?.dirty == true onSwipeLeft: hub.deferCurrentQuestion,
? 'Profile saved locally. Will sync when online.'
: 'Profile synced with your account.',
style: Theme.of(context).textTheme.bodyLarge,
textAlign: TextAlign.center,
),
if (UserProfileRepository.instance.usesLocalStore) ...<Widget>[
const SizedBox(height: 16),
OutlinedButton.icon(
onPressed: profile == null
? null
: () async {
final UserProfile current = profile!;
await UserProfileRepository.instance.updateProfile(
current.copyWith(
onboardingCompleted:
!current.onboardingCompleted,
), ),
); ),
}, ],
icon: const Icon(Icons.toggle_on_outlined), ),
label: Text( ),
profile?.onboardingCompleted == true ),
? 'Mark onboarding incomplete' if (!showQuestionPanel)
: 'Mark onboarding complete', Expanded(
child: Padding(
padding: const EdgeInsets.all(24),
child: Column(
crossAxisAlignment: CrossAxisAlignment.stretch,
children: <Widget>[
Container(
padding: const EdgeInsets.all(24),
decoration: BoxDecoration(
color: AppColors.surfaceElevated,
borderRadius: BorderRadius.circular(16),
border: Border.all(
color: AppColors.accent.withValues(alpha: 0.2),
),
),
child: Column(
children: <Widget>[
if (photoUrl != null)
CircleAvatar(
radius: 36,
backgroundImage: NetworkImage(photoUrl),
)
else
const CircleAvatar(
radius: 36,
child: Icon(Icons.person, size: 36),
),
const SizedBox(height: 16),
Text(
'Welcome, $displayName',
style: Theme.of(context)
.textTheme
.headlineMedium,
textAlign: TextAlign.center,
),
if (email != null) ...<Widget>[
const SizedBox(height: 8),
Text(
email,
style:
Theme.of(context).textTheme.bodyLarge,
textAlign: TextAlign.center,
),
],
const SizedBox(height: 12),
_SyncStatusChip(status: syncStatus),
const SizedBox(height: 20),
const Row(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
Icon(
Icons.check_circle,
color: AppColors.success,
size: 20,
),
SizedBox(width: 8),
Text(
'You\'re signed in',
style: TextStyle(
color: AppColors.success,
fontWeight: FontWeight.w600,
),
),
],
),
],
),
),
const SizedBox(height: 24),
Text(
profile?.dirty == true
? 'Profile saved locally. Will sync when online.'
: 'Profile synced with your account.',
style: Theme.of(context).textTheme.bodyLarge,
textAlign: TextAlign.center,
),
if (UserProfileRepository.instance.usesLocalStore)
...<Widget>[
const SizedBox(height: 16),
OutlinedButton.icon(
onPressed: profile == null
? null
: () async {
final UserProfile current = profile!;
await UserProfileRepository.instance
.updateProfile(
current.copyWith(
onboardingCompleted: !current
.onboardingCompleted,
),
);
},
icon: const Icon(Icons.toggle_on_outlined),
label: Text(
profile?.onboardingCompleted == true
? 'Mark onboarding incomplete'
: 'Mark onboarding complete',
),
),
],
],
),
),
), ),
),
], ],
], );
), },
), ),
), ),
), ),
@ -155,6 +251,62 @@ class HomeScreen extends StatelessWidget {
} }
} }
class _QuestionEnvelopeButton extends StatelessWidget {
const _QuestionEnvelopeButton({
required this.count,
required this.onPressed,
});
final int count;
final VoidCallback onPressed;
@override
Widget build(BuildContext context) {
return Stack(
clipBehavior: Clip.none,
alignment: Alignment.center,
children: <Widget>[
IconButton(
onPressed: onPressed,
tooltip: count > 1 ? '$count questions' : 'New question',
icon: const Icon(Icons.mail_outline, size: 22),
style: IconButton.styleFrom(
backgroundColor: AppColors.accent.withValues(alpha: 0.15),
foregroundColor: AppColors.accent,
padding: const EdgeInsets.all(8),
minimumSize: const Size(36, 36),
tapTargetSize: MaterialTapTargetSize.shrinkWrap,
),
),
if (count >= 2)
Positioned(
top: 4,
right: 0,
child: Container(
padding: const EdgeInsets.symmetric(horizontal: 5, vertical: 2),
decoration: BoxDecoration(
color: AppColors.accent,
borderRadius: BorderRadius.circular(10),
border: Border.all(color: AppColors.surface, width: 1.5),
),
constraints: const BoxConstraints(minWidth: 18, minHeight: 18),
child: Text(
'$count',
textAlign: TextAlign.center,
style: const TextStyle(
color: AppColors.background,
fontSize: 11,
fontWeight: FontWeight.w700,
height: 1.1,
),
),
),
),
],
);
}
}
class _SyncStatusChip extends StatelessWidget { class _SyncStatusChip extends StatelessWidget {
const _SyncStatusChip({required this.status}); const _SyncStatusChip({required this.status});

View File

@ -0,0 +1,127 @@
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:http/http.dart' as http;
import '../config/api_config.dart';
import '../models/incoming_question.dart';
import 'auth_service.dart';
/// HTTP client for question queue, answers, and deferrals.
class QuestionsApiService {
QuestionsApiService({http.Client? client}) : _client = client ?? http.Client();
final http.Client _client;
/// Ensures a starter question exists for the signed-in user (login only).
Future<IncomingQuestion?> bootstrapOnLogin() async {
final String? token = await AuthService.instance.getIdToken();
if (token == null) {
return null;
}
final http.Response response = await _client.post(
Uri.parse('$apiBaseUrl/v1/me/questions/bootstrap'),
headers: _authHeaders(token),
);
if (response.statusCode != 200) {
debugPrint(
'bootstrapOnLogin failed: ${response.statusCode} ${response.body}',
);
return null;
}
final Map<String, dynamic> body =
jsonDecode(response.body) as Map<String, dynamic>;
final Map<String, dynamic>? questionJson =
body['question'] as Map<String, dynamic>?;
if (questionJson == null) {
return null;
}
return IncomingQuestion.fromJson(questionJson);
}
Future<List<IncomingQuestion>> fetchUnanswered() async {
final String? token = await AuthService.instance.getIdToken();
if (token == null) {
return <IncomingQuestion>[];
}
final http.Response response = await _client.get(
Uri.parse('$apiBaseUrl/v1/me/questions'),
headers: _authHeaders(token),
);
if (response.statusCode != 200) {
debugPrint(
'fetchUnanswered failed: ${response.statusCode} ${response.body}',
);
return <IncomingQuestion>[];
}
final Map<String, dynamic> body =
jsonDecode(response.body) as Map<String, dynamic>;
final List<dynamic> raw = body['questions'] as List<dynamic>? ?? <dynamic>[];
return raw
.map(
(dynamic item) =>
IncomingQuestion.fromJson(item as Map<String, dynamic>),
)
.toList();
}
Future<int?> submitAnswer({
required String questionId,
num answer = 0,
}) async {
final String? token = await AuthService.instance.getIdToken();
if (token == null) {
return null;
}
final http.Response response = await _client.post(
Uri.parse('$apiBaseUrl/v1/me/questions/$questionId/answer'),
headers: _authHeaders(token),
body: jsonEncode(<String, num>{'answer': answer}),
);
if (response.statusCode != 200) {
debugPrint(
'submitAnswer failed: ${response.statusCode} ${response.body}',
);
return null;
}
final Map<String, dynamic> body =
jsonDecode(response.body) as Map<String, dynamic>;
return (body['unansweredCount'] as num?)?.toInt();
}
Future<int?> deferQuestion({required String questionId}) async {
final String? token = await AuthService.instance.getIdToken();
if (token == null) {
return null;
}
final http.Response response = await _client.post(
Uri.parse('$apiBaseUrl/v1/me/questions/$questionId/defer'),
headers: _authHeaders(token),
);
if (response.statusCode != 200) {
debugPrint(
'deferQuestion failed: ${response.statusCode} ${response.body}',
);
return null;
}
final Map<String, dynamic> body =
jsonDecode(response.body) as Map<String, dynamic>;
return (body['unansweredCount'] as num?)?.toInt();
}
Map<String, String> _authHeaders(String token) {
return <String, String>{
'Authorization': 'Bearer $token',
'Content-Type': 'application/json',
'Accept': 'application/json',
};
}
}

View File

@ -0,0 +1,256 @@
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:signalr_netcore/signalr_client.dart';
import '../config/api_config.dart';
import '../models/incoming_question.dart';
import 'auth_service.dart';
import 'questions_api_service.dart';
/// Maintains a SignalR connection to receive incoming questions from the API.
class QuestionsHubService {
QuestionsHubService._();
static final QuestionsHubService instance = QuestionsHubService._();
final QuestionsApiService _api = QuestionsApiService();
final ValueNotifier<bool> hasPendingQuestion = ValueNotifier<bool>(false);
final ValueNotifier<IncomingQuestion?> pendingQuestion =
ValueNotifier<IncomingQuestion?>(null);
final ValueNotifier<int> pendingQuestionCount = ValueNotifier<int>(0);
final ValueNotifier<bool> questionPanelOpen = ValueNotifier<bool>(false);
final ValueNotifier<List<IncomingQuestion>> questionQueue =
ValueNotifier<List<IncomingQuestion>>(<IncomingQuestion>[]);
final ValueNotifier<bool> questionActionBusy = ValueNotifier<bool>(false);
HubConnection? _connection;
bool _connecting = false;
IncomingQuestion? get currentQuestion {
final List<IncomingQuestion> queue = questionQueue.value;
if (queue.isNotEmpty) {
return queue.first;
}
return pendingQuestion.value;
}
/// Login hook: create starter question if needed, then open SignalR.
Future<void> onLogin() async {
final IncomingQuestion? bootstrapped = await _api.bootstrapOnLogin();
if (bootstrapped != null) {
_applyIncoming(bootstrapped);
}
await connect();
}
Future<void> connect() async {
if (_connecting) {
return;
}
if (_connection?.state == HubConnectionState.Connected) {
return;
}
final String? token = await AuthService.instance.getIdToken();
if (token == null) {
return;
}
_connecting = true;
try {
await disconnect();
final HubConnection connection = HubConnectionBuilder()
.withUrl(
questionsHubUrl,
options: HttpConnectionOptions(
accessTokenFactory: () async =>
await AuthService.instance.getIdToken() ?? '',
requestTimeout: 30000,
),
)
.withAutomaticReconnect(
retryDelays: <int>[0, 2000, 5000, 10000],
)
.build();
connection.on('ReceiveQuestion', _onReceiveQuestion);
_connection = connection;
await connection.start();
} catch (e, st) {
debugPrint('Questions hub connect failed: $e\n$st');
await disconnect();
} finally {
_connecting = false;
}
}
void _onReceiveQuestion(List<Object?>? arguments) {
if (arguments == null || arguments.isEmpty) {
return;
}
final Object? raw = arguments.first;
final Map<String, dynamic> json;
if (raw is Map<String, dynamic>) {
json = raw;
} else if (raw is Map) {
json = Map<String, dynamic>.from(raw);
} else {
debugPrint('ReceiveQuestion: unexpected payload type ${raw.runtimeType}');
return;
}
final IncomingQuestion question = IncomingQuestion.fromJson(json);
_applyIncoming(question);
}
void _applyIncoming(IncomingQuestion question) {
final int count =
question.unansweredCount < 1 ? 1 : question.unansweredCount;
pendingQuestion.value = question;
pendingQuestionCount.value = count;
hasPendingQuestion.value = count >= 1;
if (questionPanelOpen.value) {
final List<IncomingQuestion> queue = List<IncomingQuestion>.from(
questionQueue.value,
);
if (!queue.any((IncomingQuestion q) => q.id == question.id)) {
queue.add(question);
questionQueue.value = queue;
}
}
}
/// Opens the inline question panel and loads the full pending queue.
Future<void> openQuestionPanel() async {
questionActionBusy.value = true;
try {
final List<IncomingQuestion> fetched = await _api.fetchUnanswered();
if (fetched.isNotEmpty) {
questionQueue.value = fetched;
pendingQuestion.value = fetched.first;
pendingQuestionCount.value = fetched.length;
hasPendingQuestion.value = true;
} else if (pendingQuestion.value != null) {
questionQueue.value = <IncomingQuestion>[pendingQuestion.value!];
} else {
questionQueue.value = <IncomingQuestion>[];
}
questionPanelOpen.value = hasPendingQuestion.value;
} finally {
questionActionBusy.value = false;
}
}
void closeQuestionPanel() {
questionPanelOpen.value = false;
}
/// Swipe left: move current question to end of queue without answering.
Future<void> deferCurrentQuestion() async {
final IncomingQuestion? question = currentQuestion;
if (question == null || questionActionBusy.value) {
return;
}
questionActionBusy.value = true;
try {
final List<IncomingQuestion> queue = List<IncomingQuestion>.from(
questionQueue.value,
);
if (queue.isEmpty) {
return;
}
final IncomingQuestion current = queue.removeAt(0);
queue.add(current);
final int? serverCount = await _api.deferQuestion(questionId: current.id);
if (serverCount != null) {
final List<IncomingQuestion> refreshed = await _api.fetchUnanswered();
questionQueue.value = refreshed.isNotEmpty ? refreshed : queue;
_syncPendingFromQueue(serverCount);
} else {
questionQueue.value = queue;
_syncPendingFromQueue(queue.length);
}
} finally {
questionActionBusy.value = false;
}
}
/// Swipe right: submit default answer (0).
Future<void> submitCurrentAnswer({num answer = 0}) async {
final IncomingQuestion? question = currentQuestion;
if (question == null || questionActionBusy.value) {
return;
}
questionActionBusy.value = true;
try {
final int? serverCount = await _api.submitAnswer(
questionId: question.id,
answer: answer,
);
if (serverCount == null) {
return;
}
if (serverCount == 0) {
_clearPendingUi();
return;
}
final List<IncomingQuestion> refreshed = await _api.fetchUnanswered();
if (refreshed.isEmpty) {
_clearPendingUi();
return;
}
questionQueue.value = refreshed;
pendingQuestion.value = refreshed.first;
pendingQuestionCount.value = serverCount;
hasPendingQuestion.value = true;
} finally {
questionActionBusy.value = false;
}
}
void _syncPendingFromQueue(int count) {
final List<IncomingQuestion> queue = questionQueue.value;
if (queue.isEmpty || count == 0) {
_clearPendingUi();
return;
}
pendingQuestion.value = queue.first;
pendingQuestionCount.value = count;
hasPendingQuestion.value = true;
}
/// Clears pending question UI; keeps the SignalR connection alive.
void _clearPendingUi() {
hasPendingQuestion.value = false;
pendingQuestion.value = null;
pendingQuestionCount.value = 0;
questionQueue.value = <IncomingQuestion>[];
questionPanelOpen.value = false;
}
void dismissPending() {
_clearPendingUi();
}
Future<void> disconnect() async {
final HubConnection? connection = _connection;
_connection = null;
if (connection != null) {
try {
await connection.stop();
} catch (_) {
// Ignore shutdown errors.
}
}
_clearPendingUi();
}
}

View File

@ -6,6 +6,7 @@ import '../models/app_user.dart';
import '../models/sync_result.dart'; import '../models/sync_result.dart';
import '../models/user_profile.dart'; import '../models/user_profile.dart';
import '../repositories/user_profile_repository.dart'; import '../repositories/user_profile_repository.dart';
import '../services/questions_hub_service.dart';
import '../theme/app_theme.dart'; import '../theme/app_theme.dart';
/// Starts a profile sync session for [user] and exposes profile + sync state. /// Starts a profile sync session for [user] and exposes profile + sync state.
@ -35,7 +36,8 @@ class _ProfileSessionState extends State<ProfileSession> {
@override @override
void initState() { void initState() {
super.initState(); super.initState();
_sessionReady = UserProfileRepository.instance.startSession(widget.user); _sessionReady = UserProfileRepository.instance.startSession(widget.user)
..then((_) => QuestionsHubService.instance.onLogin());
_syncStatusSubscription = _syncStatusSubscription =
UserProfileRepository.instance.syncStatusStream.listen(( UserProfileRepository.instance.syncStatusStream.listen((
ProfileSyncStatus status, ProfileSyncStatus status,
@ -49,6 +51,7 @@ class _ProfileSessionState extends State<ProfileSession> {
@override @override
void dispose() { void dispose() {
_syncStatusSubscription?.cancel(); _syncStatusSubscription?.cancel();
unawaited(QuestionsHubService.instance.disconnect());
UserProfileRepository.instance.endSession(); UserProfileRepository.instance.endSession();
super.dispose(); super.dispose();
} }

View File

@ -0,0 +1,198 @@
import 'dart:async';
import 'package:flutter/material.dart';
import '../guid/guid_glyph_shape.dart';
import '../theme/app_theme.dart';
/// Swipeable question card: right submits, left defers (does not resolve).
///
/// The center band supports vertical drag as a slider from -10 (down) to 10 (up).
class SwipeQuestionTile extends StatefulWidget {
const SwipeQuestionTile({
super.key,
required this.questionId,
required this.onSwipeRight,
required this.onSwipeLeft,
this.busy = false,
});
final String questionId;
final Future<void> Function(num answer) onSwipeRight;
final Future<void> Function() onSwipeLeft;
final bool busy;
@override
State<SwipeQuestionTile> createState() => _SwipeQuestionTileState();
}
class _SwipeQuestionTileState extends State<SwipeQuestionTile> {
double _dragOffset = 0;
double _verticalOffset = 0;
bool _acting = false;
static const double _swipeThreshold = 96;
static const double _maxVerticalDrag = 120;
static const double _sliderMin = -10;
static const double _sliderMax = 10;
/// Swipe up +10, swipe down -10 (linear between).
double get _sliderValue =>
(_verticalOffset / _maxVerticalDrag * _sliderMax).clamp(_sliderMin, _sliderMax);
Future<void> _releaseDrag() async {
if (_acting || widget.busy) {
setState(() => _dragOffset = 0);
return;
}
if (_dragOffset > _swipeThreshold) {
setState(() {
_acting = true;
_dragOffset = MediaQuery.sizeOf(context).width;
});
await widget.onSwipeRight(_sliderValue);
} else if (_dragOffset < -_swipeThreshold) {
setState(() {
_acting = true;
_dragOffset = -MediaQuery.sizeOf(context).width;
});
await widget.onSwipeLeft();
} else if (mounted) {
setState(() => _dragOffset = 0);
}
if (mounted) {
setState(() {
_acting = false;
_dragOffset = 0;
});
}
}
@override
Widget build(BuildContext context) {
final double width = MediaQuery.sizeOf(context).width;
final double progress = (_dragOffset / _swipeThreshold).clamp(-1.0, 1.0);
return LayoutBuilder(
builder: (BuildContext context, BoxConstraints constraints) {
return Stack(
alignment: Alignment.center,
children: <Widget>[
Positioned.fill(
child: DecoratedBox(
decoration: BoxDecoration(
borderRadius: BorderRadius.circular(16),
gradient: LinearGradient(
begin: Alignment.centerLeft,
end: Alignment.centerRight,
colors: <Color>[
Colors.redAccent.withValues(
alpha: 0.15 + 0.35 * (-progress).clamp(0.0, 1.0),
),
AppColors.surfaceElevated,
AppColors.success.withValues(
alpha: 0.15 + 0.35 * progress.clamp(0.0, 1.0),
),
],
),
),
),
),
Transform.translate(
offset: Offset(_dragOffset, 0),
child: GestureDetector(
onHorizontalDragUpdate: widget.busy || _acting
? null
: (DragUpdateDetails details) {
setState(() {
_dragOffset += details.delta.dx;
_dragOffset = _dragOffset.clamp(-width * 0.55, width * 0.55);
});
},
onHorizontalDragEnd: widget.busy || _acting
? null
: (_) => unawaited(_releaseDrag()),
child: Material(
color: AppColors.surfaceElevated,
elevation: 4,
shadowColor: Colors.black45,
borderRadius: BorderRadius.circular(16),
child: Container(
width: constraints.maxWidth,
constraints: const BoxConstraints(minHeight: 220),
padding: const EdgeInsets.symmetric(
horizontal: 16,
vertical: 24,
),
decoration: BoxDecoration(
borderRadius: BorderRadius.circular(16),
),
child: Stack(
alignment: Alignment.center,
children: <Widget>[
Positioned(
top: 20,
bottom: 20,
left: constraints.maxWidth * 0.22,
right: constraints.maxWidth * 0.22,
child: DecoratedBox(
decoration: BoxDecoration(
borderRadius: BorderRadius.circular(14),
color: Color.lerp(
AppColors.surfaceElevated,
AppColors.surface,
0.45,
),
),
),
),
Positioned(
top: 24,
bottom: 24,
left: constraints.maxWidth * 0.22,
right: constraints.maxWidth * 0.22,
child: GestureDetector(
behavior: HitTestBehavior.opaque,
onVerticalDragUpdate: widget.busy || _acting
? null
: (DragUpdateDetails details) {
setState(() {
_verticalOffset -= details.delta.dy;
_verticalOffset = _verticalOffset.clamp(
-_maxVerticalDrag,
_maxVerticalDrag,
);
});
},
child: Center(
child: Transform.translate(
offset: Offset(0, -_verticalOffset),
child: QuestionGuidGlyph(
guid: widget.questionId,
displayValue: _sliderValue,
),
),
),
),
),
],
),
),
),
),
),
if (widget.busy)
const Positioned.fill(
child: ColoredBox(
color: Color(0x66000000),
child: Center(child: CircularProgressIndicator()),
),
),
],
);
},
);
}
}

View File

@ -585,7 +585,7 @@ packages:
source: hosted source: hosted
version: "6.1.0" version: "6.1.0"
logging: logging:
dependency: transitive dependency: "direct main"
description: description:
name: logging name: logging
sha256: c8245ada5f1717ed44271ed1c26b8ce85ca3228fd2ffdb75468ab01979309d61 sha256: c8245ada5f1717ed44271ed1c26b8ce85ca3228fd2ffdb75468ab01979309d61
@ -608,6 +608,14 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.13.0" version: "0.13.0"
message_pack_dart:
dependency: transitive
description:
name: message_pack_dart
sha256: "71b9f0ff60e5896e60b337960bb535380d7dba3297b457ac763ccae807385b59"
url: "https://pub.dev"
source: hosted
version: "2.0.1"
meta: meta:
dependency: transitive dependency: transitive
description: description:
@ -848,6 +856,14 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "3.0.0" version: "3.0.0"
signalr_netcore:
dependency: "direct main"
description:
name: signalr_netcore
sha256: "8d59dc61284c5ff8aa27c4e3e802fcf782367f06cf42b39d5ded81680b72f8b8"
url: "https://pub.dev"
source: hosted
version: "1.4.4"
sky_engine: sky_engine:
dependency: transitive dependency: transitive
description: flutter description: flutter
@ -893,6 +909,22 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.43.1" version: "0.43.1"
sse:
dependency: transitive
description:
name: sse
sha256: a9a804dbde8bfd369da3b4aa241d44d63a6486a97388c54ec166073d88c52302
url: "https://pub.dev"
source: hosted
version: "4.2.0"
sse_channel:
dependency: transitive
description:
name: sse_channel
sha256: "9aad5d4eef63faf6ecdefb636c0f857bd6f74146d2196087dcf4b17ab5b49b1b"
url: "https://pub.dev"
source: hosted
version: "0.1.1"
stack_trace: stack_trace:
dependency: transitive dependency: transitive
description: description:
@ -941,6 +973,14 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.7.11" version: "0.7.11"
tuple:
dependency: transitive
description:
name: tuple
sha256: a97ce2013f240b2f3807bcbaf218765b6f301c3eff91092bcfa23a039e7dd151
url: "https://pub.dev"
source: hosted
version: "2.0.2"
typed_data: typed_data:
dependency: transitive dependency: transitive
description: description:
@ -1013,6 +1053,14 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "3.1.5" version: "3.1.5"
uuid:
dependency: transitive
description:
name: uuid
sha256: "1fef9e8e11e2991bb773070d4656b7bd5d850967a2456cfc83cf47925ba79489"
url: "https://pub.dev"
source: hosted
version: "4.5.3"
vector_math: vector_math:
dependency: transitive dependency: transitive
description: description:

View File

@ -45,6 +45,8 @@ dependencies:
path: ^1.9.1 path: ^1.9.1
path_provider: ^2.1.5 path_provider: ^2.1.5
connectivity_plus: ^6.1.4 connectivity_plus: ^6.1.4
signalr_netcore: ^1.4.4
logging: ^1.3.0
dev_dependencies: dev_dependencies:
flutter_test: flutter_test:

View File

@ -31,6 +31,80 @@ The API listens on `http://localhost:3000` by default (`PORT` in `.env`).
|--------|------|------| |--------|------|------|
| `GET` | `/v1/me/profile` | `Authorization: Bearer <Firebase ID token>` | | `GET` | `/v1/me/profile` | `Authorization: Bearer <Firebase ID token>` |
| `PUT` | `/v1/me/profile` | same | | `PUT` | `/v1/me/profile` | same |
| `POST` | `/v1/me/incoming-question` | same — pushes a question to the client via SignalR |
| `POST` | `/v1/me/questions/bootstrap` | ensure starter question at login |
| `GET` | `/v1/me/questions` | list unanswered questions (queue order) |
| `POST` | `/v1/me/questions/{id}/answer` | submit answer (`{"answer": 0}` default) |
| `POST` | `/v1/me/questions/{id}/defer` | move question to end of queue |
## SignalR — incoming questions
Hub URL: `http://localhost:3000/hubs/questions`
The Flutter app calls `POST /v1/me/questions/bootstrap` once at login to ensure a
starter question exists (random correct answer from -10 to 10) when the user has none.
After sign-in it connects to SignalR and listens for `ReceiveQuestion`. On each new
WebSocket connection the API only delivers an existing unanswered question — it does
not create new rows.
Client payload (correct answer is not sent):
```json
{
"id": "uuid",
"assignedUserId": "firebase-uid",
"text": "...",
"sentAt": "...",
"unansweredCount": 2
}
```
`unansweredCount` is the number of unanswered rows for that user (shown in the app when greater than 1).
`questions` table: `id` (UUID), `assigned_user_id`, `question_text`, `user_response`
(nullable), `correct_answer`, `created_at`, `modified_at`.
## Background question pipeline
A background worker runs inside the API process (enabled by default). On each
interval it walks registered users, fetches data from public web APIs, and
enqueues pipeline questions when the user's queue is not full.
| Env var | Default | Purpose |
|---------|---------|---------|
| `QUESTION_WORKER_ENABLED` | `true` | Set to `false` to disable the worker |
| `QUESTION_WORKER_INTERVAL_SECONDS` | `60` | Seconds between maintenance cycles |
| `QUESTION_PIPELINE_TEST_MODE` | `false` | Use random -10..10 starter-style questions instead of API copy |
**External APIs used**
- [REST Countries](https://restcountries.com/) — population and capital facts
- [Open-Meteo](https://open-meteo.com/) — current temperature by city
**Branching flow**
1. **Track choice** — user swipes toward +10 (weather) or -10 (geography).
2. **Geography** — yes/no population threshold, then capital confirmation;
wrong population guess triggers a recovery question.
3. **Weather** — yes/no warm/cool for a random European city, then a follow-up
to continue weather or switch to geography.
When a user submits an answer (`POST .../answer`), the pipeline evaluates the
response and may immediately create the next branched question and push it over
SignalR.
Pipeline state is stored in `user_pipeline_state`; questions may include
`source_tag`, `pipeline_key`, and `pipeline_step` columns (migration
`003_question_pipeline.sql`).
Test from the shell (replace `ID_TOKEN`):
```bash
curl -s -X POST http://localhost:3000/v1/me/incoming-question \
-H "Authorization: Bearer ID_TOKEN" \
-H "Content-Type: application/json" \
-d '{"text":"What is your preferred contact method?"}'
```
## Flutter client ## Flutter client

View File

@ -6,7 +6,14 @@ import 'package:shelf/shelf_io.dart' as shelf_io;
import '../lib/db.dart'; import '../lib/db.dart';
import '../lib/env.dart'; import '../lib/env.dart';
import '../lib/firebase_auth.dart'; import '../lib/firebase_auth.dart';
import '../lib/handlers/incoming_question_handler.dart';
import '../lib/handlers/profile_handler.dart'; import '../lib/handlers/profile_handler.dart';
import '../lib/handlers/questions_handler.dart';
import '../lib/handlers/questions_hub_handler.dart';
import '../lib/pipeline/question_pipeline.dart';
import '../lib/question_service.dart';
import '../lib/questions_db.dart';
import '../lib/workers/question_background_worker.dart';
Future<void> main() async { Future<void> main() async {
final Directory serverRoot = Directory.current; final Directory serverRoot = Directory.current;
@ -24,9 +31,55 @@ Future<void> main() async {
await db.migrate(); await db.migrate();
final FirebaseAuthVerifier auth = FirebaseAuthVerifier(env.firebaseWebApiKey); final FirebaseAuthVerifier auth = FirebaseAuthVerifier(env.firebaseWebApiKey);
final QuestionsDb questionsDb = QuestionsDb(db.connection);
final QuestionService questionService = QuestionService(
questionsDb: questionsDb,
hubConnections: questionsHubConnections,
);
final QuestionPipeline questionPipeline = QuestionPipeline(
questionsDb: questionsDb,
questionService: questionService,
testMode: env.questionPipelineTestMode,
);
QuestionBackgroundWorker? backgroundWorker;
if (env.questionWorkerEnabled) {
backgroundWorker = QuestionBackgroundWorker(
pipeline: questionPipeline,
interval: Duration(seconds: env.questionWorkerIntervalSeconds),
);
backgroundWorker.start();
}
final Handler profile = profileHandler(db: db, auth: auth);
final Handler questionsHub = questionsHubHandler(
auth: auth,
questionService: questionService,
);
final Handler incomingQuestion = incomingQuestionHandler(
auth: auth,
questionsDb: questionsDb,
);
final Handler questions = questionsHandler(
auth: auth,
questionsDb: questionsDb,
questionService: questionService,
questionPipeline: questionPipeline,
);
final Handler handler = Pipeline() final Handler handler = Pipeline()
.addMiddleware(logRequests()) .addMiddleware(logRequests())
.addHandler(profileHandler(db: db, auth: auth)); .addHandler((Request request) {
final String path = request.requestedUri.path;
if (path.startsWith(questionsHubPath)) {
return questionsHub(request);
}
if (path == '/v1/me/incoming-question') {
return incomingQuestion(request);
}
if (path.startsWith(questionsBasePath)) {
return questions(request);
}
return profile(request);
});
final HttpServer server = await shelf_io.serve( final HttpServer server = await shelf_io.serve(
handler, handler,

View File

@ -0,0 +1,9 @@
/// CORS headers for API and SignalR (Flutter web sends X-Requested-With).
Map<String, String> apiCorsHeaders({String methods = 'GET, PUT, POST, OPTIONS'}) {
return <String, String>{
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': methods,
'Access-Control-Allow-Headers':
'Authorization, Content-Type, X-Requested-With',
};
}

View File

@ -24,16 +24,27 @@ class ProfileDb {
return ProfileDb(connection); return ProfileDb(connection);
} }
Future<void> migrate() async { Connection get connection => _connection;
final String sql = await File('migrations/001_users.sql').readAsString();
final List<String> statements = sql
.split(';')
.map((String s) => s.trim())
.where((String s) => s.isNotEmpty)
.toList();
for (final String statement in statements) { Future<void> migrate() async {
await _connection.execute(statement); final List<File> files = Directory('migrations')
.listSync()
.whereType<File>()
.where((File f) => f.path.endsWith('.sql'))
.toList()
..sort((File a, File b) => a.path.compareTo(b.path));
for (final File file in files) {
final String sql = await file.readAsString();
final List<String> statements = sql
.split(';')
.map((String s) => s.trim())
.where((String s) => s.isNotEmpty)
.toList();
for (final String statement in statements) {
await _connection.execute(statement);
}
} }
} }

View File

@ -3,11 +3,21 @@ import 'dart:io';
import 'package:dotenv/dotenv.dart'; import 'package:dotenv/dotenv.dart';
class ServerEnv { class ServerEnv {
ServerEnv._(this.databaseUrl, this.port, this.firebaseWebApiKey); ServerEnv._({
required this.databaseUrl,
required this.port,
required this.firebaseWebApiKey,
required this.questionWorkerEnabled,
required this.questionWorkerIntervalSeconds,
required this.questionPipelineTestMode,
});
final String databaseUrl; final String databaseUrl;
final int port; final int port;
final String firebaseWebApiKey; final String firebaseWebApiKey;
final bool questionWorkerEnabled;
final int questionWorkerIntervalSeconds;
final bool questionPipelineTestMode;
static ServerEnv load() { static ServerEnv load() {
final DotEnv env = DotEnv(includePlatformEnvironment: true) final DotEnv env = DotEnv(includePlatformEnvironment: true)
@ -26,7 +36,20 @@ class ServerEnv {
} }
final int port = int.tryParse(env['PORT'] ?? '3000') ?? 3000; final int port = int.tryParse(env['PORT'] ?? '3000') ?? 3000;
final bool workerEnabled =
(env['QUESTION_WORKER_ENABLED'] ?? 'true').toLowerCase() != 'false';
final int workerIntervalSeconds =
int.tryParse(env['QUESTION_WORKER_INTERVAL_SECONDS'] ?? '60') ?? 60;
final bool pipelineTestMode =
(env['QUESTION_PIPELINE_TEST_MODE'] ?? 'false').toLowerCase() == 'true';
return ServerEnv._(databaseUrl, port, apiKey); return ServerEnv._(
databaseUrl: databaseUrl,
port: port,
firebaseWebApiKey: apiKey,
questionWorkerEnabled: workerEnabled,
questionWorkerIntervalSeconds: workerIntervalSeconds,
questionPipelineTestMode: pipelineTestMode,
);
} }
} }

View File

@ -0,0 +1,82 @@
import 'dart:convert';
import 'dart:io';
import 'package:shelf/shelf.dart';
import '../cors_headers.dart';
import '../firebase_auth.dart';
import '../questions_db.dart';
import 'questions_hub_handler.dart';
/// REST helper to push a question to the signed-in user over SignalR.
Handler incomingQuestionHandler({
required FirebaseAuthVerifier auth,
required QuestionsDb questionsDb,
}) {
return (Request request) async {
if (request.method == 'OPTIONS') {
return Response.ok('', headers: apiCorsHeaders());
}
final String? firebaseUid = await auth.verifyBearerToken(
request.headers['Authorization'] ?? request.headers['authorization'],
);
if (firebaseUid == null) {
return _jsonResponse(401, <String, dynamic>{'error': 'Unauthorized'});
}
if (request.requestedUri.path != '/v1/me/incoming-question') {
return _jsonResponse(404, <String, dynamic>{'error': 'Not found'});
}
if (request.method != 'POST') {
return _jsonResponse(405, <String, dynamic>{'error': 'Method not allowed'});
}
try {
final String body = await request.readAsString();
final Map<String, dynamic> json = body.isEmpty
? <String, dynamic>{}
: jsonDecode(body) as Map<String, dynamic>;
final String text = (json['text'] as String?)?.trim().isNotEmpty == true
? (json['text'] as String).trim()
: 'You have a new question.';
final num correctAnswer = (json['correctAnswer'] as num?) ?? 0;
final Map<String, dynamic> question = await questionsDb.createQuestion(
assignedUserId: firebaseUid,
questionText: text,
correctAnswer: correctAnswer,
);
final int unansweredCount =
await questionsDb.countUnansweredQuestions(firebaseUid);
final Map<String, dynamic> payload = questionsDb.toClientPayload(
question,
unansweredCount: unansweredCount,
);
final int delivered = await questionsHubConnections.pushQuestion(
firebaseUid,
payload,
);
return _jsonResponse(200, <String, dynamic>{
'question': question,
'deliveredToConnections': delivered,
});
} catch (e, st) {
stderr.writeln('Incoming question handler error: $e\n$st');
return _jsonResponse(500, <String, dynamic>{'error': 'Internal error'});
}
};
}
Response _jsonResponse(int status, Map<String, dynamic> body) {
return Response(
status,
body: jsonEncode(body),
headers: <String, String>{
...apiCorsHeaders(),
'Content-Type': 'application/json',
},
);
}

View File

@ -3,6 +3,7 @@ import 'dart:io';
import 'package:shelf/shelf.dart'; import 'package:shelf/shelf.dart';
import '../cors_headers.dart';
import '../db.dart'; import '../db.dart';
import '../firebase_auth.dart'; import '../firebase_auth.dart';
@ -12,7 +13,7 @@ Handler profileHandler({
}) { }) {
return (Request request) async { return (Request request) async {
if (request.method == 'OPTIONS') { if (request.method == 'OPTIONS') {
return Response.ok('', headers: _corsHeaders); return Response.ok('', headers: apiCorsHeaders(methods: 'GET, PUT, OPTIONS'));
} }
final String? firebaseUid = await auth.verifyBearerToken( final String? firebaseUid = await auth.verifyBearerToken(
@ -64,18 +65,12 @@ Handler profileHandler({
}; };
} }
Map<String, String> get _corsHeaders => <String, String>{
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, PUT, OPTIONS',
'Access-Control-Allow-Headers': 'Authorization, Content-Type',
};
Response _jsonResponse(int status, Map<String, dynamic> body) { Response _jsonResponse(int status, Map<String, dynamic> body) {
return Response( return Response(
status, status,
body: jsonEncode(body), body: jsonEncode(body),
headers: <String, String>{ headers: <String, String>{
..._corsHeaders, ...apiCorsHeaders(methods: 'GET, PUT, OPTIONS'),
'Content-Type': 'application/json', 'Content-Type': 'application/json',
}, },
); );

View File

@ -0,0 +1,173 @@
import 'dart:convert';
import 'dart:io';
import 'package:shelf/shelf.dart';
import 'package:shelf_router/shelf_router.dart';
import '../cors_headers.dart';
import '../firebase_auth.dart';
import '../pipeline/question_pipeline.dart';
import '../question_service.dart';
import '../questions_db.dart';
const String questionsBasePath = '/v1/me/questions';
Handler questionsHandler({
required FirebaseAuthVerifier auth,
required QuestionsDb questionsDb,
required QuestionService questionService,
QuestionPipeline? questionPipeline,
}) {
final Router router = Router();
router.post('$questionsBasePath/bootstrap', (Request request) async {
final String? firebaseUid = await _verify(auth, request);
if (firebaseUid == null) {
return _jsonResponse(401, <String, dynamic>{'error': 'Unauthorized'});
}
try {
final Map<String, dynamic> question =
await questionService.ensureStarterQuestionOnLogin(firebaseUid);
final int unansweredCount =
await questionsDb.countUnansweredQuestions(firebaseUid);
return _jsonResponse(200, <String, dynamic>{
'question': question,
'unansweredCount': unansweredCount,
});
} catch (e, st) {
stderr.writeln('Bootstrap questions error: $e\n$st');
return _jsonResponse(500, <String, dynamic>{'error': 'Internal error'});
}
});
router.get(questionsBasePath, (Request request) async {
final String? firebaseUid = await _verify(auth, request);
if (firebaseUid == null) {
return _jsonResponse(401, <String, dynamic>{'error': 'Unauthorized'});
}
try {
final List<Map<String, dynamic>> rows =
await questionsDb.listUnansweredQuestions(firebaseUid);
final List<Map<String, dynamic>> questions = rows
.map(
(Map<String, dynamic> row) => questionsDb.toClientPayload(
row,
unansweredCount: rows.length,
),
)
.toList();
return _jsonResponse(200, <String, dynamic>{
'questions': questions,
'unansweredCount': questions.length,
});
} catch (e, st) {
stderr.writeln('List questions error: $e\n$st');
return _jsonResponse(500, <String, dynamic>{'error': 'Internal error'});
}
});
router.post(
'$questionsBasePath/<id>/answer',
(Request request, String id) async {
final String? firebaseUid = await _verify(auth, request);
if (firebaseUid == null) {
return _jsonResponse(401, <String, dynamic>{'error': 'Unauthorized'});
}
try {
final String body = await request.readAsString();
final Map<String, dynamic> json = body.isEmpty
? <String, dynamic>{}
: jsonDecode(body) as Map<String, dynamic>;
final num answer = (json['answer'] as num?) ?? 0;
final Map<String, dynamic>? updated = await questionsDb.submitAnswer(
questionId: id,
assignedUserId: firebaseUid,
userResponse: answer,
);
if (updated == null) {
return _jsonResponse(404, <String, dynamic>{'error': 'Not found'});
}
if (questionPipeline != null) {
await questionPipeline.onAnswerSubmitted(
firebaseUid: firebaseUid,
answeredQuestion: updated,
userResponse: answer,
);
}
final int unansweredCount =
await questionsDb.countUnansweredQuestions(firebaseUid);
return _jsonResponse(200, <String, dynamic>{
'question': updated,
'unansweredCount': unansweredCount,
});
} catch (e, st) {
stderr.writeln('Answer question error: $e\n$st');
return _jsonResponse(500, <String, dynamic>{'error': 'Internal error'});
}
},
);
router.post(
'$questionsBasePath/<id>/defer',
(Request request, String id) async {
final String? firebaseUid = await _verify(auth, request);
if (firebaseUid == null) {
return _jsonResponse(401, <String, dynamic>{'error': 'Unauthorized'});
}
try {
final Map<String, dynamic>? updated = await questionsDb.deferQuestion(
questionId: id,
assignedUserId: firebaseUid,
);
if (updated == null) {
return _jsonResponse(404, <String, dynamic>{'error': 'Not found'});
}
final int unansweredCount =
await questionsDb.countUnansweredQuestions(firebaseUid);
return _jsonResponse(200, <String, dynamic>{
'question': updated,
'unansweredCount': unansweredCount,
});
} catch (e, st) {
stderr.writeln('Defer question error: $e\n$st');
return _jsonResponse(500, <String, dynamic>{'error': 'Internal error'});
}
},
);
return (Request request) async {
if (request.method == 'OPTIONS' &&
request.requestedUri.path.startsWith(questionsBasePath)) {
return Response.ok('', headers: apiCorsHeaders());
}
final String? firebaseUid = await _verify(auth, request);
if (firebaseUid == null) {
return _jsonResponse(401, <String, dynamic>{'error': 'Unauthorized'});
}
final Response response = await router.call(request);
if (response.statusCode != 404) {
return response;
}
return _jsonResponse(404, <String, dynamic>{'error': 'Not found'});
};
}
Future<String?> _verify(FirebaseAuthVerifier auth, Request request) {
return auth.verifyBearerToken(
request.headers['Authorization'] ?? request.headers['authorization'],
);
}
Response _jsonResponse(int status, Map<String, dynamic> body) {
return Response(
status,
body: jsonEncode(body),
headers: <String, String>{
...apiCorsHeaders(),
'Content-Type': 'application/json',
},
);
}

View File

@ -0,0 +1,200 @@
import 'dart:async';
import 'dart:convert';
import 'package:shelf/shelf.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:uuid/uuid.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import '../cors_headers.dart';
import '../firebase_auth.dart';
import '../question_service.dart';
import '../signalr/questions_hub_connections.dart';
import '../signalr/signalr_protocol.dart';
import '../signalr/text_message_format.dart';
const String questionsHubPath = '/hubs/questions';
final QuestionsHubConnections questionsHubConnections = QuestionsHubConnections();
Handler questionsHubHandler({
required FirebaseAuthVerifier auth,
required QuestionService questionService,
}) {
return (Request request) async {
if (request.method == 'OPTIONS') {
return Response.ok('', headers: apiCorsHeaders());
}
final String path = request.requestedUri.path;
if (path.endsWith('/negotiate') && request.method == 'POST') {
return _handleNegotiate(request, auth);
}
if (_isWebSocketUpgrade(request)) {
final String? firebaseUid = await auth.verifyBearerToken(
_tokenFromRequest(request),
);
if (firebaseUid == null) {
return Response.forbidden('Unauthorized', headers: apiCorsHeaders());
}
final String? connectionToken = request.url.queryParameters['id'];
if (connectionToken == null || connectionToken.isEmpty) {
return Response.badRequest(
body: 'Missing connection id',
headers: apiCorsHeaders(),
);
}
final Handler wsHandler = webSocketHandler(
(WebSocketChannel channel, String? subprotocol) {
_handleWebSocket(
channel: channel,
connectionToken: connectionToken,
firebaseUid: firebaseUid,
questionService: questionService,
);
},
);
return wsHandler(request);
}
return Response.notFound('Not found', headers: apiCorsHeaders());
};
}
Future<Response> _handleNegotiate(
Request request,
FirebaseAuthVerifier auth,
) async {
final String? firebaseUid = await auth.verifyBearerToken(
request.headers['Authorization'] ?? request.headers['authorization'],
);
if (firebaseUid == null) {
return _jsonResponse(401, <String, dynamic>{'error': 'Unauthorized'});
}
const Uuid uuid = Uuid();
final String connectionToken = uuid.v4();
return _jsonResponse(200, <String, dynamic>{
'negotiateVersion': 1,
'connectionId': connectionToken,
'connectionToken': connectionToken,
'availableTransports': <Map<String, dynamic>>[
<String, dynamic>{
'transport': 'WebSockets',
'transferFormats': <String>['Text'],
},
],
});
}
void _handleWebSocket({
required WebSocketChannel channel,
required String connectionToken,
required String firebaseUid,
required QuestionService questionService,
}) {
final QuestionsHubConnection connection = QuestionsHubConnection(
connectionToken: connectionToken,
firebaseUid: firebaseUid,
channel: channel,
);
questionsHubConnections.register(connection);
connection.listen(
(String message) => _onSocketMessage(
connection,
message,
questionService: questionService,
),
onDone: () => questionsHubConnections.unregister(connectionToken),
);
}
void _onSocketMessage(
QuestionsHubConnection connection,
String payload, {
required QuestionService questionService,
}) {
if (!connection.handshakeComplete) {
try {
final List<String> messages = TextMessageFormat.parse(payload);
final Map<String, dynamic> handshake =
jsonDecode(messages.first) as Map<String, dynamic>;
if (handshake['protocol'] != 'json') {
unawaited(connection.sendRaw(
SignalrProtocol.handshakeResponse(error: 'Unsupported protocol'),
));
return;
}
connection.handshakeComplete = true;
unawaited(_completeHandshakeAndDeliverPending(
connection: connection,
questionService: questionService,
));
if (messages.length > 1) {
final String remaining = messages
.sublist(1)
.join(TextMessageFormat.recordSeparator);
questionsHubConnections.handleClientMessage(
connection,
'$remaining${TextMessageFormat.recordSeparator}',
);
}
} catch (_) {
unawaited(connection.sendRaw(
SignalrProtocol.handshakeResponse(error: 'Invalid handshake'),
));
}
return;
}
questionsHubConnections.handleClientMessage(connection, payload);
}
Future<void> _completeHandshakeAndDeliverPending({
required QuestionsHubConnection connection,
required QuestionService questionService,
}) async {
await connection.sendRaw(SignalrProtocol.handshakeResponse());
await questionService.deliverPendingQuestionOnConnect(connection);
}
bool _isWebSocketUpgrade(Request request) {
if (request.method != 'GET') {
return false;
}
final String? connection = request.headers['Connection'];
if (connection == null ||
!connection.toLowerCase().split(',').map((t) => t.trim()).contains('upgrade')) {
return false;
}
return request.headers['Upgrade']?.toLowerCase() == 'websocket';
}
String? _tokenFromRequest(Request request) {
final String? authHeader =
request.headers['Authorization'] ?? request.headers['authorization'];
if (authHeader != null && authHeader.startsWith('Bearer ')) {
return authHeader;
}
final String? accessToken = request.url.queryParameters['access_token'];
if (accessToken != null && accessToken.isNotEmpty) {
return 'Bearer $accessToken';
}
return null;
}
Response _jsonResponse(int status, Map<String, dynamic> body) {
return Response(
status,
body: jsonEncode(body),
headers: <String, String>{
...apiCorsHeaders(),
'Content-Type': 'application/json',
},
);
}

View File

@ -0,0 +1,72 @@
/// Outcome of evaluating a user's numeric answer against pipeline rules.
enum BranchOutcome {
/// Answer matched the expected value (within tolerance when applicable).
match,
/// Answer was on the positive side of the scale (e.g. preference toward +10).
preferHigh,
/// Answer was on the negative side of the scale.
preferLow,
/// Numeric answer was close but not exact.
close,
/// Answer was far from the target.
miss,
}
class BranchDecision {
const BranchDecision._();
/// Exact match for discrete choices encoded as integers.
static BranchOutcome discreteChoice({
required num userResponse,
required num correctAnswer,
}) {
if (userResponse.round() == correctAnswer.round()) {
return BranchOutcome.match;
}
return userResponse.sign >= 0 ? BranchOutcome.preferHigh : BranchOutcome.preferLow;
}
/// Compares a population or temperature estimate with tolerance.
static BranchOutcome numericEstimate({
required num userResponse,
required num correctAnswer,
int tolerance = 2,
}) {
final int delta = (userResponse.round() - correctAnswer.round()).abs();
if (delta == 0) {
return BranchOutcome.match;
}
if (delta <= tolerance) {
return BranchOutcome.close;
}
return BranchOutcome.miss;
}
/// Yes/no questions encoded as +10 (yes) vs -10 (no) on the slider.
static BranchOutcome yesNo({
required num userResponse,
required num correctAnswer,
}) {
final bool userYes = userResponse > 0;
final bool correctYes = correctAnswer > 0;
if (userYes == correctYes) {
return BranchOutcome.match;
}
return BranchOutcome.miss;
}
/// Interprets slider position as a preference (geography vs weather track).
static BranchOutcome trackPreference(num userResponse) {
if (userResponse >= 3) {
return BranchOutcome.preferHigh;
}
if (userResponse <= -3) {
return BranchOutcome.preferLow;
}
return BranchOutcome.close;
}
}

View File

@ -0,0 +1,111 @@
import 'dart:convert';
import 'dart:math';
import 'package:http/http.dart' as http;
/// Fetches public web API data used to build question text and answers.
class ExternalDataFetcher {
ExternalDataFetcher({http.Client? client}) : _client = client ?? http.Client();
final http.Client _client;
final Random _random = Random();
/// Random country with population (millions) and capital for trivia questions.
Future<CountryFacts?> fetchRandomCountry() async {
final http.Response response = await _client.get(
Uri.parse(
'https://restcountries.com/v3.1/all?fields=name,population,capital',
),
);
if (response.statusCode != 200) {
return null;
}
final List<dynamic> countries =
jsonDecode(response.body) as List<dynamic>;
if (countries.isEmpty) {
return null;
}
for (int attempt = 0; attempt < 8; attempt++) {
final Map<String, dynamic> raw =
countries[_random.nextInt(countries.length)] as Map<String, dynamic>;
final int? population = raw['population'] as int?;
final List<dynamic>? capitals = raw['capital'] as List<dynamic>?;
final Map<String, dynamic>? nameMap =
raw['name'] as Map<String, dynamic>?;
final String? commonName = nameMap?['common'] as String?;
if (population == null ||
population < 500_000 ||
capitals == null ||
capitals.isEmpty ||
commonName == null) {
continue;
}
return CountryFacts(
name: commonName,
populationMillions: (population / 1_000_000).round(),
capital: capitals.first as String,
);
}
return null;
}
/// Current temperature (°C, rounded) for a European city via Open-Meteo.
Future<WeatherFacts?> fetchRandomCityWeather() async {
const List<({String city, double lat, double lon})> cities =
<({String city, double lat, double lon})>[
(city: 'Berlin', lat: 52.52, lon: 13.41),
(city: 'Paris', lat: 48.86, lon: 2.35),
(city: 'Madrid', lat: 40.42, lon: -3.70),
(city: 'Rome', lat: 41.90, lon: 12.50),
(city: 'Amsterdam', lat: 52.37, lon: 4.90),
];
final ({String city, double lat, double lon}) pick =
cities[_random.nextInt(cities.length)];
final Uri uri = Uri.parse(
'https://api.open-meteo.com/v1/forecast'
'?latitude=${pick.lat}&longitude=${pick.lon}'
'&current=temperature_2m&timezone=auto',
);
final http.Response response = await _client.get(uri);
if (response.statusCode != 200) {
return null;
}
final Map<String, dynamic> json =
jsonDecode(response.body) as Map<String, dynamic>;
final Map<String, dynamic>? current =
json['current'] as Map<String, dynamic>?;
final num? temp = current?['temperature_2m'] as num?;
if (temp == null) {
return null;
}
return WeatherFacts(
city: pick.city,
temperatureCelsius: temp.round(),
);
}
void close() => _client.close();
}
class CountryFacts {
CountryFacts({
required this.name,
required this.populationMillions,
required this.capital,
});
final String name;
final int populationMillions;
final String capital;
}
class WeatherFacts {
WeatherFacts({
required this.city,
required this.temperatureCelsius,
});
final String city;
final int temperatureCelsius;
}

View File

@ -0,0 +1,445 @@
import 'dart:async';
import 'dart:io';
import 'dart:math';
import '../question_service.dart';
import '../questions_db.dart';
import 'branch_decision.dart';
import 'external_data_fetcher.dart';
/// Same format as [QuestionsDb.createStarterQuestion] for local pipeline testing.
abstract final class PipelineTestQuestions {
static const String text =
'Starter question: enter a whole number between -10 and 10.';
static int randomCorrectAnswer() => Random().nextInt(21) - 10;
}
/// Pipeline keys stored in [user_pipeline_state] and question rows.
abstract final class PipelineKeys {
static const String root = 'root';
static const String geography = 'geography';
static const String weather = 'weather';
}
/// Steps within each pipeline branch.
abstract final class PipelineSteps {
static const String chooseTrack = 'choose_track';
static const String populationQuiz = 'population_quiz';
static const String capitalFollowUp = 'capital_follow_up';
static const String capitalRecovery = 'capital_recovery';
static const String temperatureQuiz = 'temperature_quiz';
static const String temperatureFollowUp = 'temperature_follow_up';
static const String idle = 'idle';
}
/// Orchestrates API-driven question creation and branches on user answers.
class QuestionPipeline {
QuestionPipeline({
required QuestionsDb questionsDb,
required QuestionService questionService,
ExternalDataFetcher? fetcher,
this.maxQueuedQuestions = 3,
this.testMode = false,
}) : _questionsDb = questionsDb,
_questionService = questionService,
_fetcher = fetcher ?? ExternalDataFetcher();
final QuestionsDb _questionsDb;
final QuestionService _questionService;
final ExternalDataFetcher _fetcher;
final int maxQueuedQuestions;
final bool testMode;
/// Periodic tick: start pipelines for idle users and top up shallow queues.
Future<void> runMaintenanceCycle() async {
final List<String> userIds = await _questionsDb.listAllUserFirebaseUids();
for (final String uid in userIds) {
try {
await _maintainUser(uid);
} catch (e, st) {
stderr.writeln('Pipeline maintenance failed for $uid: $e\n$st');
}
}
}
Future<void> _maintainUser(String firebaseUid) async {
final int queued = await _questionsDb.countUnansweredQuestions(firebaseUid);
if (queued >= maxQueuedQuestions) {
return;
}
final Map<String, dynamic>? state =
await _questionsDb.getPipelineState(firebaseUid);
if (state == null) {
if (queued == 0) {
await _startRootPipeline(firebaseUid);
}
return;
}
final String step = state['step'] as String;
if (step == PipelineSteps.idle && queued == 0) {
await _advanceFromIdle(firebaseUid, state);
}
}
Future<void> _deliverPipelineQuestion({
required String firebaseUid,
required String questionText,
required num correctAnswer,
required String sourceTag,
required String pipelineKey,
required String pipelineStep,
}) async {
await _questionService.createAndDeliverQuestion(
assignedUserId: firebaseUid,
questionText:
testMode ? PipelineTestQuestions.text : questionText,
correctAnswer:
testMode ? PipelineTestQuestions.randomCorrectAnswer() : correctAnswer,
sourceTag: sourceTag,
pipelineKey: pipelineKey,
pipelineStep: pipelineStep,
);
}
BranchOutcome _evaluateAnswer({
required num userResponse,
required num correctAnswer,
required BranchOutcome Function() production,
}) {
if (testMode) {
return userResponse.round() == correctAnswer.round()
? BranchOutcome.match
: BranchOutcome.miss;
}
return production();
}
Future<bool> _canEnqueue(String firebaseUid) async {
final int queued = await _questionsDb.countUnansweredQuestions(firebaseUid);
return queued < maxQueuedQuestions;
}
/// Called after a question is answered; may enqueue the next branched question.
Future<void> onAnswerSubmitted({
required String firebaseUid,
required Map<String, dynamic> answeredQuestion,
required num userResponse,
}) async {
final String? pipelineKey = answeredQuestion['pipelineKey'] as String?;
final String? pipelineStep = answeredQuestion['pipelineStep'] as String?;
if (pipelineKey == null || pipelineStep == null) {
return;
}
if (!await _canEnqueue(firebaseUid)) {
return;
}
final num correctAnswer = answeredQuestion['correctAnswer'] as num;
final Map<String, dynamic> context =
await _loadContext(firebaseUid, pipelineKey);
switch (pipelineKey) {
case PipelineKeys.root:
await _handleRootAnswer(
firebaseUid: firebaseUid,
step: pipelineStep,
userResponse: userResponse,
);
case PipelineKeys.geography:
await _handleGeographyAnswer(
firebaseUid: firebaseUid,
step: pipelineStep,
userResponse: userResponse,
correctAnswer: correctAnswer,
context: context,
);
case PipelineKeys.weather:
await _handleWeatherAnswer(
firebaseUid: firebaseUid,
step: pipelineStep,
userResponse: userResponse,
correctAnswer: correctAnswer,
context: context,
);
}
}
Future<Map<String, dynamic>> _loadContext(
String firebaseUid,
String pipelineKey,
) async {
final Map<String, dynamic>? state =
await _questionsDb.getPipelineState(firebaseUid);
if (state == null || state['pipelineKey'] != pipelineKey) {
return <String, dynamic>{};
}
return Map<String, dynamic>.from(
state['context'] as Map<String, dynamic>? ?? <String, dynamic>{},
);
}
Future<void> _startRootPipeline(String firebaseUid) async {
if (!await _canEnqueue(firebaseUid)) {
return;
}
await _questionsDb.upsertPipelineState(
assignedUserId: firebaseUid,
pipelineKey: PipelineKeys.root,
step: PipelineSteps.chooseTrack,
context: <String, dynamic>{},
);
await _deliverPipelineQuestion(
firebaseUid: firebaseUid,
questionText:
'Which topics interest you? Swipe toward +10 for weather questions, '
'or toward -10 for geography. Submit your preference.',
correctAnswer: 0,
sourceTag: 'pipeline:root:choose_track',
pipelineKey: PipelineKeys.root,
pipelineStep: PipelineSteps.chooseTrack,
);
}
Future<void> _handleRootAnswer({
required String firebaseUid,
required String step,
required num userResponse,
}) async {
if (step != PipelineSteps.chooseTrack) {
return;
}
final BranchOutcome outcome = BranchDecision.trackPreference(userResponse);
if (outcome == BranchOutcome.preferHigh) {
await _startWeatherPipeline(firebaseUid);
} else {
await _startGeographyPipeline(firebaseUid);
}
}
Future<void> _startGeographyPipeline(String firebaseUid) async {
if (!await _canEnqueue(firebaseUid)) {
return;
}
final CountryFacts? country =
testMode ? null : await _fetcher.fetchRandomCountry();
if (!testMode && country == null) {
await _setIdle(firebaseUid, PipelineKeys.geography);
return;
}
final Map<String, dynamic> context = testMode
? <String, dynamic>{
'country': 'Testland',
'populationMillions': 40,
'capital': 'Test City',
}
: <String, dynamic>{
'country': country!.name,
'populationMillions': country.populationMillions,
'capital': country.capital,
};
await _questionsDb.upsertPipelineState(
assignedUserId: firebaseUid,
pipelineKey: PipelineKeys.geography,
step: PipelineSteps.populationQuiz,
context: context,
);
const int thresholdMillions = 30;
final bool overThreshold = (context['populationMillions'] as int) >=
thresholdMillions;
await _deliverPipelineQuestion(
firebaseUid: firebaseUid,
questionText:
'Does ${context['country']} have a population over $thresholdMillions million? '
'Swipe toward +10 for yes, toward -10 for no.',
correctAnswer: overThreshold ? 10 : -10,
sourceTag: 'pipeline:geography:population',
pipelineKey: PipelineKeys.geography,
pipelineStep: PipelineSteps.populationQuiz,
);
}
Future<void> _handleGeographyAnswer({
required String firebaseUid,
required String step,
required num userResponse,
required num correctAnswer,
required Map<String, dynamic> context,
}) async {
final String country = context['country'] as String? ?? 'this country';
final String capital = context['capital'] as String? ?? '';
switch (step) {
case PipelineSteps.populationQuiz:
final BranchOutcome outcome = _evaluateAnswer(
userResponse: userResponse,
correctAnswer: correctAnswer,
production: () => BranchDecision.yesNo(
userResponse: userResponse,
correctAnswer: correctAnswer,
),
);
if (outcome == BranchOutcome.match) {
await _questionsDb.upsertPipelineState(
assignedUserId: firebaseUid,
pipelineKey: PipelineKeys.geography,
step: PipelineSteps.capitalFollowUp,
context: context,
);
await _deliverPipelineQuestion(
firebaseUid: firebaseUid,
questionText:
'Nice estimate! What is the capital of $country? '
'Enter 1 if it is $capital, or -1 if you are unsure.',
correctAnswer: 1,
sourceTag: 'pipeline:geography:capital',
pipelineKey: PipelineKeys.geography,
pipelineStep: PipelineSteps.capitalFollowUp,
);
} else {
await _questionsDb.upsertPipelineState(
assignedUserId: firebaseUid,
pipelineKey: PipelineKeys.geography,
step: PipelineSteps.capitalRecovery,
context: context,
);
await _deliverPipelineQuestion(
firebaseUid: firebaseUid,
questionText:
'The population of $country is about '
'${context['populationMillions']} million. '
'Try again: enter 1 if the capital is $capital, -1 otherwise.',
correctAnswer: 1,
sourceTag: 'pipeline:geography:recovery',
pipelineKey: PipelineKeys.geography,
pipelineStep: PipelineSteps.capitalRecovery,
);
}
case PipelineSteps.capitalFollowUp:
case PipelineSteps.capitalRecovery:
await _setIdle(firebaseUid, PipelineKeys.geography);
}
}
Future<void> _startWeatherPipeline(String firebaseUid) async {
if (!await _canEnqueue(firebaseUid)) {
return;
}
final WeatherFacts? weather =
testMode ? null : await _fetcher.fetchRandomCityWeather();
if (!testMode && weather == null) {
await _setIdle(firebaseUid, PipelineKeys.weather);
return;
}
final Map<String, dynamic> context = testMode
? <String, dynamic>{'city': 'Test City', 'temperatureCelsius': 20}
: <String, dynamic>{
'city': weather!.city,
'temperatureCelsius': weather.temperatureCelsius,
};
await _questionsDb.upsertPipelineState(
assignedUserId: firebaseUid,
pipelineKey: PipelineKeys.weather,
step: PipelineSteps.temperatureQuiz,
context: context,
);
const int warmThresholdC = 15;
final bool isWarm =
(context['temperatureCelsius'] as int) >= warmThresholdC;
await _deliverPipelineQuestion(
firebaseUid: firebaseUid,
questionText:
'Is it $warmThresholdC°C or warmer in ${context['city']} right now? '
'Swipe toward +10 for yes, toward -10 for no.',
correctAnswer: isWarm ? 10 : -10,
sourceTag: 'pipeline:weather:temperature',
pipelineKey: PipelineKeys.weather,
pipelineStep: PipelineSteps.temperatureQuiz,
);
}
Future<void> _handleWeatherAnswer({
required String firebaseUid,
required String step,
required num userResponse,
required num correctAnswer,
required Map<String, dynamic> context,
}) async {
final String city = context['city'] as String? ?? 'the city';
final int actual = (context['temperatureCelsius'] as num?)?.round() ?? 0;
switch (step) {
case PipelineSteps.temperatureQuiz:
final BranchOutcome outcome = _evaluateAnswer(
userResponse: userResponse,
correctAnswer: correctAnswer,
production: () => BranchDecision.yesNo(
userResponse: userResponse,
correctAnswer: correctAnswer,
),
);
final String followUp = switch (outcome) {
BranchOutcome.match =>
'Spot on! It is about $actual°C in $city. '
'Enter +1 to get another weather question, -1 to switch to geography.',
BranchOutcome.close =>
'Close — it is about $actual°C in $city. '
'Enter +1 for another weather round, -1 for geography instead.',
_ =>
'It is about $actual°C in $city right now. '
'Enter +1 to try another city, -1 to try geography questions.',
};
await _questionsDb.upsertPipelineState(
assignedUserId: firebaseUid,
pipelineKey: PipelineKeys.weather,
step: PipelineSteps.temperatureFollowUp,
context: context,
);
await _deliverPipelineQuestion(
firebaseUid: firebaseUid,
questionText: followUp,
correctAnswer: 1,
sourceTag: 'pipeline:weather:follow_up',
pipelineKey: PipelineKeys.weather,
pipelineStep: PipelineSteps.temperatureFollowUp,
);
case PipelineSteps.temperatureFollowUp:
if (userResponse > 0) {
await _startWeatherPipeline(firebaseUid);
} else {
await _startGeographyPipeline(firebaseUid);
}
}
}
Future<void> _advanceFromIdle(
String firebaseUid,
Map<String, dynamic> state,
) async {
final String pipelineKey = state['pipelineKey'] as String;
switch (pipelineKey) {
case PipelineKeys.geography:
await _startGeographyPipeline(firebaseUid);
case PipelineKeys.weather:
await _startWeatherPipeline(firebaseUid);
default:
await _startRootPipeline(firebaseUid);
}
}
Future<void> _setIdle(String firebaseUid, String pipelineKey) async {
await _questionsDb.upsertPipelineState(
assignedUserId: firebaseUid,
pipelineKey: pipelineKey,
step: PipelineSteps.idle,
context: await _loadContext(firebaseUid, pipelineKey),
);
}
void close() => _fetcher.close();
}

View File

@ -0,0 +1,85 @@
import 'dart:async';
import 'dart:io';
import 'questions_db.dart';
import 'signalr/questions_hub_connections.dart';
/// Creates questions in Postgres and delivers them over SignalR.
class QuestionService {
QuestionService({
required QuestionsDb questionsDb,
required QuestionsHubConnections hubConnections,
}) : _questionsDb = questionsDb,
_hubConnections = hubConnections;
final QuestionsDb _questionsDb;
final QuestionsHubConnections _hubConnections;
/// Called at login: ensures a starter question exists when the user has none.
Future<Map<String, dynamic>> ensureStarterQuestionOnLogin(
String firebaseUid,
) async {
final Map<String, dynamic> question =
await _questionsDb.getOrCreateStarterQuestion(firebaseUid);
final int unansweredCount =
await _questionsDb.countUnansweredQuestions(firebaseUid);
final Map<String, dynamic> payload = _questionsDb.toClientPayload(
question,
unansweredCount: unansweredCount,
);
await _hubConnections.pushQuestion(firebaseUid, payload);
return payload;
}
/// Inserts a question and pushes it to connected SignalR clients.
Future<Map<String, dynamic>> createAndDeliverQuestion({
required String assignedUserId,
required String questionText,
required num correctAnswer,
String? sourceTag,
String? pipelineKey,
String? pipelineStep,
}) async {
final Map<String, dynamic> question = await _questionsDb.createQuestion(
assignedUserId: assignedUserId,
questionText: questionText,
correctAnswer: correctAnswer,
sourceTag: sourceTag,
pipelineKey: pipelineKey,
pipelineStep: pipelineStep,
);
final int unansweredCount =
await _questionsDb.countUnansweredQuestions(assignedUserId);
final Map<String, dynamic> payload = _questionsDb.toClientPayload(
question,
unansweredCount: unansweredCount,
);
await _hubConnections.pushQuestion(assignedUserId, payload);
return payload;
}
/// On SignalR connect: deliver an existing unanswered question only (no create).
Future<void> deliverPendingQuestionOnConnect(
QuestionsHubConnection connection,
) async {
try {
final String uid = connection.firebaseUid;
final Map<String, dynamic>? question =
await _questionsDb.findUnansweredQuestion(uid);
if (question == null) {
return;
}
final int unansweredCount =
await _questionsDb.countUnansweredQuestions(uid);
final Map<String, dynamic> payload = _questionsDb.toClientPayload(
question,
unansweredCount: unansweredCount,
);
await _hubConnections.pushQuestionToConnection(connection, payload);
} catch (e, st) {
stderr.writeln(
'Failed to deliver pending question for ${connection.firebaseUid}: $e\n$st',
);
}
}
}

View File

@ -0,0 +1,409 @@
import 'dart:convert';
import 'dart:math';
import 'package:postgres/postgres.dart';
import 'package:uuid/uuid.dart';
/// Postgres access for the questions table.
class QuestionsDb {
QuestionsDb(this._connection);
final Connection _connection;
static const Uuid _uuid = Uuid();
Future<void> ensureUserExists(String firebaseUid) async {
await _connection.execute(
Sql.named(
'''
INSERT INTO users (firebase_uid)
VALUES (@uid)
ON CONFLICT (firebase_uid) DO NOTHING
''',
),
parameters: <String, dynamic>{'uid': firebaseUid},
);
}
/// Latest unanswered question for [assignedUserId], or null if none.
Future<Map<String, dynamic>?> findUnansweredQuestion(
String assignedUserId,
) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT id, assigned_user_id, question_text, user_response, correct_answer,
created_at, modified_at, source_tag, pipeline_key, pipeline_step
FROM questions
WHERE assigned_user_id = @uid AND user_response IS NULL
ORDER BY created_at ASC
LIMIT 1
''',
),
parameters: <String, dynamic>{'uid': assignedUserId},
);
if (result.isEmpty) {
return null;
}
return _rowFromResult(result.first);
}
/// All unanswered questions for [assignedUserId], oldest first.
Future<List<Map<String, dynamic>>> listUnansweredQuestions(
String assignedUserId,
) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT id, assigned_user_id, question_text, user_response, correct_answer,
created_at, modified_at, source_tag, pipeline_key, pipeline_step
FROM questions
WHERE assigned_user_id = @uid AND user_response IS NULL
ORDER BY created_at ASC
''',
),
parameters: <String, dynamic>{'uid': assignedUserId},
);
return result.map(_rowFromResult).toList();
}
/// Records [userResponse] for an unanswered question owned by [assignedUserId].
Future<Map<String, dynamic>?> submitAnswer({
required String questionId,
required String assignedUserId,
required num userResponse,
}) async {
final DateTime now = DateTime.now().toUtc();
final Result result = await _connection.execute(
Sql.named(
'''
UPDATE questions
SET user_response = @user_response, modified_at = @modified_at
WHERE id = @id::uuid
AND assigned_user_id = @uid
AND user_response IS NULL
RETURNING id, assigned_user_id, question_text, user_response, correct_answer,
created_at, modified_at, source_tag, pipeline_key, pipeline_step
''',
),
parameters: <String, dynamic>{
'id': questionId,
'uid': assignedUserId,
'user_response': userResponse,
'modified_at': now,
},
);
if (result.isEmpty) {
return null;
}
return _rowFromResult(result.first);
}
/// All Firebase UIDs that have a profile row (pipeline targets).
Future<List<String>> listAllUserFirebaseUids() async {
final Result result = await _connection.execute(
'SELECT firebase_uid FROM users ORDER BY updated_at DESC',
);
return result
.map((ResultRow row) => row[0]! as String)
.toList();
}
Future<Map<String, dynamic>?> getPipelineState(String assignedUserId) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT pipeline_key, step, context, updated_at
FROM user_pipeline_state
WHERE assigned_user_id = @uid
''',
),
parameters: <String, dynamic>{'uid': assignedUserId},
);
if (result.isEmpty) {
return null;
}
final ResultRow row = result.first;
final Object? contextRaw = row[2];
final Map<String, dynamic> context = contextRaw is Map<String, dynamic>
? contextRaw
: jsonDecode(contextRaw.toString()) as Map<String, dynamic>;
return <String, dynamic>{
'assignedUserId': assignedUserId,
'pipelineKey': row[0]! as String,
'step': row[1]! as String,
'context': context,
'updatedAt': (row[3]! as DateTime).toIso8601String(),
};
}
Future<void> upsertPipelineState({
required String assignedUserId,
required String pipelineKey,
required String step,
required Map<String, dynamic> context,
}) async {
await ensureUserExists(assignedUserId);
final DateTime now = DateTime.now().toUtc();
await _connection.execute(
Sql.named(
'''
INSERT INTO user_pipeline_state (
assigned_user_id, pipeline_key, step, context, updated_at
) VALUES (
@uid, @pipeline_key, @step, @context::jsonb, @updated_at
)
ON CONFLICT (assigned_user_id) DO UPDATE SET
pipeline_key = EXCLUDED.pipeline_key,
step = EXCLUDED.step,
context = EXCLUDED.context,
updated_at = EXCLUDED.updated_at
''',
),
parameters: <String, dynamic>{
'uid': assignedUserId,
'pipeline_key': pipelineKey,
'step': step,
'context': jsonEncode(context),
'updated_at': now,
},
);
}
/// Moves an unanswered question to the end of the user's queue.
Future<Map<String, dynamic>?> deferQuestion({
required String questionId,
required String assignedUserId,
}) async {
final DateTime now = DateTime.now().toUtc();
final Result result = await _connection.execute(
Sql.named(
'''
UPDATE questions q
SET created_at = sub.max_ts + INTERVAL '1 millisecond',
modified_at = @modified_at
FROM (
SELECT COALESCE(MAX(created_at), @modified_at) AS max_ts
FROM questions
WHERE assigned_user_id = @uid AND user_response IS NULL
) sub
WHERE q.id = @id::uuid
AND q.assigned_user_id = @uid
AND q.user_response IS NULL
RETURNING q.id, q.assigned_user_id, q.question_text, q.user_response,
q.correct_answer, q.created_at, q.modified_at,
q.source_tag, q.pipeline_key, q.pipeline_step
''',
),
parameters: <String, dynamic>{
'id': questionId,
'uid': assignedUserId,
'modified_at': now,
},
);
if (result.isEmpty) {
return null;
}
return _rowFromResult(result.first);
}
/// Count of unanswered questions assigned to [assignedUserId].
Future<int> countUnansweredQuestions(String assignedUserId) async {
final Result result = await _connection.execute(
Sql.named(
'''
SELECT COUNT(*)::int AS count
FROM questions
WHERE assigned_user_id = @uid AND user_response IS NULL
''',
),
parameters: <String, dynamic>{'uid': assignedUserId},
);
return (result.first[0]! as num).toInt();
}
/// Returns an existing unanswered question or creates the starter question.
Future<Map<String, dynamic>> getOrCreateStarterQuestion(
String assignedUserId,
) async {
final Map<String, dynamic>? existing =
await findUnansweredQuestion(assignedUserId);
if (existing != null) {
return existing;
}
return createStarterQuestion(assignedUserId);
}
/// Creates the starter test question with a random correct answer in [-10, 10].
Future<Map<String, dynamic>> createStarterQuestion(String assignedUserId) async {
await ensureUserExists(assignedUserId);
final int correctAnswer = Random().nextInt(21) - 10;
final String id = _uuid.v4();
final DateTime now = DateTime.now().toUtc();
const String questionText =
'Starter question: enter a whole number between -10 and 10.';
await _connection.execute(
Sql.named(
'''
INSERT INTO questions (
id, assigned_user_id, question_text, user_response, correct_answer,
created_at, modified_at
) VALUES (
@id::uuid, @assigned_user_id, @question_text, NULL, @correct_answer,
@created_at, @modified_at
)
''',
),
parameters: <String, dynamic>{
'id': id,
'assigned_user_id': assignedUserId,
'question_text': questionText,
'correct_answer': correctAnswer,
'created_at': now,
'modified_at': now,
},
);
return <String, dynamic>{
'id': id,
'assignedUserId': assignedUserId,
'text': questionText,
'userResponse': null,
'correctAnswer': correctAnswer,
'createdAt': now.toIso8601String(),
'modifiedAt': now.toIso8601String(),
};
}
Future<Map<String, dynamic>> createQuestion({
required String assignedUserId,
required String questionText,
required num correctAnswer,
String? sourceTag,
String? pipelineKey,
String? pipelineStep,
}) async {
await ensureUserExists(assignedUserId);
final String id = _uuid.v4();
final DateTime now = DateTime.now().toUtc();
await _connection.execute(
Sql.named(
'''
INSERT INTO questions (
id, assigned_user_id, question_text, user_response, correct_answer,
created_at, modified_at, source_tag, pipeline_key, pipeline_step
) VALUES (
@id::uuid, @assigned_user_id, @question_text, NULL, @correct_answer,
@created_at, @modified_at, @source_tag, @pipeline_key, @pipeline_step
)
''',
),
parameters: <String, dynamic>{
'id': id,
'assigned_user_id': assignedUserId,
'question_text': questionText,
'correct_answer': correctAnswer,
'created_at': now,
'modified_at': now,
'source_tag': sourceTag,
'pipeline_key': pipelineKey,
'pipeline_step': pipelineStep,
},
);
return _rowToJson(
id: id,
assignedUserId: assignedUserId,
questionText: questionText,
userResponse: null,
correctAnswer: correctAnswer,
createdAt: now,
modifiedAt: now,
sourceTag: sourceTag,
pipelineKey: pipelineKey,
pipelineStep: pipelineStep,
);
}
/// Payload sent to the Flutter client over SignalR (excludes correct answer).
Map<String, dynamic> toClientPayload(
Map<String, dynamic> question, {
required int unansweredCount,
}) {
return <String, dynamic>{
'id': question['id'],
'assignedUserId': question['assignedUserId'],
'text': question['text'],
'sentAt': question['createdAt'],
'unansweredCount': unansweredCount,
};
}
Map<String, dynamic> _rowFromResult(ResultRow row) {
final Object idValue = row[0]!;
final String id = idValue is String ? idValue : idValue.toString();
final DateTime createdAt = row[5]! as DateTime;
final DateTime modifiedAt = row[6]! as DateTime;
return _rowToJson(
id: id,
assignedUserId: row[1]! as String,
questionText: row[2]! as String,
userResponse: _readOptionalNumeric(row[3]),
correctAnswer: _readNumeric(row[4]),
createdAt: createdAt,
modifiedAt: modifiedAt,
sourceTag: row.length > 7 ? row[7] as String? : null,
pipelineKey: row.length > 8 ? row[8] as String? : null,
pipelineStep: row.length > 9 ? row[9] as String? : null,
);
}
/// Postgres NUMERIC columns may decode as [String] or [num].
static num _readNumeric(Object? value) {
if (value == null) {
return 0;
}
if (value is num) {
return value;
}
if (value is String) {
return num.parse(value);
}
return num.parse(value.toString());
}
static num? _readOptionalNumeric(Object? value) {
if (value == null) {
return null;
}
return _readNumeric(value);
}
Map<String, dynamic> _rowToJson({
required String id,
required String assignedUserId,
required String questionText,
required Object? userResponse,
required num correctAnswer,
required DateTime createdAt,
required DateTime modifiedAt,
String? sourceTag,
String? pipelineKey,
String? pipelineStep,
}) {
return <String, dynamic>{
'id': id,
'assignedUserId': assignedUserId,
'text': questionText,
'userResponse': userResponse,
'correctAnswer': correctAnswer,
'createdAt': createdAt.toIso8601String(),
'modifiedAt': modifiedAt.toIso8601String(),
if (sourceTag != null) 'sourceTag': sourceTag,
if (pipelineKey != null) 'pipelineKey': pipelineKey,
if (pipelineStep != null) 'pipelineStep': pipelineStep,
};
}
}

View File

@ -0,0 +1,123 @@
import 'dart:async';
import 'dart:convert';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'signalr_protocol.dart';
import 'text_message_format.dart';
/// Active SignalR connection for the questions hub.
class QuestionsHubConnection {
QuestionsHubConnection({
required this.connectionToken,
required this.firebaseUid,
required this.channel,
});
final String connectionToken;
final String firebaseUid;
final WebSocketChannel channel;
bool handshakeComplete = false;
StreamSubscription<Object?>? _subscription;
void listen(void Function(String message) onMessage, {void Function()? onDone}) {
_subscription = channel.stream.listen(
(Object? message) {
if (message is String) {
onMessage(message);
}
},
onDone: onDone,
cancelOnError: true,
);
}
Future<void> sendRaw(String payload) async {
channel.sink.add(payload);
}
Future<void> sendInvocation(String target, List<Object?> arguments) async {
await sendRaw(SignalrProtocol.invocation(
target: target,
arguments: arguments,
));
}
Future<void> close() async {
await _subscription?.cancel();
await channel.sink.close();
}
}
/// Tracks connected clients and delivers hub invocations by Firebase UID.
class QuestionsHubConnections {
final Map<String, QuestionsHubConnection> _byToken =
<String, QuestionsHubConnection>{};
final Map<String, Set<String>> _tokensByUid = <String, Set<String>>{};
void register(QuestionsHubConnection connection) {
_byToken[connection.connectionToken] = connection;
_tokensByUid
.putIfAbsent(connection.firebaseUid, () => <String>{})
.add(connection.connectionToken);
}
Future<void> unregister(String connectionToken) async {
final QuestionsHubConnection? connection = _byToken.remove(connectionToken);
if (connection == null) {
return;
}
final Set<String>? tokens = _tokensByUid[connection.firebaseUid];
tokens?.remove(connectionToken);
if (tokens != null && tokens.isEmpty) {
_tokensByUid.remove(connection.firebaseUid);
}
await connection.close();
}
bool isConnected(String firebaseUid) =>
(_tokensByUid[firebaseUid]?.isNotEmpty ?? false);
Future<void> pushQuestionToConnection(
QuestionsHubConnection connection,
Map<String, dynamic> question,
) async {
if (!connection.handshakeComplete) {
return;
}
await connection.sendInvocation('ReceiveQuestion', <Object?>[question]);
}
Future<int> pushQuestion(
String firebaseUid,
Map<String, dynamic> question,
) async {
final Set<String> tokens = _tokensByUid[firebaseUid] ?? <String>{};
var delivered = 0;
for (final String token in tokens) {
final QuestionsHubConnection? connection = _byToken[token];
if (connection == null || !connection.handshakeComplete) {
continue;
}
await connection.sendInvocation('ReceiveQuestion', <Object?>[question]);
delivered++;
}
return delivered;
}
/// Parses inbound frames after the handshake.
void handleClientMessage(
QuestionsHubConnection connection,
String payload,
) {
for (final String message in TextMessageFormat.parse(payload)) {
final Map<String, dynamic> json =
jsonDecode(message) as Map<String, dynamic>;
final int? type = json['type'] as int?;
if (type == 6) {
unawaited(connection.sendRaw(SignalrProtocol.ping()));
}
}
}
}

View File

@ -0,0 +1,40 @@
import 'dart:convert';
import 'text_message_format.dart';
/// Writes ASP.NET Core SignalR JSON hub messages.
class SignalrProtocol {
static String handshakeRequest() =>
TextMessageFormat.write(jsonEncode(<String, dynamic>{
'protocol': 'json',
'version': 1,
}));
static String handshakeResponse({String? error}) =>
TextMessageFormat.write(jsonEncode(<String, dynamic>{
if (error != null) 'error': error,
}));
static String invocation({
required String target,
required List<Object?> arguments,
String? invocationId,
}) {
return TextMessageFormat.write(jsonEncode(<String, dynamic>{
'type': 1,
'target': target,
'arguments': arguments,
if (invocationId != null) 'invocationId': invocationId,
}));
}
static String ping() => TextMessageFormat.write(jsonEncode(<String, dynamic>{
'type': 6,
}));
static String close({String? error}) =>
TextMessageFormat.write(jsonEncode(<String, dynamic>{
'type': 7,
if (error != null) 'error': error,
}));
}

View File

@ -0,0 +1,16 @@
/// SignalR text framing (record separator 0x1E).
class TextMessageFormat {
static const int recordSeparatorCode = 0x1e;
static final String recordSeparator = String.fromCharCode(recordSeparatorCode);
static String write(String output) => '$output$recordSeparator';
static List<String> parse(String input) {
if (input.isEmpty || input.codeUnitAt(input.length - 1) != recordSeparatorCode) {
throw StateError('Message is incomplete.');
}
final List<String> messages = input.split(recordSeparator);
messages.removeLast();
return messages;
}
}

View File

@ -0,0 +1,49 @@
import 'dart:async';
import 'dart:io';
import '../pipeline/question_pipeline.dart';
/// Runs [QuestionPipeline.runMaintenanceCycle] on a fixed interval.
class QuestionBackgroundWorker {
QuestionBackgroundWorker({
required QuestionPipeline pipeline,
required Duration interval,
}) : _pipeline = pipeline,
_interval = interval;
final QuestionPipeline _pipeline;
final Duration _interval;
Timer? _timer;
bool _running = false;
void start() {
if (_timer != null) {
return;
}
stdout.writeln(
'Question background worker started (interval ${_interval.inSeconds}s)',
);
_timer = Timer.periodic(_interval, (_) => _tick());
unawaited(_tick());
}
void stop() {
_timer?.cancel();
_timer = null;
_pipeline.close();
}
Future<void> _tick() async {
if (_running) {
return;
}
_running = true;
try {
await _pipeline.runMaintenanceCycle();
} catch (e, st) {
stderr.writeln('Question background worker tick failed: $e\n$st');
} finally {
_running = false;
}
}
}

View File

@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS questions (
id UUID PRIMARY KEY,
assigned_user_id TEXT NOT NULL REFERENCES users (firebase_uid) ON DELETE CASCADE,
question_text TEXT NOT NULL,
user_response NUMERIC,
correct_answer NUMERIC NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
modified_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS questions_assigned_user_id_idx ON questions (assigned_user_id);
CREATE INDEX IF NOT EXISTS questions_created_at_idx ON questions (created_at);

View File

@ -0,0 +1,15 @@
ALTER TABLE questions
ADD COLUMN IF NOT EXISTS source_tag TEXT,
ADD COLUMN IF NOT EXISTS pipeline_key TEXT,
ADD COLUMN IF NOT EXISTS pipeline_step TEXT;
CREATE INDEX IF NOT EXISTS questions_source_tag_idx ON questions (source_tag)
WHERE source_tag IS NOT NULL;
CREATE TABLE IF NOT EXISTS user_pipeline_state (
assigned_user_id TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE,
pipeline_key TEXT NOT NULL,
step TEXT NOT NULL,
context JSONB NOT NULL DEFAULT '{}',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

View File

@ -57,6 +57,14 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "4.2.0" version: "4.2.0"
fixnum:
dependency: transitive
description:
name: fixnum
sha256: b6dc7065e46c974bc7c5f143080a6764ec7a4be6da1285ececdc37be96de53be
url: "https://pub.dev"
source: hosted
version: "1.1.1"
http: http:
dependency: "direct main" dependency: "direct main"
description: description:
@ -137,6 +145,14 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.1.4" version: "1.1.4"
shelf_web_socket:
dependency: "direct main"
description:
name: shelf_web_socket
sha256: "3632775c8e90d6c9712f883e633716432a27758216dfb61bd86a8321c0580925"
url: "https://pub.dev"
source: hosted
version: "3.0.0"
source_span: source_span:
dependency: transitive dependency: transitive
description: description:
@ -185,6 +201,14 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.4.0" version: "1.4.0"
uuid:
dependency: "direct main"
description:
name: uuid
sha256: "1fef9e8e11e2991bb773070d4656b7bd5d850967a2456cfc83cf47925ba79489"
url: "https://pub.dev"
source: hosted
version: "4.5.3"
web: web:
dependency: transitive dependency: transitive
description: description:
@ -193,5 +217,21 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.1.1" version: "1.1.1"
web_socket:
dependency: transitive
description:
name: web_socket
sha256: "34d64019aa8e36bf9842ac014bb5d2f5586ca73df5e4d9bf5c936975cae6982c"
url: "https://pub.dev"
source: hosted
version: "1.0.1"
web_socket_channel:
dependency: "direct main"
description:
name: web_socket_channel
sha256: d645757fb0f4773d602444000a8131ff5d48c9e47adfe9772652dd1a4f2d45c8
url: "https://pub.dev"
source: hosted
version: "3.0.3"
sdks: sdks:
dart: ">=3.12.0 <4.0.0" dart: ">=3.12.0 <4.0.0"

View File

@ -9,6 +9,9 @@ dependencies:
shelf: ^1.4.2 shelf: ^1.4.2
shelf_router: ^1.1.4 shelf_router: ^1.1.4
shelf_cors_headers: ^0.1.5 shelf_cors_headers: ^0.1.5
shelf_web_socket: ^3.0.0
postgres: ^3.5.6 postgres: ^3.5.6
dotenv: ^4.2.0 dotenv: ^4.2.0
http: ^1.6.0 http: ^1.6.0
uuid: ^4.5.3
web_socket_channel: ^3.0.0

98
startup.sh Executable file
View File

@ -0,0 +1,98 @@
#!/usr/bin/env bash
# Start the API (port 3000) and Flutter web dev server (port 8080) with merged logs.
# Usage: ./startup.sh
set -euo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
API_PORT=3000
WEB_PORT=8080
if [ -f "$ROOT/scripts/flutter-env.sh" ]; then
# shellcheck source=/dev/null
source "$ROOT/scripts/flutter-env.sh"
fi
log() { printf '%s\n' "$*"; }
kill_port() {
local port=$1
if command -v fuser >/dev/null 2>&1; then
if fuser "${port}/tcp" >/dev/null 2>&1; then
log "Stopping process(es) on port ${port}..."
fuser -k "${port}/tcp" >/dev/null 2>&1 || true
sleep 0.5
fi
return
fi
if ! command -v lsof >/dev/null 2>&1; then
warn "Cannot free port ${port}: install fuser or lsof"
return
fi
local pids
pids="$(lsof -t -iTCP:"${port}" -sTCP:LISTEN 2>/dev/null || true)"
if [ -n "$pids" ]; then
log "Stopping process(es) on port ${port}: ${pids}"
# shellcheck disable=SC2086
kill ${pids} 2>/dev/null || true
sleep 0.5
# shellcheck disable=SC2086
kill -9 ${pids} 2>/dev/null || true
fi
}
warn() { printf 'WARN: %s\n' "$*" >&2; }
prefix_lines() {
local tag=$1
while IFS= read -r line || [ -n "${line:-}" ]; do
printf '[%s] %s\n' "$tag" "$line"
done
}
cleanup() {
trap - INT TERM EXIT
log "Shutting down..."
if [ -n "${API_PID:-}" ]; then kill "$API_PID" 2>/dev/null || true; fi
if [ -n "${WEB_PID:-}" ]; then kill "$WEB_PID" 2>/dev/null || true; fi
kill_port "$API_PORT"
kill_port "$WEB_PORT"
wait 2>/dev/null || true
}
trap cleanup INT TERM EXIT
log "Freeing ports ${API_PORT} (API) and ${WEB_PORT} (web)..."
kill_port "$API_PORT"
kill_port "$WEB_PORT"
if [ ! -f "$ROOT/server/.env" ]; then
warn "Missing server/.env — copy server/.env.example and configure it first."
fi
log "Starting API on http://localhost:${API_PORT} ..."
(
cd "$ROOT/server"
dart run bin/server.dart 2>&1 | prefix_lines api
) &
API_PID=$!
log "Starting web app on http://localhost:${WEB_PORT} ..."
(
cd "$ROOT"
flutter run -d web-server \
--dart-define=API_BASE_URL="http://localhost:${API_PORT}" \
2>&1 | prefix_lines web
) &
WEB_PID=$!
log "Both services running. Press Ctrl+C to stop."
log " API: http://localhost:${API_PORT}"
log " Web: http://localhost:${WEB_PORT}"
log ""
wait -n 2>/dev/null || wait || true
exit_code=$?
cleanup
exit "$exit_code"