Перейти к содержимому

Глава 2: Рыночные данные криптовалют: книги ордеров, свечи и ончейн-сигналы

Визуализация книги ордеров

Обзор

Рыночные данные — это жизненная сила каждой алгоритмической торговой системы. На криптовалютных рынках разнообразие и объём доступных данных значительно превышают то, с чем сталкиваются трейдеры на традиционных фондовых рынках. Одна биржа, такая как Bybit, генерирует миллионы обновлений книги ордеров, исполнений сделок и снимков ставки финансирования каждый день по сотням бессрочных фьючерсов и спотовых пар. В отличие от фондовых рынков, где провайдеры консолидированной ленты агрегируют данные с множества площадок, криптотрейдеры должны строить собственную инфраструктуру данных — подключаясь к WebSocket-потокам бирж, нормализуя разнородные форматы и сохраняя терабайты тиковой информации для бэктестирования и обучения моделей.

Анатомия рыночных данных криптовалют простирается далеко за пределы традиционных цен и объёмов. Данные глубины книги ордеров (Level 2) раскрывают ландшафт спроса и предложения на каждом ценовом уровне, позволяя стратегиям прогнозировать краткосрочные движения цен на основе дисбалансов потока ордеров. Ставки финансирования — уникальные для бессрочных фьючерсов — предоставляют датчик рыночных настроений и позиционирования с кредитным плечом в реальном времени. Ончейн-данные добавляют совершенно другое измерение: блокчейн-транзакции, взаимодействия со смарт-контрактами и сетевые метрики предлагают фундаментальные сигналы, не имеющие аналогов в традиционных финансах. Проблема не в поиске данных, а в построении конвейеров, способных эффективно принимать, очищать и хранить их в масштабе.

Эта глава предоставляет исчерпывающее руководство по построению конвейера рыночных данных криптовалют производственного уровня. Мы охватываем WebSocket и REST API Bybit для данных в реальном времени и исторических данных, демонстрируем, как строить OHLCV-свечи из сырых потоков сделок, и исследуем ончейн-метрики как дополнительный источник данных. Реализации охватывают Python для быстрого прототипирования и Rust для высокопроизводительных продакшн-систем, используя Apache Parquet для эффективного колоночного хранения. К концу этой главы у вас будет инфраструктура для сбора, хранения и обслуживания данных, питающих каждую стратегию в последующих главах.

Содержание

  1. Введение в рыночные данные криптовалют
  2. Математические основы: динамика книги ордеров
  3. Сравнение источников данных и форматов
  4. Торговые приложения рыночных данных
  5. Реализация на Python
  6. Реализация на Rust
  7. Практические примеры
  8. Фреймворк бэктестирования
  9. Оценка производительности
  10. Направления будущего развития

Раздел 1: Введение в рыночные данные криптовалют

Анатомия книги ордеров криптобиржи

Книга ордеров — это центральная структура данных любой биржи. Она поддерживает два отсортированных списка: биды (ордера на покупку, отсортированные по убыванию цены) и аски (ордера на продажу, отсортированные по возрастанию цены). Разница между лучшим аском и лучшим бидом — это спред бид-аск, прямая мера ликвидности рынка и транзакционных издержек.

На Bybit книга ордеров для бессрочных фьючерсов BTCUSDT обычно показывает 200+ ценовых уровней с каждой стороны, с обновлениями, поступающими через WebSocket со скоростью более 100 сообщений в секунду в волатильные периоды. Понимание динамики книги ордеров необходимо для алгоритмов исполнения и моделей прогнозирования на коротких горизонтах.

Ключевая терминология

  • Книга ордеров (Order Book): Запись в реальном времени всех активных лимитных ордеров на каждом ценовом уровне для торговой пары.
  • Спред бид-аск (Bid-Ask Spread): Разница между наименьшей ценой аска и наибольшей ценой бида; представляет стоимость немедленности.
  • Тиковые данные (Tick Data): Наиболее гранулярный уровень рыночных данных — каждое отдельное изменение сделки или книги ордеров с точностью до миллисекунды.
  • OHLCV: Бары Открытие-Максимум-Минимум-Закрытие-Объём, агрегированные по временному интервалу (1м, 5м, 1ч, 1д).
  • Данные Level I: Только лучшие цены и объёмы бида и аска (вершина книги).
  • Данные Level II: Полная глубина книги ордеров, показывающая все ценовые уровни и их агрегированные объёмы.
  • Цена маркировки (Mark Price): Справедливая цена, рассчитанная из множества источников, используемая Bybit для ликвидаций и расчётов финансирования во избежание манипуляций.
  • Индексная цена (Index Price): Составная цена, полученная из нескольких спотовых бирж, служащая эталоном для расчётов ставки финансирования.
  • Открытый интерес (Open Interest): Общее количество непогашенных деривативных контрактов.
  • Ставка финансирования (Funding Rate): Периодический платёж между держателями длинных и коротких позиций бессрочных фьючерсов.
  • Протокол FIX: Протокол обмена финансовой информацией, стандарт электронной торговли на традиционных рынках (менее распространён в крипте).

