Documentation Index
Fetch the complete documentation index at: https://polynode.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Orderbook Streaming
Real-time orderbook data from ob.polynode.dev. Three levels of abstraction: raw stream, local state manager, or the fully managed engine.
Raw Stream
For full control over message processing:
use polynode::{PolyNodeClient, ObStreamOptions};
use polynode::types::orderbook::ObMessage;
#[tokio::main]
async fn main() -> polynode::Result<()> {
let client = PolyNodeClient::new("pn_live_YOUR_KEY")?;
let mut stream = client.orderbook_stream(ObStreamOptions::default()).await?;
// Subscribe to specific tokens
stream.subscribe(vec![
"51037625779056581606819614184446816710505006861008496087735536016411882582167".into(),
]).await?;
while let Some(msg) = stream.next().await {
match msg {
Ok(ObMessage::Update(update)) => {
println!("{:?}", update);
}
Ok(ObMessage::Subscribed { markets }) => {
println!("tracking {} markets", markets);
}
Ok(ObMessage::SnapshotsDone { total }) => {
println!("all {} snapshots loaded", total);
}
Err(e) => eprintln!("error: {}", e),
_ => {}
}
}
Ok(())
}
Local Orderbook State
Apply snapshots, deltas, and price changes to maintain a sorted, tick-accurate local copy of the book:
use polynode::LocalOrderbook;
use polynode::types::orderbook::{ObMessage, OrderbookUpdate};
# async fn example(stream: &mut polynode::ObStream) -> polynode::Result<()> {
let mut book = LocalOrderbook::new();
while let Some(msg) = stream.next().await {
if let Ok(ObMessage::Update(update)) = msg {
match &update {
OrderbookUpdate::Snapshot(snap) => book.apply_snapshot(snap),
OrderbookUpdate::Update(delta) => book.apply_update(delta),
OrderbookUpdate::PriceChange(pc) => book.apply_price_change(pc),
OrderbookUpdate::LastTradePrice(_) => {}
}
}
let token = "21742633...";
if let Some(bid) = book.best_bid(token) {
println!("best bid: {} x {}", bid.price, bid.size);
}
if let Some(ask) = book.best_ask(token) {
println!("best ask: {} x {}", ask.price, ask.size);
}
if let Some(spread) = book.spread(token) {
println!("spread: {:.4}", spread);
}
}
# Ok(())
# }
Requires polynode v0.11.0+. Earlier versions dropped price_change events before they reached your handler — upgrade if your local book drifts from the server.
Types
best_bid and best_ask return Option<&OrderbookLevel>. book.get_book(token) returns Option<(&[OrderbookLevel], &[OrderbookLevel])> — bids first (sorted descending), asks second (sorted ascending).
pub struct OrderbookLevel {
pub price: String, // string-typed — parse to f64 for math
pub size: String,
}
Prices and sizes are strings to preserve full Polymarket precision. Parse with level.price.parse::<f64>()? when you need numeric values.
Orderbook Engine
The highest-level abstraction. One shared WebSocket, automatic state management, and filtered views for different consumers:
use polynode::{OrderbookEngine, EngineOptions};
#[tokio::main]
async fn main() -> polynode::Result<()> {
let engine = OrderbookEngine::connect(
"pn_live_YOUR_KEY",
EngineOptions::default(),
).await?;
// Subscribe to tokens
engine.subscribe(vec![
"token_a".into(),
"token_b".into(),
]).await?;
// Query the shared state directly
if let Some(mid) = engine.midpoint("token_a").await {
println!("midpoint: {:.4}", mid);
}
if let Some(spread) = engine.spread("token_a").await {
println!("spread: {:.4}", spread);
}
if let Some((bids, asks)) = engine.book("token_a").await {
println!("bids: {}, asks: {}", bids.len(), asks.len());
}
// Create a filtered view for a subset of tokens
let mut view = engine.view(vec!["token_a".into()]);
while let Some(update) = view.next().await {
if let Some(mid) = view.midpoint("token_a").await {
println!("token_a midpoint: {:.4}", mid);
}
}
engine.close().await?;
Ok(())
}
Method signatures
# use polynode::OrderbookEngine;
# use polynode::types::orderbook::OrderbookLevel;
# async fn sigs(engine: &OrderbookEngine) {
let _: Option<f64> = engine.midpoint("token").await;
let _: Option<f64> = engine.spread("token").await;
let _: Option<OrderbookLevel> = engine.best_bid("token").await;
let _: Option<OrderbookLevel> = engine.best_ask("token").await;
let _: Option<(Vec<OrderbookLevel>, Vec<OrderbookLevel>)> = engine.book("token").await;
# }
midpoint and spread return None when either the bid OR ask side is empty — a one-sided book has no defined mid or spread. best_bid / best_ask only require their own side to be present. book returns whatever’s there even if one side is empty.
EngineOptions
pub struct EngineOptions {
pub compress: bool, // default: true — gzip on the wire
pub auto_reconnect: bool, // default: true
pub max_reconnect_attempts: Option<u32>, // default: None — reconnect forever
}
EngineOptions::default() is the right call for almost every use case. Override only if you need to disable compression for a CPU-constrained client or cap reconnect attempts in a one-shot script.
engine.subscribe(...) sends the subscribe command to the background WS task and returns as soon as the command is queued. Snapshots arrive asynchronously over the WS and populate local state some time later (typically 1–5 seconds, longer for many tokens). Calling engine.midpoint(id) the moment subscribe() returns will return None because the snapshot hasn’t landed yet.
Two patterns work:
# use std::time::Duration;
# use polynode::OrderbookEngine;
# async fn example(engine: &OrderbookEngine) -> polynode::Result<()> {
// (1) coarse wait — simplest, fine for scripts and examples
engine.subscribe(vec!["token_a".into()]).await?;
tokio::time::sleep(Duration::from_secs(3)).await;
let mid = engine.midpoint("token_a").await; // now returns Some(..) once snapshot applied
// (2) event-driven — read updates from a view (production pattern)
let mut view = engine.view(vec!["token_a".into()]);
engine.subscribe(vec!["token_a".into()]).await?;
while let Some(_update) = view.next().await {
if let Some(mid) = view.midpoint("token_a").await {
println!("ready: midpoint={mid:.4}");
break;
}
}
# Ok(())
# }
Errors connect() can return
OrderbookEngine::connect returns polynode::Result<Self> which unwraps to one of:
Error::Auth(...) — API key is invalid, revoked, or not recognized
Error::RateLimited(...) — too many concurrent connections for your tier
Error::WebSocket(...) — TLS/handshake failure (network, DNS, TLS cert)
Error::Url(...) — internal URL parse error (shouldn’t happen; file a bug)
In long-running programs, the engine’s internal task uses EngineOptions::auto_reconnect (default true) to recover from mid-session drops, so you generally only need to handle errors at initial connect() time.
Batch Queries
Query many tokens in a single call. Each batch method takes a slice of token IDs and returns a HashMap<String, T> keyed by asset_id. Tokens that aren’t in local state are silently omitted, so callers can pass mixed lists without pre-filtering.
use polynode::{OrderbookEngine, EngineOptions};
#[tokio::main]
async fn main() -> polynode::Result<()> {
let engine = OrderbookEngine::connect("pn_live_...", EngineOptions::default()).await?;
engine.subscribe(vec!["token_a".into(), "token_b".into(), "token_c".into()]).await?;
let ids = vec!["token_a".to_string(), "token_b".to_string(), "token_c".to_string()];
// One read lock, one round-trip — every value at once.
let mids = engine.midpoints(&ids).await;
let spreads = engine.spreads(&ids).await;
let bids = engine.best_bids(&ids).await;
let asks = engine.best_asks(&ids).await;
let books = engine.books(&ids).await;
for (id, m) in &mids {
println!("{} mid={:.4}", id, m);
}
Ok(())
}
Sample response shape (actual values from a live run):
midpoints(&ids) -> { "77893140…19989216": 0.2350,
"22139576…97751695": 0.0085,
"10905153…10914081": 0.3050 }
spreads(&ids) -> { "58343456…67891911": 0.0010,
"22139576…97751695": 0.0010 }
best_bids(&ids) -> { "72100993…76306968": OrderbookLevel { price: "0.999", size: "154844.11" },
"10905153…10914081": OrderbookLevel { price: "0.3", size: "473238.67" } }
books(&ids) -> { "84387820…19854548": (
vec![], // bids (empty — one-sided)
vec![OrderbookLevel { price: "0.001", size: "687883.2" }, /* +113 more */]
) }
Only two of the three tokens have midpoints because the third is a one-sided binary book (see EngineOptions → None semantics above).
All Tracked Tokens
Skip the ID list and get every token currently in local state. Useful when you want a global snapshot of everything you’re subscribed to.
# use polynode::OrderbookEngine;
# async fn example(engine: &OrderbookEngine) -> polynode::Result<()> {
let tokens = engine.tracked_tokens().await; // Vec<String>
let mids = engine.midpoints_all().await; // every midpoint
let spreads = engine.spreads_all().await;
let bids = engine.best_bids_all().await;
let asks = engine.best_asks_all().await;
let books = engine.books_all().await; // full L2 for everything
# Ok(())
# }
The same _all methods exist on EngineView.
EngineView — filtered handle over shared state
engine.view(token_ids) returns an EngineView — a filtered receiver that only sees updates for the token IDs you pass in. All views share the engine’s single WebSocket and single local state; they’re cheap to create and you can have many in one process (e.g. one per consumer task).
# use polynode::OrderbookEngine;
# async fn example(engine: &OrderbookEngine) {
let mut view = engine.view(vec!["token_a".into(), "token_b".into()]);
// Stream updates scoped to this view's tokens. next() returns the next
// OrderbookUpdate for token_a or token_b, and None only if the engine closes.
while let Some(update) = view.next().await {
// update is an OrderbookUpdate::Snapshot | ::Update | ::PriceChange | ::LastTradePrice
// query freshest state (same read lock as the engine):
if let Some(mid) = view.midpoint("token_a").await {
println!("token_a mid: {mid:.4}");
}
}
// Change the filter at runtime — takes effect on the next incoming message:
view.set_tokens(vec!["token_c".into()]).await;
# }
Every batch + _all() method that exists on OrderbookEngine also exists on EngineView. engine.state() is engine-only (views are filtered; use the engine for the full state handle).
Detecting Inactive Markets
Each token tracks the moment its local copy was last touched (snapshot or delta). Use this to detect markets that have stopped moving.
use std::time::{Duration, Instant};
# use polynode::OrderbookEngine;
# async fn example(engine: &OrderbookEngine) -> polynode::Result<()> {
// Return type: Option<std::time::Instant>. Call .elapsed() for Duration since.
let ts: Option<Instant> = engine.last_change("token_a").await;
if let Some(ts) = ts {
println!("last update was {:?} ago", ts.elapsed());
}
// Return type: Vec<String> — every tracked token whose last update is older than threshold.
let stale: Vec<String> = engine.inactive_since(Duration::from_secs(60)).await;
for token in stale {
println!("inactive: {}", token);
}
# Ok(())
# }
Sample response (a tracked token and a stale filter):
last_change("77893140…") -> Some(Instant { .. }) // .elapsed() => 3.910s
inactive_since(60s) -> [] // nothing stale right after subscribe
inactive_since(1ms) -> ["77893140…", "22139576…", "10905153…"]
The timestamp is stamped client-side at the moment your SDK applies the update, which is typically tens of milliseconds behind the actual exchange event — accurate enough for inactive-market detection, not for sub-second cross-market correlation.
Direct State Access
For callers who want to hold the lock and walk the full state themselves, engine.state() returns the underlying Arc<RwLock<LocalOrderbook>>. Useful for custom batch logic, snapshotting all tokens at a single consistent moment, or building your own view.
# use polynode::OrderbookEngine;
# async fn example(engine: &OrderbookEngine) -> polynode::Result<()> {
let state = engine.state();
let guard = state.read().await;
// Everything in one consistent view
let total = guard.len();
let mids = guard.midpoints_all();
let books = guard.books_all();
for token in guard.tracked_tokens() {
if let Some(ts) = guard.last_change(&token) {
println!("{}: last change {:?} ago", token, ts.elapsed());
}
}
# Ok(())
# }
The same methods are available on LocalOrderbook directly when used outside the engine.