SURGE

为量化交易者打造的高速低延迟数据管道,微秒级消息分发,让你的策略快人一步。

High-speed, low-latency data pipeline built for quant traders. Microsecond message delivery to keep your strategies ahead.

<30μs
P50 延迟
P50 Latency
<100μs
P99 延迟
P99 Latency
100K+
消息/秒
msgs/sec
立即下载 Download Now 核心特性 Features

量化散户的痛点

Pain Points for Retail Quant Traders

在量化交易的世界里,散户面对的不只是策略问题,更是基础设施的困境。

In the world of quant trading, retail traders face not just strategy challenges, but infrastructure limitations.

延迟太高,信号失效

High Latency Kills Signals

用 Redis/Kafka/RabbitMQ 做行情分发,毫秒级延迟在高频场景下意味着信号到达时价格已经变了,策略失去意义。

Using Redis/Kafka/RabbitMQ for market data adds milliseconds of latency. In fast markets, your signal arrives after the price has already moved.

🚧

架构复杂,门槛太高

Complex Architecture, High Barrier

搭建一套低延迟的数据管道需要 ZeroMQ + Protobuf + 自定义序列化,散户没有团队支撑,光是基础设施就耗尽精力。

Building a low-latency pipeline requires ZeroMQ + Protobuf + custom serialization. Solo traders lack the team to build and maintain this infrastructure.

🔌

多语言互通困难

Cross-Language Pain

行情源用 C++ 抓取,策略用 Python 编写,执行引擎用 Rust——不同组件间的数据传输需要反复造轮子。

Market feed in C++, strategy in Python, execution in Rust — connecting components across languages means reinventing the wheel every time.

💥

慢消费者拖垮整个系统

Slow Consumers Crash Everything

一个处理慢的订阅者会导致消息积压、内存暴涨,最终拖慢甚至崩溃整个消息链路。

One slow subscriber causes message backlog and memory explosion, eventually dragging down or crashing the entire messaging pipeline.

🚫

行情断线无法恢复

No Recovery After Disconnection

网络抖动后重连,错过的行情数据无法补回,策略状态不一致导致错误交易。

After a network hiccup, missed market data is gone forever. Inconsistent strategy state leads to erroneous trades.

💰

商业方案价格昂贵

Enterprise Solutions Are Expensive

机构级的低延迟消息中间件(如 Solace、29West)年费数万美元起步,散户根本用不起。

Enterprise-grade low-latency messaging (Solace, 29West) starts at tens of thousands per year — completely out of reach for individual traders.

Surge 如何解决这些问题

How Surge Solves These Problems

一个二进制文件,零配置启动,开箱即用的微秒级数据管道。

One binary, zero config startup, microsecond data pipeline out of the box.

原生 TCP + 自定义协议

Raw TCP + Custom Protocol

绕过 HTTP/gRPC 的协议开销,5 字节帧头 + Protobuf 载荷,P50 延迟低于 30 微秒,比 Redis Pub/Sub 快 10 倍以上。

Bypass HTTP/gRPC overhead. 5-byte frame header + Protobuf payload, P50 under 30μs — over 10x faster than Redis Pub/Sub.

🐍

Python SDK 原生支持

Native Python SDK

asyncio 原生异步 SDK,5 行代码即可接入。策略逻辑、行情源、执行引擎全部可以用 Python 自定义实现。

Native asyncio SDK — connect in 5 lines of code. Strategy logic, market feed, and execution engine can all be customized in Python.

🛠

智能慢消费者保护

Smart Slow Consumer Protection

三种订阅模式:Normal(全量)、Conflate(合并只保留最新)、Sample(采样),慢消费者不会拖垮系统。

Three subscription modes: Normal (all messages), Conflate (keep latest only), Sample (periodic) — slow consumers never crash the system.

📷

快照 + 断线重连

Snapshot + Auto Reconnect

订阅时可请求最新快照,断线后自动重连并恢复订阅,不丢失任何状态。WAL 持久化支持历史回放。