REST vs WebSocket для сбора данных

Существуют два основных метода сбора данных с криптобирж:

REST API: Модель запрос-ответ. Клиент отправляет HTTP-запросы и получает JSON-ответы. Лучше всего подходит для запросов исторических данных, управления аккаунтом и размещения ордеров. REST API Bybit поддерживает эндпоинты /v5/market/kline для исторических свечей и /v5/market/orderbook для снимков книги ордеров.

WebSocket API: Постоянное двунаправленное соединение. Сервер отправляет обновления клиенту в реальном времени. Необходим для потоков книги ордеров, потоков сделок и обновлений ставки финансирования. WebSocket-эндпоинт Bybit предоставляет топики orderbook.200.BTCUSDT для глубины книги ордеров и publicTrade.BTCUSDT для отдельных сделок.

Хранение данных: CSV, HDF5 и Parquet

  • CSV: Человекочитаемый, универсально совместимый, но медленный для чтения/записи и неэффективный по хранению для больших наборов данных.
  • HDF5: Иерархический формат данных, хорош для числовых массивов со сжатием. Поддерживает произвольный доступ, но может иметь проблемы с конкурентным доступом.
  • Parquet: Apache Parquet — колоночный формат хранения, оптимизированный для аналитических запросов. Предлагает отличные степени сжатия (10-20x по сравнению с CSV), быстрое чтение на уровне столбцов и нативную поддержку в pandas, polars и Apache Arrow. Это рекомендуемый формат для рыночных данных криптовалют.

Потоковая передача данных WebSocket

Раздел 2: Математические основы: динамика книги ордеров

Дисбаланс книги ордеров

Дисбаланс книги ордеров (OBI) количественно оценивает асимметрию между давлением покупки и продажи:

OBI = (V_bid - V_ask) / (V_bid + V_ask)

Где V_bid — общий объём на стороне бидов (через N уровней), а V_ask — общий объём на стороне асков. OBI варьируется от -1 (всё давление на продажу) до +1 (всё давление на покупку). Положительный OBI предсказывает краткосрочное восходящее движение цены.

Средневзвешенная по объёму цена (VWAP)

VWAP = Σ(P_i × V_i) / Σ(V_i)

Где P_i и V_i — цена и объём i-й сделки. VWAP служит как бенчмарком, так и целью исполнения для алгоритмических трейдеров.

Дисбаланс торгового потока

TFI = (V_buy - V_sell) / (V_buy + V_sell)

Где V_buy — объём сделок, инициированных покупателями, а V_sell — объём сделок, инициированных продавцами, классифицированных по алгоритму Ли-Реди (сравнение цены сделки с серединой текущего спреда бид-аск).

Расчёт ставки финансирования

На Bybit ставка финансирования рассчитывается как:

Funding Rate = Average Premium Index + clamp(Interest Rate - Average Premium Index, -0.05%, 0.05%)

Где индекс премии отражает базис между ценой бессрочного контракта и спотовой ценой:

Premium Index = [Max(0, Impact Bid Price - Index Price) - Max(0, Index Price - Impact Ask Price)] / Index Price

Построение OHLCV из сырых сделок

Для потока сделок {(t_i, p_i, v_i)} бары OHLCV для интервала [T_start, T_end):

Open = p_j где j = argmin(t_i) для t_i в [T_start, T_end)
High = max(p_i) для t_i в [T_start, T_end)
Low = min(p_i) для t_i в [T_start, T_end)
Close = p_k где k = argmax(t_i) для t_i в [T_start, T_end)
Volume = Σ(v_i) для t_i в [T_start, T_end)

Раздел 3: Сравнение источников данных и форматов

ХарактеристикаBybit REST APIBybit WebSocketОнчейн (блокчейн)yfinance
Тип данныхИсторические OHLCV, снимкиПотоки в реальном времениТранзакции, балансыИсторические OHLCV
Задержка50-200мс за запрос<10мс push1-15 минут (время блока)Н/Д (пакетный)
ГранулярностьСвечи от 1м до 1МУровень тикаУровень транзакции1м до 1д
Лимиты запросов120 зап./5с (GET)500 подписокЗависит от RPC~2000/час
ПокрытиеТолько пары BybitТолько пары BybitВся ончейн-активностьМульти-биржевые крипто + акции
СтоимостьБесплатноБесплатноПлата за ноду или провайдераБесплатно
Лучше всего дляДанные бэктестированияЖивая торговляФундаментальный анализКросс-активные исследования
Формат данныхJSONJSONРазличные (JSON-RPC, GraphQL)pandas DataFrame

Сравнение форматов хранения

ФорматСжатиеСкорость чтенияСкорость записиПоддержка схемыЛучший случай использования
CSVНет (1x)МедленнаяМедленнаяНетМалые наборы данных, отладка
JSONНет (1x)МедленнаяМедленнаяНетAPI-ответы, конфиги
HDF5Хорошее (5-10x)БыстраяБыстраяДаЧисловые массивы
ParquetОтличное (10-20x)Очень быстраяБыстраяДаПродакшн-хранение данных
TimescaleDBХорошее (5-8x)Быстрая (SQL)УмереннаяДаЗапросы временных рядов

