金融行情 WebSocket 長連線:動態訂閱與心跳保活實戰分享

kalos
·
·
IPFS
·
在量化策略開發、實盤交易與歷史數據回測的場景之中,基於 WebSocket 的即時 Tick 數據串,是整套交易體系的核心數據基礎。但實務開發裡,連線假活、重連風暴、訂閱狀態不一致、髒數據滲入等屢見不鮮。本文結合實務專案的落地經驗,分享一套「單一長連線動態訂閱 + 獨立執行緒心跳偵測」的完整解決方案,附上可直接執行的 Python 程式碼,適用於量化數據介接、自動化交易工具、行情擷取服務等場景。

前言

在量化策略開發、實盤交易與歷史數據回測的場景之中,基於 WebSocket 的即時 Tick 數據串,是整套交易體系的核心數據基礎。長連線憑藉低延遲的特性,成為金融行情推送的主流選擇,但實務開發裡,連線假活、重連風暴、訂閱狀態不一致、髒數據滲入等問題屢見不鮮。

這些異常不僅會造成行情中斷,更會導致策略運算異常、回測結果失真。本文結合實務專案的落地經驗,分享一套「單一長連線動態訂閱 + 獨立執行緒心跳偵測」的完整解決方案,附上可直接執行的 Python 程式碼、場景配置規範與故障排除方法,適用於量化數據介接、自動化交易工具、行情擷取服務等開發場景。

一、場景說明與既有問題

進行量化研究時,我們往往需要同時接入股票、外匯、加密資產等多類行情標的,並且會依據回測標的池、實盤投資組合的調整,頻繁執行訂閱、取消訂閱等操作。

專案初期,我們採用「單一標的對應一條 WebSocket 連線」的傳統寫法,上線後陸續暴露出諸多穩定性隱憂:

  1. 批次調整標的時,連線反覆建立與銷毀,引發重連風暴,佔用大量網路與伺服器資源,數據串也隨之劇烈震盪;

  2. 出現幽靈訂閱:執行取消訂閱指令後,仍持續收到多餘的行情資料;

  3. 網路抖動、介面流量限制時,會產生連線假活狀態:程式判定連線正常,但行情推送已經停止,策略持續使用無效數據進行運算;

  4. 這類隱性故障難以透過一般日誌定位,大幅增加除錯與維運的難度。

為解決上述問題,我們重新架構整體邏輯,改以單一長連線統一管理所有標的,搭配獨立執行緒監控心跳狀態。改寫完成後,系統可穩定承接高頻 Tick 數據擷取、多標的並行回測與自動化策略執行。

二、長連線常見痛點整理

結合金融量化系統的開發與維運經驗,歸納出 WebSocket 長連線在行情場景下的四大典型問題,也是開發過程中必須優先處理的技術難點:

  1. 連線泛濫與重連風暴

    每增刪一筆標的就重新建立連線,批次操作時連線不斷握手、斷線,除了耗費系統資源,也會破壞行情數據的連續性,對於高頻策略的影響尤為明顯。

  2. 連線假活、隱性斷線難以察覺

    網路波動、伺服器限流時,WebSocket 不會自動觸發關閉回呼。程式層面顯示連線正常,實際數據傳輸已經中斷,這也是造成回測偏差、實盤訊號失效的主要原因之一。

  3. 訂閱狀態不同步

    短時間內連續發送訂閱、取消訂閱指令,容易產生請求競爭。本機記錄的標的清單,與伺服器真實的訂閱狀態出現落差,進而發生漏訂閱、重複接收數據等狀況。

  4. 邊界場景缺少驗證機制

    重複訂閱、空標的清單、非法代碼等無效請求,若未提前攔截,除了加重介面解析負擔,也會將髒數據導入量化模型,干擾正常運算邏輯。

三、整體技術方案

3.1 核心概念說明

動態增減訂閱:在已建立的單一 WebSocket 長連線內,透過標準介面指令更新標的代碼清單,完成訂閱新增與移除。此模式無須銷毀、重建連線,和輪詢式的 REST 介面有明顯區別,也是高頻即時行情接入的主流架構,能兼顧低延遲與數據連續性。

3.2 整體架構設計

整套方案拆分為兩個互相解耦的模組,分別負責訂閱管理與連線健康偵測,同時兼備彈性與穩定性:

  • 動態訂閱模組:透過標準指令在單一長連線內管理所有標的,從根源杜絕重連風暴,符合量化場景中標的池頻繁調整的需求;

  • 心跳偵測模組:啟用獨立守護執行緒專職監控心跳封包,與行情數據處理邏輯隔離。即便數據回呼發生阻塞,依舊可以精準判斷連線狀態。

