cyberhybridhub/TRADING_DEVELOPMENT_PLAN.md

20 KiB
Raw Permalink Blame History

Trading Development Plan — Alpaca Data & Execution

This document defines how Cyber Hybrid Hub will add Alpaca market data gathering, Alpaca trade actuation, and a dynamically configurable data-input layer that feeds the existing interval worker (QuestionBackgroundWorker), Postgres persistence, user questions (SignalR), and automated trade decisions.

It extends the current architecture documented in server/README.md and implemented in server/lib/pipeline/question_pipeline.dart — not replace it.


1. Goals & constraints

Goal Detail
Data Ingest US equity (and optional crypto) prices from Alpaca Market Data; store normalized snapshots in Postgres for the worker to read.
Questions Worker evaluates stored data + pipeline config → enqueues swipe/numeric questions via QuestionService → Flutter receives ReceiveQuestion over SignalR.
Trades After rules + optional user confirmation via answers, place orders through Alpaca Trading API (paper first).
Configuration Watchlists, thresholds, and pipeline steps are DB-driven (and env defaults), not hard-coded in Dart.
Security Alpaca secret keys live only on the Dart API server; Flutter never holds trading credentials.
Cost Target Alpaca Basic market data (IEX, ~30 WS symbols) + paper trading until strategies are validated.

Non-goals (initial phases): sub-second HFT, full SIP market data, client-side WebSockets to Alpaca, or multi-broker routing.


2. Fit with the current system

flowchart TB
  subgraph flutter [Flutter app]
    UI[HomeScreen / SwipeQuestionTile]
    Hub[QuestionsHubService SignalR]
  end

  subgraph server [Dart API server]
    API[Shelf handlers]
    Worker[QuestionBackgroundWorker interval]
    MDP[MarketDataPipeline NEW]
    TP[TradingPipeline NEW]
    QP[QuestionPipeline existing]
    QS[QuestionService]
    AlpacaMD[Alpaca Market Data client]
    AlpacaTR[Alpaca Trading client]
  end

  subgraph pg [Postgres]
    MD[market_data_snapshots]
    CFG[trading_config / user_trading_state]
    Q[questions]
    UPS[user_pipeline_state]
    ORD[trade_orders]
  end

  AlpacaMD -->|REST poll or WS| MDP
  MDP --> MD
  Worker --> MDP
  Worker --> TP
  Worker --> QP
  TP --> MD
  TP --> CFG
  TP --> ORD
  TP --> AlpacaTR
  QP --> QS
  QS --> Q
  QS --> Hub
  API --> QS
  Hub --> UI
  API --> UI

Existing touchpoints to extend:

Component Path Role
Interval worker server/lib/workers/question_background_worker.dart Orchestrates tick; will call market-data ingest + trading evaluation before/alongside QuestionPipeline.runMaintenanceCycle.
Question pipeline server/lib/pipeline/question_pipeline.dart Add PipelineKeys.trading branch; reuse BranchDecision for threshold-style answers.
External fetcher pattern server/lib/pipeline/external_data_fetcher.dart Precedent for HTTP clients; Alpaca clients live in server/lib/alpaca/.
Env loading server/lib/env.dart Add Alpaca + trading feature flags.
Flutter lib/services/questions_hub_service.dart, lib/widgets/swipe_question_tile.dart Unchanged contract: questions in, numeric answers out.

3. Alpaca accounts & APIs

Use one Alpaca account with two API surfaces:

Surface Base URL (paper) Purpose
Market Data https://data.alpaca.markets Snapshots, bars, trades, quotes; WS wss://stream.data.alpaca.markets/v2/iex (Basic).
Trading https://paper-api.alpaca.markets Orders, positions, account; switch to live URL only after explicit env flip.

Authentication: Headers APCA-API-KEY-ID and APCA-API-SECRET-KEY on every request.

