cyberhybridhub/TRADING_DEVELOPMENT_PLAN.md

541 lines
20 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Trading Development Plan — Alpaca Data & Execution
This document defines how Cyber Hybrid Hub will add **Alpaca market data gathering**, **Alpaca trade actuation**, and a **dynamically configurable data-input layer** that feeds the existing **interval worker** (`QuestionBackgroundWorker`), **Postgres** persistence, **user questions** (SignalR), and **automated trade decisions**.
It extends the current architecture documented in `server/README.md` and implemented in `server/lib/pipeline/question_pipeline.dart` — not replace it.
---
## 1. Goals & constraints
| Goal | Detail |
|------|--------|
| **Data** | Ingest US equity (and optional crypto) prices from Alpaca Market Data; store normalized snapshots in Postgres for the worker to read. |
| **Questions** | Worker evaluates stored data + pipeline config → enqueues swipe/numeric questions via `QuestionService` → Flutter receives `ReceiveQuestion` over SignalR. |
| **Trades** | After rules + optional user confirmation via answers, place orders through Alpaca Trading API (paper first). |
| **Configuration** | Watchlists, thresholds, and pipeline steps are **DB-driven** (and env defaults), not hard-coded in Dart. |
| **Security** | Alpaca secret keys live **only** on the Dart API server; Flutter never holds trading credentials. |
| **Cost** | Target Alpaca **Basic** market data (IEX, ~30 WS symbols) + **paper** trading until strategies are validated. |
**Non-goals (initial phases):** sub-second HFT, full SIP market data, client-side WebSockets to Alpaca, or multi-broker routing.
---
## 2. Fit with the current system
```mermaid
flowchart TB
subgraph flutter [Flutter app]
UI[HomeScreen / SwipeQuestionTile]
Hub[QuestionsHubService SignalR]
end
subgraph server [Dart API server]
API[Shelf handlers]
Worker[QuestionBackgroundWorker interval]
MDP[MarketDataPipeline NEW]
TP[TradingPipeline NEW]
QP[QuestionPipeline existing]
QS[QuestionService]
AlpacaMD[Alpaca Market Data client]
AlpacaTR[Alpaca Trading client]
end
subgraph pg [Postgres]
MD[market_data_snapshots]
CFG[trading_config / user_trading_state]
Q[questions]
UPS[user_pipeline_state]
ORD[trade_orders]
end
AlpacaMD -->|REST poll or WS| MDP
MDP --> MD
Worker --> MDP
Worker --> TP
Worker --> QP
TP --> MD
TP --> CFG
TP --> ORD
TP --> AlpacaTR
QP --> QS
QS --> Q
QS --> Hub
API --> QS
Hub --> UI
API --> UI
```
**Existing touchpoints to extend:**
| Component | Path | Role |
|-----------|------|------|
| Interval worker | `server/lib/workers/question_background_worker.dart` | Orchestrates tick; will call market-data ingest + trading evaluation before/alongside `QuestionPipeline.runMaintenanceCycle`. |
| Question pipeline | `server/lib/pipeline/question_pipeline.dart` | Add `PipelineKeys.trading` branch; reuse `BranchDecision` for threshold-style answers. |
| External fetcher pattern | `server/lib/pipeline/external_data_fetcher.dart` | Precedent for HTTP clients; Alpaca clients live in `server/lib/alpaca/`. |
| Env loading | `server/lib/env.dart` | Add Alpaca + trading feature flags. |
| Flutter | `lib/services/questions_hub_service.dart`, `lib/widgets/swipe_question_tile.dart` | Unchanged contract: questions in, numeric answers out. |
---
## 3. Alpaca accounts & APIs
Use **one Alpaca account** with two API surfaces:
| Surface | Base URL (paper) | Purpose |
|---------|------------------|---------|
| **Market Data** | `https://data.alpaca.markets` | Snapshots, bars, trades, quotes; WS `wss://stream.data.alpaca.markets/v2/iex` (Basic). |
| **Trading** | `https://paper-api.alpaca.markets` | Orders, positions, account; switch to live URL only after explicit env flip. |
**Authentication:** Headers `APCA-API-KEY-ID` and `APCA-API-SECRET-KEY` on every request.
**Free-tier limits to design around:**
- Market Data Basic: **IEX real-time**, **~30 WebSocket subscriptions**, historical REST capped (use DB as source of truth for worker ticks).
- Trading: paper account; rate limits documented by Alpaca; idempotent `client_order_id` required.
---
## 4. Dynamically configurable data input design
Configuration is split into **global templates** (reusable) and **per-user overrides** (Firebase UID). The worker always resolves effective config before reading market data or emitting questions.
### 4.1 Configuration layers
```text
trading_config_templates (optional defaults by name)
↓ merge
user_trading_config (per user: watchlist, rules, enabled flag)
↓ resolved at tick
EffectiveTradingConfig (in-memory for one worker cycle)
```
### 4.2 Config document shape (JSONB)
Store in Postgres as `JSONB`; validate on write via API or seed migrations.
```json
{
"version": 1,
"enabled": true,
"mode": "paper",
"data_inputs": [
{
"id": "primary_watchlist",
"source": "alpaca",
"asset_class": "us_equity",
"symbols": ["AAPL", "MSFT", "SPY"],
"feed": "iex",
"poll_interval_seconds": 60,
"metrics": ["last_trade", "daily_bar", "prev_close"]
}
],
"rules": [
{
"id": "dip_confirm",
"type": "price_below_pct_of_ref",
"symbol": "SPY",
"ref_metric": "prev_close",
"threshold_pct": -1.5,
"question_template": "SPY is down {{pct}}% vs yesterday. Swipe +10 to approve a small buy, -10 to skip.",
"on_answer_match": { "action": "propose_order", "side": "buy", "notional_usd": 10 }
}
],
"guardrails": {
"max_orders_per_day": 3,
"max_notional_usd_per_4h": 100,
"require_question_before_order": true,
"symbols_blocklist": []
}
}
```
**Design principles:**
- **`data_inputs[]`** — what to fetch and how often; worker matches `poll_interval_seconds` against `QUESTION_WORKER_INTERVAL_SECONDS` or stores last fetch time per input id in `user_trading_state.context`.
- **`rules[]`** — declarative triggers evaluated against **Postgres snapshots**, not live Alpaca on every rule check (decouples question logic from API rate limits).
- **`question_template`** — supports `{{symbol}}`, `{{price}}`, `{{pct}}` substitution when creating `question_text`.
- **User answers** map to actions via existing `pipeline_key` / `pipeline_step` / `source_tag` conventions (e.g. `trading:dip_confirm:await_answer`).
### 4.3 Config administration
| Method | Phase |
|--------|-------|
| SQL seed / migration | Phase 1 — dev templates |
| `PUT /v1/me/trading/config` | Phase 2 — authenticated user override |
| Admin script | Optional bulk updates |
Flutter does **not** edit raw JSON initially; a settings screen can come later. Phase 1 uses server-side defaults keyed by `firebase_uid`.
---
## 5. Postgres schema (new migration `004_trading.sql`)
### 5.1 `market_data_snapshots`
Append-only (or upsert latest) normalized facts per symbol.
```sql
CREATE TABLE market_data_snapshots (
id BIGSERIAL PRIMARY KEY,
symbol TEXT NOT NULL,
asset_class TEXT NOT NULL DEFAULT 'us_equity',
feed TEXT NOT NULL DEFAULT 'iex',
metric TEXT NOT NULL, -- last_trade | daily_bar | prev_close
price NUMERIC,
volume NUMERIC,
as_of TIMESTAMPTZ NOT NULL,
raw JSONB, -- optional Alpaca payload fragment
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX market_data_snapshots_symbol_as_of_idx
ON market_data_snapshots (symbol, as_of DESC);
```
Worker queries: “latest row per `(symbol, metric)`” via `DISTINCT ON` or a materialized view `market_data_latest` (Phase 2 optimization).
### 5.2 `trading_config_templates`
```sql
CREATE TABLE trading_config_templates (
name TEXT PRIMARY KEY,
config JSONB NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
### 5.3 `user_trading_config`
```sql
CREATE TABLE user_trading_config (
firebase_uid TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE,
template_name TEXT REFERENCES trading_config_templates (name),
config JSONB NOT NULL DEFAULT '{}',
enabled BOOLEAN NOT NULL DEFAULT false,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
### 5.4 `user_trading_state`
Extends the pipeline state pattern in `user_pipeline_state` for trading-specific cursor.
```sql
CREATE TABLE user_trading_state (
firebase_uid TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE,
last_ingest_at TIMESTAMPTZ,
last_eval_at TIMESTAMPTZ,
context JSONB NOT NULL DEFAULT '{}',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
`context` holds: last fired rule ids, pending order proposal ids, daily order counts, last fetch times per `data_inputs[].id`.
### 5.5 `trade_orders`
Audit trail and idempotency.
```sql
CREATE TABLE trade_orders (
id UUID PRIMARY KEY,
firebase_uid TEXT NOT NULL REFERENCES users (firebase_uid) ON DELETE CASCADE,
client_order_id TEXT NOT NULL UNIQUE,
alpaca_order_id TEXT,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
order_type TEXT NOT NULL,
notional_usd NUMERIC,
qty NUMERIC,
status TEXT NOT NULL,
question_id UUID REFERENCES questions (id),
rule_id TEXT,
submitted_at TIMESTAMPTZ,
filled_at TIMESTAMPTZ,
raw JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
```
### 5.6 Questions table linkage
Reuse existing columns:
- `source_tag` — e.g. `trading:rule:dip_confirm`
- `pipeline_key``trading`
- `pipeline_step` — rule id + phase (`propose`, `confirm`, `idle`)
Optional Phase 2: `questions.metadata JSONB` for symbol/price at question time.
---
## 6. Server modules (proposed layout)
```text
server/lib/
alpaca/
alpaca_env.dart # keys, paper vs live, data host
alpaca_market_data_client.dart
alpaca_trading_client.dart
alpaca_models.dart
trading/
trading_config.dart # parse/merge JSONB → EffectiveTradingConfig
trading_config_db.dart
market_data_ingest.dart # data_inputs → snapshots
market_data_db.dart
rule_engine.dart # rules × snapshots → RuleEvaluation
trading_pipeline.dart # questions + order proposals
trade_orders_db.dart
pipeline/
question_pipeline.dart # wire trading branch in onAnswerSubmitted
workers/
question_background_worker.dart # invoke trading tick
```
---
## 7. Worker tick sequence (single interval)
On each `QuestionBackgroundWorker._tick()` when `TRADING_ENABLED=true`:
```text
1. For each user with user_trading_config.enabled:
a. Resolve EffectiveTradingConfig (template + user JSON merge).
b. MarketDataIngest.runIfDue(user, config.data_inputs)
→ Alpaca REST (or WS service) → INSERT market_data_snapshots
c. TradingPipeline.evaluate(user, config, snapshots from DB)
→ if rule fires and guardrails OK:
- maybe create question (respect maxQueuedQuestions)
- or stage pending order in user_trading_state.context
d. QuestionPipeline.runMaintenanceCycle() for same user (existing geography/weather/trading branches)
2. For users with pending order + confirmed answer (see §8):
TradeActuator.submitOrder() → Alpaca Trading → UPDATE trade_orders
```
**Concurrency:** keep `_running` guard; ingest and evaluate per user sequentially first; optimize later.
**Alignment with `QUESTION_WORKER_INTERVAL_SECONDS`:** default 60s matches Alpaca Basic REST polling for watchlists ≤30 symbols. A dedicated **long-lived WS ingest task** (optional Phase 3) writes snapshots on every tick and reduces REST calls.
---
## 8. Question ↔ trade decision flow
### 8.1 Phases per rule
| Phase | `pipeline_step` | Worker behavior |
|-------|-----------------|-----------------|
| `idle` | — | Evaluate rules against DB snapshots only. |
| `await_confirm` | rule id | Question delivered; `correct_answer` encodes expected confirmation (+10 / -10). |
| `submit_order` | rule id | User answered; if match and guardrails pass → POST Alpaca order. |
| `done` | rule id | Log outcome; return to `idle`. |
### 8.2 Branching (reuse `BranchDecision`)
| User swipe | Meaning | Trade action |
|------------|---------|--------------|
| `+10` (yes) | Approve proposed action | `TradeActuator` places configured order |
| `-10` (no) | Reject | No order; log skip in `user_trading_state.context` |
| near 0 | Ambiguous | Optional follow-up question (same pattern as weather `close`) |
### 8.3 Guardrails (always before Alpaca POST)
- `require_question_before_order` and unanswered question must be resolved.
- `max_orders_per_day` (calendar-day) and `max_notional_usd_per_4h` (rolling 4-hour window) from config. The 4-hour notional window bounds runaway-rule blast radius while still allowing intraday "double-down" sizing as a thesis strengthens.
- `mode == paper` unless `ALPACA_ALLOW_LIVE=true`.
- Symbol in user watchlist and not in `symbols_blocklist`.
- Idempotent `client_order_id` = `uuid` or `{uid}-{rule_id}-{question_id}`.
---
## 9. Alpaca data gathering (implementation notes)
### 9.1 Phase 1 — REST polling (recommended start)
For each symbol in `data_inputs[].symbols`:
| Metric | Alpaca endpoint (v2) |
|--------|----------------------|
| `last_trade` | `GET /v2/stocks/{symbol}/trades/latest` |
| `daily_bar` | `GET /v2/stocks/bars?symbols=...&timeframe=1Day&limit=1` |
| `prev_close` | Derived from previous daily bar or snapshot |
Batch symbols where possible (`bars` multi-symbol) to minimize calls.
### 9.2 Phase 2 — WebSocket ingest service
- Connect to `wss://stream.data.alpaca.markets/v2/iex`.
- Subscribe to union of all enabled users symbols (cap 30 on Basic — enforce in config validator).
- On trade/quote events → upsert `market_data_snapshots` with `as_of = now()`.
- Worker reads DB only; WS process runs independently inside server isolate or secondary entrypoint.
### 9.3 Normalization
All ingested values stored as `NUMERIC` in USD where applicable; `raw` JSONB for debugging. Rule engine never parses Alpaca JSON directly — only snapshot rows.
---
## 10. Alpaca trade actuation (implementation notes)
### 10.1 Order types (initial)
| Config | Alpaca order |
|--------|--------------|
| `notional_usd` | `POST /v2/orders` with `notional`, `side`, `type: market`, `time_in_force: day` |
| Future: qty | `qty` instead of `notional` |
### 10.2 Paper vs live
| Env var | Default |
|---------|---------|
| `ALPACA_TRADING_BASE_URL` | `https://paper-api.alpaca.markets` |
| `ALPACA_ALLOW_LIVE` | `false` |
Server refuses live URL unless `ALPACA_ALLOW_LIVE=true` and `TRADING_MODE=live` in user config.
### 10.3 Post-submit
- Poll order status or use Alpaca trade updates (Phase 3).
- Update `trade_orders.status` and notify user via optional future SignalR event `ReceiveTradeUpdate` (not required for MVP).
---
## 11. Environment variables
Add to `server/.env.example` (document only in plan until implemented):
```bash
# Trading feature gate
TRADING_ENABLED=false
TRADING_WORKER_INGEST_ENABLED=true
TRADING_WORKER_EVAL_ENABLED=true
# Alpaca (server only — never in Flutter)
ALPACA_API_KEY_ID=
ALPACA_API_SECRET_KEY=
ALPACA_TRADING_BASE_URL=https://paper-api.alpaca.markets
ALPACA_DATA_BASE_URL=https://data.alpaca.markets
ALPACA_DATA_FEED=iex
ALPACA_ALLOW_LIVE=false
```
Existing worker flags remain:
- `QUESTION_WORKER_ENABLED` — master switch for interval timer.
- `QUESTION_WORKER_INTERVAL_SECONDS` — drives ingest due-times and evaluation frequency.
- `QUESTION_PIPELINE_TEST_MODE` — when true, skip real Alpaca calls; use fixture snapshots and fake order ids.
---
## 12. Flutter app responsibilities
| Responsibility | Owner |
|----------------|-------|
| Display questions, collect swipe/numeric answer | Flutter (`SwipeQuestionTile`) |
| Submit answer `POST /v1/me/questions/{id}/answer` | Flutter |
| Real-time question delivery | SignalR `ReceiveQuestion` |
| Alpaca credentials | **Never on device** |
| Market data storage | Postgres via server |
| Trade execution | Server only |
Optional later: read-only `GET /v1/me/trading/status` for positions and recent `trade_orders` (no secrets).
---
## 13. HTTP API additions (Phase 2+)
| Method | Path | Purpose |
|--------|------|---------|
| `GET` | `/v1/me/trading/config` | Return merged effective config (sanitized). |
| `PUT` | `/v1/me/trading/config` | Update user JSONB override. |
| `GET` | `/v1/me/trading/orders` | List `trade_orders` for user. |
| `GET` | `/v1/me/trading/snapshots?symbol=SPY` | Debug/latest metrics (auth required). |
Phase 1 can omit public endpoints and use SQL seeds for test users.
---
## 14. Implementation phases
### Phase 1 — Foundation (MVP)
- [ ] Migration `004_trading.sql`
- [ ] `AlpacaMarketDataClient` + `AlpacaTradingClient` (paper, REST only)
- [ ] `MarketDataIngest` + `market_data_snapshots` writes
- [ ] `TradingConfig` resolver + seed template `default_paper_watchlist`
- [ ] `RuleEngine` with one rule type: `price_below_pct_of_ref`
- [ ] `TradingPipeline.evaluate` → create question via `QuestionService`
- [ ] Extend `QuestionPipeline.onAnswerSubmitted` for `PipelineKeys.trading`
- [ ] `TradeActuator` on confirm → `trade_orders` + Alpaca POST
- [ ] Wire into `QuestionBackgroundWorker` tick behind `TRADING_ENABLED`
- [ ] `QUESTION_PIPELINE_TEST_MODE` fixtures for CI/local without Alpaca keys
### Phase 2 — Configuration API & hardening
- [ ] `PUT/GET /v1/me/trading/config`
- [ ] Config validation (symbol cap 30, required fields)
- [ ] `market_data_latest` view or cached latest query
- [ ] Daily guardrail counters in `user_trading_state.context`
- [ ] Order status polling + failed order user question
### Phase 3 — Streaming & observability
- [ ] Alpaca IEX WebSocket ingest sidecar
- [ ] `ReceiveTradeUpdate` SignalR event (optional)
- [ ] Metrics logging (ingest lag, rule fires, order latency)
---
## 15. Testing strategy
| Layer | Approach |
|-------|----------|
| Unit | `RuleEngine` with fixture snapshot rows; no network. |
| Integration | Alpaca paper account; verify order appears in Alpaca dashboard. |
| Worker | Run server with `QUESTION_PIPELINE_TEST_MODE=true` and `TRADING_ENABLED=true` using seeded snapshots. |
| Flutter | Existing question flow; manual swipe on trading copy. |
**Safety checklist before live:**
- [ ] `ALPACA_ALLOW_LIVE=false` in production until explicit review
- [ ] Guardrail notional limits enforced in code, not only config
- [ ] Every order traceable to `question_id` + `rule_id`
---
## 16. Risks & mitigations
| Risk | Mitigation |
|------|------------|
| Stale DB snapshots | Store `as_of`; rules ignore data older than `max_staleness_seconds` in config. |
| Alpaca rate limits | Batch REST; cap symbols; WS in Phase 3. |
| Duplicate orders | `client_order_id` UNIQUE; check `trade_orders` before POST. |
| User confusion on auto-trades | Clear `question_text`; default `require_question_before_order: true`. |
| Regulatory / suitability | Paper only initially; disclaimers in app; no investment advice in question copy. |
---
## 17. Example end-to-end scenario
1. **Seed** `user_trading_config` for user U: enabled, `SPY` in watchlist, `dip_confirm` rule at -1.5% vs `prev_close`.
2. **Worker tick (T0):** ingest writes `prev_close=500`, `last_trade=492` → rule fires.
3. **Worker:** queue question: “SPY is down 1.6% … +10 approve buy $10, -10 skip” with `pipeline_key=trading`, `pipeline_step=dip_confirm:await_confirm`.
4. **Flutter:** user swipes +10 → `POST .../answer`.
5. **Pipeline:** `onAnswerSubmitted``TradingPipeline` sees match → `TradeActuator` POST paper market order $10 SPY → row in `trade_orders`.
6. **Next tick:** rule not re-fired (cooldown in `user_trading_state.context`).
---
## 18. References
- [Alpaca Market Data API](https://docs.alpaca.markets/docs/market-data-api)
- [Alpaca Trading API](https://docs.alpaca.markets/docs/trading-api)
- [Alpaca WebSocket streaming](https://docs.alpaca.markets/docs/streaming-market-data)
- Existing pipeline: `server/lib/pipeline/question_pipeline.dart`
- Existing worker: `server/lib/workers/question_background_worker.dart`
---
*Document version: 1.0 — Alpaca unified data + execution, Postgres-backed configurable inputs, interval worker integration.*