3.3 典型場景與配置規範

彙整開發過程中最常使用的場景、介面設定與驗證標準,方便開發者進行介面對接、聯調與故障排查。

  1. 初始批次訂閱

    適用場景:一次性接入多組回測、實盤標的。

    潛在問題:逐筆建立連線,造成資源浪費。

    介面設定:cmd_id=22004、action=subscribe、code=[NASDAQ:AAPL,BTCUSDT]

    驗證規範:僅建立一條 WebSocket 連線,本機標的清單與伺服器訂閱內容完全同步。

  2. 增量新增訂閱

    適用場景:在既有標的池內,新增研究或交易標的。

    潛在問題:重新建立連線,造成數據串不穩定。

    介面設定:cmd_id=22004、action=subscribe、code=[EURUSD]

    驗證規範:原有連線持續執行,用戶端僅接收新增標的的行情資料。

  3. 移除指定訂閱

    適用場景:剔除無效回測標的、縮減實盤投資組合。

    潛在問題:取消訂閱後,仍持續接收多餘資料。

    介面設定:cmd_id=22004、action=unsubscribe、code=[NASDAQ:AAPL]

    驗證規範:本機移除對應標的代碼,伺服器停止推送該筆標的的數據。

  4. 重複發送訂閱請求

    適用場景:程式重試、邏輯冗餘導致重複呼叫。

    潛在問題:無效請求堆積,提高介面負載。

    介面設定:cmd_id=22004、action=subscribe、code=[BTCUSDT]

    驗證規範:本機執行去重機制,不向外發送重複指令。

  5. 空清單訂閱

    適用場景:程式異常,傳入空的標的集合。

    潛在問題:空指令引發用戶端、伺服器執行錯誤。

    介面設定:cmd_id=22004、action=subscribe、code=[]

    驗證規範:本機提前攔截,不發送網路請求。

3.4 Python 完整實作程式碼

程式相容一般 Python 開發環境,整合連線回呼、狀態管理、心跳監控、動態訂閱與數據驗證功能,可直接嵌入行情擷取工具、量化前置模組使用。

import websocket
import json
import time
import threading

# 本機訂閱集合,用於標的管理、去重與狀態同步
subscriptions = set()
# 記錄最後一次心跳接收時間戳,做為逾時判斷依據
last_heartbeat_time = time.time()

# 獨立執行緒執行心跳監控,與行情處理邏輯解耦
def heartbeat_monitor():
    global last_heartbeat_time
    while True:
        current_ts = time.time()
        time_gap = current_ts - last_heartbeat_time
        # 心跳逾時門檻:20秒,輪詢偵測間隔:5秒
        if time_gap > 20:
            print("告警:心跳逾時,行情連線發生異常")
            # 可擴充邏輯:主動斷線、觸發重連、暫停策略運算
            break
        time.sleep(5)

# 連線建立完成回呼,執行初始訂閱
def on_open(ws):
    global subscriptions
    init_codes = ["NASDAQ:AAPL", "BTCUSDT"]
    subscriptions.update(init_codes)
    # 組合標準訂閱指令
    sub_req = {
        "cmd_id": 22004,
        "action": "subscribe",
        "code": init_codes
    }
    ws.send(json.dumps(sub_req))
    print("初始行情標的訂閱完成")

# 訊息接收回呼,區分心跳封包與Tick數據,並進行數據過濾
def on_message(ws, message):
    global last_heartbeat_time
    if not message:
        return
    try:
        data = json.loads(message)
        # 更新心跳時間戳
        if data.get("type") == "heartbeat":
            last_heartbeat_time = time.time()
            return
        # 過濾空值與異常行情,確保量化數據品質
        if data.get("type") == "tick":
            code = data.get("code", "")
            price = data.get("price", 0)
            open_24h = data.get("open_24h", 0)
            if not code or price <= 0 or open_24h <= 0:
                return
            print(f"標的{code} 最新價格:{price}")
    except Exception as e:
        print(f"訊息解析異常:{str(e)}")

# 連線異常回呼
def on_error(ws, error):
    print(f"連線發生異常:{error}")

# 連線關閉回呼,清空本機訂閱狀態
def on_close(ws, close_status_code, close_msg):
    global subscriptions
    subscriptions.clear()
    print(f"連線已關閉,狀態碼:{close_status_code}")