Free-tier limits to design around:

  • Market Data Basic: IEX real-time, ~30 WebSocket subscriptions, historical REST capped (use DB as source of truth for worker ticks).
  • Trading: paper account; rate limits documented by Alpaca; idempotent client_order_id required.

4. Dynamically configurable data input design

Configuration is split into global templates (reusable) and per-user overrides (Firebase UID). The worker always resolves effective config before reading market data or emitting questions.

4.1 Configuration layers

trading_config_templates (optional defaults by name)
        ↓ merge
user_trading_config (per user: watchlist, rules, enabled flag)
        ↓ resolved at tick
EffectiveTradingConfig (in-memory for one worker cycle)

4.2 Config document shape (JSONB)

Store in Postgres as JSONB; validate on write via API or seed migrations.

{
  "version": 1,
  "enabled": true,
  "mode": "paper",
  "data_inputs": [
    {
      "id": "primary_watchlist",
      "source": "alpaca",
      "asset_class": "us_equity",
      "symbols": ["AAPL", "MSFT", "SPY"],
      "feed": "iex",
      "poll_interval_seconds": 60,
      "metrics": ["last_trade", "daily_bar", "prev_close"]
    }
  ],
  "rules": [
    {
      "id": "dip_confirm",
      "type": "price_below_pct_of_ref",
      "symbol": "SPY",
      "ref_metric": "prev_close",
      "threshold_pct": -1.5,
      "question_template": "SPY is down {{pct}}% vs yesterday. Swipe +10 to approve a small buy, -10 to skip.",
      "on_answer_match": { "action": "propose_order", "side": "buy", "notional_usd": 10 }
    }
  ],
  "guardrails": {
    "max_orders_per_day": 3,
    "max_notional_usd_per_4h": 100,
    "require_question_before_order": true,
    "symbols_blocklist": []
  }
}

Design principles:

  • data_inputs[] — what to fetch and how often; worker matches poll_interval_seconds against QUESTION_WORKER_INTERVAL_SECONDS or stores last fetch time per input id in user_trading_state.context.
  • rules[] — declarative triggers evaluated against Postgres snapshots, not live Alpaca on every rule check (decouples question logic from API rate limits).
  • question_template — supports {{symbol}}, {{price}}, {{pct}} substitution when creating question_text.
  • User answers map to actions via existing pipeline_key / pipeline_step / source_tag conventions (e.g. trading:dip_confirm:await_answer).

4.3 Config administration

Method Phase
SQL seed / migration Phase 1 — dev templates
PUT /v1/me/trading/config Phase 2 — authenticated user override
Admin script Optional bulk updates

Flutter does not edit raw JSON initially; a settings screen can come later. Phase 1 uses server-side defaults keyed by firebase_uid.


5. Postgres schema (new migration 004_trading.sql)

5.1 market_data_snapshots

Append-only (or upsert latest) normalized facts per symbol.

