PQAP Strategy Development Guide

Table of Contents

  1. Strategy Architecture
  2. Creating Your First Strategy
  3. Strategy Interface
  4. Signal Generation
  5. Entry and Exit Logic
  6. Configuration in YAML
  7. Testing Strategies
  8. Best Practices
  9. Example: Creating a Volume Spike Strategy

Strategy Architecture

PQAP uses a plug-and-play strategy system where strategies are independent modules that: 1. Subscribe to market data updates 2. Generate trading signals based on their logic 3. Respond to trade execution feedback 4. Manage internal state (positions, entry prices, etc.)

BaseStrategy Class

All strategies inherit from BaseStrategy in src/strategies/base.py.

Key Components:

from src.strategies.base import BaseStrategy, Signal, SignalType
from decimal import Decimal

class MyStrategy(BaseStrategy):
    @property
    def strategy_id(self) -> str:
        return "my_strategy_v1"  # Unique identifier

    @property
    def name(self) -> str:
        return "My Custom Strategy"  # Human-readable name

    def on_orderbook_update(self, market: Market) -> Signal | None:
        """Called when market data updates - generate signals here"""
        pass

    def on_trade_executed(self, trade: Trade) -> None:
        """Called when your trade executes - update state here"""
        pass

Signal Dataclass

Signals are the output of strategies - they tell the execution engine what to trade.

@dataclass
class Signal:
    strategy_id: str           # Which strategy generated this
    market_id: str             # Which market to trade
    outcome_id: str            # Which outcome (YES/NO token ID)

    signal_type: SignalType    # BUY, SELL, HOLD, CLOSE
    confidence: float          # 0.0 to 1.0 (how confident you are)

    size_suggestion: Decimal   # Fraction of capital (0.0 to 1.0)

    max_price: Decimal | None  # Maximum price willing to pay (BUY orders)
    min_price: Decimal | None  # Minimum price to accept (SELL orders)

    stop_loss: Decimal | None  # Optional stop-loss price
    take_profit: Decimal | None # Optional take-profit price

    reason: str                # Human-readable explanation
    metadata: dict             # Any custom data for tracking

Creating Your First Strategy

Step 1: Create Strategy Directory

mkdir -p src/strategies/my_strategy
touch src/strategies/my_strategy/__init__.py
touch src/strategies/my_strategy/strategy.py

Step 2: Implement Strategy Class

Edit src/strategies/my_strategy/strategy.py:

"""
My Custom Strategy

Brief description of what this strategy does and why.
"""
import logging
from decimal import Decimal
from datetime import datetime, timedelta

from ..base import BaseStrategy, StrategyConfig, Signal, SignalType
from ...core.models import Market, Trade

logger = logging.getLogger(__name__)