# 動態新增訂閱標的
def add_subscribe(ws, code_list):
    global subscriptions
    # 過濾空值與重複標的
    new_codes = [code for code in code_list if code not in subscriptions and code]
    if not new_codes:
        return
    subscriptions.update(new_codes)
    req = {
        "cmd_id": 22004,
        "action": "subscribe",
        "code": new_codes
    }
    ws.send(json.dumps(req))

# 動態取消訂閱標的
def cancel_subscribe(ws, code_list):
    global subscriptions
    remove_codes = [code for code in subscriptions if code in code_list]
    if not remove_codes:
        return
    for code in remove_codes:
        subscriptions.discard(code)
    req = {
        "cmd_id": 22004,
        "action": "unsubscribe",
        "code": remove_codes
    }
    ws.send(json.dumps(req))

if __name__ == "__main__":
    # 股票行情 WebSocket 位址
    stock_wss_url = "wss://quote.alltick.co/quote-stock-b-ws-api?token=YOUR_TOKEN"
    # 外匯、加密資產行情 WebSocket 位址
    common_wss_url = "wss://quote.alltick.co/quote-b-ws-api?token=YOUR_TOKEN"

    # 初始化 WebSocket 連線實例
    ws_app = websocket.WebSocketApp(
        common_wss_url,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    # 啟動心跳監控執行緒
    threading.Thread(target=heartbeat_monitor, daemon=True).start()
    # 底層Ping輔助偵測,間隔10秒
    ws_app.run_forever(ping_interval=10)

3.5 開發維運常見問題與對策

依據長期上線運行的經驗,整理四類高頻故障,搭配現象描述、偵測方式與解決方案,降低回測與實盤的故障機率。

  1. 現象:高頻 Tick 數據大量湧入,訊息回呼壅塞,連帶造成心跳偵測延遲

    偵測方式:統計單筆數據的處理耗時,觀察心跳接收間隔是否超出正常範圍。

    解決對策:將心跳邏輯獨立執行緒運作,與行情處理完全分離;針對數據進行優先級分類,優先解析核心策略對應的標的資料。

  2. 現象:網路短暫抖動,連線假活,未觸發關閉回呼,但數據停止推送

    偵測方式:透過前後心跳封包的時間差,判斷是否觸發逾時門檻。

    解決對策:逾時後主動關閉失效連線,循序執行重連作業;重連期間暫停策略運算,避免使用無效數據。

  3. 現象:短時間內頻繁增刪標的,指令互相競爭,本機與伺服器訂閱狀態不一致

    偵測方式:發送指令前,比對本機標的集合與待執行清單。

    解決對策:使用集合結構管理訂閱狀態,增刪標的時同步更新;限制指令發送頻率,避免並行衝突。

  4. 現象:標的代碼輸入錯誤、傳入空字串,導致靜默訂閱失敗,無法收到行情

    偵測方式:依照介面規範驗證代碼格式,長時間未收到數據則判定為異常訂閱。

    解決對策:本機提前過濾空值與非法代碼;針對異常標的自動重新訂閱,維持標的池數據完整性。

3.6 功能邊界說明

本方案支援:在單一 WebSocket 長連線內,動態新增、移除標的代碼,彈性調整訂閱組合,滿足多標的量化組合的運用需求。

本方案不支援:跨多條連線同步訂閱狀態、回溯查詢歷史 Tick 數據,以及使用 cmd_id=22004 以外的私有指令執行訂閱操作。

四、方案落地效益

  1. 降低資源消耗,強化數據連續性

    以單一長連線取代多條連線架構,大幅減少 TCP 握手、連線銷毀帶來的網路額外負荷,徹底解決數據串震盪問題,為高頻策略、多標的組合回測提供穩定的原始數據。

  2. 提升數據品質,減少模型誤差

    透過前置驗證機制,攔截重複請求、空值、非法內容,從來源隔離髒數據,降低異常資料對量化模型的干擾,提高回測結果的可信度。

  3. 簡化維運作業,加快故障排查

    心跳逾時、訂閱異常等事件都會留下完整日誌,過去難以定位的隱性斷線問題,現在可以快速溯源,減少量化系統的維運成本。

  4. 契合量化迭代節奏

    動態訂閱機制可隨時調整標的池,符合研究者反覆測試策略、優化投資組合的工作模式,提升整體研發效率。

交流討論

在金融 WebSocket 長連線的開發與運維過程中,心跳門檻調校、重連邏輯設計、多連線集中監控等議題,都會直接影響系統穩定性。歡迎各位開發者、量化研究者分享實務經驗與優化思路。


CC BY-NC-ND 4.0 授权

喜欢我的作品吗?别忘了给予支持与赞赏,让我知道在创作的路上有你陪伴,一起延续这份热忱!