20 KiB
Trading Development Plan — Alpaca Data & Execution
This document defines how Cyber Hybrid Hub will add Alpaca market data gathering, Alpaca trade actuation, and a dynamically configurable data-input layer that feeds the existing interval worker (QuestionBackgroundWorker), Postgres persistence, user questions (SignalR), and automated trade decisions.
It extends the current architecture documented in server/README.md and implemented in server/lib/pipeline/question_pipeline.dart — not replace it.
1. Goals & constraints
| Goal | Detail |
|---|---|
| Data | Ingest US equity (and optional crypto) prices from Alpaca Market Data; store normalized snapshots in Postgres for the worker to read. |
| Questions | Worker evaluates stored data + pipeline config → enqueues swipe/numeric questions via QuestionService → Flutter receives ReceiveQuestion over SignalR. |
| Trades | After rules + optional user confirmation via answers, place orders through Alpaca Trading API (paper first). |
| Configuration | Watchlists, thresholds, and pipeline steps are DB-driven (and env defaults), not hard-coded in Dart. |
| Security | Alpaca secret keys live only on the Dart API server; Flutter never holds trading credentials. |
| Cost | Target Alpaca Basic market data (IEX, ~30 WS symbols) + paper trading until strategies are validated. |
Non-goals (initial phases): sub-second HFT, full SIP market data, client-side WebSockets to Alpaca, or multi-broker routing.
2. Fit with the current system
flowchart TB
subgraph flutter [Flutter app]
UI[HomeScreen / SwipeQuestionTile]
Hub[QuestionsHubService SignalR]
end
subgraph server [Dart API server]
API[Shelf handlers]
Worker[QuestionBackgroundWorker interval]
MDP[MarketDataPipeline NEW]
TP[TradingPipeline NEW]
QP[QuestionPipeline existing]
QS[QuestionService]
AlpacaMD[Alpaca Market Data client]
AlpacaTR[Alpaca Trading client]
end
subgraph pg [Postgres]
MD[market_data_snapshots]
CFG[trading_config / user_trading_state]
Q[questions]
UPS[user_pipeline_state]
ORD[trade_orders]
end
AlpacaMD -->|REST poll or WS| MDP
MDP --> MD
Worker --> MDP
Worker --> TP
Worker --> QP
TP --> MD
TP --> CFG
TP --> ORD
TP --> AlpacaTR
QP --> QS
QS --> Q
QS --> Hub
API --> QS
Hub --> UI
API --> UI
Existing touchpoints to extend:
| Component | Path | Role |
|---|---|---|
| Interval worker | server/lib/workers/question_background_worker.dart |
Orchestrates tick; will call market-data ingest + trading evaluation before/alongside QuestionPipeline.runMaintenanceCycle. |
| Question pipeline | server/lib/pipeline/question_pipeline.dart |
Add PipelineKeys.trading branch; reuse BranchDecision for threshold-style answers. |
| External fetcher pattern | server/lib/pipeline/external_data_fetcher.dart |
Precedent for HTTP clients; Alpaca clients live in server/lib/alpaca/. |
| Env loading | server/lib/env.dart |
Add Alpaca + trading feature flags. |
| Flutter | lib/services/questions_hub_service.dart, lib/widgets/swipe_question_tile.dart |
Unchanged contract: questions in, numeric answers out. |
3. Alpaca accounts & APIs
Use one Alpaca account with two API surfaces:
| Surface | Base URL (paper) | Purpose |
|---|---|---|
| Market Data | https://data.alpaca.markets |
Snapshots, bars, trades, quotes; WS wss://stream.data.alpaca.markets/v2/iex (Basic). |
| Trading | https://paper-api.alpaca.markets |
Orders, positions, account; switch to live URL only after explicit env flip. |
Authentication: Headers APCA-API-KEY-ID and APCA-API-SECRET-KEY on every request.
Free-tier limits to design around:
- Market Data Basic: IEX real-time, ~30 WebSocket subscriptions, historical REST capped (use DB as source of truth for worker ticks).
- Trading: paper account; rate limits documented by Alpaca; idempotent
client_order_idrequired.
4. Dynamically configurable data input design
Configuration is split into global templates (reusable) and per-user overrides (Firebase UID). The worker always resolves effective config before reading market data or emitting questions.
4.1 Configuration layers
trading_config_templates (optional defaults by name)
↓ merge
user_trading_config (per user: watchlist, rules, enabled flag)
↓ resolved at tick
EffectiveTradingConfig (in-memory for one worker cycle)
4.2 Config document shape (JSONB)
Store in Postgres as JSONB; validate on write via API or seed migrations.
{
"version": 1,
"enabled": true,
"mode": "paper",
"data_inputs": [
{
"id": "primary_watchlist",
"source": "alpaca",
"asset_class": "us_equity",
"symbols": ["AAPL", "MSFT", "SPY"],
"feed": "iex",
"poll_interval_seconds": 60,
"metrics": ["last_trade", "daily_bar", "prev_close"]
}
],
"rules": [
{
"id": "dip_confirm",
"type": "price_below_pct_of_ref",
"symbol": "SPY",
"ref_metric": "prev_close",
"threshold_pct": -1.5,
"question_template": "SPY is down {{pct}}% vs yesterday. Swipe +10 to approve a small buy, -10 to skip.",
"on_answer_match": { "action": "propose_order", "side": "buy", "notional_usd": 10 }
}
],
"guardrails": {
"max_orders_per_day": 3,
"max_notional_usd_per_4h": 100,
"require_question_before_order": true,
"symbols_blocklist": []
}
}
Design principles:
data_inputs[]— what to fetch and how often; worker matchespoll_interval_secondsagainstQUESTION_WORKER_INTERVAL_SECONDSor stores last fetch time per input id inuser_trading_state.context.rules[]— declarative triggers evaluated against Postgres snapshots, not live Alpaca on every rule check (decouples question logic from API rate limits).question_template— supports{{symbol}},{{price}},{{pct}}substitution when creatingquestion_text.- User answers map to actions via existing
pipeline_key/pipeline_step/source_tagconventions (e.g.trading:dip_confirm:await_answer).
4.3 Config administration
| Method | Phase |
|---|---|
| SQL seed / migration | Phase 1 — dev templates |
PUT /v1/me/trading/config |
Phase 2 — authenticated user override |
| Admin script | Optional bulk updates |
Flutter does not edit raw JSON initially; a settings screen can come later. Phase 1 uses server-side defaults keyed by firebase_uid.
5. Postgres schema (new migration 004_trading.sql)
5.1 market_data_snapshots
Append-only (or upsert latest) normalized facts per symbol.
CREATE TABLE market_data_snapshots (
id BIGSERIAL PRIMARY KEY,
symbol TEXT NOT NULL,
asset_class TEXT NOT NULL DEFAULT 'us_equity',
feed TEXT NOT NULL DEFAULT 'iex',
metric TEXT NOT NULL, -- last_trade | daily_bar | prev_close
price NUMERIC,
volume NUMERIC,
as_of TIMESTAMPTZ NOT NULL,
raw JSONB, -- optional Alpaca payload fragment
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX market_data_snapshots_symbol_as_of_idx
ON market_data_snapshots (symbol, as_of DESC);
Worker queries: “latest row per (symbol, metric)” via DISTINCT ON or a materialized view market_data_latest (Phase 2 optimization).
5.2 trading_config_templates
CREATE TABLE trading_config_templates (
name TEXT PRIMARY KEY,
config JSONB NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
5.3 user_trading_config
CREATE TABLE user_trading_config (
firebase_uid TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE,
template_name TEXT REFERENCES trading_config_templates (name),
config JSONB NOT NULL DEFAULT '{}',
enabled BOOLEAN NOT NULL DEFAULT false,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
5.4 user_trading_state
Extends the pipeline state pattern in user_pipeline_state for trading-specific cursor.
CREATE TABLE user_trading_state (
firebase_uid TEXT PRIMARY KEY REFERENCES users (firebase_uid) ON DELETE CASCADE,
last_ingest_at TIMESTAMPTZ,
last_eval_at TIMESTAMPTZ,
context JSONB NOT NULL DEFAULT '{}',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
context holds: last fired rule ids, pending order proposal ids, daily order counts, last fetch times per data_inputs[].id.
5.5 trade_orders
Audit trail and idempotency.
CREATE TABLE trade_orders (
id UUID PRIMARY KEY,
firebase_uid TEXT NOT NULL REFERENCES users (firebase_uid) ON DELETE CASCADE,
client_order_id TEXT NOT NULL UNIQUE,
alpaca_order_id TEXT,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
order_type TEXT NOT NULL,
notional_usd NUMERIC,
qty NUMERIC,
status TEXT NOT NULL,
question_id UUID REFERENCES questions (id),
rule_id TEXT,
submitted_at TIMESTAMPTZ,
filled_at TIMESTAMPTZ,
raw JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
5.6 Questions table linkage
Reuse existing columns:
source_tag— e.g.trading:rule:dip_confirmpipeline_key—tradingpipeline_step— rule id + phase (propose,confirm,idle)
Optional Phase 2: questions.metadata JSONB for symbol/price at question time.
6. Server modules (proposed layout)
server/lib/
alpaca/
alpaca_env.dart # keys, paper vs live, data host
alpaca_market_data_client.dart
alpaca_trading_client.dart
alpaca_models.dart
trading/
trading_config.dart # parse/merge JSONB → EffectiveTradingConfig
trading_config_db.dart
market_data_ingest.dart # data_inputs → snapshots
market_data_db.dart
rule_engine.dart # rules × snapshots → RuleEvaluation
trading_pipeline.dart # questions + order proposals
trade_orders_db.dart
pipeline/
question_pipeline.dart # wire trading branch in onAnswerSubmitted
workers/
question_background_worker.dart # invoke trading tick
7. Worker tick sequence (single interval)
On each QuestionBackgroundWorker._tick() when TRADING_ENABLED=true:
1. For each user with user_trading_config.enabled:
a. Resolve EffectiveTradingConfig (template + user JSON merge).
b. MarketDataIngest.runIfDue(user, config.data_inputs)
→ Alpaca REST (or WS service) → INSERT market_data_snapshots
c. TradingPipeline.evaluate(user, config, snapshots from DB)
→ if rule fires and guardrails OK:
- maybe create question (respect maxQueuedQuestions)
- or stage pending order in user_trading_state.context
d. QuestionPipeline.runMaintenanceCycle() for same user (existing geography/weather/trading branches)
2. For users with pending order + confirmed answer (see §8):
TradeActuator.submitOrder() → Alpaca Trading → UPDATE trade_orders
Concurrency: keep _running guard; ingest and evaluate per user sequentially first; optimize later.
Alignment with QUESTION_WORKER_INTERVAL_SECONDS: default 60s matches Alpaca Basic REST polling for watchlists ≤30 symbols. A dedicated long-lived WS ingest task (optional Phase 3) writes snapshots on every tick and reduces REST calls.
8. Question ↔ trade decision flow
8.1 Phases per rule
| Phase | pipeline_step |
Worker behavior |
|---|---|---|
idle |
— | Evaluate rules against DB snapshots only. |
await_confirm |
rule id | Question delivered; correct_answer encodes expected confirmation (+10 / -10). |
submit_order |
rule id | User answered; if match and guardrails pass → POST Alpaca order. |
done |
rule id | Log outcome; return to idle. |
8.2 Branching (reuse BranchDecision)
| User swipe | Meaning | Trade action |
|---|---|---|
+10 (yes) |
Approve proposed action | TradeActuator places configured order |
-10 (no) |
Reject | No order; log skip in user_trading_state.context |
| near 0 | Ambiguous | Optional follow-up question (same pattern as weather close) |
8.3 Guardrails (always before Alpaca POST)
require_question_before_orderand unanswered question must be resolved.max_orders_per_day(calendar-day) andmax_notional_usd_per_4h(rolling 4-hour window) from config. The 4-hour notional window bounds runaway-rule blast radius while still allowing intraday "double-down" sizing as a thesis strengthens.mode == paperunlessALPACA_ALLOW_LIVE=true.- Symbol in user watchlist and not in
symbols_blocklist. - Idempotent
client_order_id=uuidor{uid}-{rule_id}-{question_id}.
9. Alpaca data gathering (implementation notes)
9.1 Phase 1 — REST polling (recommended start)
For each symbol in data_inputs[].symbols:
| Metric | Alpaca endpoint (v2) |
|---|---|
last_trade |
GET /v2/stocks/{symbol}/trades/latest |
daily_bar |
GET /v2/stocks/bars?symbols=...&timeframe=1Day&limit=1 |
prev_close |
Derived from previous daily bar or snapshot |
Batch symbols where possible (bars multi-symbol) to minimize calls.
9.2 Phase 2 — WebSocket ingest service
- Connect to
wss://stream.data.alpaca.markets/v2/iex. - Subscribe to union of all enabled users’ symbols (cap 30 on Basic — enforce in config validator).
- On trade/quote events → upsert
market_data_snapshotswithas_of = now(). - Worker reads DB only; WS process runs independently inside server isolate or secondary entrypoint.
9.3 Normalization
All ingested values stored as NUMERIC in USD where applicable; raw JSONB for debugging. Rule engine never parses Alpaca JSON directly — only snapshot rows.
10. Alpaca trade actuation (implementation notes)
10.1 Order types (initial)
| Config | Alpaca order |
|---|---|
notional_usd |
POST /v2/orders with notional, side, type: market, time_in_force: day |
| Future: qty | qty instead of notional |
10.2 Paper vs live
| Env var | Default |
|---|---|
ALPACA_TRADING_BASE_URL |
https://paper-api.alpaca.markets |
ALPACA_ALLOW_LIVE |
false |
Server refuses live URL unless ALPACA_ALLOW_LIVE=true and TRADING_MODE=live in user config.
10.3 Post-submit
- Poll order status or use Alpaca trade updates (Phase 3).
- Update
trade_orders.statusand notify user via optional future SignalR eventReceiveTradeUpdate(not required for MVP).
11. Environment variables
Add to server/.env.example (document only in plan until implemented):
# Trading feature gate
TRADING_ENABLED=false
TRADING_WORKER_INGEST_ENABLED=true
TRADING_WORKER_EVAL_ENABLED=true
# Alpaca (server only — never in Flutter)
ALPACA_API_KEY_ID=
ALPACA_API_SECRET_KEY=
ALPACA_TRADING_BASE_URL=https://paper-api.alpaca.markets
ALPACA_DATA_BASE_URL=https://data.alpaca.markets
ALPACA_DATA_FEED=iex
ALPACA_ALLOW_LIVE=false
Existing worker flags remain:
QUESTION_WORKER_ENABLED— master switch for interval timer.QUESTION_WORKER_INTERVAL_SECONDS— drives ingest due-times and evaluation frequency.QUESTION_PIPELINE_TEST_MODE— when true, skip real Alpaca calls; use fixture snapshots and fake order ids.
12. Flutter app responsibilities
| Responsibility | Owner |
|---|---|
| Display questions, collect swipe/numeric answer | Flutter (SwipeQuestionTile) |
Submit answer POST /v1/me/questions/{id}/answer |
Flutter |
| Real-time question delivery | SignalR ReceiveQuestion |
| Alpaca credentials | Never on device |
| Market data storage | Postgres via server |
| Trade execution | Server only |
Optional later: read-only GET /v1/me/trading/status for positions and recent trade_orders (no secrets).
13. HTTP API additions (Phase 2+)
| Method | Path | Purpose |
|---|---|---|
GET |
/v1/me/trading/config |
Return merged effective config (sanitized). |
PUT |
/v1/me/trading/config |
Update user JSONB override. |
GET |
/v1/me/trading/orders |
List trade_orders for user. |
GET |
/v1/me/trading/snapshots?symbol=SPY |
Debug/latest metrics (auth required). |
Phase 1 can omit public endpoints and use SQL seeds for test users.
14. Implementation phases
Phase 1 — Foundation (MVP)
- Migration
004_trading.sql AlpacaMarketDataClient+AlpacaTradingClient(paper, REST only)MarketDataIngest+market_data_snapshotswritesTradingConfigresolver + seed templatedefault_paper_watchlistRuleEnginewith one rule type:price_below_pct_of_refTradingPipeline.evaluate→ create question viaQuestionService- Extend
QuestionPipeline.onAnswerSubmittedforPipelineKeys.trading TradeActuatoron confirm →trade_orders+ Alpaca POST- Wire into
QuestionBackgroundWorkertick behindTRADING_ENABLED QUESTION_PIPELINE_TEST_MODEfixtures for CI/local without Alpaca keys
Phase 2 — Configuration API & hardening
PUT/GET /v1/me/trading/config- Config validation (symbol cap 30, required fields)
market_data_latestview or cached latest query- Daily guardrail counters in
user_trading_state.context - Order status polling + failed order user question
Phase 3 — Streaming & observability
- Alpaca IEX WebSocket ingest sidecar
ReceiveTradeUpdateSignalR event (optional)- Metrics logging (ingest lag, rule fires, order latency)
15. Testing strategy
| Layer | Approach |
|---|---|
| Unit | RuleEngine with fixture snapshot rows; no network. |
| Integration | Alpaca paper account; verify order appears in Alpaca dashboard. |
| Worker | Run server with QUESTION_PIPELINE_TEST_MODE=true and TRADING_ENABLED=true using seeded snapshots. |
| Flutter | Existing question flow; manual swipe on trading copy. |
Safety checklist before live:
ALPACA_ALLOW_LIVE=falsein production until explicit review- Guardrail notional limits enforced in code, not only config
- Every order traceable to
question_id+rule_id
16. Risks & mitigations
| Risk | Mitigation |
|---|---|
| Stale DB snapshots | Store as_of; rules ignore data older than max_staleness_seconds in config. |
| Alpaca rate limits | Batch REST; cap symbols; WS in Phase 3. |
| Duplicate orders | client_order_id UNIQUE; check trade_orders before POST. |
| User confusion on auto-trades | Clear question_text; default require_question_before_order: true. |
| Regulatory / suitability | Paper only initially; disclaimers in app; no investment advice in question copy. |
17. Example end-to-end scenario
- Seed
user_trading_configfor user U: enabled,SPYin watchlist,dip_confirmrule at -1.5% vsprev_close. - Worker tick (T0): ingest writes
prev_close=500,last_trade=492→ rule fires. - Worker: queue question: “SPY is down 1.6% … +10 approve buy $10, -10 skip” with
pipeline_key=trading,pipeline_step=dip_confirm:await_confirm. - Flutter: user swipes +10 →
POST .../answer. - Pipeline:
onAnswerSubmitted→TradingPipelinesees match →TradeActuatorPOST paper market order $10 SPY → row intrade_orders. - Next tick: rule not re-fired (cooldown in
user_trading_state.context).
18. References
- Alpaca Market Data API
- Alpaca Trading API
- Alpaca WebSocket streaming
- Existing pipeline:
server/lib/pipeline/question_pipeline.dart - Existing worker:
server/lib/workers/question_background_worker.dart
Document version: 1.0 — Alpaca unified data + execution, Postgres-backed configurable inputs, interval worker integration.