Skip to main content

Install

pip install polynode
Requires Python 3.10+. For trading support (order placement on Polymarket):
pip install polynode[trading]

Quick Start

from polynode import PolyNode

pn = PolyNode(api_key="pn_live_...")

# Fetch top markets
markets = pn.markets(count=10)
print(f"{markets.count} markets, {markets.total} total")

# Search
results = pn.search("bitcoin")
print(results.results[0].question)

pn.close()

Context Manager

with PolyNode(api_key="pn_live_...") as pn:
    status = pn.status()
    print(f"Tracking {status.state.market_count} markets")

Async Client

Every method is available in both sync and async variants:
import asyncio
from polynode import AsyncPolyNode

async def main():
    async with AsyncPolyNode(api_key="pn_live_...") as pn:
        status = await pn.status()
        markets = await pn.markets(count=3)
        print(f"{status.state.market_count} markets, {status.ws_subscribers} ws subs")

asyncio.run(main())

REST Methods

Every REST endpoint has a typed method on the PolyNode client. All return Pydantic models with full IDE autocomplete.
# System
pn.healthz()                           # "ok"
pn.status()                            # StatusResponse
pn.create_key("my-bot")               # ApiKeyResponse

# Markets
pn.markets(count=10)                   # MarketsResponse
pn.market(token_id)                    # dict
pn.market_by_slug("bitcoin-100k")     # dict
pn.market_by_condition(condition_id)   # dict
pn.markets_list(count=20, sort="volume")  # MarketsListResponse
pn.search("ethereum", limit=5)        # SearchResponse

# Pricing
pn.candles(token_id, resolution="1h", limit=100)  # CandlesResponse
pn.stats(token_id)                     # dict

# Settlements
pn.recent_settlements(count=20)        # SettlementsResponse
pn.token_settlements(token_id, count=10)
pn.wallet_settlements(address, count=10)

# Wallets
pn.wallet(address)                     # WalletResponse
pn.wallet_trades(address, limit=50)    # dict
pn.wallet_positions(address, limit=50) # dict
pn.wallet_onchain_positions(address)   # dict

# Orderbook (REST)
pn.orderbook_rest(token_id)           # OrderbookResponse
pn.midpoint(token_id)                 # MidpointResponse
pn.spread(token_id)                   # SpreadResponse

# Enriched Data (1 req/sec rate limit)
pn.leaderboard(period="weekly", sort="profit")  # LeaderboardResponse
pn.trending()                          # TrendingResponse
pn.activity()                          # ActivityResponse
pn.movers()                           # MoversResponse
pn.trader_profile("0xabc...")         # TraderProfile
pn.trader_pnl("0xabc...", period="1W")  # TraderPnlResponse
pn.event("how-many-fed-rate-cuts-2026")  # EventDetailResponse
pn.search_events("recession", limit=5)  # EventSearchResponse
pn.markets_by_category("crypto")      # MarketsListResponse

# RPC (rpc.polynode.dev)
pn.rpc("eth_blockNumber")
pn.rpc("eth_getBlockByNumber", ["latest", False])

Example: Market Data

with PolyNode(api_key="pn_live_...") as pn:
    # Top 3 markets by volume
    markets = pn.markets(count=3)
    for m in markets.markets:
        print(f"{m.question} — ${m.volume_24h:,.0f} vol")

    # OHLCV candles
    candles = pn.candles(markets.markets[0].token_id, resolution="1h", limit=3)
    for c in candles.candles:
        print(f"  O={c.open} H={c.high} L={c.low} C={c.close} V={c.volume:.0f}")

Example: Wallet Activity

with PolyNode(api_key="pn_live_...") as pn:
    wallet = pn.wallet("0xB27BC932bf8110D8F78e55da7d5f0497A18B5b82")
    a = wallet.activity
    print(f"Trades: {a.trade_count}, Volume: ${a.trade_volume_usd:,.0f}")

    profile = pn.trader_profile(wallet.wallet)
    print(f"{profile.pseudonym}: PnL ${profile.totalPnl:,.0f}")

WebSocket Streaming

Subscribe to real-time events with a builder pattern. WebSocket is async-only (Python convention):
import asyncio
from polynode import AsyncPolyNode

async def main():
    async with AsyncPolyNode(api_key="pn_live_...") as pn:
        sub = await pn.ws.subscribe("settlements") \
            .min_size(100) \
            .status("pending") \
            .snapshot_count(20) \
            .send()

        async for event in sub:
            print(f"{event.taker_side} ${event.taker_size:.0f} on {event.market_title}")
            print(f"  status: {event.status}, tx: {event.tx_hash[:20]}...")