Раздел 4: Торговые приложения рыночных данных

4.1 Стратегии на основе книги ордеров

Данные книги ордеров позволяют реализовать несколько категорий стратегий:

  • Микроструктурная альфа: Прогнозирование краткосрочного направления цены по дисбалансам потока ордеров
  • Маркет-мейкинг: Размещение лимитных ордеров с обеих сторон, извлечение прибыли из спреда при управлении инвентарным риском
  • Обнаружение айсбергов: Выявление крупных скрытых ордеров через распознавание паттернов в обновлениях книги ордеров
  • Обнаружение спуфинга: Выявление и обход ценовых уровней с вероятными фальшивыми ордерами

4.2 Технический анализ на основе OHLCV

Свечные данные поддерживают классические технические стратегии, адаптированные для крипто:

  • Следование за трендом: Пересечения скользящих средних на 4-часовых и дневных таймфреймах
  • Обнаружение пробоев: Пробои с подтверждением объёмом выше/ниже ключевых уровней
  • Распознавание паттернов: Свечные паттерны (поглощение, доджи, молот) с оценками надёжности, специфичными для крипто
  • Мультитаймфреймовый анализ: Комбинирование сигналов с баров 1м, 5м, 1ч и 4ч

4.3 Стратегии на ставках финансирования

Ставки финансирования предлагают уникальные торговые сигналы:

  • Арбитраж ставки финансирования: Дельта-нейтральная стратегия для захвата платежей финансирования
  • Экстремальная ставка как контрарный сигнал: Очень высокая положительная ставка часто предшествует коррекциям
  • Моментум ставки финансирования: Устойчивое направление ставки сигнализирует о продолжении тренда

4.4 Интеграция ончейн-данных

Блокчейн-данные предоставляют фундаментальные сигналы:

  • Притоки на биржи: Крупные переводы на биржи часто предшествуют давлению продаж
  • Активные адреса: Рост сетевой активности коррелирует с ростом цены
  • Хэшрейт: Активность майнинга как прокси для безопасности сети и настроений майнеров
  • Коэффициент NVT: Отношение стоимости сети к транзакциям — криптоаналог P/E

4.5 Синтез сигналов из разных источников

Наиболее качественные сигналы комбинируют множество источников данных:

  • Дисбаланс книги ордеров + направление ставки финансирования для тайминга входа
  • Движения «китов» в ончейне + притоки на биржи для определения размера позиции
  • Моментум OHLCV + изменения открытого интереса для подтверждения тренда

Архитектура пайплайна данных

Раздел 5: Реализация на Python