Request the latest snapshot on subscribe. Auto-reconnect with subscription restore on disconnect. WAL persistence enables historical replay.

🌐

跨语言统一协议

Cross-Language Unified Protocol

Protobuf 序列化 + 标准 TCP 协议,Python/Rust/C++/Go/Java 任意语言都能接入同一个 Surge 集群。

Protobuf serialization + standard TCP protocol. Python, Rust, C++, Go, Java — any language connects to the same Surge cluster.

🎁

免费开源,单文件部署

Free, Open Source, Single Binary

一个可执行文件 + 一个配置文件,不依赖 JVM / Docker / 任何外部服务,笔记本上就能运行生产级性能。

One executable + one config file. No JVM, no Docker, no external services. Production-grade performance on your laptop.

核心特性

Core Features

通配符订阅

Wildcard Subscribe

md.binance.* 自动匹配所有币对

md.binance.* auto-matches all pairs

动态 Topic

Dynamic Topics

发布即创建,无需预定义

Publish creates topics on-the-fly

序列号追踪

Sequence Tracking

每条消息全局递增序列号

Globally incremented sequence per message

延迟测量

Latency Measurement

纳秒时间戳内置端到端延迟计算

Nanosecond timestamps with built-in E2E latency

TLS 加密

TLS Encryption

纯 Rust 实现,零 OpenSSL 依赖

Pure Rust TLS, zero OpenSSL dependency

Web 监控面板

Web Dashboard

实时监控消息流、延迟、Topic 状态

Real-time monitoring of messages, latency, topics

Prometheus 指标

Prometheus Metrics

原生指标导出,对接 Grafana

Native metrics export, Grafana-ready

Token 认证

Token Authentication

可选的连接级身份认证

Optional connection-level auth

Python SDK 参考文档

Python SDK Reference

pip install surge-python — 完整的异步 API,覆盖发布、订阅、认证、回调等全部功能。

pip install surge-python — Full async API covering publish, subscribe, auth, callbacks and more.