class MyCustomStrategy(BaseStrategy):
    """
    One-line description of strategy.

    Detailed explanation:
    - What edge this exploits
    - How entry signals work
    - How exit signals work
    - Expected win rate / return
    """

    @property
    def strategy_id(self) -> str:
        return "my_strategy_v1"

    @property
    def name(self) -> str:
        return "My Custom Strategy"

    @property
    def description(self) -> str:
        return "Exploits [specific market inefficiency]"

    @property
    def version(self) -> str:
        return "1.0.0"

    def __init__(self):
        super().__init__()
        # Initialize strategy-specific state
        self._entry_threshold = Decimal("0.05")
        self._positions = {}  # Track open positions
        self._signal_count = 0

    def configure(self, config: StrategyConfig) -> None:
        """Load configuration parameters."""
        super().configure(config)

        # Extract strategy-specific parameters from config
        params = config.parameters if config else {}
        self._entry_threshold = Decimal(str(params.get("entry_threshold", "0.05")))
        self._order_size_pct = Decimal(str(params.get("order_size_pct", "0.02")))

        logger.info(f"Configured {self.strategy_id}: threshold={self._entry_threshold}")

    def validate(self) -> list[str]:
        """Validate configuration - return list of errors."""
        errors = super().validate()

        if self._entry_threshold <= 0:
            errors.append("entry_threshold must be positive")

        return errors

    def on_orderbook_update(self, market: Market) -> Signal | None:
        """
        Main strategy logic - analyze market and generate signal.

        This is called whenever market data updates.
        Return a Signal to trade, or None to do nothing.
        """
        if not self._is_active:
            return None

        # Example: Only trade binary markets
        yes_outcome = market.get_outcome("YES")
        if not yes_outcome or yes_outcome.price is None:
            return None

        # Your strategy logic here
        # Example: Buy if price drops below threshold
        if yes_outcome.price < self._entry_threshold:
            self._signal_count += 1

            return Signal(
                strategy_id=self.strategy_id,
                market_id=market.market_id,
                outcome_id=yes_outcome.outcome_id,
                signal_type=SignalType.BUY,
                confidence=0.7,  # 70% confident
                size_suggestion=self._order_size_pct,
                max_price=self._entry_threshold,
                reason=f"Price {yes_outcome.price} below threshold {self._entry_threshold}",
                metadata={
                    "signal_number": self._signal_count,
                    "market_volume": float(market.volume_24h or 0)
                }
            )

        return None

    def on_trade_executed(self, trade: Trade) -> None:
        """
        Handle trade execution notification.

        Update internal state, position tracking, etc.
        """
        logger.info(f"Trade executed: {trade.trade_id}, P&L: {trade.pnl}")

        # Track position
        if trade.market_id not in self._positions:
            self._positions[trade.market_id] = {
                "shares": Decimal("0"),
                "entry_price": Decimal("0")
            }

        position = self._positions[trade.market_id]

        # Update position based on trade side
        if str(trade.side).upper() == "BUY":
            position["shares"] += trade.size
            # Update weighted average entry price
            if position["shares"] > 0:
                position["entry_price"] = (
                    (position["entry_price"] * (position["shares"] - trade.size) +
                     trade.price * trade.size) / position["shares"]
                )
        else:  # SELL
            position["shares"] -= trade.size

    def on_order_failed(self, order, reason: str) -> None:
        """Handle order failure - optional to override."""
        logger.warning(f"Order failed: {order.order_id}, reason: {reason}")

    def get_stats(self) -> dict:
        """Return strategy statistics for monitoring."""
        return {
            "strategy_id": self.strategy_id,
            "signals_generated": self._signal_count,
            "positions_open": len([p for p in self._positions.values() if p["shares"] > 0])
        }

Step 3: Register Strategy

Edit src/strategies/my_strategy/__init__.py:

"""My Custom Strategy Package"""
from .strategy import MyCustomStrategy

__all__ = ["MyCustomStrategy"]

Step 4: Enable in Configuration

Edit configs/dev.yaml:

enabled_strategies:
  - dual_arb_v1
  - my_strategy_v1  # Add your strategy

strategies:
  my_strategy_v1:
    enabled: true
    allocated_capital: 50
    parameters:
      entry_threshold: "0.05"
      order_size_pct: "0.02"

Step 5: Test

python -m src.main configs/dev.yaml

Watch logs for signals from your strategy.

Strategy Interface

Required Methods

1. strategy_id (property)

@property
def strategy_id(self) -> str:
    return "my_strategy_v1"

Purpose: Unique identifier for this strategy Convention: lowercase_with_underscores_v{version} Used for: Configuration lookup, P&L attribution, logging

2. name (property)

@property
def name(self) -> str:
    return "My Readable Strategy Name"

Purpose: Human-readable name for display in UI Used for: Dashboard, reports, alerts

3. on_orderbook_update(market: Market) -> Signal | None

def on_orderbook_update(self, market: Market) -> Signal | None:
    # Analyze market data
    # Return Signal if you want to trade
    # Return None if no action
    pass

Called when: Market orderbook updates (WebSocket) OR during polling loop Frequency: Can be very frequent (multiple times per second) Return: Signal object to trade, or None to do nothing

Available Market Data: - market.market_id - Unique market identifier - market.question - Market question text - market.outcomes - List of outcomes (YES/NO) - market.volume_24h - 24-hour trading volume - market.get_outcome("YES") - Get YES outcome

Available Outcome Data: - outcome.price - Current market price - outcome.best_bid - Best bid price (orderbook) - outcome.best_ask - Best ask price (orderbook) - outcome.bid_depth - Total size at best bid - outcome.ask_depth - Total size at best ask - outcome.imbalance - Bid/ask imbalance ratio

4. on_trade_executed(trade: Trade) -> None

def on_trade_executed(self, trade: Trade) -> None:
    # Update internal state
    # Track positions
    # Calculate P&L
    pass

Called when: Your trade completes Purpose: Update strategy state, position tracking Available Trade Data: - trade.trade_id - Unique trade identifier - trade.market_id - Which market - trade.outcome_id - Which outcome - trade.side - BUY or SELL - trade.price - Execution price - trade.size - Number of shares - trade.pnl - Profit/loss on this trade