asyncio.run(main())

Event Callbacks

sub.on("settlement", lambda e: print(f"{e.taker_side} ${e.taker_size} on {e.market_title}"))
sub.on("status_update", lambda e: print(f"Confirmed in {e.latency_ms}ms"))
sub.on("*", lambda e: print(e.event_type))  # catch-all

Subscription Filters

All filters from the Subscriptions & Filters page are supported:
(
    pn.ws.subscribe("settlements")
    .wallets(["0xabc..."])          # by wallet
    .tokens(["21742633..."])        # by token ID
    .slugs(["bitcoin-100k"])        # by market slug
    .condition_ids(["0xabc..."])    # by condition ID
    .side("BUY")                    # BUY or SELL
    .status("pending")              # pending, confirmed, or all
    .min_size(100)                  # min USD size
    .max_size(10000)                # max USD size
    .event_types(["settlement"])    # override event types
    .snapshot_count(50)             # initial snapshot (max 200)
    .feeds(["BTC/USD"])             # chainlink feeds
    .send()
)

Subscription Types

pn.ws.subscribe("settlements")   # pending + confirmed settlements
pn.ws.subscribe("trades")        # all trade activity
pn.ws.subscribe("prices")        # price-moving events
pn.ws.subscribe("blocks")        # new Polygon blocks
pn.ws.subscribe("wallets")       # all wallet activity
pn.ws.subscribe("markets")       # all market activity
pn.ws.subscribe("large_trades")  # $1K+ trades
pn.ws.subscribe("oracle")        # UMA resolution events
pn.ws.subscribe("chainlink")     # real-time price feeds

Multiple Subscriptions

Subscriptions stack on the same connection:
whales = await pn.ws.subscribe("large_trades").min_size(5000).send()
my_wallet = await pn.ws.subscribe("wallets").wallets(["0xabc..."]).send()

# Both active simultaneously, events deduplicated

Context Manager

async with await pn.ws.subscribe("settlements").send() as sub:
    async for event in sub:
        print(event.market_title, event.taker_price)
        break
# Auto-unsubscribes on exit

Compression

Zlib compression is enabled by default for all WebSocket connections (~50% bandwidth savings). No configuration needed.

Auto-Reconnect

Enabled by default. The SDK reconnects with exponential backoff and replays all active subscriptions:
from polynode.ws import PolyNodeWS
from polynode.types.ws import WsOptions

ws = PolyNodeWS("pn_live_...", "wss://ws.polynode.dev/ws", WsOptions(
    compress=True,
    auto_reconnect=True,
    max_reconnect_attempts=0,       # 0 = unlimited
    reconnect_base_delay=1.0,       # seconds
    reconnect_max_delay=30.0,       # seconds
))

ws.on_connect(lambda: print("connected"))
ws.on_disconnect(lambda reason: print(f"disconnected: {reason}"))
ws.on_reconnect(lambda attempt: print(f"reconnected, attempt {attempt}"))
ws.on_error(lambda err: print(f"error: {err}"))

Cleanup

sub.unsubscribe()        # remove one subscription
pn.ws.unsubscribe_all()  # remove all
pn.ws.disconnect()       # close connection

Orderbook Streaming

The SDK includes a dedicated orderbook client for real-time book data from ob.polynode.dev. This is a separate WebSocket connection from the event stream.

Subscribe

# Lazy-initialized, connects on first subscribe
await pn.orderbook.subscribe(["token_id_1", "token_id_2"])

Event Handlers

pn.orderbook.on("snapshot", lambda snap: print(f"{snap.asset_id}: {len(snap.bids)} bids"))
pn.orderbook.on("update", lambda delta: print(f"{delta.asset_id} updated"))
pn.orderbook.on("price", lambda c: print(f"price: {c.assets[0].price}"))
pn.orderbook.on("snapshots_done", lambda msg: print(f"All {msg.total} snapshots received"))
pn.orderbook.on("*", lambda u: print(u.type))  # catch-all

LocalOrderbook

Maintain a sorted local copy of the book:
from polynode import LocalOrderbook

book = LocalOrderbook()

# Wire to orderbook WS events
pn.orderbook.on("snapshot", lambda snap: book.apply_snapshot(snap))
pn.orderbook.on("update", lambda delta: book.apply_update(delta))

# Query state
full_book = book.get_book(token_id)    # (bids, asks) or None
best_bid = book.get_best_bid(token_id)  # OrderbookLevel or None
best_ask = book.get_best_ask(token_id)  # OrderbookLevel or None
spread = book.get_spread(token_id)      # float or None