快速上手Quick Start
Publisher
Subscriber
SurgeClient
数据类型Data Types
进阶用法Advanced
# ─── 行情发布 (tick_publisher.py) ─── from surge import Publisher import json, asyncio async def main(): pub = Publisher("127.0.0.1", 9800) await pub.connect() while True: tick = {"price": 12.85, "vol": 34200, "ask": 12.86, "bid": 12.84} await pub.send("md.sse.600519", json.dumps(tick).encode()) await asyncio.sleep(0.01) asyncio.run(main()) # ─── 策略订阅 (my_strategy.py) ─── from surge import Subscriber import json, asyncio async def main(): sub = Subscriber("127.0.0.1", 9800) await sub.subscribe(["md.sse.*"], snapshot=True) async for msg in sub.messages(): tick = json.loads(msg.payload) if tick["price"] < tick["bid"] * 0.995: print(f"BUY signal: {msg.topic} @ {tick['price']}") asyncio.run(main())
──────────────────────────────────────────────────── class Publisher(host, port, token=None) ──────────────────────────────────────────────────── 发布专用客户端。只发不收,接口最简。 Publish-only client with the simplest interface. ──────────────────────────────────────────────────── # 参数 / Parameters: # host (str) 服务器地址 default "127.0.0.1" # port (int) 服务器端口 default 9800 # token (str|None) 认证令牌 default None # ── connect() ── # 连接服务器,如有 token 自动完成认证 pub = Publisher("127.0.0.1", 9800, token="surge_token_abc123") await pub.connect() # ── send(topic, payload, headers=None) ── # 发送单条消息到指定 topic # topic 不存在会自动创建 await pub.send( "md.sse.600519", # topic 名称 json.dumps({"price": 12.85}).encode(), # payload (bytes) headers={"source": "ctp"} # 可选 headers ) # ── send_batch(messages) ── # 批量发送,减少系统调用次数,提高吞吐 # messages: list of (topic, payload) tuples await pub.send_batch([ ("md.sse.600519", json.dumps({"price": 12.85}).encode()), ("md.sse.000001", json.dumps({"price": 10.22}).encode()), ("md.szse.300750", json.dumps({"price": 45.60}).encode()), ]) # ── disconnect() ── await pub.disconnect() # ── connected (property) ── if pub.connected: print("connected")
──────────────────────────────────────────────────── class Subscriber(host, port, token=None) ──────────────────────────────────────────────────── 订阅专用客户端。支持通配符、快照、回调。 Subscribe-only client with wildcards, snapshots, callbacks and async generator. ──────────────────────────────────────────────────── # ── subscribe(topics, snapshot=False) ── # topics: 支持精确匹配和通配符 # "md.sse.600519" 精确订阅贵州茅台 # "md.sse.*" 订阅全部沪市 (* 匹配单层级) # "md.**" 订阅全部行情 (** 匹配多层级) # snapshot: 订阅时获取最新一条快照 sub = Subscriber("127.0.0.1", 9800) await sub.connect() await sub.subscribe(["md.sse.*", "md.szse.*"], snapshot=True) # ── 方式一: async for 消费消息 ── # 推荐方式,简洁直观 async for msg in sub.messages(): print(f"{msg.topic} seq={msg.sequence} latency={msg.latency_us:.0f}us") data = json.loads(msg.payload) # ── 方式二: 回调函数 ── # 适合需要区分消息类型的场景 def on_tick(msg): print(f"TICK: {msg.topic} @ {json.loads(msg.payload)['price']}") def on_snap(snap): print(f"SNAPSHOT: {snap.topic} seq={snap.latest_sequence}") def on_disc(): print("disconnected from server") sub.on_message(on_tick) # 注册消息回调 sub.on_snapshot(on_snap) # 注册快照回调 sub.on_disconnect(on_disc) # 注册断连回调 # ── disconnect() ── await sub.disconnect()
──────────────────────────────────────────────────── class SurgeClient(host, port, token, auto_reconnect, reconnect_interval) ──────────────────────────────────────────────────── 全功能客户端,同时支持发布和订阅。 Full-featured client supporting both pub and sub. Publisher/Subscriber 内部都基于此类。 ──────────────────────────────────────────────────── from surge import SurgeClient client = SurgeClient( host="127.0.0.1", # 服务器地址 port=9800, # 服务器端口 token="surge_token_abc123", # 可选认证令牌 auto_reconnect=True, # 断线自动重连 (默认开启) reconnect_interval=1.0, # 重连间隔秒数 (默认 1.0) ) await client.connect() # ── publish(topic, payload, headers=None) ── await client.publish( "signal.buy", json.dumps({"symbol": "600519", "qty": 100}).encode(), headers={"strategy": "mean_reversion"} ) # ── subscribe(topics, snapshot=False) ── await client.subscribe(["md.sse.*"], snapshot=True) # ── unsubscribe(topics=None) ── # topics=None 取消全部订阅 await client.unsubscribe(["md.sse.*"]) await client.unsubscribe() # 取消全部 # ── messages() ── # 异步生成器,同时接收 SurgeMessage 和 SurgeSnapshot async for msg in client.messages(): if isinstance(msg, SurgeMessage): print(f"MSG: {msg.topic}") elif isinstance(msg, SurgeSnapshot): print(f"SNAP: {msg.topic}") # ── 回调注册 ── client.on_message(lambda m: print(m.topic)) client.on_snapshot(lambda s: print(s.topic)) client.on_disconnect(lambda: print("disconnected")) # ── disconnect() ── await client.disconnect()
──────────────────────────────────────────────────── class SurgeMessage ──────────────────────────────────────────────────── 订阅收到的实时消息对象。 Real-time message received from subscription. ──────────────────────────────────────────────────── msg.topic # str — Topic 名称, e.g. "md.sse.600519" msg.sequence # int — 全局递增序列号 msg.payload # bytes — 消息载荷 (你的业务数据) msg.timestamp # int — 发布时的纳秒时间戳 msg.headers # dict — 附加的 key-value 头信息 msg.latency_us # float — 端到端延迟 (微秒) # 使用示例 async for msg in sub.messages(): data = json.loads(msg.payload) print(f"[{msg.topic}] seq={msg.sequence} " f"latency={msg.latency_us:.0f}us price={data['price']}") ──────────────────────────────────────────────────── class SurgeSnapshot ──────────────────────────────────────────────────── 订阅时请求的快照消息。包含 topic 的最新状态。 Snapshot received when subscribing with snapshot=True. ──────────────────────────────────────────────────── snap.topic # str — Topic 名称 snap.latest_sequence # int — 最新序列号 snap.payload # bytes — 最新消息的载荷 # 使用示例: 订阅时先获取快照初始化状态 await sub.subscribe(["md.sse.600519"], snapshot=True) async for msg in sub.messages(): if isinstance(msg, SurgeSnapshot): state = json.loads(msg.payload) print(f"初始状态: {state}") else: print(f"实时更新: {json.loads(msg.payload)}")
──────────────────────────────────────────────────── 进阶用法 / Advanced Usage ──────────────────────────────────────────────────── # ── 1. Token 认证 ── # 服务端 surge.toml 配置: # [auth] # enabled = true # tokens = ["my_secret_token"] client = SurgeClient(token="my_secret_token") await client.connect() # connect() 内部自动完成认证握手 # ── 2. 自动重连 ── # 默认开启,断线后每 1 秒尝试重连 # 重连成功后自动恢复之前的全部订阅 client = SurgeClient( auto_reconnect=True, reconnect_interval=2.0, # 改为 2 秒 ) # 关闭自动重连 client = SurgeClient(auto_reconnect=False) # ── 3. 通配符订阅模式 ── await sub.subscribe(["md.sse.*"]) # 匹配 md.sse.600519, md.sse.000001 等 await sub.subscribe(["md.**"]) # 匹配 md.sse.600519, md.szse.300750 等 await sub.subscribe(["signal.*"]) # 匹配 signal.buy, signal.sell 等 # 新 topic 被发布时,通配符自动匹配新 topic # 无需重新订阅 # ── 4. 批量发布 (高吞吐场景) ── # 多条消息一次性发送,减少系统调用开销 symbols = ["600519", "000001", "300750", "601318"] batch = [ (f"md.sse.{sym}", json.dumps({"price": 10.0}).encode()) for sym in symbols ] await pub.send_batch(batch) # ── 5. 完整量化系统示例 ── # 行情源 → Surge → 策略引擎 → Surge → 执行引擎 # [行情源] 发布价格 await pub.send("md.sse.600519", tick_data) # [策略引擎] 订阅行情, 生成信号 async for msg in strategy_sub.messages(): signal = run_strategy(msg.payload) if signal: await signal_pub.send("signal.buy", signal) # [执行引擎] 订阅信号, 下单执行 async for msg in exec_sub.messages(): order = json.loads(msg.payload) execute_order(order)

下载 Surge Server

Download Surge Server

选择你的操作系统,下载后直接运行,无需安装任何依赖。

Choose your OS. Download, run — no dependencies required.

macOS

Apple Silicon & Intel

Apple Silicon & Intel

下载Download .tar surge-server-macos
🐧

Linux

x86_64 静态链接

x86_64 Static Linked

下载Download .bin surge-server-linux-x64
💻

Windows

x86_64

x86_64

下载Download .exe surge-server-windows-x64.exe

快速开始

Quick Start

# 1. Generate default config (optional, runs fine without it) $ ./surge-server --init # 2. Start the server $ ./surge-server # uses surge.toml if present, otherwise defaults # 3. Install Python SDK $ pip install surge-python # 4. Open dashboard $ open http://localhost:9803