Optional Methods

configure(config: StrategyConfig) -> None

def configure(self, config: StrategyConfig) -> None:
    super().configure(config)

    params = config.parameters
    self._my_param = Decimal(str(params.get("my_param", "0.5")))

Purpose: Load strategy-specific parameters from YAML config When called: Once during strategy initialization Best practice: Extract parameters, set defaults, validate ranges

validate() -> list[str]

def validate(self) -> list[str]:
    errors = super().validate()

    if self._my_param < 0:
        errors.append("my_param must be positive")

    return errors

Purpose: Validate configuration before trading Return: List of error messages (empty = valid) When called: After configuration, before strategy starts

on_order_failed(order: Order, reason: str) -> None

def on_order_failed(self, order: Order, reason: str) -> None:
    logger.warning(f"Order failed: {reason}")
    # Optionally retry or cleanup

Purpose: Handle order failures Use cases: Implement retry logic, cleanup state, alert

on_market_resolved(market: Market, winning_outcome: str) -> None

def on_market_resolved(self, market: Market, winning_outcome: str) -> None:
    # Calculate final P&L
    # Update win rate statistics
    pass

Purpose: Handle market resolution Use cases: Calculate final P&L, update calibration stats

Signal Generation

Signal Types

class SignalType(Enum):
    BUY = "buy"      # Open long position or increase position
    SELL = "sell"    # Open short position or decrease position
    HOLD = "hold"    # No action
    CLOSE = "close"  # Close existing position

Confidence Levels

Confidence (0.0 to 1.0) indicates how certain you are:

  • 0.0 - 0.3: Low confidence (risky trades)
  • 0.3 - 0.7: Medium confidence (typical trades)
  • 0.7 - 1.0: High confidence (strong signals)

Use confidence for: - Position sizing (higher confidence = larger position) - Signal filtering (reject low confidence signals) - Performance tracking (calibration analysis)

Example:

# Calculate confidence based on signal strength
edge = abs(expected_value - current_price)
confidence = min(0.95, 0.5 + edge * 10)  # Cap at 95%

Position Sizing

size_suggestion is a fraction of allocated capital, not dollar amount.

# Conservative sizing
size_suggestion=Decimal("0.01")  # 1% of allocated capital

# Moderate sizing
size_suggestion=Decimal("0.05")  # 5% of allocated capital

# Aggressive sizing
size_suggestion=Decimal("0.10")  # 10% of allocated capital

Example calculation:

allocated_capital = Decimal("1000")  # From config
size_suggestion = Decimal("0.05")     # 5%
position_size = allocated_capital * size_suggestion  # $50

Price Limits

BUY Orders: Set max_price

Signal(
    signal_type=SignalType.BUY,
    max_price=Decimal("0.55"),  # Won't pay more than $0.55
    ...
)

Use cases: - Prevent slippage on large orders - Ensure minimum expected profit - Arbitrage strategies (exact price required)

SELL Orders: Set min_price

Signal(
    signal_type=SignalType.SELL,
    min_price=Decimal("0.45"),  # Won't sell for less than $0.45
    ...
)

Risk Parameters

Signal(
    ...,
    stop_loss=Decimal("0.40"),      # Exit if price hits $0.40
    take_profit=Decimal("0.60"),    # Exit if price hits $0.60
)

Note: Stop-loss and take-profit are advisory - execution engine may or may not implement them. For guaranteed exit logic, implement in on_orderbook_update().

Signal Metadata

Use metadata dict to store any custom data:

Signal(
    ...,
    reason="Strong momentum detected",
    metadata={
        "signal_number": 42,
        "z_score": -2.5,
        "volume_spike": True,
        "related_markets": ["market_id_1", "market_id_2"]
    }
)

Metadata is saved to database and available in backtest reports.

Entry and Exit Logic

Entry Logic

Entry signals open new positions or increase existing positions.

def on_orderbook_update(self, market: Market) -> Signal | None:
    # Check if we should enter
    if self._should_enter(market):
        return Signal(
            signal_type=SignalType.BUY,
            # ... other fields
        )

    return None

def _should_enter(self, market: Market) -> bool:
    """Entry criteria - customize for your strategy."""
    yes = market.get_outcome("YES")

    # Example criteria:
    # 1. Price in acceptable range
    if yes.price < Decimal("0.05") or yes.price > Decimal("0.95"):
        return False

    # 2. Sufficient volume
    if market.volume_24h < Decimal("1000"):
        return False

    # 3. Tight spread (good liquidity)
    spread = yes.best_ask - yes.best_bid
    if spread > Decimal("0.05"):  # > 5% spread
        return False

    # 4. Your signal condition
    if yes.price < self._entry_threshold:
        return True

    return False