CREATE TABLE market_data_snapshots (
  id              BIGSERIAL PRIMARY KEY,
  symbol          TEXT NOT NULL,
  asset_class     TEXT NOT NULL DEFAULT 'us_equity',
  feed            TEXT NOT NULL DEFAULT 'iex',
  metric          TEXT NOT NULL,          -- last_trade | daily_bar | prev_close
  price           NUMERIC,
  volume          NUMERIC,
  as_of           TIMESTAMPTZ NOT NULL,
  raw             JSONB,                -- optional Alpaca payload fragment
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX market_data_snapshots_symbol_as_of_idx
  ON market_data_snapshots (symbol, as_of DESC);

Worker queries: “latest row per (symbol, metric)” via DISTINCT ON or a materialized view market_data_latest (Phase 2 optimization).

5.2 trading_config_templates

CREATE TABLE trading_config_templates (
  name        TEXT PRIMARY KEY,
  config      JSONB NOT NULL,
  updated_at  TIMESTAMPTZ NOT NULL DEFAULT now()
);

5.3 user_trading_config

CREATE TABLE user_trading_config (
  firebase_uid  TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE,
  template_name TEXT REFERENCES trading_config_templates (name),
  config        JSONB NOT NULL DEFAULT '{}',
  enabled       BOOLEAN NOT NULL DEFAULT false,
  updated_at    TIMESTAMPTZ NOT NULL DEFAULT now()
);

5.4 user_trading_state

Extends the pipeline state pattern in user_pipeline_state for trading-specific cursor.

CREATE TABLE user_trading_state (
  firebase_uid       TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE,
  last_ingest_at     TIMESTAMPTZ,
  last_eval_at       TIMESTAMPTZ,
  context            JSONB NOT NULL DEFAULT '{}',
  updated_at         TIMESTAMPTZ NOT NULL DEFAULT now()
);

context holds: last fired rule ids, pending order proposal ids, daily order counts, last fetch times per data_inputs[].id.

5.5 trade_orders

Audit trail and idempotency.

CREATE TABLE trade_orders (
  id                 UUID PRIMARY KEY,
  firebase_uid       TEXT NOT NULL REFERENCES users (firebase_uid) ON DELETE CASCADE,
  client_order_id    TEXT NOT NULL UNIQUE,
  alpaca_order_id    TEXT,
  symbol             TEXT NOT NULL,
  side               TEXT NOT NULL,
  order_type         TEXT NOT NULL,
  notional_usd       NUMERIC,
  qty                NUMERIC,
  status             TEXT NOT NULL,
  question_id        UUID REFERENCES questions (id),
  rule_id            TEXT,
  submitted_at       TIMESTAMPTZ,
  filled_at          TIMESTAMPTZ,
  raw                JSONB,
  created_at         TIMESTAMPTZ NOT NULL DEFAULT now()
);

5.6 Questions table linkage

Reuse existing columns:

  • source_tag — e.g. trading:rule:dip_confirm
  • pipeline_keytrading
  • pipeline_step — rule id + phase (propose, confirm, idle)

Optional Phase 2: questions.metadata JSONB for symbol/price at question time.


6. Server modules (proposed layout)

server/lib/
  alpaca/
    alpaca_env.dart              # keys, paper vs live, data host
    alpaca_market_data_client.dart
    alpaca_trading_client.dart
    alpaca_models.dart
  trading/
    trading_config.dart          # parse/merge JSONB → EffectiveTradingConfig
    trading_config_db.dart
    market_data_ingest.dart      # data_inputs → snapshots
    market_data_db.dart
    rule_engine.dart             # rules × snapshots → RuleEvaluation
    trading_pipeline.dart        # questions + order proposals
    trade_orders_db.dart
  pipeline/
    question_pipeline.dart       # wire trading branch in onAnswerSubmitted
  workers/
    question_background_worker.dart  # invoke trading tick

7. Worker tick sequence (single interval)

On each QuestionBackgroundWorker._tick() when TRADING_ENABLED=true:

1. For each user with user_trading_config.enabled:
   a. Resolve EffectiveTradingConfig (template + user JSON merge).
   b. MarketDataIngest.runIfDue(user, config.data_inputs)
      → Alpaca REST (or WS service) → INSERT market_data_snapshots
   c. TradingPipeline.evaluate(user, config, snapshots from DB)
      → if rule fires and guardrails OK:
         - maybe create question (respect maxQueuedQuestions)
         - or stage pending order in user_trading_state.context
   d. QuestionPipeline.runMaintenanceCycle() for same user (existing geography/weather/trading branches)

2. For users with pending order + confirmed answer (see §8):
   TradeActuator.submitOrder() → Alpaca Trading → UPDATE trade_orders

Concurrency: keep _running guard; ingest and evaluate per user sequentially first; optimize later.

Alignment with QUESTION_WORKER_INTERVAL_SECONDS: default 60s matches Alpaca Basic REST polling for watchlists ≤30 symbols. A dedicated long-lived WS ingest task (optional Phase 3) writes snapshots on every tick and reduces REST calls.


8. Question ↔ trade decision flow

8.1 Phases per rule

Phase pipeline_step Worker behavior
idle Evaluate rules against DB snapshots only.
await_confirm rule id Question delivered; correct_answer encodes expected confirmation (+10 / -10).
submit_order rule id User answered; if match and guardrails pass → POST Alpaca order.
done rule id Log outcome; return to idle.

8.2 Branching (reuse BranchDecision)

User swipe Meaning Trade action
+10 (yes) Approve proposed action TradeActuator places configured order
-10 (no) Reject No order; log skip in user_trading_state.context
near 0 Ambiguous Optional follow-up question (same pattern as weather close)

8.3 Guardrails (always before Alpaca POST)

  • require_question_before_order and unanswered question must be resolved.
  • max_orders_per_day (calendar-day) and max_notional_usd_per_4h (rolling 4-hour window) from config. The 4-hour notional window bounds runaway-rule blast radius while still allowing intraday "double-down" sizing as a thesis strengthens.
  • mode == paper unless ALPACA_ALLOW_LIVE=true.
  • Symbol in user watchlist and not in symbols_blocklist.
  • Idempotent client_order_id = uuid or {uid}-{rule_id}-{question_id}.

9. Alpaca data gathering (implementation notes)

For each symbol in data_inputs[].symbols:

Metric Alpaca endpoint (v2)
last_trade GET /v2/stocks/{symbol}/trades/latest
daily_bar GET /v2/stocks/bars?symbols=...&timeframe=1Day&limit=1
prev_close Derived from previous daily bar or snapshot

Batch symbols where possible (bars multi-symbol) to minimize calls.

9.2 Phase 2 — WebSocket ingest service

  • Connect to wss://stream.data.alpaca.markets/v2/iex.
  • Subscribe to union of all enabled users symbols (cap 30 on Basic — enforce in config validator).
  • On trade/quote events → upsert market_data_snapshots with as_of = now().
  • Worker reads DB only; WS process runs independently inside server isolate or secondary entrypoint.

9.3 Normalization

All ingested values stored as NUMERIC in USD where applicable; raw JSONB for debugging. Rule engine never parses Alpaca JSON directly — only snapshot rows.


10. Alpaca trade actuation (implementation notes)

10.1 Order types (initial)

Config Alpaca order
notional_usd POST /v2/orders with notional, side, type: market, time_in_force: day
Future: qty qty instead of notional

10.2 Paper vs live

Env var Default
ALPACA_TRADING_BASE_URL https://paper-api.alpaca.markets
ALPACA_ALLOW_LIVE false

Server refuses live URL unless ALPACA_ALLOW_LIVE=true and TRADING_MODE=live in user config.

10.3 Post-submit

  • Poll order status or use Alpaca trade updates (Phase 3).
  • Update trade_orders.status and notify user via optional future SignalR event ReceiveTradeUpdate (not required for MVP).

11. Environment variables

Add to server/.env.example (document only in plan until implemented):

# Trading feature gate
TRADING_ENABLED=false
TRADING_WORKER_INGEST_ENABLED=true
TRADING_WORKER_EVAL_ENABLED=true

# Alpaca (server only — never in Flutter)
ALPACA_API_KEY_ID=
ALPACA_API_SECRET_KEY=
ALPACA_TRADING_BASE_URL=https://paper-api.alpaca.markets
ALPACA_DATA_BASE_URL=https://data.alpaca.markets
ALPACA_DATA_FEED=iex
ALPACA_ALLOW_LIVE=false

Existing worker flags remain:

  • QUESTION_WORKER_ENABLED — master switch for interval timer.
  • QUESTION_WORKER_INTERVAL_SECONDS — drives ingest due-times and evaluation frequency.
  • QUESTION_PIPELINE_TEST_MODE — when true, skip real Alpaca calls; use fixture snapshots and fake order ids.

12. Flutter app responsibilities

Responsibility Owner
Display questions, collect swipe/numeric answer Flutter (SwipeQuestionTile)
Submit answer POST /v1/me/questions/{id}/answer Flutter
Real-time question delivery SignalR ReceiveQuestion
Alpaca credentials Never on device
Market data storage Postgres via server
Trade execution Server only

Optional later: read-only GET /v1/me/trading/status for positions and recent trade_orders (no secrets).


13. HTTP API additions (Phase 2+)

Method Path Purpose
GET /v1/me/trading/config Return merged effective config (sanitized).
PUT /v1/me/trading/config Update user JSONB override.
GET /v1/me/trading/orders List trade_orders for user.
GET /v1/me/trading/snapshots?symbol=SPY Debug/latest metrics (auth required).

Phase 1 can omit public endpoints and use SQL seeds for test users.


14. Implementation phases

Phase 1 — Foundation (MVP)

  • Migration 004_trading.sql
  • AlpacaMarketDataClient + AlpacaTradingClient (paper, REST only)
  • MarketDataIngest + market_data_snapshots writes
  • TradingConfig resolver + seed template default_paper_watchlist
  • RuleEngine with one rule type: price_below_pct_of_ref
  • TradingPipeline.evaluate → create question via QuestionService
  • Extend QuestionPipeline.onAnswerSubmitted for PipelineKeys.trading
  • TradeActuator on confirm → trade_orders + Alpaca POST
  • Wire into QuestionBackgroundWorker tick behind TRADING_ENABLED
  • QUESTION_PIPELINE_TEST_MODE fixtures for CI/local without Alpaca keys

Phase 2 — Configuration API & hardening

  • PUT/GET /v1/me/trading/config
  • Config validation (symbol cap 30, required fields)
  • market_data_latest view or cached latest query
  • Daily guardrail counters in user_trading_state.context
  • Order status polling + failed order user question

Phase 3 — Streaming & observability

  • Alpaca IEX WebSocket ingest sidecar
  • ReceiveTradeUpdate SignalR event (optional)
  • Metrics logging (ingest lag, rule fires, order latency)

15. Testing strategy

Layer Approach
Unit RuleEngine with fixture snapshot rows; no network.
Integration Alpaca paper account; verify order appears in Alpaca dashboard.
Worker Run server with QUESTION_PIPELINE_TEST_MODE=true and TRADING_ENABLED=true using seeded snapshots.
Flutter Existing question flow; manual swipe on trading copy.

Safety checklist before live:

  • ALPACA_ALLOW_LIVE=false in production until explicit review
  • Guardrail notional limits enforced in code, not only config
  • Every order traceable to question_id + rule_id

16. Risks & mitigations

Risk Mitigation
Stale DB snapshots Store as_of; rules ignore data older than max_staleness_seconds in config.
Alpaca rate limits Batch REST; cap symbols; WS in Phase 3.
Duplicate orders client_order_id UNIQUE; check trade_orders before POST.
User confusion on auto-trades Clear question_text; default require_question_before_order: true.
Regulatory / suitability Paper only initially; disclaimers in app; no investment advice in question copy.

17. Example end-to-end scenario

  1. Seed user_trading_config for user U: enabled, SPY in watchlist, dip_confirm rule at -1.5% vs prev_close.
  2. Worker tick (T0): ingest writes prev_close=500, last_trade=492 → rule fires.
  3. Worker: queue question: “SPY is down 1.6% … +10 approve buy $10, -10 skip” with pipeline_key=trading, pipeline_step=dip_confirm:await_confirm.
  4. Flutter: user swipes +10 → POST .../answer.
  5. Pipeline: onAnswerSubmittedTradingPipeline sees match → TradeActuator POST paper market order $10 SPY → row in trade_orders.
  6. Next tick: rule not re-fired (cooldown in user_trading_state.context).

18. References


Document version: 1.0 — Alpaca unified data + execution, Postgres-backed configurable inputs, interval worker integration.