Python SDK 参考文档
Python SDK Reference
pip install surge-python — 完整的异步
API,覆盖发布、订阅、认证、回调等全部功能。
pip install surge-python — Full async API covering
publish, subscribe, auth, callbacks and more.
快速上手Quick Start
Publisher
Subscriber
SurgeClient
数据类型Data Types
进阶用法Advanced
# ─── 行情发布 (tick_publisher.py) ───
from surge
import Publisher
import json, asyncio
async def
main(): pub = Publisher("127.0.0.1", 9800)
await pub.connect()
while True: tick = {"price": 12.85,
"vol": 34200,
"ask": 12.86,
"bid": 12.84}
await pub.send("md.sse.600519", json.dumps(tick).encode())
await asyncio.sleep(0.01) asyncio.run(main())
# ─── 策略订阅 (my_strategy.py) ───
from surge
import Subscriber
import json, asyncio
async def
main(): sub = Subscriber("127.0.0.1", 9800)
await sub.subscribe(["md.sse.*"], snapshot=True)
async for msg
in sub.messages(): tick =
json.loads(msg.payload) if tick["price"] < tick["bid"] *
0.995: print(f"BUY signal: {msg.topic} @ {tick['price']}") asyncio.run(main())
────────────────────────────────────────────────────
class Publisher(host, port, token=None)
────────────────────────────────────────────────────
发布专用客户端。只发不收,接口最简。
Publish-only client with the simplest interface.
────────────────────────────────────────────────────
# 参数 / Parameters:
# host (str) 服务器地址 default "127.0.0.1"
# port (int) 服务器端口 default 9800
# token (str|None) 认证令牌 default None
# ── connect() ──
# 连接服务器,如有 token 自动完成认证
pub = Publisher("127.0.0.1",
9800, token="surge_token_abc123") await pub.connect()
# ── send(topic, payload, headers=None) ──
# 发送单条消息到指定 topic
# topic 不存在会自动创建
await pub.send(
"md.sse.600519",
# topic 名称 json.dumps({"price": 12.85}).encode(),
# payload (bytes) headers={"source": "ctp"}
# 可选 headers
)
# ── send_batch(messages) ──
# 批量发送,减少系统调用次数,提高吞吐
# messages: list of (topic, payload) tuples
await pub.send_batch([ ("md.sse.600519", json.dumps({"price":
12.85}).encode()), ("md.sse.000001", json.dumps({"price":
10.22}).encode()), ("md.szse.300750", json.dumps({"price":
45.60}).encode()), ])
# ── disconnect() ──
await pub.disconnect()
# ── connected (property) ──
if pub.connected: print("connected")
────────────────────────────────────────────────────
class Subscriber(host, port, token=None)
────────────────────────────────────────────────────
订阅专用客户端。支持通配符、快照、回调。
Subscribe-only client with wildcards, snapshots,
callbacks and async generator.
────────────────────────────────────────────────────
# ── subscribe(topics, snapshot=False) ──
# topics: 支持精确匹配和通配符
# "md.sse.600519" 精确订阅贵州茅台
# "md.sse.*" 订阅全部沪市 (* 匹配单层级)
# "md.**" 订阅全部行情 (** 匹配多层级)
# snapshot: 订阅时获取最新一条快照
sub = Subscriber("127.0.0.1",
9800)
await sub.connect()
await sub.subscribe(["md.sse.*", "md.szse.*"], snapshot=True)
# ── 方式一: async for 消费消息 ──
# 推荐方式,简洁直观
async for msg
in sub.messages(): print(f"{msg.topic} seq={msg.sequence}
latency={msg.latency_us:.0f}us") data = json.loads(msg.payload)
# ── 方式二: 回调函数 ──
# 适合需要区分消息类型的场景
def
on_tick(msg): print(f"TICK: {msg.topic} @
{json.loads(msg.payload)['price']}")
def
on_snap(snap): print(f"SNAPSHOT: {snap.topic} seq={snap.latest_sequence}")
def on_disc():
print("disconnected from server")
sub.on_message(on_tick)
# 注册消息回调 sub.on_snapshot(on_snap)
# 注册快照回调
sub.on_disconnect(on_disc)
# 注册断连回调
# ── disconnect() ──
await sub.disconnect()
────────────────────────────────────────────────────
class SurgeClient(host, port, token,
auto_reconnect, reconnect_interval)
────────────────────────────────────────────────────
全功能客户端,同时支持发布和订阅。
Full-featured client supporting both pub and sub.
Publisher/Subscriber 内部都基于此类。
────────────────────────────────────────────────────
from surge
import SurgeClient client = SurgeClient(
host="127.0.0.1",
# 服务器地址 port=9800, # 服务器端口 token="surge_token_abc123", # 可选认证令牌 auto_reconnect=True,
# 断线自动重连 (默认开启)
reconnect_interval=1.0,
# 重连间隔秒数 (默认 1.0)
)
await client.connect()
# ── publish(topic, payload, headers=None) ──
await client.publish(
"signal.buy", json.dumps({"symbol": "600519",
"qty":
100}).encode(), headers={"strategy": "mean_reversion"} )
# ── subscribe(topics, snapshot=False) ──
await client.subscribe(["md.sse.*"], snapshot=True)
# ── unsubscribe(topics=None) ──
# topics=None 取消全部订阅
await client.unsubscribe(["md.sse.*"]) await client.unsubscribe()
# 取消全部
# ── messages() ──
# 异步生成器,同时接收 SurgeMessage 和 SurgeSnapshot
async for msg
in client.messages():
if isinstance(msg, SurgeMessage):
print(f"MSG: {msg.topic}")
elif isinstance(msg, SurgeSnapshot):
print(f"SNAP: {msg.topic}")
# ── 回调注册 ──
client.on_message(lambda m:
print(m.topic)) client.on_snapshot(lambda
s: print(s.topic)) client.on_disconnect(lambda: print("disconnected"))
# ── disconnect() ──
await client.disconnect()
────────────────────────────────────────────────────
class SurgeMessage
────────────────────────────────────────────────────
订阅收到的实时消息对象。
Real-time message received from subscription.
────────────────────────────────────────────────────
msg.topic
# str — Topic 名称, e.g. "md.sse.600519"
msg.sequence
# int — 全局递增序列号 msg.payload
# bytes — 消息载荷 (你的业务数据)
msg.timestamp
# int — 发布时的纳秒时间戳 msg.headers
# dict — 附加的 key-value 头信息
msg.latency_us
# float — 端到端延迟 (微秒)
# 使用示例
async for msg
in sub.messages(): data =
json.loads(msg.payload) print(f"[{msg.topic}] seq={msg.sequence} "
f"latency={msg.latency_us:.0f}us
price={data['price']}")
────────────────────────────────────────────────────
class SurgeSnapshot
────────────────────────────────────────────────────
订阅时请求的快照消息。包含 topic 的最新状态。
Snapshot received when subscribing with snapshot=True.
────────────────────────────────────────────────────
snap.topic
# str — Topic 名称 snap.latest_sequence
# int — 最新序列号 snap.payload
# bytes — 最新消息的载荷
# 使用示例: 订阅时先获取快照初始化状态
await sub.subscribe(["md.sse.600519"], snapshot=True)
async for msg
in sub.messages():
if isinstance(msg, SurgeSnapshot): state
= json.loads(msg.payload) print(f"初始状态: {state}") else: print(f"实时更新: {json.loads(msg.payload)}")
────────────────────────────────────────────────────
进阶用法 / Advanced Usage
────────────────────────────────────────────────────
# ── 1. Token 认证 ──
# 服务端 surge.toml 配置:
# [auth]
# enabled = true
# tokens = ["my_secret_token"]
client = SurgeClient(token="my_secret_token") await client.connect()
# connect() 内部自动完成认证握手
# ── 2. 自动重连 ──
# 默认开启,断线后每 1 秒尝试重连
# 重连成功后自动恢复之前的全部订阅
client = SurgeClient( auto_reconnect=True, reconnect_interval=2.0,
# 改为 2 秒
)
# 关闭自动重连
client = SurgeClient(auto_reconnect=False)
# ── 3. 通配符订阅模式 ──
await sub.subscribe(["md.sse.*"])
# 匹配 md.sse.600519, md.sse.000001 等
await sub.subscribe(["md.**"])
# 匹配 md.sse.600519, md.szse.300750 等
await sub.subscribe(["signal.*"]) # 匹配 signal.buy, signal.sell 等
# 新 topic 被发布时,通配符自动匹配新 topic
# 无需重新订阅
# ── 4. 批量发布 (高吞吐场景) ──
# 多条消息一次性发送,减少系统调用开销
symbols = ["600519",
"000001",
"300750",
"601318"] batch = [ (f"md.sse.{sym}", json.dumps({"price":
10.0}).encode())
for sym
in symbols ]
await pub.send_batch(batch)
# ── 5. 完整量化系统示例 ──
# 行情源 → Surge → 策略引擎 → Surge → 执行引擎
# [行情源] 发布价格
await pub.send("md.sse.600519", tick_data)
# [策略引擎] 订阅行情, 生成信号
async for msg
in strategy_sub.messages(): signal =
run_strategy(msg.payload) if signal:
await signal_pub.send("signal.buy", signal)
# [执行引擎] 订阅信号, 下单执行
async for msg
in exec_sub.messages(): order =
json.loads(msg.payload) execute_order(order)