> ## Documentation Index
> Fetch the complete documentation index at: https://docs.polynode.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Rust — Orderbook

## Orderbook Streaming

Real-time orderbook data from `ob.polynode.dev`. Three levels of abstraction: raw stream, local state manager, or the fully managed engine.

### Raw Stream

For full control over message processing:

```rust,no_run theme={null}
use polynode::{PolyNodeClient, ObStreamOptions};
use polynode::types::orderbook::ObMessage;

#[tokio::main]
async fn main() -> polynode::Result<()> {
    let client = PolyNodeClient::new("pn_live_YOUR_KEY")?;

    let mut stream = client.orderbook_stream(ObStreamOptions::default()).await?;

    // Subscribe to specific tokens
    stream.subscribe(vec![
        "51037625779056581606819614184446816710505006861008496087735536016411882582167".into(),
    ]).await?;

    while let Some(msg) = stream.next().await {
        match msg {
            Ok(ObMessage::Update(update)) => {
                println!("{:?}", update);
            }
            Ok(ObMessage::Subscribed { markets }) => {
                println!("tracking {} markets", markets);
            }
            Ok(ObMessage::SnapshotsDone { total }) => {
                println!("all {} snapshots loaded", total);
            }
            Err(e) => eprintln!("error: {}", e),
            _ => {}
        }
    }

    Ok(())
}
```

### Local Orderbook State

Apply snapshots, deltas, and price changes to maintain a sorted, tick-accurate local copy of the book:

```rust,no_run theme={null}
use polynode::LocalOrderbook;
use polynode::types::orderbook::{ObMessage, OrderbookUpdate};

# async fn example(stream: &mut polynode::ObStream) -> polynode::Result<()> {
let mut book = LocalOrderbook::new();

while let Some(msg) = stream.next().await {
    if let Ok(ObMessage::Update(update)) = msg {
        match &update {
            OrderbookUpdate::Snapshot(snap)       => book.apply_snapshot(snap),
            OrderbookUpdate::Update(delta)        => book.apply_update(delta),
            OrderbookUpdate::PriceChange(pc)      => book.apply_price_change(pc),
            OrderbookUpdate::LastTradePrice(_)    => {}
        }
    }

    let token = "21742633...";
    if let Some(bid) = book.best_bid(token) {
        println!("best bid: {} x {}", bid.price, bid.size);
    }
    if let Some(ask) = book.best_ask(token) {
        println!("best ask: {} x {}", ask.price, ask.size);
    }
    if let Some(spread) = book.spread(token) {
        println!("spread: {:.4}", spread);
    }
}
# Ok(())
# }
```

<Tip>
  Requires polynode **v0.11.0+**. Earlier versions dropped `price_change` events before they reached your handler — upgrade if your local book drifts from the server.
</Tip>

#### Types

`best_bid` and `best_ask` return `Option<&OrderbookLevel>`. `book.get_book(token)` returns `Option<(&[OrderbookLevel], &[OrderbookLevel])>` — bids first (sorted descending), asks second (sorted ascending).

```rust,no_run theme={null}
pub struct OrderbookLevel {
    pub price: String,  // string-typed — parse to f64 for math
    pub size: String,
}
```

Prices and sizes are strings to preserve full Polymarket precision. Parse with `level.price.parse::<f64>()?` when you need numeric values.

### Orderbook Engine

The highest-level abstraction. One shared WebSocket, automatic state management, and filtered views for different consumers:

```rust,no_run theme={null}
use polynode::{OrderbookEngine, EngineOptions};

#[tokio::main]
async fn main() -> polynode::Result<()> {
    let engine = OrderbookEngine::connect(
        "pn_live_YOUR_KEY",
        EngineOptions::default(),
    ).await?;

    // Subscribe to tokens
    engine.subscribe(vec![
        "token_a".into(),
        "token_b".into(),
    ]).await?;

    // Query the shared state directly
    if let Some(mid) = engine.midpoint("token_a").await {
        println!("midpoint: {:.4}", mid);
    }
    if let Some(spread) = engine.spread("token_a").await {
        println!("spread: {:.4}", spread);
    }
    if let Some((bids, asks)) = engine.book("token_a").await {
        println!("bids: {}, asks: {}", bids.len(), asks.len());
    }

    // Create a filtered view for a subset of tokens
    let mut view = engine.view(vec!["token_a".into()]);
    while let Some(update) = view.next().await {
        if let Some(mid) = view.midpoint("token_a").await {
            println!("token_a midpoint: {:.4}", mid);
        }
    }

    engine.close().await?;
    Ok(())
}
```

#### Method signatures