Exit Logic

CRITICAL: Strategies MUST implement exit logic to avoid exhausting capital.

Pattern 1: Exit on Opposite Signal

def on_orderbook_update(self, market: Market) -> Signal | None:
    state = self._get_position_state(market.market_id)

    # If we have a long position and signal flips, exit
    if state.position == 1 and self._should_sell(market):
        return Signal(
            signal_type=SignalType.SELL,
            size_suggestion=Decimal("1.0"),  # Close entire position
            reason="Exit: Signal reversed"
        )

    # If flat and signal is buy, enter
    if state.position == 0 and self._should_buy(market):
        return Signal(signal_type=SignalType.BUY, ...)

    return None

Pattern 2: Exit on Price Target

def on_orderbook_update(self, market: Market) -> Signal | None:
    state = self._positions.get(market.market_id)

    if state and state.position > 0:
        current_price = market.get_outcome("YES").price

        # Take profit
        if current_price >= state.entry_price * Decimal("1.10"):  # 10% profit
            return Signal(
                signal_type=SignalType.SELL,
                size_suggestion=Decimal("1.0"),
                reason=f"Exit: Take profit at {current_price}"
            )

        # Stop loss
        if current_price <= state.entry_price * Decimal("0.90"):  # 10% loss
            return Signal(
                signal_type=SignalType.SELL,
                size_suggestion=Decimal("1.0"),
                reason=f"Exit: Stop loss at {current_price}"
            )

    return None

Pattern 3: Exit on Time

from datetime import datetime, timedelta

def on_orderbook_update(self, market: Market) -> Signal | None:
    state = self._positions.get(market.market_id)

    if state and state.position > 0:
        # Force exit after max hold time
        hold_time = datetime.now(timezone.utc) - state.entry_time
        max_hold = timedelta(hours=24)

        if hold_time > max_hold:
            return Signal(
                signal_type=SignalType.SELL,
                size_suggestion=Decimal("1.0"),
                reason=f"Exit: Max hold time ({hold_time}) exceeded"
            )

    return None

Pattern 4: Exit on Mean Reversion

def on_orderbook_update(self, market: Market) -> Signal | None:
    state = self._market_states.get(market.market_id)

    if state and state.position != 0:
        current_price = market.get_outcome("YES").price
        zscore = state.zscore(current_price)

        # Exit when price reverts to mean (z-score near 0)
        if abs(zscore) < self._exit_zscore:
            signal_type = SignalType.SELL if state.position > 0 else SignalType.BUY
            return Signal(
                signal_type=signal_type,
                size_suggestion=Decimal("1.0"),
                reason=f"Exit: Mean reversion (z={zscore:.2f})"
            )

    return None

State Management

Track positions to implement exit logic:

from dataclasses import dataclass
from datetime import datetime

@dataclass
class PositionState:
    market_id: str
    position: int           # 1 = long, -1 = short, 0 = flat
    entry_price: Decimal
    entry_time: datetime
    shares: Decimal

class MyStrategy(BaseStrategy):
    def __init__(self):
        super().__init__()
        self._positions: dict[str, PositionState] = {}

    def on_trade_executed(self, trade: Trade) -> None:
        # Update position state
        if trade.market_id not in self._positions:
            self._positions[trade.market_id] = PositionState(
                market_id=trade.market_id,
                position=0,
                entry_price=Decimal("0"),
                entry_time=datetime.now(timezone.utc),
                shares=Decimal("0")
            )

        state = self._positions[trade.market_id]

        if str(trade.side).upper() == "BUY":
            state.position = 1
            state.entry_price = trade.price
            state.entry_time = trade.executed_at
            state.shares += trade.size
        else:  # SELL
            state.shares -= trade.size
            if state.shares <= 0:
                state.position = 0  # Flat

Configuration in YAML

Basic Configuration

Edit configs/dev.yaml or configs/prod.yaml:

# List of enabled strategies
enabled_strategies:
  - my_strategy_v1

# Strategy-specific configuration
strategies:
  my_strategy_v1:
    enabled: true
    allocated_capital: 100  # Dollars allocated to this strategy
    parameters:
      # Your custom parameters
      entry_threshold: "0.05"
      exit_threshold: "0.10"
      order_size_pct: "0.02"
      max_positions: 5

