[踩坑] A 股开盘把 Python 搞挂了,怒切 Go 重写行情网关 (附 pprof 分析 + 源码)

查看 31|回复 3
作者:Howiee   
💥 事故现场
LZ 所在的量化小厂,早期基础设施全是 Python (Asyncio) 一把梭。 跑美股( US )的时候相安无事,毕竟 Tick 流是均匀的。 上周策略组说要加 A 股 (CN) 和 外汇 (FX) 做宏观对冲,我就按老套路接了数据源。
结果上线第一天 9:30 就炸了。 监控报警 CPU 100%,接着就是 TCP Recv-Q 堆积,最后直接断连。 策略端收到行情的时候,黄花菜都凉了(延迟 > 500ms )。
🔍 排查过程 (Post-Mortem)
被 Leader 骂完后,挂了 py-spy 看火焰图,发现两个大坑:
Snapshot 脉冲:A 股跟美股不一样,它是 3 秒一次的全市场快照。几千只股票的数据在同一毫秒涌进来,瞬间流量是平时的几十倍。
GIL + GC 混合双打:
json.loads 是 CPU 密集型,把 GIL 锁死了,网络线程根本抢不到 CPU 读数据。
短时间生成大量 dict 对象,触发 Python 频繁 GC ,Stop-the-world 。
🛠️ 架构重构 (Python -> Go)
为了保住饭碗,连夜决定把 Feed Handler 层剥离出来用 Go 重写。 目标很明确:扛住 A 股脉冲,把数据洗干净,再喂给 Python 策略。
架构逻辑:WebSocket (Unified API) -> Go Channel (Buffer) -> Worker Pool (Sonic Decode) -> Shm/ZMQ
为什么用 Go ?
Goroutine:几 KB 开销,随开随用。
Channel:天然的队列,做 Buffer 抗脉冲神器。
Sonic:字节开源的 JSON 库,带 SIMD 加速,比标准库快 2-3 倍(这个是关键)。
💻 Show me the code
为了解决 协议异构( A 股 CTP 、美股 FIX 、外汇 MT4 ),我接了个聚合源( TickDB ),把全市场数据洗成了统一的 JSON 。这样 Go 这边只用维护一个 Struct 。
以下是脱敏后的核心代码,复制可跑(需 go get 依赖)。
package main
import (
        "fmt"
        "log"
        "runtime"
        "time"
        "github.com/bytedance/sonic" // 字节的库,解析速度吊打 encoding/json
        "github.com/gorilla/websocket"
)
// 防爬虫/防风控,URL 拆一下
const (
        Host = "api.tickdb.ai"
        Path = "/v1/realtime"
        // Key 是薅的试用版,大家拿去压测没问题
        Key  = "?api_key=YOUR_V2EX_KEY"
)
// 内存对齐优化:把同类型字段放一起
type MarketTick struct {
        Cmd  string `json:"cmd"`
        Data struct {
                Symbol    string `json:"symbol"`
                LastPrice string `json:"last_price"` // 价格统一 string ,下游处理精度
                Volume    string `json:"volume_24h"`
                Timestamp int64  `json:"timestamp"`  // 8 byte
                Market    string `json:"market"`     // CN/US/HK/FX
        } `json:"data"`
}
func main() {
        // 1. 跑满多核,别浪费 AWS 的 CPU
        runtime.GOMAXPROCS(runtime.NumCPU())
        url := "wss://" + Host + Path + Key
        conn, _, err := websocket.DefaultDialer.Dial(url, nil)
        if err != nil {
                log.Fatal("Dial err:", err)
        }
        defer conn.Close()
        // 2. 订阅指令
        // 重点测试:A 股(脉冲) + 贵金属(高频) + 美股/港股
        subMsg := `{
                "cmd": "subscribe",
                "data": {
                        "channel": "ticker",
                        "symbols": [
                                "600519.SH", "000001.SZ",   // A 股:茅台、平安 (9:30 压力源)
                                "XAUUSD", "USDJPY",         // 外汇:黄金、日元 (高频源)
                                "NVDA.US", "AAPL.US",       // 美股:英伟达
                                "00700.HK", "09988.HK",     // 港股:腾讯
                                "BTCUSDT"                   // Crypto:拿来跑 7x24h 稳定性的
                        ]
                }
        }`
        if err := conn.WriteMessage(websocket.TextMessage, []byte(subMsg)); err != nil {
                log.Fatal("Sub err:", err)
        }
        fmt.Println(">>> Go Engine Started...")
        // 3. Ring Buffer
        // 关键点:8192 的缓冲,专门为了吃下 A 股的瞬间脉冲
        dataChan := make(chan []byte, 8192)
        // 4. Worker Pool
        // 经验值:CPU 核数 * 2
        workerNum := runtime.NumCPU() * 2
        for i := 0; i < workerNum; i++ {
                go worker(i, dataChan)
        }
        // 5. Producer Loop (IO Bound)
        // 只管读,读到就扔 Channel ,绝对不阻塞
        for {
                _, msg, err := conn.ReadMessage()
                if err != nil {
                        log.Println("Read err:", err)
                        break
                }
                dataChan <- msg
        }
}
// Consumer (CPU Bound)
func worker(id int, ch <-chan []byte) {
        var tick MarketTick
        for msg := range ch {
                // 用 Sonic 解析,性能起飞
                if err := sonic.Unmarshal(msg, &tick); err != nil {
                        continue
                }
                if tick.Cmd == "ticker" {
                        // 简单的监控:全链路延迟
                        latency := time.Now().UnixMilli() - tick.Data.Timestamp
                       
                        // 抽样打印
                        if id == 0 {
                                fmt.Printf("[%s] %-8s | Price: %s | Lat: %d ms\n",
                                        tick.Data.Market, tick.Data.Symbol, tick.Data.LastPrice, latency)
                        }
                }
        }
}
📊 Benchmark (实测数据)
环境:AWS c5.xlarge (4C 8G),订阅 500 个活跃 Symbol 。 复现了 9:30 A 股开盘 + 非农数据公布 的混合场景。
指标,Python (Asyncio),Go (Sonic + Channel),评价
P99 Latency,480ms+,< 4ms,简直是降维打击
Max Jitter,1.2s (GC Stop),15ms,终于不丢包了
CPU Usage,98% (单核打满),18% (多核均衡),机器都不怎么转
Mem,800MB,60MB,省下来的内存可以多跑个回测
📝 几点心得
术业有专攻:Python 做策略逻辑开发是无敌的,但这种 I/O + CPU 混合密集型的接入层,还是交给 Go/Rust 吧,别头铁。
别造轮子:之前想自己写 CTP 和 FIX 的解析器,写了一周只想跑路。后来切到 TickDB 这种 Unified API ,把脏活外包出去,瞬间清爽了。
Sonic 是神器:如果你的 Go 程序瓶颈在 JSON ,无脑换 bytedance/sonic ,立竿见影。
代码大家随便拿去改,希望能帮到同样被 Python 延迟折磨的兄弟。 (Key 是试用版的,别拿去跑大资金实盘哈,被限流了别找我)

Python, Go, A股

zoharSoul   
难点在于量化吧
这种优化的场景还是很简单的, 不管是 go 还是什么的都 ok
balckcloud37   
其实只是受不了 gc 的话,disable gc 再手动 gc 就好了
另外如果项目里没有 circular ref ,直接不 gc 也行
encro   
a 股 ctp 接口哪家好用,需要什么开通条件呢。
您需要登录后才可以回帖 登录 | 立即注册

返回顶部