Cleanup

pn.orderbook.unsubscribe()    # unsubscribe from all markets
pn.orderbook.disconnect()     # close connection

OrderbookEngine

Higher-level wrapper that manages one connection, maintains local state, and routes updates to filtered views.

Create and Subscribe

from polynode import OrderbookEngine

engine = OrderbookEngine(api_key="pn_live_...")
await engine.subscribe([token_a, token_b, token_c])

engine.on("ready", lambda: print(f"{engine.size} books loaded"))

Query State

engine.midpoint(token_id)    # float | None
engine.spread(token_id)      # float | None
engine.best_bid(token_id)    # OrderbookLevel | None
engine.best_ask(token_id)    # OrderbookLevel | None
engine.book(token_id)        # (bids, asks) | None

Filtered Views

Create lightweight views that only receive updates for specific tokens:
trade_view = engine.view([token_a, token_b])
portfolio_view = engine.view(my_position_tokens)

trade_view.on("update", lambda u: print(f"{u.asset_id} changed"))
trade_view.on("price", lambda c: print(f"price: {c.assets}"))

trade_view.midpoint(token_a)
trade_view.spread(token_a)

# Swap to different tokens
trade_view.set_tokens([new_token_x, new_token_y])

# Or destroy entirely
trade_view.destroy()

Cleanup

engine.close()  # disconnects WS, destroys all views, clears state

Trading

Place orders on Polymarket with local credential custody and builder attribution. Supports both the current exchange and the Polymarket V2 exchange. See also: PolyUSD Guide for V2 collateral wrapping. Requires the trading extras:
pip install polynode[trading]

Generate a Wallet

import asyncio
from polynode.trading import PolyNodeTrader

async def main():
    wallet = await PolyNodeTrader.generate_wallet()
    print(f"Address: {wallet.address}")
    print(f"Private key: {wallet.private_key}")
    # BACK UP THE PRIVATE KEY — it cannot be recovered

asyncio.run(main())

One-Call Onboarding

from polynode.trading import PolyNodeTrader, TraderConfig

trader = PolyNodeTrader(TraderConfig(polynode_key="pn_live_..."))

# Auto-detects wallet type, deploys Safe if needed, sets approvals, creates CLOB credentials
status = await trader.ensure_ready("0xYourPrivateKey...")
print(f"Wallet: {status.wallet}")
print(f"Funder: {status.funder_address}")
print(f"Type: {status.signature_type.name}")  # POLY_GNOSIS_SAFE
print(f"Actions: {status.actions}")

Place Orders

from polynode.trading import OrderParams

result = await trader.order(OrderParams(
    token_id="51037625779056581606819614184446816710505006861008496087735536016411882582167",
    side="BUY",
    price=0.55,
    size=100,
))
print(f"Success: {result.success}, Order ID: {result.order_id}")

Cancel Orders

# Cancel one
cancel = await trader.cancel_order("order-id-here")

# Cancel all
cancel = await trader.cancel_all()

# Cancel all in a market
cancel = await trader.cancel_all(market="condition-id")

Open Orders

orders = await trader.get_open_orders()
for o in orders:
    print(f"{o.side} {o.original_size} @ {o.price}{o.status}")

Pre-Trade Checks

# Check token approvals
approvals = await trader.check_approvals()
print(f"All approved: {approvals.all_approved}")

# Check balances
balance = await trader.check_balance()
print(f"USDC: {balance.usdc}, MATIC: {balance.matic}")

Wallet Management

# Link existing credentials (no signing needed)
trader.link_credentials(
    wallet="0x...",
    api_key="...",
    api_secret="...",
    api_passphrase="...",
)

# Export for backup
exported = trader.export_wallet()
# Import on another machine
trader.import_wallet(exported)

# List all linked wallets
wallets = trader.get_linked_wallets()

Address Derivation

from polynode.trading import derive_safe_address, derive_proxy_address

eoa = "0xacd89cFCB82Ae1f843467D56b58796bb928C9E1A"
safe = derive_safe_address(eoa)    # 0xf75564cEe0ed847463E18D20B5005c9db245c374
proxy = derive_proxy_address(eoa)  # 0x65FA7F1aFB344A3F78a62D662798A32d850738c8

Polymarket V2 Exchange

To trade on the Polymarket V2 exchange, set exchange_version on your TraderConfig:
from polynode.trading import TraderConfig, ExchangeVersion