Parameter Types

Decimal Values (Prices, Thresholds)

ALWAYS use strings for Decimal parameters:

parameters:
  entry_threshold: "0.05"      # ✅ Correct
  # entry_threshold: 0.05      # ❌ Wrong - floating point imprecision

In code:

self._threshold = Decimal(str(params.get("entry_threshold", "0.05")))

Integer Values

parameters:
  max_positions: 5
  lookback_periods: 20

In code:

self._max_positions = int(params.get("max_positions", 5))

Boolean Values

parameters:
  use_stop_loss: true
  aggressive_mode: false

In code:

self._use_stop_loss = bool(params.get("use_stop_loss", True))

Example: Mean Reversion Configuration

strategies:
  mean_reversion_v1:
    enabled: true
    allocated_capital: 200
    parameters:
      # Entry/exit thresholds
      entry_zscore: "1.5"        # Enter when |z-score| > 1.5
      exit_zscore: "0.5"         # Exit when |z-score| < 0.5

      # Market selection
      min_price_range: "0.02"    # Min 2% historical volatility
      min_volume_24h: "1000"     # Min $1k daily volume

      # Position sizing
      order_size_pct: "0.03"     # 3% of capital per trade

      # Risk controls
      max_hold_periods: 30       # Force exit after 30 periods
      stop_loss_pct: "0.10"      # 10% stop loss

Testing Strategies

1. Unit Testing

Create tests/test_my_strategy.py:

import pytest
from decimal import Decimal
from src.strategies.my_strategy import MyCustomStrategy
from src.strategies.base import StrategyConfig
from src.core.models import Market, Outcome

def test_strategy_signal_generation():
    strategy = MyCustomStrategy()

    # Configure
    config = StrategyConfig(
        strategy_id="my_strategy_v1",
        allocated_capital=Decimal("1000"),
        parameters={"entry_threshold": "0.05"}
    )
    strategy.configure(config)

    # Create mock market
    market = Market(
        market_id="test_market",
        question="Will it rain tomorrow?",
        outcomes=[
            Outcome(outcome_id="yes_id", name="YES", price=Decimal("0.04")),
            Outcome(outcome_id="no_id", name="NO", price=Decimal("0.96"))
        ]
    )

    # Test signal generation
    signal = strategy.on_orderbook_update(market)

    assert signal is not None
    assert signal.signal_type == SignalType.BUY
    assert signal.confidence > 0

Run tests:

pytest tests/test_my_strategy.py -v

2. Paper Trading

Enable your strategy with small capital in dev config:

enabled_strategies:
  - my_strategy_v1

strategies:
  my_strategy_v1:
    allocated_capital: 10  # Small amount for testing
    parameters:
      entry_threshold: "0.05"

Run PQAP:

python -m src.main configs/dev.yaml

Monitor: - Dashboard for signals generated - Paper trading page for simulated P&L - Logs for strategy behavior

3. Backtesting

Create a backtest script scripts/backtest_my_strategy.py:

"""Backtest my custom strategy."""
from src.backtest.engine import BacktestEngine
from src.strategies.my_strategy import MyCustomStrategy
from datetime import datetime

# Initialize strategy
strategy = MyCustomStrategy()

# Create backtest engine
engine = BacktestEngine("data/market_history.db")

# Run backtest
result = engine.run(
    strategy=strategy,
    start_date=datetime(2024, 1, 1),
    end_date=datetime(2024, 12, 1)
)

# Print results
print(result.summary())
print(f"Total P&L: ${result.total_pnl:.2f}")
print(f"Win Rate: {result.win_rate:.1%}")
print(f"Sharpe Ratio: {result.sharpe_ratio:.2f}")

Run:

python scripts/backtest_my_strategy.py

Best Practices

1. Always Implement Exit Logic

Problem: Strategies that only buy eventually exhaust all capital.

Solution: Implement one or more exit conditions: - Time-based (max hold periods) - Price-based (stop loss / take profit) - Signal-based (opposite signal triggers exit) - Mean reversion (return to average)

2. Validate Configuration

def validate(self) -> list[str]:
    errors = super().validate()

    if self._entry_threshold <= 0:
        errors.append("entry_threshold must be positive")

    if self._entry_threshold >= Decimal("1.0"):
        errors.append("entry_threshold must be less than 1.0")

    if self._order_size_pct > Decimal("0.20"):
        errors.append("order_size_pct too large (max 20%)")

    return errors