import numpy as np
import pandas as pd
import requests
import json
import asyncio
import websockets
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq
@dataclass
class OrderBookLevel:
"""Single price level in the order book."""
price: float
size: float
@dataclass
class OrderBook:
"""Full order book snapshot."""
symbol: str
timestamp: int
bids: List[OrderBookLevel] = field(default_factory=list)
asks: List[OrderBookLevel] = field(default_factory=list)
@property
def best_bid(self) -> float:
return self.bids[0].price if self.bids else 0.0
@property
def best_ask(self) -> float:
return self.asks[0].price if self.asks else 0.0
@property
def mid_price(self) -> float:
return (self.best_bid + self.best_ask) / 2
@property
def spread(self) -> float:
return self.best_ask - self.best_bid
@property
def spread_bps(self) -> float:
return (self.spread / self.mid_price) * 10000
def imbalance(self, levels: int = 5) -> float:
"""Order book imbalance across N levels."""
bid_vol = sum(b.size for b in self.bids[:levels])
ask_vol = sum(a.size for a in self.asks[:levels])
total = bid_vol + ask_vol
if total == 0:
return 0.0
return (bid_vol - ask_vol) / total
class OHLCVBuilder:
"""Build OHLCV candles from raw trade data."""
def __init__(self, interval_seconds: int = 60):
self.interval = interval_seconds
self.trades: List[Dict] = []
def add_trade(self, timestamp: int, price: float, volume: float, side: str):
self.trades.append({
"timestamp": timestamp,
"price": price,
"volume": volume,
"side": side,
})
def build(self) -> pd.DataFrame:
if not self.trades:
return pd.DataFrame()
df = pd.DataFrame(self.trades)
df["datetime"] = pd.to_datetime(df["timestamp"], unit="ms")
df = df.set_index("datetime")
ohlcv = df["price"].resample(f"{self.interval}s").ohlc()
ohlcv["volume"] = df["volume"].resample(f"{self.interval}s").sum()
ohlcv = ohlcv.dropna()
return ohlcv
class BybitMarketData:
"""Fetches market data from Bybit REST API."""
BASE_URL = "https://api.bybit.com"
def __init__(self):
self.session = requests.Session()
def get_orderbook(self, symbol: str = "BTCUSDT",
limit: int = 50) -> OrderBook:
"""Fetch order book snapshot."""
endpoint = f"{self.BASE_URL}/v5/market/orderbook"
params = {"category": "linear", "symbol": symbol, "limit": limit}
resp = self.session.get(endpoint, params=params).json()
result = resp["result"]
ob = OrderBook(symbol=symbol, timestamp=result["ts"])
ob.bids = [OrderBookLevel(float(p), float(s)) for p, s in result["b"]]
ob.asks = [OrderBookLevel(float(p), float(s)) for p, s in result["a"]]
return ob
def get_klines(self, symbol: str = "BTCUSDT",
interval: str = "60", limit: int = 200) -> pd.DataFrame:
"""Fetch OHLCV kline data."""
endpoint = f"{self.BASE_URL}/v5/market/kline"
params = {
"category": "linear", "symbol": symbol,
"interval": interval, "limit": limit,
}
resp = self.session.get(endpoint, params=params).json()
rows = resp["result"]["list"]
df = pd.DataFrame(rows, columns=[
"timestamp", "open", "high", "low", "close", "volume", "turnover"
])
for col in ["open", "high", "low", "close", "volume", "turnover"]:
df[col] = df[col].astype(float)
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms")
return df.sort_values("timestamp").reset_index(drop=True)
def get_trades(self, symbol: str = "BTCUSDT",
limit: int = 1000) -> pd.DataFrame:
"""Fetch recent public trades."""
endpoint = f"{self.BASE_URL}/v5/market/recent-trade"
params = {"category": "linear", "symbol": symbol, "limit": limit}
resp = self.session.get(endpoint, params=params).json()
trades = resp["result"]["list"]
df = pd.DataFrame(trades)
df["price"] = df["price"].astype(float)
df["size"] = df["size"].astype(float)
df["time"] = pd.to_datetime(df["time"].astype(int), unit="ms")
return df
def get_funding_history(self, symbol: str = "BTCUSDT",
limit: int = 200) -> pd.DataFrame:
"""Fetch historical funding rates."""
endpoint = f"{self.BASE_URL}/v5/market/funding/history"
params = {"category": "linear", "symbol": symbol, "limit": limit}
resp = self.session.get(endpoint, params=params).json()
df = pd.DataFrame(resp["result"]["list"])
df["fundingRate"] = df["fundingRate"].astype(float)
df["fundingRateTimestamp"] = pd.to_datetime(
df["fundingRateTimestamp"].astype(int), unit="ms"
)
return df.sort_values("fundingRateTimestamp").reset_index(drop=True)
def get_open_interest(self, symbol: str = "BTCUSDT",
interval: str = "1h",
limit: int = 200) -> pd.DataFrame:
"""Fetch historical open interest."""
endpoint = f"{self.BASE_URL}/v5/market/open-interest"
params = {
"category": "linear", "symbol": symbol,
"intervalTime": interval, "limit": limit,
}
resp = self.session.get(endpoint, params=params).json()
df = pd.DataFrame(resp["result"]["list"])
df["openInterest"] = df["openInterest"].astype(float)
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms")
return df.sort_values("timestamp").reset_index(drop=True)
class ParquetStorage:
"""Efficient storage using Apache Parquet."""
def __init__(self, base_dir: str = "./data"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
def save_ohlcv(self, df: pd.DataFrame, symbol: str, interval: str):
path = self.base_dir / f"{symbol}_{interval}_ohlcv.parquet"
table = pa.Table.from_pandas(df)
pq.write_table(table, path, compression="snappy")
def load_ohlcv(self, symbol: str, interval: str) -> pd.DataFrame:
path = self.base_dir / f"{symbol}_{interval}_ohlcv.parquet"
return pq.read_table(path).to_pandas()
def save_trades(self, df: pd.DataFrame, symbol: str, date: str):
path = self.base_dir / f"{symbol}_trades_{date}.parquet"
table = pa.Table.from_pandas(df)
pq.write_table(table, path, compression="zstd")
def save_orderbook_snapshot(self, ob: OrderBook):
data = {
"timestamp": [ob.timestamp],
"best_bid": [ob.best_bid],
"best_ask": [ob.best_ask],
"mid_price": [ob.mid_price],
"spread_bps": [ob.spread_bps],
"imbalance_5": [ob.imbalance(5)],
}
df = pd.DataFrame(data)
path = self.base_dir / f"{ob.symbol}_orderbook.parquet"
table = pa.Table.from_pandas(df)
if path.exists():
existing = pq.read_table(path)
table = pa.concat_tables([existing, table])
pq.write_table(table, path, compression="snappy")
# WebSocket streaming example
async def stream_orderbook(symbol: str = "BTCUSDT", duration_seconds: int = 60):
"""Stream order book updates via Bybit WebSocket."""
url = "wss://stream.bybit.com/v5/public/linear"
async with websockets.connect(url) as ws:
subscribe_msg = {
"op": "subscribe",
"args": [f"orderbook.50.{symbol}"]
}
await ws.send(json.dumps(subscribe_msg))
start_time = datetime.now()
updates = []
while (datetime.now() - start_time).seconds < duration_seconds:
msg = await ws.recv()
data = json.loads(msg)
if "data" in data:
updates.append(data)
return updates
if __name__ == "__main__":
client = BybitMarketData()
# Fetch and display order book
ob = client.get_orderbook("BTCUSDT", 25)
print(f"=== BTCUSDT Order Book ===")
print(f"Best Bid: {ob.best_bid}, Best Ask: {ob.best_ask}")
print(f"Spread: {ob.spread:.2f} ({ob.spread_bps:.1f} bps)")
print(f"Imbalance (5 levels): {ob.imbalance(5):.4f}")
# Fetch trades and build candles
trades = client.get_trades("BTCUSDT", 1000)
print(f"\nFetched {len(trades)} recent trades")
# Store to Parquet
storage = ParquetStorage("./data")
klines = client.get_klines("BTCUSDT", "60", 200)
storage.save_ohlcv(klines, "BTCUSDT", "1h")
print(f"Saved {len(klines)} candles to Parquet")

Раздел 6: Реализация на Rust

use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use futures_util::{StreamExt, SinkExt};
use std::collections::BTreeMap;
#[derive(Debug, Deserialize)]
struct BybitResponse<T> {
#[serde(rename = "retCode")]
ret_code: i32,
result: T,
}
#[derive(Debug, Deserialize)]
struct KlineResult {
list: Vec<Vec<String>>,
}
#[derive(Debug, Deserialize)]
struct OrderBookResult {
s: String,
b: Vec<Vec<String>>,
a: Vec<Vec<String>>,
ts: u64,
}
#[derive(Debug, Deserialize)]
struct TradeResult {
list: Vec<TradeEntry>,
}
#[derive(Debug, Deserialize)]
struct TradeEntry {
#[serde(rename = "execId")]
exec_id: String,
symbol: String,
price: String,
size: String,
side: String,
time: String,
}
#[derive(Debug, Clone)]
struct OhlcvBar {
timestamp: i64,
open: f64,
high: f64,
low: f64,
close: f64,
volume: f64,
}
#[derive(Debug, Clone)]
struct OrderBookLevel {
price: f64,
size: f64,
}
#[derive(Debug)]
struct OrderBook {
symbol: String,
timestamp: u64,
bids: Vec<OrderBookLevel>,
asks: Vec<OrderBookLevel>,
}
impl OrderBook {
fn mid_price(&self) -> f64 {
if self.bids.is_empty() || self.asks.is_empty() {
return 0.0;
}
(self.bids[0].price + self.asks[0].price) / 2.0
}
fn spread(&self) -> f64 {
if self.bids.is_empty() || self.asks.is_empty() {
return 0.0;
}
self.asks[0].price - self.bids[0].price
}
fn spread_bps(&self) -> f64 {
let mid = self.mid_price();
if mid == 0.0 { return 0.0; }
(self.spread() / mid) * 10000.0
}
fn imbalance(&self, levels: usize) -> f64 {
let bid_vol: f64 = self.bids.iter().take(levels).map(|l| l.size).sum();
let ask_vol: f64 = self.asks.iter().take(levels).map(|l| l.size).sum();
let total = bid_vol + ask_vol;
if total == 0.0 { return 0.0; }
(bid_vol - ask_vol) / total
}
}
struct OhlcvBuilder {
interval_ms: i64,
trades: Vec<(i64, f64, f64)>,
}
impl OhlcvBuilder {
fn new(interval_seconds: i64) -> Self {
Self {
interval_ms: interval_seconds * 1000,
trades: Vec::new(),
}
}
fn add_trade(&mut self, timestamp: i64, price: f64, volume: f64) {
self.trades.push((timestamp, price, volume));
}
fn build(&self) -> Vec<OhlcvBar> {
if self.trades.is_empty() {
return Vec::new();
}
let mut sorted = self.trades.clone();
sorted.sort_by_key(|t| t.0);
let mut bars: BTreeMap<i64, OhlcvBar> = BTreeMap::new();
for (ts, price, volume) in &sorted {
let bar_ts = (*ts / self.interval_ms) * self.interval_ms;
let bar = bars.entry(bar_ts).or_insert(OhlcvBar {
timestamp: bar_ts,
open: *price,
high: *price,
low: *price,
close: *price,
volume: 0.0,
});
bar.high = bar.high.max(*price);
bar.low = bar.low.min(*price);
bar.close = *price;
bar.volume += volume;
}
bars.into_values().collect()
}
}
struct BybitClient {
client: Client,
base_url: String,
}
impl BybitClient {
fn new() -> Self {
Self {
client: Client::new(),
base_url: "https://api.bybit.com".to_string(),
}
}
async fn get_orderbook(&self, symbol: &str, limit: u32)
-> Result<OrderBook, Box<dyn std::error::Error>>
{
let url = format!("{}/v5/market/orderbook", self.base_url);
let resp = self.client.get(&url)
.query(&[
("category", "linear"),
("symbol", symbol),
("limit", &limit.to_string()),
])
.send().await?;
let body: BybitResponse<OrderBookResult> = resp.json().await?;
let bids = body.result.b.iter().map(|l| OrderBookLevel {
price: l[0].parse().unwrap_or(0.0),
size: l[1].parse().unwrap_or(0.0),
}).collect();
let asks = body.result.a.iter().map(|l| OrderBookLevel {
price: l[0].parse().unwrap_or(0.0),
size: l[1].parse().unwrap_or(0.0),
}).collect();
Ok(OrderBook {
symbol: symbol.to_string(),
timestamp: body.result.ts,
bids,
asks,
})
}
async fn get_klines(&self, symbol: &str, interval: &str, limit: u32)
-> Result<Vec<OhlcvBar>, Box<dyn std::error::Error>>
{
let url = format!("{}/v5/market/kline", self.base_url);
let resp = self.client.get(&url)
.query(&[
("category", "linear"),
("symbol", symbol),
("interval", interval),
("limit", &limit.to_string()),
])
.send().await?;
let body: BybitResponse<KlineResult> = resp.json().await?;
let bars = body.result.list.iter().map(|row| OhlcvBar {
timestamp: row[0].parse().unwrap_or(0),
open: row[1].parse().unwrap_or(0.0),
high: row[2].parse().unwrap_or(0.0),
low: row[3].parse().unwrap_or(0.0),
close: row[4].parse().unwrap_or(0.0),
volume: row[5].parse().unwrap_or(0.0),
}).collect();
Ok(bars)
}
async fn get_recent_trades(&self, symbol: &str, limit: u32)
-> Result<Vec<TradeEntry>, Box<dyn std::error::Error>>
{
let url = format!("{}/v5/market/recent-trade", self.base_url);
let resp = self.client.get(&url)
.query(&[
("category", "linear"),
("symbol", symbol),
("limit", &limit.to_string()),
])
.send().await?;
let body: BybitResponse<TradeResult> = resp.json().await?;
Ok(body.result.list)
}
}
async fn stream_orderbook_ws(symbol: &str, duration_secs: u64)
-> Result<Vec<String>, Box<dyn std::error::Error>>
{
let url = "wss://stream.bybit.com/v5/public/linear";
let (mut ws_stream, _) = connect_async(url).await?;
let subscribe = serde_json::json!({
"op": "subscribe",
"args": [format!("orderbook.50.{}", symbol)]
});
ws_stream.send(Message::Text(subscribe.to_string())).await?;
let mut messages = Vec::new();
let start = std::time::Instant::now();
while start.elapsed().as_secs() < duration_secs {
if let Some(Ok(msg)) = ws_stream.next().await {
if let Message::Text(text) = msg {
messages.push(text);
}
}
}
Ok(messages)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = BybitClient::new();
// Fetch order book
let ob = client.get_orderbook("BTCUSDT", 25).await?;
println!("=== BTCUSDT Order Book ===");
println!("Mid Price: {:.2}", ob.mid_price());
println!("Spread: {:.2} ({:.1} bps)", ob.spread(), ob.spread_bps());
println!("Imbalance (5 levels): {:.4}", ob.imbalance(5));
// Fetch klines
let bars = client.get_klines("BTCUSDT", "60", 200).await?;
println!("\nFetched {} hourly bars", bars.len());
// Fetch recent trades and build OHLCV
let trades = client.get_recent_trades("BTCUSDT", 1000).await?;
let mut builder = OhlcvBuilder::new(60);
for trade in &trades {
let ts: i64 = trade.time.parse().unwrap_or(0);
let price: f64 = trade.price.parse().unwrap_or(0.0);
let size: f64 = trade.size.parse().unwrap_or(0.0);
builder.add_trade(ts, price, size);
}
let custom_bars = builder.build();
println!("Built {} custom 1-minute bars from {} trades",
custom_bars.len(), trades.len());
Ok(())
}

Структура проекта

ch02_crypto_market_data_pipeline/
├── Cargo.toml
├── src/
│ ├── lib.rs
│ ├── feeds/
│ │ ├── mod.rs
│ │ ├── orderbook.rs
│ │ └── trades.rs
│ ├── storage/
│ │ ├── mod.rs
│ │ └── parquet.rs
│ └── onchain/
│ ├── mod.rs
│ └── metrics.rs
└── examples/
├── orderbook_stream.rs
├── ohlcv_builder.rs
└── onchain_pipeline.rs

Модуль feeds/orderbook.rs управляет WebSocket-соединениями для данных глубины книги ордеров в реальном времени, поддерживая локальную книгу ордеров с инкрементальными обновлениями. Модуль feeds/trades.rs обрабатывает отдельные события сделок и передаёт их в построитель OHLCV. Модуль storage/parquet.rs обрабатывает сериализацию в Apache Parquet с использованием крейтов arrow и parquet для Rust. Модуль onchain/metrics.rs получает ончейн-данные из публичных блокчейн API. Каждый пример демонстрирует самостоятельный конвейер, который можно запустить командой cargo run --example <name>.


Построение OHLCV свечей

Раздел 7: Практические примеры

Пример 1: Анализ книги ордеров в реальном времени

client = BybitMarketData()
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT"]
print("=== Order Book Analysis ===")
print(f"{'Symbol':<12} {'Spread (bps)':<14} {'Imbalance':<12} {'Mid Price':<12}")
print("-" * 50)
for symbol in symbols:
ob = client.get_orderbook(symbol, 25)
print(f"{symbol:<12} {ob.spread_bps:<14.2f} {ob.imbalance(5):<12.4f} {ob.mid_price:<12.2f}")

Типичный результат:

=== Order Book Analysis ===
Symbol Spread (bps) Imbalance Mid Price
--------------------------------------------------
BTCUSDT 0.51 0.1234 67432.50
ETHUSDT 0.83 -0.0567 3521.25
SOLUSDT 1.24 0.2891 142.35

Пример 2: Построение пользовательских OHLCV из сделок

client = BybitMarketData()
trades = client.get_trades("BTCUSDT", 1000)
builder = OHLCVBuilder(interval_seconds=60)
for _, trade in trades.iterrows():
builder.add_trade(
timestamp=int(trade["time"].timestamp() * 1000),
price=trade["price"],
volume=trade["size"],
side=trade["side"],
)
candles = builder.build()
print(f"Built {len(candles)} 1-minute candles from {len(trades)} trades")
print(candles.tail())

Типичный результат:

Built 17 1-minute candles from 1000 trades
open high low close volume
datetime
2024-12-15... 67430.0 67445.5 67425.0 67440.0 12.345
2024-12-15... 67440.0 67460.0 67435.0 67455.5 18.721
2024-12-15... 67455.5 67458.0 67420.0 67428.0 24.103

Пример 3: Конвейер ставки финансирования и открытого интереса

client = BybitMarketData()
funding = client.get_funding_history("BTCUSDT", 200)
oi = client.get_open_interest("BTCUSDT", "1h", 200)
# Merge and analyze
print(f"Funding rate observations: {len(funding)}")
print(f"Open interest observations: {len(oi)}")
print(f"\nFunding rate stats:")
print(f" Mean: {funding['fundingRate'].mean():.6f}")
print(f" Std: {funding['fundingRate'].std():.6f}")
print(f"\nOpen interest stats:")
print(f" Current: {oi['openInterest'].iloc[-1]:,.0f}")
print(f" 24h change: {((oi['openInterest'].iloc[-1] / oi['openInterest'].iloc[-24]) - 1) * 100:.2f}%")
# Save to parquet
storage = ParquetStorage("./data")
storage.save_ohlcv(funding, "BTCUSDT", "funding")
print("\nData saved to Parquet format")

Типичный результат:

Funding rate observations: 200
Open interest observations: 200
Funding rate stats:
Mean: 0.000082
Std: 0.000295
Open interest stats:
Current: 523,450,000
24h change: 3.42%
Data saved to Parquet format

Аналитика ончейн-данных

Раздел 8: Фреймворк бэктестирования

Компоненты фреймворка

Дата-ориентированный фреймворк бэктестирования уделяет особое внимание качеству данных:

  1. Валидатор данных: Проверяет пробелы, выбросы и несоответствия в исторических данных
  2. Нормализатор данных: Выравнивает временные метки, обрабатывает пропущенные бары, корректирует на техобслуживание биржи
  3. Движок воспроизведения: Симулирует доставку данных в реальном времени из сохранённых исторических данных
  4. Модель проскальзывания: Оценивает стоимость исполнения на основе глубины книги ордеров в момент сделки
  5. Симулятор финансирования: Применяет исторические ставки финансирования к открытым позициям
  6. Модель задержки: Симулирует сетевые и вычислительные задержки для реалистичного тайминга исполнения

Метрики качества данных

МетрикаПорогОписание
Частота пробелов< 0.1%Процент пропущенных временных периодов в данных OHLCV
Частота выбросов< 0.05%Процент цен, отклоняющихся >10 std от скользящего среднего
Точность меток времени< 100мсМаксимальное расхождение часов между источниками данных
Полнота> 99.9%Процент ожидаемых точек данных, фактически полученных
Свежесть< 1 секундаМаксимальный возраст последней точки данных
Кросс-источниковая согласованность< 0.01%Максимальное расхождение цен между типами API

Примерные результаты конвейера данных

=== Data Pipeline Health Report ===
Date: 2024-12-15
Symbol: BTCUSDT
Data Collection:
WebSocket Uptime: 99.97%
Messages Received: 2,847,293
Avg Latency: 3.2ms
Max Latency: 127ms
Storage:
Daily Parquet Size: 142 MB (trades)
Daily Parquet Size: 0.8 MB (1m OHLCV)
Compression Ratio: 18.4x vs CSV
Write Throughput: 45,000 rows/sec
Data Quality:
OHLCV Gaps: 0 (0.00%)
Price Outliers: 2 (0.01%)
Timestamp Issues: 0
Completeness: 100.00%

Раздел 9: Оценка производительности

Сравнение конвейеров данных

Компонент конвейераPython (pandas)Python (polars)RustУлучшение (Rust vs pandas)
Построение OHLCV (1М сделок)4.2с0.8с0.12с35x
Запись Parquet (1М строк)1.8с0.6с0.15с12x
Чтение Parquet (1М строк)0.9с0.3с0.08с11x
Обработка обновления книги ордеров0.15мс0.04мс0.005мс30x
Парсинг WebSocket-сообщения0.08мс0.02мс0.003мс27x

Ключевые выводы

  1. Rust обеспечивает 10-35x улучшение производительности по сравнению с pandas для операций конвейера данных, что делает его необходимым для высокочастотной обработки данных.
  2. Parquet — оптимальный формат хранения для рыночных данных криптовалют, предлагая 15-20x сжатие по сравнению с CSV с более быстрой скоростью чтения.
  3. WebSocket необходим для живой торговли — REST-опрос вносит неприемлемую задержку (50-200мс vs <10мс) для стратегий, работающих на субминутных таймфреймах.
  4. Данные книги ордеров наиболее ёмкие по пропускной способности — генерируют 50-100 МБ/час для потока полной глубины одного символа.

Ограничения

  • Лимиты запросов API биржи ограничивают скорость обратного заполнения исторических данных; Bybit допускает 120 GET-запросов за 5 секунд.
  • Реконструкция книги ордеров из инкрементальных обновлений требует тщательного отслеживания порядковых номеров для избежания десинхронизации.
  • Задержка ончейн-данных (1-15 минут в зависимости от времени блока) ограничивает их полезность для субчасовых стратегий.
  • Выравнивание данных с разных бирж затруднено из-за расхождения часов и различных алгоритмов сопоставления сделок.
  • Стоимость поставщиков данных для всестороннего ончейн-анализа может превышать $1000/месяц для институциональных потоков.

Раздел 10: Направления будущего развития

  1. Конвейеры данных с нулевым копированием на Apache Arrow: Построение сквозных конвейеров данных с использованием колоночного формата Arrow в памяти, обеспечивающего обмен данными без копирования между сбором данных на Rust, обучением ML на Python и визуализацией на дашбордах без накладных расходов на сериализацию.

  2. Аппаратно-ускоренная обработка книги ордеров: Использование FPGA или GPU-ускорения для реконструкции книги ордеров и извлечения признаков, обеспечивающее субмикросекундную обработку данных Level 2 для стратегий со сверхнизкой задержкой.

  3. Децентрализованные оракулы данных: Использование блокчейн-сетей оракулов (Chainlink, Pyth) для верифицированных, защищённых от подделки потоков рыночных данных, снижающих риск единой точки отказа от централизованных API бирж.

  4. Адаптивная выборка для оптимизации хранения: Реализация построения баров на основе событий (бары по объёму, долларовые бары, бары дисбаланса тиков), адаптирующих гранулярность к рыночной активности, сокращая требования к хранению на 60-80% в спокойные периоды без потери информации в волатильные эпизоды.

  5. Фьюжн данных с нескольких бирж: Построение конвейеров фьюжна данных в реальном времени, агрегирующих книги ордеров и потоки сделок с нескольких бирж в консолидированный вид, позволяя обнаруживать кросс-биржевой арбитраж и более точно оценивать справедливую стоимость.

  6. Потоковое машинное обучение на рыночных данных: Развёртывание моделей онлайн-обучения, обновляющихся инкрементально по мере поступления новых данных, устраняя цикл пакетного переобучения и позволяя стратегиям непрерывно адаптироваться к изменяющейся рыночной микроструктуре.


Ссылки

  1. Cont, R., Stoikov, S., & Talreja, R. (2010). “A Stochastic Model for Order Book Dynamics.” Operations Research, 58(3), 549-563.

  2. Gould, M. D., Porter, M. A., Williams, S., McDonald, M., Fenn, D. J., & Howison, S. D. (2013). “Limit Order Books.” Quantitative Finance, 13(11), 1709-1748.

  3. de Prado, M. L. (2018). Advances in Financial Machine Learning. Wiley. Chapters 2-3 (Data Structures).

  4. Cao, C., Chen, Y., Liang, B., & Lo, A. W. (2013). “Can Hedge Funds Time Market Liquidity?” Journal of Financial Economics, 109(2), 493-516.

  5. Cartea, A., Jaimungal, S., & Ricci, J. (2014). “Buy Low, Sell High: A High Frequency Trading Perspective.” SIAM Journal on Financial Mathematics, 5(1), 415-444.

  6. Hasbrouck, J. (2007). Empirical Market Microstructure. Oxford University Press.

  7. Easley, D., Lopez de Prado, M., & O’Hara, M. (2011). “The Microstructure of the Flash Crash.” Journal of Portfolio Management, 37(2), 118-128.