config = TraderConfig(
    polynode_key="pn_live_...",
    exchange_version=ExchangeVersion.V2,
)
trader = PolyNodeTrader(config)
V2 uses PolyUSD (a 1:1 USDC.e wrapper) as collateral. You must wrap USDC.e into PolyUSD before placing orders:
# Wrap USDC.e → PolyUSD
result = await trader.wrap_to_polyusd(100.0)

# Unwrap PolyUSD → USDC.e
result = await trader.unwrap_from_polyusd(50.0)

# Check balances
polyusd = await trader.get_polyusd_balance()
usdce = await trader.get_usdce_balance()
print(f"PolyUSD: {polyusd}, USDC.e: {usdce}")
See the V2 Migration Guide for details on the Polymarket V2 exchange upgrade.

Configuration

from polynode.trading import TraderConfig, SignatureType, ExchangeVersion

config = TraderConfig(
    polynode_key="pn_live_...",                          # for builder attribution
    db_path="./polynode-trading.db",                     # local SQLite storage
    cosigner_url="https://trade.polynode.dev",           # co-signer proxy
    fallback_direct=True,                                 # direct CLOB if co-signer down
    default_signature_type=SignatureType.POLY_GNOSIS_SAFE,  # Safe (2), Proxy (1), or EOA (0)
    rpc_url="https://polygon-bor-rpc.publicnode.com",    # for on-chain reads
    exchange_version=ExchangeVersion.V1,                  # default; set to V2 for the Polymarket V2 exchange
)

trader = PolyNodeTrader(config)

Signature Types

TypeValueDescription
SignatureType.EOA0Direct EOA signing (user pays gas for approvals)
SignatureType.POLY_PROXY1Legacy Polymarket proxy wallet
SignatureType.POLY_GNOSIS_SAFE2Gnosis Safe (default, gasless onboarding)

Privy Signer (Server-Side Wallets)

Use Privy-managed wallets for headless server-side trading. No private key needed — signing is done through Privy’s wallet API:
from polynode.trading import PolyNodeTrader, TraderConfig
from polynode.trading.privy import PrivySigner, PrivyConfig

signer = PrivySigner(
    PrivyConfig(
        app_id="your-privy-app-id",
        app_secret="your-privy-app-secret",
        authorization_key="wallet-auth:your-authorization-key",
    ),
    wallet_id="your-privy-wallet-id",
    wallet_address="0xYourWalletAddress",
)

trader = PolyNodeTrader(TraderConfig(polynode_key="pn_live_..."))
status = await trader.ensure_ready(signer)
result = await trader.order(OrderParams(token_id="...", side="BUY", price=0.50, size=100))
The Privy signer implements the same RouterSigner interface and works with all trading methods (ensure_ready, order, cancel_all, etc.). Gnosis Safe wallets (type 2) are fully gasless.

Cleanup

trader.close()  # closes SQLite, clears active signer

Configuration

pn = PolyNode(
    api_key="pn_live_...",                       # required
    base_url="https://api.polynode.dev",         # default
    ws_url="wss://ws.polynode.dev/ws",           # default
    ob_url="wss://ob.polynode.dev/ws",           # default
    rpc_url="https://rpc.polynode.dev",          # default
    timeout=10.0,                                 # seconds, default 10
)

Error Handling

from polynode import PolyNode, ApiError, WsError, PolyNodeError

try:
    pn.market("invalid-id")
except ApiError as e:
    print(e.status)   # 404
    print(e.message)  # "Market not found"
except PolyNodeError as e:
    print(e.message)  # base error class

Pydantic Models

All event types are Pydantic v2 models with full IDE support:
from polynode.types.events import (
    SettlementEvent,
    TradeEvent,
    StatusUpdateEvent,
    BlockEvent,
    PositionChangeEvent,
    DepositEvent,
    PositionSplitEvent,
    PositionMergeEvent,
    OracleEvent,
    PriceFeedEvent,
    PolyNodeEvent,          # discriminated union of all events
)

from polynode.types.orderbook import (
    OrderbookLevel,
    BookSnapshot,
    BookUpdate,
    PriceChange,
)

from polynode.types.rest import (
    StatusResponse,
    MarketsResponse,
    MarketSummary,
    CandlesResponse,
    SettlementsResponse,
    WalletResponse,
    OrderbookResponse,
    LeaderboardResponse,
    TrendingResponse,
    TraderProfile,
)
The PolyNodeEvent union uses Pydantic’s discriminated union on event_type:
from pydantic import TypeAdapter

adapter = TypeAdapter(PolyNodeEvent)
event = adapter.validate_python({"event_type": "settlement", ...})
# Returns a SettlementEvent instance

Source

PyPI