3. Use Logging Effectively

import logging
logger = logging.getLogger(__name__)

# Info: Major events
logger.info(f"Strategy initialized: {self.strategy_id}")

# Debug: Detailed logic flow
logger.debug(f"Checking entry criteria: price={price}, threshold={self._threshold}")

# Warning: Unexpected but recoverable
logger.warning(f"No orderbook data for market {market_id}")

# Error: Something went wrong
logger.error(f"Failed to process market: {e}")

4. Handle Missing Data Gracefully

def on_orderbook_update(self, market: Market) -> Signal | None:
    yes = market.get_outcome("YES")

    # Check for None values
    if not yes or yes.price is None:
        return None  # Skip this update

    # Check for invalid values
    if yes.price < 0 or yes.price > 1:
        logger.warning(f"Invalid price: {yes.price}")
        return None

    # Proceed with strategy logic
    ...

5. Use Decimal for Money

Never use float for prices or money:

# ❌ Wrong - floating point errors
threshold = 0.05
price = market.price
if price < threshold:  # Dangerous comparison

# ✅ Correct - exact decimal arithmetic
threshold = Decimal("0.05")
price = Decimal(str(market.price))
if price < threshold:  # Safe comparison

6. Implement Cooldown Periods

Prevent signal spam:

from datetime import datetime, timedelta, timezone

class MyStrategy(BaseStrategy):
    def __init__(self):
        super().__init__()
        self._last_signal_time = {}  # market_id -> datetime

    def on_orderbook_update(self, market: Market) -> Signal | None:
        # Check cooldown
        last_signal = self._last_signal_time.get(market.market_id)
        if last_signal:
            cooldown = timedelta(seconds=60)  # 1 minute cooldown
            if datetime.now(timezone.utc) - last_signal < cooldown:
                return None  # Skip, too soon

        # Generate signal
        signal = ...

        # Update last signal time
        self._last_signal_time[market.market_id] = datetime.now(timezone.utc)

        return signal

7. Document Expected Performance

In strategy docstring:

class MyStrategy(BaseStrategy):
    """
    My Custom Strategy

    Edge: Exploits [specific market inefficiency]

    Entry: [Entry condition]
    Exit: [Exit condition]

    Expected Performance (based on backtest):
    - Win Rate: 55-60%
    - Average P&L: $0.50 per trade
    - Sharpe Ratio: 1.2
    - Max Drawdown: -15%

    Best Markets: High volume (> $10k/day), moderate volatility
    Worst Markets: Low liquidity, extreme prices (< 5% or > 95%)
    """

8. Use Metadata for Debugging

Signal(
    ...,
    metadata={
        "entry_reason": "momentum",
        "z_score": float(zscore),
        "volume_24h": float(market.volume_24h),
        "imbalance": imbalance,
        "similar_markets": similar_market_ids,
        # Include any data useful for analysis
    }
)

Access in backtest reports to understand why signals were generated.

Example: Creating a Volume Spike Strategy

Let's create a complete strategy that trades on volume spikes.

Strategy Logic

Edge: Sudden volume increases often precede price movements Entry: Buy when volume spikes > 2x average Exit: After 1 hour or 5% profit/loss

Implementation

"""
Volume Spike Strategy

Detects abnormal volume increases and enters positions expecting
price movement to follow.
"""
import logging
from collections import deque
from dataclasses import dataclass
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Optional

from ..base import BaseStrategy, StrategyConfig, Signal, SignalType
from ...core.models import Market, Trade

logger = logging.getLogger(__name__)


@dataclass
class VolumeState:
    """Track volume history for a market."""
    market_id: str
    volume_history: deque  # Last N volume observations
    position: int  # 1 = long, -1 = short, 0 = flat
    entry_price: Decimal = Decimal("0")
    entry_time: Optional[datetime] = None
    last_signal_time: Optional[datetime] = None

    @property
    def avg_volume(self) -> Decimal:
        if not self.volume_history:
            return Decimal("0")
        return Decimal(str(sum(self.volume_history) / len(self.volume_history)))