```rust,no_run theme={null}
# use polynode::OrderbookEngine;
# use polynode::types::orderbook::OrderbookLevel;
# async fn sigs(engine: &OrderbookEngine) {
let _: Option<f64>            = engine.midpoint("token").await;
let _: Option<f64>            = engine.spread("token").await;
let _: Option<OrderbookLevel> = engine.best_bid("token").await;
let _: Option<OrderbookLevel> = engine.best_ask("token").await;
let _: Option<(Vec<OrderbookLevel>, Vec<OrderbookLevel>)> = engine.book("token").await;
# }
```

`midpoint` and `spread` return `None` when either the bid OR ask side is empty — a one-sided book has no defined mid or spread. `best_bid` / `best_ask` only require their own side to be present. `book` returns whatever's there even if one side is empty.

#### EngineOptions

```rust,no_run theme={null}
pub struct EngineOptions {
    pub compress: bool,                       // default: true  — gzip on the wire
    pub auto_reconnect: bool,                 // default: true
    pub max_reconnect_attempts: Option<u32>,  // default: None  — reconnect forever
}
```

`EngineOptions::default()` is the right call for almost every use case. Override only if you need to disable compression for a CPU-constrained client or cap reconnect attempts in a one-shot script.

#### `subscribe()` returns immediately — snapshots are async

`engine.subscribe(...)` sends the subscribe command to the background WS task and returns as soon as the command is queued. **Snapshots arrive asynchronously** over the WS and populate local state some time later (typically 1–5 seconds, longer for many tokens). Calling `engine.midpoint(id)` the moment `subscribe()` returns will return `None` because the snapshot hasn't landed yet.

Two patterns work:

```rust,no_run theme={null}
# use std::time::Duration;
# use polynode::OrderbookEngine;
# async fn example(engine: &OrderbookEngine) -> polynode::Result<()> {
// (1) coarse wait — simplest, fine for scripts and examples
engine.subscribe(vec!["token_a".into()]).await?;
tokio::time::sleep(Duration::from_secs(3)).await;
let mid = engine.midpoint("token_a").await; // now returns Some(..) once snapshot applied

// (2) event-driven — read updates from a view (production pattern)
let mut view = engine.view(vec!["token_a".into()]);
engine.subscribe(vec!["token_a".into()]).await?;
while let Some(_update) = view.next().await {
    if let Some(mid) = view.midpoint("token_a").await {
        println!("ready: midpoint={mid:.4}");
        break;
    }
}
# Ok(())
# }
```

#### Errors `connect()` can return

`OrderbookEngine::connect` returns `polynode::Result<Self>` which unwraps to one of:

* `Error::Auth(...)` — API key is invalid, revoked, or not recognized
* `Error::RateLimited(...)` — too many concurrent connections for your tier
* `Error::WebSocket(...)` — TLS/handshake failure (network, DNS, TLS cert)
* `Error::Url(...)` — internal URL parse error (shouldn't happen; file a bug)

In long-running programs, the engine's internal task uses `EngineOptions::auto_reconnect` (default true) to recover from mid-session drops, so you generally only need to handle errors at initial `connect()` time.

### Batch Queries

Query many tokens in a single call. Each batch method takes a slice of token IDs and returns a `HashMap<String, T>` keyed by `asset_id`. Tokens that aren't in local state are silently omitted, so callers can pass mixed lists without pre-filtering.

```rust,no_run theme={null}
use polynode::{OrderbookEngine, EngineOptions};

#[tokio::main]
async fn main() -> polynode::Result<()> {
    let engine = OrderbookEngine::connect("pn_live_...", EngineOptions::default()).await?;
    engine.subscribe(vec!["token_a".into(), "token_b".into(), "token_c".into()]).await?;

    let ids = vec!["token_a".to_string(), "token_b".to_string(), "token_c".to_string()];

    // One read lock, one round-trip — every value at once.
    let mids    = engine.midpoints(&ids).await;
    let spreads = engine.spreads(&ids).await;
    let bids    = engine.best_bids(&ids).await;
    let asks    = engine.best_asks(&ids).await;
    let books   = engine.books(&ids).await;

    for (id, m) in &mids {
        println!("{}  mid={:.4}", id, m);
    }
    Ok(())
}
```

Sample response shape (actual values from a live run):

```text theme={null}
midpoints(&ids)   -> { "77893140…19989216": 0.2350,
                       "22139576…97751695": 0.0085,
                       "10905153…10914081": 0.3050 }

spreads(&ids)     -> { "58343456…67891911": 0.0010,
                       "22139576…97751695": 0.0010 }

best_bids(&ids)   -> { "72100993…76306968": OrderbookLevel { price: "0.999", size: "154844.11" },
                       "10905153…10914081": OrderbookLevel { price: "0.3",   size: "473238.67" } }

books(&ids)       -> { "84387820…19854548": (
                         vec![],                                                // bids (empty — one-sided)
                         vec![OrderbookLevel { price: "0.001", size: "687883.2" }, /* +113 more */]
                       ) }
```

Only two of the three tokens have midpoints because the third is a one-sided binary book (see `EngineOptions` → `None` semantics above).

### All Tracked Tokens

Skip the ID list and get every token currently in local state. Useful when you want a global snapshot of everything you're subscribed to.

```rust,no_run theme={null}
# use polynode::OrderbookEngine;
# async fn example(engine: &OrderbookEngine) -> polynode::Result<()> {
let tokens = engine.tracked_tokens().await;     // Vec<String>
let mids   = engine.midpoints_all().await;       // every midpoint
let spreads = engine.spreads_all().await;
let bids   = engine.best_bids_all().await;
let asks   = engine.best_asks_all().await;
let books  = engine.books_all().await;           // full L2 for everything
# Ok(())
# }
```

The same `_all` methods exist on `EngineView`.

### EngineView — filtered handle over shared state

`engine.view(token_ids)` returns an `EngineView` — a filtered receiver that only sees updates for the token IDs you pass in. All views share the engine's single WebSocket and single local state; they're cheap to create and you can have many in one process (e.g. one per consumer task).

```rust,no_run theme={null}
# use polynode::OrderbookEngine;
# async fn example(engine: &OrderbookEngine) {
let mut view = engine.view(vec!["token_a".into(), "token_b".into()]);

// Stream updates scoped to this view's tokens. next() returns the next
// OrderbookUpdate for token_a or token_b, and None only if the engine closes.
while let Some(update) = view.next().await {
    // update is an OrderbookUpdate::Snapshot | ::Update | ::PriceChange | ::LastTradePrice
    // query freshest state (same read lock as the engine):
    if let Some(mid) = view.midpoint("token_a").await {
        println!("token_a mid: {mid:.4}");
    }
}

// Change the filter at runtime — takes effect on the next incoming message:
view.set_tokens(vec!["token_c".into()]).await;
# }
```

Every batch + `_all()` method that exists on `OrderbookEngine` also exists on `EngineView`. `engine.state()` is engine-only (views are filtered; use the engine for the full state handle).

### Detecting Inactive Markets

Each token tracks the moment its local copy was last touched (snapshot or delta). Use this to detect markets that have stopped moving.

```rust,no_run theme={null}
use std::time::{Duration, Instant};
# use polynode::OrderbookEngine;

# async fn example(engine: &OrderbookEngine) -> polynode::Result<()> {
// Return type: Option<std::time::Instant>. Call .elapsed() for Duration since.
let ts: Option<Instant> = engine.last_change("token_a").await;
if let Some(ts) = ts {
    println!("last update was {:?} ago", ts.elapsed());
}

// Return type: Vec<String> — every tracked token whose last update is older than threshold.
let stale: Vec<String> = engine.inactive_since(Duration::from_secs(60)).await;
for token in stale {
    println!("inactive: {}", token);
}
# Ok(())
# }
```

Sample response (a tracked token and a stale filter):

```text theme={null}
last_change("77893140…")     -> Some(Instant { .. })    // .elapsed() => 3.910s
inactive_since(60s)          -> []                      // nothing stale right after subscribe
inactive_since(1ms)          -> ["77893140…", "22139576…", "10905153…"]
```

The timestamp is stamped client-side at the moment your SDK applies the update, which is typically tens of milliseconds behind the actual exchange event — accurate enough for inactive-market detection, not for sub-second cross-market correlation.

### Direct State Access

For callers who want to hold the lock and walk the full state themselves, `engine.state()` returns the underlying `Arc<RwLock<LocalOrderbook>>`. Useful for custom batch logic, snapshotting all tokens at a single consistent moment, or building your own view.

```rust,no_run theme={null}
# use polynode::OrderbookEngine;
# async fn example(engine: &OrderbookEngine) -> polynode::Result<()> {
let state = engine.state();
let guard = state.read().await;

// Everything in one consistent view
let total = guard.len();
let mids  = guard.midpoints_all();
let books = guard.books_all();

for token in guard.tracked_tokens() {
    if let Some(ts) = guard.last_change(&token) {
        println!("{}: last change {:?} ago", token, ts.elapsed());
    }
}
# Ok(())
# }
```

The same methods are available on `LocalOrderbook` directly when used outside the engine.