class VolumeSpikeStrategy(BaseStrategy):
    """
    Volume Spike Strategy

    Entry: Volume > 2x average
    Exit: After 1 hour or 5% profit/loss

    Expected Performance:
    - Win Rate: ~52-55%
    - Avg P&L per trade: $0.30
    """

    @property
    def strategy_id(self) -> str:
        return "volume_spike_v1"

    @property
    def name(self) -> str:
        return "Volume Spike"

    @property
    def description(self) -> str:
        return "Trades on abnormal volume increases"

    def __init__(self):
        super().__init__()
        self._states: dict[str, VolumeState] = {}
        self._lookback = 20  # Track last 20 volume observations
        self._spike_threshold = Decimal("2.0")  # 2x average
        self._order_size_pct = Decimal("0.03")
        self._max_hold_hours = 1
        self._stop_loss_pct = Decimal("0.05")
        self._take_profit_pct = Decimal("0.05")

    def configure(self, config: StrategyConfig) -> None:
        super().configure(config)

        params = config.parameters if config else {}
        self._spike_threshold = Decimal(str(params.get("spike_threshold", "2.0")))
        self._order_size_pct = Decimal(str(params.get("order_size_pct", "0.03")))
        self._max_hold_hours = int(params.get("max_hold_hours", 1))

        logger.info(
            f"Volume spike config: threshold={self._spike_threshold}x, "
            f"max_hold={self._max_hold_hours}h"
        )

    def _get_state(self, market_id: str) -> VolumeState:
        if market_id not in self._states:
            self._states[market_id] = VolumeState(
                market_id=market_id,
                volume_history=deque(maxlen=self._lookback),
                position=0
            )
        return self._states[market_id]

    def on_orderbook_update(self, market: Market) -> Signal | None:
        if not self._is_active:
            return None

        yes = market.get_outcome("YES")
        if not yes or yes.price is None:
            return None

        state = self._get_state(market.market_id)
        current_volume = Decimal(str(market.volume_24h or 0))

        # Update volume history
        state.volume_history.append(float(current_volume))

        # Need enough history
        if len(state.volume_history) < self._lookback:
            return None

        # Check exit conditions first
        exit_signal = self._check_exit(market, state)
        if exit_signal:
            return exit_signal

        # Only enter if flat
        if state.position != 0:
            return None

        # Check for volume spike
        avg_volume = state.avg_volume
        if avg_volume == 0:
            return None

        volume_ratio = current_volume / avg_volume

        if volume_ratio >= self._spike_threshold:
            # Volume spike detected - enter position

            # Check cooldown
            if state.last_signal_time:
                cooldown = timedelta(seconds=300)  # 5 min cooldown
                if datetime.now(timezone.utc) - state.last_signal_time < cooldown:
                    return None

            # Enter long position (expecting price to rise)
            state.position = 1
            state.entry_price = yes.price
            state.entry_time = datetime.now(timezone.utc)
            state.last_signal_time = datetime.now(timezone.utc)

            return Signal(
                strategy_id=self.strategy_id,
                market_id=market.market_id,
                outcome_id=yes.outcome_id,
                signal_type=SignalType.BUY,
                confidence=min(0.8, 0.5 + float(volume_ratio - self._spike_threshold) * 0.1),
                size_suggestion=self._order_size_pct,
                max_price=yes.price * Decimal("1.02"),  # 2% slippage tolerance
                reason=(
                    f"Volume spike: {float(volume_ratio):.1f}x avg "
                    f"(${float(current_volume):,.0f} vs ${float(avg_volume):,.0f})"
                ),
                metadata={
                    "volume_ratio": float(volume_ratio),
                    "current_volume": float(current_volume),
                    "avg_volume": float(avg_volume)
                }
            )

        return None

    def _check_exit(self, market: Market, state: VolumeState) -> Optional[Signal]:
        """Check if we should exit position."""
        if state.position == 0:
            return None

        yes = market.get_outcome("YES")
        current_price = yes.price

        # Time-based exit
        if state.entry_time:
            hold_time = datetime.now(timezone.utc) - state.entry_time
            max_hold = timedelta(hours=self._max_hold_hours)

            if hold_time > max_hold:
                state.position = 0
                return Signal(
                    strategy_id=self.strategy_id,
                    market_id=market.market_id,
                    outcome_id=yes.outcome_id,
                    signal_type=SignalType.SELL,
                    confidence=0.7,
                    size_suggestion=Decimal("1.0"),  # Close full position
                    reason=f"Exit: Max hold time ({self._max_hold_hours}h)"
                )

        # Profit/loss exit
        if state.entry_price > 0:
            pnl_pct = (current_price - state.entry_price) / state.entry_price

            # Take profit
            if pnl_pct >= self._take_profit_pct:
                state.position = 0
                return Signal(
                    strategy_id=self.strategy_id,
                    market_id=market.market_id,
                    outcome_id=yes.outcome_id,
                    signal_type=SignalType.SELL,
                    confidence=0.8,
                    size_suggestion=Decimal("1.0"),
                    reason=f"Exit: Take profit ({float(pnl_pct)*100:.1f}%)"
                )

            # Stop loss
            if pnl_pct <= -self._stop_loss_pct:
                state.position = 0
                return Signal(
                    strategy_id=self.strategy_id,
                    market_id=market.market_id,
                    outcome_id=yes.outcome_id,
                    signal_type=SignalType.SELL,
                    confidence=0.9,
                    size_suggestion=Decimal("1.0"),
                    reason=f"Exit: Stop loss ({float(pnl_pct)*100:.1f}%)"
                )

        return None

    def on_trade_executed(self, trade: Trade) -> None:
        logger.info(
            f"Volume spike trade: {trade.market_id}, "
            f"side={trade.side}, P&L={trade.pnl}"
        )

        # Reset position if this was an exit
        if trade.market_id in self._states:
            state = self._states[trade.market_id]
            if str(trade.side).upper() == "SELL" and state.position == 1:
                state.position = 0

Configuration

Add to configs/dev.yaml:

enabled_strategies:
  - volume_spike_v1

strategies:
  volume_spike_v1:
    enabled: true
    allocated_capital: 100
    parameters:
      spike_threshold: "2.0"      # 2x average volume
      order_size_pct: "0.03"      # 3% per trade
      max_hold_hours: 1           # Exit after 1 hour
      stop_loss_pct: "0.05"       # 5% stop loss
      take_profit_pct: "0.05"     # 5% take profit

Testing

# Run with dev config
python -m src.main configs/dev.yaml

# Watch logs for volume spike signals
tail -f pqap.log | grep "Volume spike"

# Monitor paper trading P&L
# Navigate to http://localhost:8080/paper

Summary

To create a strategy: 1. Subclass BaseStrategy 2. Implement on_orderbook_update() for signal generation 3. Implement on_trade_executed() for state tracking 4. Add exit logic (time, price, or signal-based) 5. Configure in YAML 6. Test in paper trading 7. Backtest before live trading

Key principles: - Always implement exit logic - Handle missing data gracefully - Use Decimal for money - Validate configuration - Log important events - Document expected performance

System Overview

Polymarket API

Market data source

Data Collector

Every 5 minutes

SQLite Database

Price history + trades

Strategy Engine

Signal generation

ML Model

XGBoost (72% acc)

Execution Engine

Paper trading

Dashboard

You are here!

Telegram

Alerts & updates

Trading Strategies

Each strategy looks for different market inefficiencies:

Dual Arbitrage Active

Finds when YES + NO prices don't add to 100%. Risk-free profit.

Mean Reversion Active

Buys when price drops too far from average, sells when it recovers.

Market Maker Active

Places bid/ask orders to capture the spread.

Time Arbitrage Active

Exploits predictable price patterns at certain hours.

ML Prediction Active

Uses machine learning to predict 6-hour price direction.

Value Betting Disabled

Finds underpriced outcomes based on implied probability.

Data Storage (Single Source of Truth)

All data lives on EC2. Local machines are for development only. The EC2 instance is the authoritative source for all market data, trades, and positions.
Database Purpose Location
market_history.db Price snapshots every 5 minutes (8.2 MB) EC2 (primary)
pqap_prod.db Trades, positions, P&L history EC2 (primary)
paper_trading_state.json Current portfolio state EC2 (primary)

Environment Architecture

EC2 (Production)

  • Runs 24/7
  • All databases live here
  • Executes all trades
  • Single source of truth

Local (Development)

  • For code changes only
  • Syncs code to EC2
  • No production data
  • Can be turned off

Environment Details

Component Details
Dashboard URL https://pqap.tailwindtech.ai
Server AWS EC2 (us-east-1)
SSL Let's Encrypt via Traefik
Mode Paper Trading (simulated)

How It Works (Simple Version)

1. Data Collection: Every 5 minutes, we fetch prices from Polymarket for 50 markets and save them to our database.

2. Analysis: Our strategies analyze this data looking for patterns - like prices that moved too far from normal, or markets where the math doesn't add up.

3. Signals: When a strategy finds an opportunity, it generates a "signal" - a recommendation to buy or sell.

4. Execution: The execution engine takes these signals and simulates trades (paper trading). Eventually, this will place real orders.

5. Monitoring: This dashboard shows you what's happening. Telegram sends alerts for important events.