Перейти к содержимому

Глава 97: PCMCI — Обнаружение причинных связей для трейдинга

Обзор

Алгоритм PCMCI (Peter and Clark Momentary Conditional Independence) представляет собой современный двухфазный метод обнаружения причинно-следственных связей в многомерных временных рядах. Разработанный Якобом Рунге (Jakob Runge) в 2019 году, PCMCI специально адаптирован для работы с автокоррелированными данными, что делает его исключительно подходящим для анализа финансовых рынков. В отличие от традиционных корреляционных подходов, которые лишь фиксируют статистические ассоциации, PCMCI выявляет направленные причинные связи между переменными, что позволяет строить более робастные торговые стратегии.

В финансовом контексте PCMCI позволяет:

  • Построение причинных графов: Определение того, какие активы действительно влияют на другие, а не просто коррелируют
  • Выявление lead-lag структур: Обнаружение опережающих и запаздывающих связей между инструментами
  • Режимно-зависимый анализ: Как причинные связи изменяются в различных рыночных режимах
  • Оптимизация портфеля: Использование причинной структуры для построения более устойчивых портфелей
  • Генерация альфа-сигналов: Создание торговых сигналов на основе причинных, а не корреляционных зависимостей

Ключевое преимущество PCMCI перед классическими методами (Granger causality, VAR) заключается в контроле ложных открытий (false discovery rate) и корректной работе с высокоразмерными данными при наличии сильной автокорреляции — типичной характеристики финансовых временных рядов.

Содержание

  1. Введение в причинное обнаружение
  2. Обзор алгоритма PCMCI
  3. Математические основы
  4. Финансовые приложения
  5. Практические примеры на Python
  6. Реализация на Rust
  7. Реализация на Python
  8. Сравнение с другими методами
  9. Лучшие практики
  10. Ресурсы

Введение в причинное обнаружение

Почему корреляция не равна причинности

Одна из самых фундаментальных проблем количественного трейдинга — это смешение корреляции и причинности. Когда два актива движутся вместе, это может быть вызвано:

  1. Прямой причинной связью: Актив A действительно влияет на актив B (например, цена нефти влияет на акции нефтяных компаний)
  2. Общим конфаундером: Оба актива подвержены влиянию третьей переменной (например, макроэкономических показателей)
  3. Случайной корреляцией: Статистическая ассоциация без реального механизма
  4. Обратной причинностью: Кажущаяся связь идёт в противоположном направлении
Пример ложной корреляции в финансах:
Корреляция: BTC <---> ETH = 0.85
Но реальная причинная структура может быть:
Макро-риск аппетит
/ \
v v
BTC ETH
Или:
BTC ---> ETH (BTC причинно влияет на ETH)
Или:
BTC <--- ETH (ETH влияет на BTC)
Корреляция не может различить эти сценарии!

В трейдинге это различие критически важно: стратегия, основанная на ложной корреляции, разрушится при изменении скрытого конфаундера. Стратегия, основанная на истинной причинной связи, более устойчива к режимным сдвигам.

Проблемы классических методов

Причинность по Грейнджеру (Granger Causality)

Тест Грейнджера — самый популярный метод проверки причинности во временных рядах. Его идея: переменная X «причинно влияет по Грейнджеру» на Y, если прошлые значения X помогают предсказать Y сверх того, что дают прошлые значения Y.

Granger Causality Test:
H0: X не Granger-причина Y
H1: X является Granger-причиной Y
Модель: Y_t = a0 + a1*Y_{t-1} + ... + ap*Y_{t-p}
+ b1*X_{t-1} + ... + bp*X_{t-p} + e_t
F-тест: проверка b1 = b2 = ... = bp = 0

Проблемы Granger causality:

  • Попарный тест: Не учитывает третьи переменные (конфаундеры)
  • Линейность: Предполагает линейную зависимость
  • Чувствительность к лагу: Результат сильно зависит от выбора порядка лага p
  • Проблема множественных сравнений: При тестировании многих пар без коррекции
  • Автокорреляция: Может давать ложные результаты при сильной автокорреляции

Модели VAR (Vector Autoregression)

VAR(p) модель для N переменных:
X_t = c + A1*X_{t-1} + A2*X_{t-2} + ... + Ap*X_{t-p} + e_t
где X_t — вектор N переменных
Ai — матрицы коэффициентов NxN
Число параметров: N + p * N^2
Для N=50 активов, p=5 лагов: 50 + 5 * 2500 = 12550 параметров!

Проблемы VAR:

  • Проклятие размерности: Количество параметров растёт квадратично
  • Переобучение: Особенно на коротких финансовых данных
  • Нет причинной интерпретации: Коэффициенты VAR не имеют прямой причинной интерпретации
  • Мультиколлинеарность: Коррелированные предикторы искажают оценки

Что такое PCMCI

PCMCI — это двухфазный алгоритм обнаружения причинных связей, разработанный специально для многомерных временных рядов. Название расшифровывается как:

  • PC (Peter and Clark) — первая фаза, использующая модифицированный алгоритм PC для отбора потенциальных причинных предков
  • MCI (Momentary Conditional Independence) — вторая фаза, тестирующая моментальную условную независимость
Высокоуровневая схема PCMCI:
Входные данные Фаза 1 (PC) Фаза 2 (MCI)
+-----------+ +-------------+ +--------------+
| Многомер. | Итеративный | Отобранные | Тест | Причинные |
| временной | --> удаление --> | родители | --> MCI --> | связи с |
| ряд | условий | P(X_t^j) | | p-значениями |
+-----------+ +-------------+ +--------------+
N переменных Уменьшает число Контролирует
T наблюдений ложных кандидатов false positives
tau_max лагов

Ключевые преимущества PCMCI:

  1. Двухфазный подход: Сначала отсеивает слабые связи, затем точно тестирует оставшиеся
  2. Контроль конфаундеров: Условная независимость учитывает все другие переменные
  3. Масштабируемость: Работает с десятками и сотнями переменных
  4. Контроль ложных открытий: Встроенная коррекция множественных сравнений
  5. Гибкость тестов: Поддержка линейных (ParCorr) и нелинейных (CMI, GPDC) тестов

Исторический контекст и мотивация

Причинное обнаружение (causal discovery) имеет долгую историю в статистике и эпидемиологии. В хронологическом порядке:

Хронология развития причинного вывода:
1956 Reichenbach — Принцип общей причины
|
1980 Granger — Granger Causality (Нобелевская премия 2003)
|
1988 Pearl, Verma — Bayesian Networks
|
1991 Spirtes, Glymour, Scheines — Алгоритм PC
|
2000 Pearl — "Causality" (do-исчисление)
|
2000 Schreiber — Transfer Entropy
|
2012 Runge et al. — Причинный анализ климатических данных
|
2019 Runge — PCMCI (Science Advances) <--- ключевая работа
|
2020 Runge — PCMCI+ (одновременные связи)
|
2020 Runge — LPCMCI (латентные конфаундеры)
|
2023+ Применение в финансах и трейдинге

Работа Рунге была мотивирована проблемами климатологии — выявление причинных связей между климатическими переменными (температура, давление, осадки и т.д.) в присутствии сильных автокорреляций и нелинейных зависимостей. Эти же проблемы свойственны финансовым данным, что делает PCMCI естественным инструментом для трейдинга.

Ключевая публикация: “Detecting and Quantifying Causal Associations in Large Nonlinear Time Series Datasets”, Jakob Runge, Science Advances, 2019.


Обзор алгоритма PCMCI

Фаза 1: Отбор условий (алгоритм PC)

Первая фаза PCMCI адаптирует классический алгоритм PC (named after its creators Peter Spirtes and Clark Glymour) для работы с временными рядами. Цель этой фазы — для каждой переменной X_t^j определить множество её потенциальных причинных предков (parents).

Алгоритм начинает с полного набора возможных предков и итеративно удаляет те, которые оказываются условно независимыми от целевой переменной при условии уже отобранных предков.

Алгоритм PC (Фаза 1) для переменной X_t^j:
Вход: Данные X, максимальный лаг tau_max, уровень значимости alpha_PC
Инициализация:
P_0(X_t^j) = { X_{t-tau}^i : для всех i, tau = 1,...,tau_max }
(все возможные предки)
Итерация p = 0, 1, 2, ...:
Для каждого X_{t-tau}^i в P_p(X_t^j):
Выбрать p условий S из P_p(X_t^j) \ {X_{t-tau}^i}
с наибольшей зависимостью от X_t^j
Тест: X_t^j _||_ X_{t-tau}^i | S (условная независимость)
Если p-value > alpha_PC:
Удалить X_{t-tau}^i из P_{p+1}(X_t^j)
Стоп: если p >= |P_p(X_t^j)| - 1 (все подмножества проверены)
Выход: P(X_t^j) — множество отобранных предков

Визуальный пример работы фазы PC для трёх переменных (X, Y, Z) с максимальным лагом 2:

Начальное состояние (все возможные предки X_t):
X_{t-1} --?--> X_t
X_{t-2} --?--> X_t
Y_{t-1} --?--> X_t
Y_{t-2} --?--> X_t
Z_{t-1} --?--> X_t
Z_{t-2} --?--> X_t
Итерация p=0 (безусловные тесты):
Тест: X_t _||_ Z_{t-2}? p-value = 0.45 > 0.05 --> УДАЛИТЬ
Тест: X_t _||_ Y_{t-2}? p-value = 0.32 > 0.05 --> УДАЛИТЬ
(остальные значимы, оставить)
Итерация p=1 (условные тесты с 1 условием):
Тест: X_t _||_ X_{t-2} | X_{t-1}? p-value = 0.08 > 0.05 --> УДАЛИТЬ
(остальные значимы)
Результат P(X_t) = { X_{t-1}, Y_{t-1}, Z_{t-1} }

Ключевое свойство фазы PC: она эффективно уменьшает число кандидатов без полного перебора всех подмножеств, что обеспечивает масштабируемость.

Фаза 2: Тест моментальной условной независимости (MCI)

После того как для каждой переменной определены потенциальные предки, вторая фаза выполняет тест моментальной условной независимости (Momentary Conditional Independence, MCI) для количественной оценки силы каждой связи.

MCI-тест для проверки связи X_{t-tau}^i -> X_t^j:

MCI-статистика:
X_t^j _||_ X_{t-tau}^i | P(X_t^j) \ {X_{t-tau}^i}, P(X_{t-tau}^i)
Условие включает:
1. P(X_t^j) \ {X_{t-tau}^i} — предки X_t^j (без тестируемого)
2. P(X_{t-tau}^i) — предки самого X_{t-tau}^i
Это "моментальная" условная независимость:
- Контролирует автокорреляцию через P(X_{t-tau}^i)
- Контролирует конфаундеры через P(X_t^j)

Критическое отличие MCI от обычного условного теста:

Проблема автокорреляции:
Обычный тест: X_t^j _||_ X_{t-1}^i | P(X_t^j)
Проблема: Если X^i сильно автокоррелирован:
X_{t-2}^i --> X_{t-1}^i --> X_t^i (автокорреляция)
X_{t-2}^i --> X_t^j (истинная причина)
Тогда X_{t-1}^i будет ложно значимым, потому что
коррелирован с X_{t-2}^i через автокорреляцию.
MCI-тест: X_t^j _||_ X_{t-1}^i | P(X_t^j), P(X_{t-1}^i)
Решение: Условливаясь на P(X_{t-1}^i), мы "убираем"
автокорреляцию из X_{t-1}^i и тестируем только
моментальную (одномоментную) связь.

Это ключевая инновация PCMCI — именно дополнительное условие на предков источника (P(X_{t-tau}^i)) позволяет корректно работать с автокоррелированными финансовыми данными.

Двухфазная архитектура

Полная архитектура PCMCI объединяет обе фазы:

+============================================================+
| АЛГОРИТМ PCMCI |
+============================================================+
| |
| ВХОД: Данные X[T x N], tau_max, alpha_PC, alpha_MCI |
| |
| +----- ФАЗА 1: PC-стабильный отбор предков -----+ |
| | | |
| | Для j = 1, ..., N: | |
| | P(X_t^j) = все возможные предки | |
| | Повторять: | |
| | Для каждого кандидата в P(X_t^j): | |
| | Тест условной независимости | |
| | Если незначим -> удалить | |
| | Увеличить порядок условия | |
| | Пока не проверены все подмножества | |
| | | |
| | Результат: P(X_t^j) для всех j | |
| +-------------------------------------------------+ |
| | |
| v |
| +----- ФАЗА 2: MCI-тестирование ------------------+ |
| | | |
| | Для каждой пары (i,j) и лага tau: | |
| | S = P(X_t^j) \ {X_{t-tau}^i} | |
| | UNION P(X_{t-tau}^i) | |
| | | |
| | MCI-val = I(X_t^j ; X_{t-tau}^i | S) | |
| | p-value = тест значимости | |
| | | |
| | Коррекция множественных сравнений | |
| | (Bonferroni или FDR) | |
| | | |
| | Результат: причинный граф G с p-значениями | |
| +---------------------------------------------------+ |
| |
| ВЫХОД: G — направленный граф причинных связей |
| с весами (сила связи) и p-значениями |
+============================================================+

Расширения: PCMCI+ и LPCMCI

PCMCI+ (Runge, 2020) расширяет стандартный PCMCI для обнаружения одновременных (contemporaneous) причинных связей:

Стандартный PCMCI:
X_{t-tau}^i --> X_t^j (только лагированные, tau >= 1)
PCMCI+:
X_{t-tau}^i --> X_t^j (лагированные, tau >= 1)
X_t^i --- X_t^j (одновременные, tau = 0)
X_t^i --> X_t^j (одновременные направленные)

Это важно для финансовых данных на дневных частотах, где многие связи проявляются «одновременно» (в рамках одного торгового дня).

LPCMCI (Latent PCMCI) работает в условиях наличия ненаблюдаемых конфаундеров:

Стандартный PCMCI: предполагает каузальную достаточность
(все конфаундеры наблюдаемы)
LPCMCI: допускает латентные (скрытые) конфаундеры
X_t^i --> X_t^j (прямая причинная связь)
X_t^i <-> X_t^j (связь через латентный конфаундер)
X_t^i o--> X_t^j (неопределённый тип связи)

Для финансовых данных LPCMCI особенно актуален, так как мы никогда не наблюдаем все релевантные переменные (настроения участников, инсайдерская информация, геополитические факторы и т.д.).


Математические основы

Условная независимость

Центральное понятие в причинном обнаружении — условная независимость. Две переменные X и Y условно независимы при заданном множестве Z, если:

Определение условной независимости:
X _||_ Y | Z <=> P(X, Y | Z) = P(X | Z) * P(Y | Z)
Эквивалентно:
X _||_ Y | Z <=> P(X | Y, Z) = P(X | Z)
То есть: знание Y не даёт дополнительной информации о X,
если Z уже известно.

В контексте PCMCI условная независимость означает, что связь между X_{t-tau}^i и X_t^j является опосредованной (через другие переменные), а не прямой.

Пример с финансовыми данными:

Пример: Причинная цепочка
Ставка ФРС (t-2) --> Курс доллара (t-1) --> Цена золота (t)
Безусловно:
Ставка ФРС _||_ Цена золота?
Нет! Они зависимы (корреляция существует).
Условно:
Ставка ФРС _||_ Цена золота | Курс доллара?
Да! При фиксированном курсе доллара, ставка ФРС
не даёт дополнительной информации о цене золота.
Вывод: Ставка ФРС -> Доллар -> Золото (цепочка)
Нет прямой связи: Ставка ФРС -/-> Золото

Частная корреляция

Для линейных зависимостей условная независимость проверяется через частную корреляцию (partial correlation). Частная корреляция между X и Y при условии Z — это корреляция остатков после регрессии X и Y на Z.

Частная корреляция ParCorr(X, Y | Z):
Шаг 1: Регрессия X на Z: X = beta_XZ * Z + residual_X
Шаг 2: Регрессия Y на Z: Y = beta_YZ * Z + residual_Y
Шаг 3: ParCorr = Corr(residual_X, residual_Y)
Свойства:
- ParCorr = 0 <=> X _||_ Y | Z (для гауссовских данных)
- |ParCorr| in [0, 1]
- Знак указывает направление связи
Тест значимости:
H0: ParCorr = 0 (условная независимость)
H1: ParCorr != 0
Статистика: t = ParCorr * sqrt((T - dim_Z - 2) / (1 - ParCorr^2))
Распределение: t(T - dim_Z - 2) (t-распределение Стьюдента)

Пример вычисления:

Пример: ParCorr для трёх акций
Данные (доходности):
AAPL = [0.01, -0.02, 0.03, 0.01, -0.01, 0.02, ...]
MSFT = [0.02, -0.01, 0.02, 0.02, -0.02, 0.01, ...]
SPY = [0.01, -0.01, 0.02, 0.01, -0.01, 0.01, ...]
Простая корреляция:
Corr(AAPL, MSFT) = 0.75
Частная корреляция (контроль SPY):
ParCorr(AAPL, MSFT | SPY) = 0.30
Интерпретация:
Большая часть совместного движения AAPL и MSFT
объясняется общим рыночным фактором (SPY).
Прямая связь между AAPL и MSFT значительно слабее.

Условная взаимная информация (CMI)

Для нелинейных зависимостей вместо частной корреляции используется условная взаимная информация (Conditional Mutual Information, CMI):

Условная взаимная информация:
I(X; Y | Z) = integral p(x,y,z) * log(p(x,y|z) / (p(x|z)*p(y|z))) dx dy dz
Свойства:
- I(X; Y | Z) >= 0
- I(X; Y | Z) = 0 <=> X _||_ Y | Z
- Обнаруживает нелинейные зависимости
- Не требует предположений о распределении
Проблемы:
- Оценка плотностей в высокоразмерных пространствах
- Нужен большой объём данных
- Вычислительная сложность

На практике CMI оценивается с помощью k-nearest neighbor (kNN) оценки:

kNN-оценка CMI (Kraskov et al., 2004):
I_knn(X; Y | Z) = psi(k) - <psi(n_xz + 1) + psi(n_yz + 1) - psi(n_z + 1)>
где:
psi() — функция дигамма
k — число ближайших соседей
n_xz, n_yz, n_z — число точек в гиперпрямоугольниках
<.> — среднее по всем точкам данных

Тестирование значимости

PCMCI поддерживает несколько методов тестирования значимости:

Методы тестирования в PCMCI:
1. ParCorr (Partial Correlation)
- Для линейных зависимостей
- Быстрый, аналитическое распределение
- t-тест с (T - d - 2) степенями свободы
- Рекомендуется для финансовых данных при T > 200
2. GPDC (Gaussian Process Distance Correlation)
- Для нелинейных зависимостей
- Основан на distance correlation с GP-регрессией
- Пермутационный тест для p-значения
- Хорошо для умеренной нелинейности
3. CMIknn (CMI kNN-estimator)
- Для произвольных нелинейных зависимостей
- Наиболее общий метод
- Требует большого T (>500)
- Вычислительно затратный
4. RobustParCorr
- Устойчивая версия ParCorr
- Использует ранговые преобразования
- Для данных с тяжёлыми хвостами (финансовые доходности)

Выбор метода зависит от характера данных:

Дерево выбора метода тестирования:
Данные линейны?
|
+-- Да --> Нормальное распределение остатков?
| |
| +-- Да --> ParCorr
| |
| +-- Нет --> RobustParCorr
|
+-- Нет --> Достаточно данных (T > 500)?
|
+-- Да --> CMIknn
|
+-- Нет --> GPDC

Контроль множественных сравнений

При тестировании множества связей одновременно необходима коррекция на множественные сравнения. PCMCI поддерживает:

Методы коррекции множественных сравнений:
1. Bonferroni:
alpha_corrected = alpha / m
где m — число тестов
Консервативный, контролирует FWER
2. Benjamini-Hochberg (FDR):
Упорядочить p-значения: p(1) <= p(2) <= ... <= p(m)
Отвергнуть H0 для i: p(i) <= (i/m) * alpha
Менее консервативный, контролирует FDR
Для финансовых данных:
- Bonferroni: когда ложные сигналы очень дороги
- FDR: когда хотим обнаружить больше связей (исследование)
Пример:
N=10 активов, tau_max=5 лагов
Число тестов: m = N * N * tau_max = 10 * 10 * 5 = 500
alpha = 0.05
Bonferroni: alpha_corrected = 0.05 / 500 = 0.0001
FDR: зависит от распределения p-значений

Финансовые приложения

Причинные сети между активами

PCMCI позволяет построить направленный граф причинных связей между финансовыми инструментами. Это принципиально отличается от корреляционной матрицы:

Корреляционная матрица (симметричная, ненаправленная):
BTC ETH SOL AVAX BNB
BTC 1.00 0.85 0.78 0.72 0.68
ETH 0.85 1.00 0.82 0.76 0.71
SOL 0.78 0.82 1.00 0.80 0.65
AVAX 0.72 0.76 0.80 1.00 0.63
BNB 0.68 0.71 0.65 0.63 1.00
Причинный граф PCMCI (асимметричный, направленный):
BTC =====> ETH (lag=1, MCI=0.35, p<0.001)
BTC =====> SOL (lag=1, MCI=0.28, p<0.001)
ETH ----> SOL (lag=1, MCI=0.15, p=0.003)
ETH ----> AVAX (lag=1, MCI=0.12, p=0.008)
BTC ----> BNB (lag=2, MCI=0.10, p=0.015)
SOL ----> AVAX (lag=1, MCI=0.09, p=0.022)
Интерпретация:
- BTC — главный причинный драйвер (наибольшее число исходящих связей)
- ETH — посредник между BTC и альткоинами
- SOL и AVAX — в основном следуют за BTC и ETH
- Прямая связь BTC->BNB слабее и с бОльшим лагом

Визуализация причинного графа в виде ASCII:

Причинный граф криптовалютного рынка
lag=1, 0.35
BTC ==================> ETH
| |
| lag=1, 0.28 | lag=1, 0.15
| |
v lag=1, 0.09 v
SOL ................... AVAX
|
| lag=2, 0.10
v
BNB
Толщина стрелки ~ сила MCI-связи
==== сильная связь (MCI > 0.25)
---- средняя связь (0.10 < MCI < 0.25)
.... слабая связь (MCI < 0.10)

Lead-lag структуры

Одно из важнейших приложений PCMCI в трейдинге — обнаружение lead-lag структур. Если актив A причинно предшествует активу B с определённым лагом, это создаёт торговую возможность:

Lead-lag обнаружение с PCMCI:
Результат PCMCI анализа (дневные данные):
S&P 500 (t-1) --> DAX (t) lag=1 day, MCI=0.22
DAX (t-1) --> Nikkei 225 (t) lag=1 day, MCI=0.18
VIX (t-1) --> S&P 500 (t) lag=1 day, MCI=-0.31
US 10Y (t-2) --> Gold (t) lag=2 days, MCI=-0.15
DXY (t-1) --> EM equities (t) lag=1 day, MCI=-0.20
Торговая логика:
Если VIX значительно вырос сегодня:
--> Ожидать снижение S&P 500 завтра (lag=1, MCI=-0.31)
--> Ожидать снижение DAX послезавтра (через S&P 500)
Если US 10Y выросла сегодня:
--> Ожидать снижение Gold через 2 дня (lag=2, MCI=-0.15)

Важно отметить, что lead-lag структуры могут быть нестационарными:

Динамика lead-lag во времени:
Период BTC -> ETH lag Сила MCI
2020 Q1 1 час 0.42
2020 Q2 2 часа 0.35
2020 Q3 30 мин 0.48
2021 Q1 (бычий) 15 мин 0.55
2022 Q1 (медвеж) 4 часа 0.25
2023 Q1 1 час 0.38
Наблюдение:
- В бычьих режимах лаг короче (быстрая передача)
- В медвежьих режимах лаг длиннее
- Сила связи варьируется с волатильностью

Режимно-зависимая причинность

Причинная структура рынка не постоянна — она существенно изменяется в зависимости от рыночного режима. PCMCI можно применять к различным подпериодам для выявления режимно-зависимых паттернов:

Причинная структура в разных рыночных режимах:
БЫЧИЙ РЕЖИМ (низкая волатильность):
+-----------------------------------------------+
| Много слабых причинных связей |
| Секторная ротация видна: |
| Tech -> Financials -> Energy |
| Международные связи слабые |
| Доминирует momentum |
+-----------------------------------------------+
МЕДВЕЖИЙ РЕЖИМ / КРИЗИС (высокая волатильность):
+-----------------------------------------------+
| Мало, но очень сильных связей |
| Всё коррелировано = "correlation breakdown" |
| Один доминирующий фактор (risk-on/risk-off) |
| Международная передача шоков очень быстрая |
| VIX -> всё |
+-----------------------------------------------+
ПЕРЕХОДНЫЙ РЕЖИМ:
+-----------------------------------------------+
| Причинная структура перестраивается |
| Появляются новые связи, старые исчезают |
| Высокая неопределённость в графе |
| Возможности для арбитража максимальны |
+-----------------------------------------------+

Стратегия на основе режимно-зависимой причинности:

Алгоритм адаптивной причинной стратегии:
1. Определить текущий рыночный режим (HMM, VIX-based и т.д.)
2. Для каждого режима r:
Запустить PCMCI на данных из режима r
Получить причинный граф G_r
3. В реальном времени:
a. Определить текущий режим r*
b. Использовать граф G_{r*} для генерации сигналов
c. Вес сигнала ~ сила MCI-связи в текущем режиме
4. При смене режима:
a. Переключиться на соответствующий граф
b. Уменьшить размер позиций (неопределённость при переходе)
c. Постепенно наращивать позиции по мере подтверждения режима

Причинное построение портфеля

PCMCI открывает новый подход к построению портфеля — на основе причинной структуры вместо (или в дополнение к) корреляционной:

Традиционный подход Марковица:
min w^T * Sigma * w (минимизация дисперсии)
s.t. w^T * mu >= target_return
w^T * 1 = 1
Проблема: Sigma — корреляционная матрица
Не отличает прямые связи от косвенных
Нестабильна, меняется в кризис
Причинный подход:
1. Построить причинный граф G с помощью PCMCI
2. Определить причинную матрицу C:
C_ij = MCI(X^i -> X^j) — сила причинного влияния i на j
3. Оптимизировать портфель:
- Минимизировать причинную концентрацию:
min sum_i (sum_j C_ij * w_j)^2
(не давать одному активу слишком влиять на портфель)
- Или: использовать C вместо/вместе с Sigma
min w^T * (lambda*Sigma + (1-lambda)*C_sym) * w
где C_sym = (C + C^T) / 2
4. Причинная диверсификация:
- Выбирать активы из разных "причинных кластеров"
- Избегать активов, причинно управляемых одним источником

Пример причинной диверсификации:

Причинный граф показывает:
BTC --> ETH, SOL, AVAX (BTC управляет крипто-кластером)
S&P500 --> Tech, Finance (рынок управляет секторами)
Oil --> Energy, Transport (нефть управляет энергетическим)
Плохая диверсификация (одна причинная группа):
Портфель: [ETH, SOL, AVAX]
-> Все управляются BTC, причинная диверсификация = 0
Хорошая причинная диверсификация:
Портфель: [BTC, S&P500, Gold, Oil]
-> Разные причинные источники
-> Причинная диверсификация высока
Лучшая стратегия:
Портфель: [BTC, S&P500, Gold, Oil]
+ Веса обратно пропорциональны причинному влиянию:
Актив с большим числом исходящих связей -> меньший вес
(так как он "заражает" другие при шоках)

Практические примеры на Python

Пример 01: Подготовка данных

В этом примере мы загружаем и подготавливаем данные фондового рынка и криптовалютного рынка (Bybit) для последующего причинного анализа.

"""
Пример 01: Подготовка данных для PCMCI причинного анализа
Включает данные фондового рынка (Yahoo Finance) и криптовалюты (Bybit)
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
# ============================================================
# Часть 1: Загрузка данных фондового рынка
# ============================================================
def load_stock_data(
tickers: list[str],
start_date: str = "2020-01-01",
end_date: str = "2024-01-01",
) -> pd.DataFrame:
"""
Загрузка данных фондового рынка через yfinance.
Parameters
----------
tickers : list[str]
Список тикеров (например, ['AAPL', 'MSFT', 'GOOGL'])
start_date : str
Начальная дата в формате 'YYYY-MM-DD'
end_date : str
Конечная дата
Returns
-------
pd.DataFrame
DataFrame с колонками для каждого тикера (цены закрытия)
"""
import yfinance as yf
data = yf.download(tickers, start=start_date, end=end_date)
prices = data['Close'] if isinstance(data.columns, pd.MultiIndex) else data[['Close']]
if isinstance(prices.columns, pd.MultiIndex):
prices.columns = prices.columns.get_level_values(0)
prices = prices.dropna()
print(f"Загружено {len(prices)} дневных наблюдений")
print(f"Период: {prices.index[0].date()} -- {prices.index[-1].date()}")
print(f"Тикеры: {list(prices.columns)}")
return prices
def load_bybit_data(
symbols: list[str],
interval: str = "1d",
limit: int = 1000,
) -> pd.DataFrame:
"""
Загрузка данных криптовалют с Bybit через REST API.
Parameters
----------
symbols : list[str]
Список символов (например, ['BTCUSDT', 'ETHUSDT'])
interval : str
Интервал свечей ('1', '5', '15', '60', '240', '1d', '1w')
limit : int
Количество свечей (max 1000)
Returns
-------
pd.DataFrame
DataFrame с ценами закрытия для каждого символа
"""
import requests
all_data = {}
base_url = "https://api.bybit.com/v5/market/kline"
for symbol in symbols:
params = {
"category": "spot",
"symbol": symbol,
"interval": "D" if interval == "1d" else interval,
"limit": limit,
}
try:
response = requests.get(base_url, params=params, timeout=10)
data = response.json()
if data["retCode"] == 0:
records = data["result"]["list"]
df = pd.DataFrame(records, columns=[
"timestamp", "open", "high", "low", "close", "volume", "turnover"
])
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms")
df = df.set_index("timestamp")
df["close"] = df["close"].astype(float)
all_data[symbol.replace("USDT", "")] = df["close"]
print(f" {symbol}: {len(df)} свечей загружено")
else:
print(f" {symbol}: ошибка API -- {data['retMsg']}")
except Exception as e:
print(f" {symbol}: ошибка загрузки -- {e}")
prices = pd.DataFrame(all_data).sort_index().dropna()
print(f"\nИтого: {len(prices)} совмещённых наблюдений")
return prices
# ============================================================
# Часть 2: Предобработка данных для PCMCI
# ============================================================
def compute_returns(
prices: pd.DataFrame,
method: str = "log",
) -> pd.DataFrame:
"""
Вычисление доходностей из ценовых данных.
Parameters
----------
prices : pd.DataFrame
Цены закрытия
method : str
'log' для логарифмических, 'simple' для простых доходностей
Returns
-------
pd.DataFrame
Доходности
"""
if method == "log":
returns = np.log(prices / prices.shift(1))
elif method == "simple":
returns = prices.pct_change()
else:
raise ValueError(f"Неизвестный метод: {method}")
returns = returns.dropna()
print(f"\nСтатистика доходностей ({method}):")
print(returns.describe().round(4))
return returns
def prepare_tigramite_data(
returns: pd.DataFrame,
standardize: bool = True,
) -> tuple:
"""
Подготовка данных в формате tigramite.
Parameters
----------
returns : pd.DataFrame
Доходности (T x N)
standardize : bool
Стандартизировать ли данные
Returns
-------
tuple
(dataframe, var_names) для tigramite
"""
from tigramite import data_processing as pp
values = returns.values.copy()
var_names = list(returns.columns)
if standardize:
means = values.mean(axis=0)
stds = values.std(axis=0)
values = (values - means) / stds
print("Данные стандартизированы (mean=0, std=1)")
# Проверка на пропущенные значения
n_missing = np.isnan(values).sum()
if n_missing > 0:
print(f"ПРЕДУПРЕЖДЕНИЕ: {n_missing} пропущенных значений!")
dataframe = pp.DataFrame(
data=values,
var_names=var_names,
)
print(f"\nДанные подготовлены для tigramite:")
print(f" Переменные (N): {len(var_names)}")
print(f" Наблюдения (T): {values.shape[0]}")
print(f" Имена: {var_names}")
return dataframe, var_names
def check_stationarity(
returns: pd.DataFrame,
significance: float = 0.05,
) -> pd.DataFrame:
"""
Проверка стационарности рядов (ADF-тест).
Parameters
----------
returns : pd.DataFrame
Доходности
significance : float
Уровень значимости
Returns
-------
pd.DataFrame
Результаты ADF-теста
"""
from statsmodels.tsa.stattools import adfuller
results = []
for col in returns.columns:
adf_stat, p_value, used_lag, nobs, critical_values, icbest = adfuller(
returns[col].dropna()
)
results.append({
"variable": col,
"adf_statistic": round(adf_stat, 4),
"p_value": round(p_value, 6),
"used_lag": used_lag,
"stationary": p_value < significance,
})
df_results = pd.DataFrame(results)
print("\nТест Дики-Фуллера (ADF) на стационарность:")
print(df_results.to_string(index=False))
non_stationary = df_results[~df_results["stationary"]]
if len(non_stationary) > 0:
print(f"\nПРЕДУПРЕЖДЕНИЕ: {len(non_stationary)} рядов нестационарны!")
print("Рекомендуется использовать доходности вместо цен.")
else:
print("\nВсе ряды стационарны -- можно применять PCMCI.")
return df_results
# ============================================================
# Часть 3: Основной скрипт
# ============================================================
if __name__ == "__main__":
# --- Загрузка данных фондового рынка ---
print("=" * 60)
print("ЗАГРУЗКА ДАННЫХ ФОНДОВОГО РЫНКА")
print("=" * 60)
stock_tickers = ["SPY", "QQQ", "TLT", "GLD", "USO"]
stock_prices = load_stock_data(stock_tickers)
stock_returns = compute_returns(stock_prices, method="log")
# Проверка стационарности
check_stationarity(stock_returns)
# Подготовка для tigramite
stock_df, stock_names = prepare_tigramite_data(stock_returns)
# --- Загрузка данных криптовалют (Bybit) ---
print("\n" + "=" * 60)
print("ЗАГРУЗКА ДАННЫХ КРИПТОВАЛЮТ (BYBIT)")
print("=" * 60)
crypto_symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "AVAXUSDT", "BNBUSDT"]
crypto_prices = load_bybit_data(crypto_symbols, interval="1d", limit=500)
crypto_returns = compute_returns(crypto_prices, method="log")
check_stationarity(crypto_returns)
crypto_df, crypto_names = prepare_tigramite_data(crypto_returns)
# --- Комбинированный датасет ---
print("\n" + "=" * 60)
print("КОМБИНИРОВАННЫЙ ДАТАСЕТ")
print("=" * 60)
# Объединяем по датам (inner join)
combined = pd.concat(
[stock_returns, crypto_returns], axis=1, join="inner"
)
combined = combined.dropna()
print(f"Комбинированный набор: {combined.shape}")
combined_df, combined_names = prepare_tigramite_data(combined)
print("\nДанные готовы для причинного анализа PCMCI!")

Объяснение ключевых шагов:

Подготовка данных для PCMCI включает несколько критически важных этапов:

  1. Стационарность: PCMCI требует стационарных временных рядов. Цены активов нестационарны (содержат тренд), поэтому мы используем логарифмические доходности.

  2. Стандартизация: Приведение всех рядов к единому масштабу (среднее = 0, стандартное отклонение = 1) важно для корректной работы тестов условной независимости.

  3. Формат tigramite: Библиотека tigramite ожидает данные в формате numpy-массива (T x N) и список имён переменных.

  4. Пропущенные значения: PCMCI не работает с пропусками, поэтому необходимо либо удалить, либо интерполировать пропущенные значения.

Пример 02: Построение причинных графов

"""
Пример 02: Построение причинных графов с помощью PCMCI
Использует библиотеку tigramite для обнаружения причинных связей
"""
import numpy as np
import pandas as pd
from tigramite import data_processing as pp
from tigramite.pcmci import PCMCI
from tigramite.independence_tests.parcorr import ParCorr
from tigramite.independence_tests.robust_parcorr import RobustParCorr
from tigramite.independence_tests.cmiknn import CMIknn
from tigramite import plotting as tp
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
# ============================================================
# Часть 1: Базовый PCMCI анализ
# ============================================================
def run_pcmci_analysis(
dataframe: pp.DataFrame,
var_names: list[str],
tau_max: int = 5,
alpha_pc: float = 0.05,
alpha_mci: float = 0.05,
cond_ind_test: str = "parcorr",
) -> dict:
"""
Запуск PCMCI анализа.
Parameters
----------
dataframe : pp.DataFrame
Данные в формате tigramite
var_names : list[str]
Имена переменных
tau_max : int
Максимальный лаг
alpha_pc : float
Уровень значимости для фазы PC
alpha_mci : float
Уровень значимости для фазы MCI
cond_ind_test : str
Метод тестирования: 'parcorr', 'robust_parcorr', 'cmiknn'
Returns
-------
dict
Результаты PCMCI анализа
"""
# Выбор теста условной независимости
if cond_ind_test == "parcorr":
ci_test = ParCorr(significance="analytic")
elif cond_ind_test == "robust_parcorr":
ci_test = RobustParCorr(significance="analytic")
elif cond_ind_test == "cmiknn":
ci_test = CMIknn(
knn=0.1, # доля ближайших соседей
significance="shuffle_test",
sig_samples=200,
)
else:
raise ValueError(f"Неизвестный тест: {cond_ind_test}")
print(f"Тест условной независимости: {cond_ind_test}")
print(f"Максимальный лаг: {tau_max}")
print(f"alpha_PC: {alpha_pc}, alpha_MCI: {alpha_mci}")
# Создание экземпляра PCMCI
pcmci = PCMCI(
dataframe=dataframe,
cond_ind_test=ci_test,
verbosity=1,
)
# Запуск PCMCI
results = pcmci.run_pcmci(
tau_max=tau_max,
pc_alpha=alpha_pc,
alpha_level=alpha_mci,
)
# Извлечение результатов
val_matrix = results["val_matrix"] # матрица MCI-значений
p_matrix = results["p_matrix"] # матрица p-значений
graph = results["graph"] # граф причинных связей
conf_matrix = results.get("conf_matrix") # доверительные интервалы
# Подсчёт значимых связей
significant_links = (p_matrix < alpha_mci).sum()
total_links = p_matrix.size
print(f"\nРезультаты PCMCI:")
print(f" Значимые связи: {significant_links} из {total_links}")
print(f" Доля значимых: {significant_links/total_links:.2%}")
return {
"pcmci": pcmci,
"results": results,
"val_matrix": val_matrix,
"p_matrix": p_matrix,
"graph": graph,
"var_names": var_names,
"tau_max": tau_max,
}
# ============================================================
# Часть 2: Визуализация причинного графа
# ============================================================
def plot_causal_graph(
pcmci_results: dict,
output_file: str = "causal_graph.png",
) -> None:
"""
Визуализация причинного графа.
Parameters
----------
pcmci_results : dict
Результаты run_pcmci_analysis
output_file : str
Имя файла для сохранения
"""
pcmci = pcmci_results["pcmci"]
results = pcmci_results["results"]
var_names = pcmci_results["var_names"]
# Граф процесса (process graph)
tp.plot_graph(
val_matrix=results["val_matrix"],
graph=results["graph"],
var_names=var_names,
link_colorbar_label="MCI strength",
node_colorbar_label="Auto-MCI",
figsize=(12, 8),
save_name=output_file,
)
print(f"Причинный граф сохранён: {output_file}")
def plot_time_series_graph(
pcmci_results: dict,
output_file: str = "time_series_graph.png",
) -> None:
"""
Визуализация причинного графа как развёрнутого во времени.
Parameters
----------
pcmci_results : dict
Результаты
output_file : str
Имя файла
"""
pcmci = pcmci_results["pcmci"]
results = pcmci_results["results"]
var_names = pcmci_results["var_names"]
tau_max = pcmci_results["tau_max"]
tp.plot_time_series_graph(
val_matrix=results["val_matrix"],
graph=results["graph"],
var_names=var_names,
link_colorbar_label="MCI strength",
figsize=(16, 6),
save_name=output_file,
)
print(f"Граф временных рядов сохранён: {output_file}")
# ============================================================
# Часть 3: Извлечение причинных связей
# ============================================================
def extract_causal_links(
pcmci_results: dict,
alpha: float = 0.05,
) -> pd.DataFrame:
"""
Извлечение списка значимых причинных связей.
Parameters
----------
pcmci_results : dict
Результаты PCMCI
alpha : float
Уровень значимости
Returns
-------
pd.DataFrame
Таблица причинных связей
"""
val_matrix = pcmci_results["val_matrix"]
p_matrix = pcmci_results["p_matrix"]
var_names = pcmci_results["var_names"]
N = len(var_names)
tau_max = val_matrix.shape[2] - 1
links = []
for j in range(N):
for i in range(N):
for tau in range(1, tau_max + 1):
if p_matrix[i, j, tau] < alpha:
links.append({
"source": var_names[i],
"target": var_names[j],
"lag": tau,
"mci_value": round(val_matrix[i, j, tau], 4),
"p_value": round(p_matrix[i, j, tau], 6),
"abs_mci": round(abs(val_matrix[i, j, tau]), 4),
})
df_links = pd.DataFrame(links)
if len(df_links) > 0:
df_links = df_links.sort_values("abs_mci", ascending=False)
df_links = df_links.reset_index(drop=True)
print(f"\nЗначимые причинные связи (alpha={alpha}):")
print(f" Всего: {len(df_links)}")
if len(df_links) > 0:
print("\nТоп-10 сильнейших связей:")
print(df_links.head(10).to_string(index=False))
return df_links
# ============================================================
# Часть 4: PCMCI+ для одновременных связей
# ============================================================
def run_pcmciplus_analysis(
dataframe: pp.DataFrame,
var_names: list[str],
tau_max: int = 5,
alpha_pc: float = 0.01,
) -> dict:
"""
Запуск PCMCI+ (с одновременными связями).
Parameters
----------
dataframe : pp.DataFrame
Данные
var_names : list[str]
Имена переменных
tau_max : int
Максимальный лаг
alpha_pc : float
Уровень значимости
Returns
-------
dict
Результаты
"""
ci_test = ParCorr(significance="analytic")
pcmci = PCMCI(
dataframe=dataframe,
cond_ind_test=ci_test,
verbosity=1,
)
results = pcmci.run_pcmciplus(
tau_max=tau_max,
pc_alpha=alpha_pc,
)
print("\nPCMCI+ результаты (с одновременными связями):")
# Одновременные связи (tau=0)
p_contemp = results["p_matrix"][:, :, 0]
val_contemp = results["val_matrix"][:, :, 0]
N = len(var_names)
contemp_links = []
for i in range(N):
for j in range(i + 1, N):
if p_contemp[i, j] < 0.05:
contemp_links.append({
"var1": var_names[i],
"var2": var_names[j],
"mci_value": round(val_contemp[i, j], 4),
"p_value": round(p_contemp[i, j], 6),
})
if contemp_links:
print(f" Одновременные связи: {len(contemp_links)}")
for link in contemp_links:
print(f" {link['var1']} <-> {link['var2']}: "
f"MCI={link['mci_value']}, p={link['p_value']}")
return {
"pcmci": pcmci,
"results": results,
"val_matrix": results["val_matrix"],
"p_matrix": results["p_matrix"],
"graph": results["graph"],
"var_names": var_names,
"tau_max": tau_max,
}
# ============================================================
# Часть 5: Основной скрипт
# ============================================================
if __name__ == "__main__":
# Создание демонстрационных данных
np.random.seed(42)
T = 1000
N = 5
# Генерация синтетических данных с известной причинной структурой
# X0 -> X1 (lag=1), X0 -> X2 (lag=2), X1 -> X3 (lag=1)
data = np.zeros((T, N))
for t in range(2, T):
data[t, 0] = 0.6 * data[t-1, 0] + np.random.normal(0, 0.5)
data[t, 1] = 0.4 * data[t-1, 0] + 0.3 * data[t-1, 1] + np.random.normal(0, 0.5)
data[t, 2] = 0.3 * data[t-2, 0] + 0.5 * data[t-1, 2] + np.random.normal(0, 0.5)
data[t, 3] = 0.35 * data[t-1, 1] + 0.4 * data[t-1, 3] + np.random.normal(0, 0.5)
data[t, 4] = 0.2 * data[t-1, 4] + np.random.normal(0, 0.5)
var_names = ["BTC", "ETH", "SOL", "AVAX", "BNB"]
dataframe = pp.DataFrame(data=data, var_names=var_names)
# Запуск PCMCI
print("=" * 60)
print("PCMCI АНАЛИЗ (ДЕМО)")
print("=" * 60)
results = run_pcmci_analysis(
dataframe=dataframe,
var_names=var_names,
tau_max=5,
alpha_pc=0.05,
alpha_mci=0.05,
cond_ind_test="parcorr",
)
# Извлечение связей
links = extract_causal_links(results, alpha=0.05)
# Визуализация
plot_causal_graph(results, output_file="demo_causal_graph.png")
plot_time_series_graph(results, output_file="demo_ts_graph.png")
# PCMCI+
print("\n" + "=" * 60)
print("PCMCI+ АНАЛИЗ (С ОДНОВРЕМЕННЫМИ СВЯЗЯМИ)")
print("=" * 60)
results_plus = run_pcmciplus_analysis(
dataframe=dataframe,
var_names=var_names,
tau_max=5,
alpha_pc=0.01,
)
print("\nАнализ завершён!")

Пример 03: Анализ причинных сетей

"""
Пример 03: Анализ причинных сетей — метрики и интерпретация
Вычисление сетевых характеристик причинного графа
"""
import numpy as np
import pandas as pd
from collections import defaultdict
# ============================================================
# Часть 1: Сетевые метрики причинного графа
# ============================================================
class CausalNetworkAnalyzer:
"""
Анализатор причинных сетей для финансовых данных.
Вычисляет сетевые характеристики причинного графа,
выявляет ключевые узлы и структурные паттерны.
"""
def __init__(
self,
val_matrix: np.ndarray,
p_matrix: np.ndarray,
var_names: list[str],
alpha: float = 0.05,
):
"""
Parameters
----------
val_matrix : np.ndarray
Матрица MCI-значений (N x N x (tau_max+1))
p_matrix : np.ndarray
Матрица p-значений
var_names : list[str]
Имена переменных
alpha : float
Уровень значимости
"""
self.val_matrix = val_matrix
self.p_matrix = p_matrix
self.var_names = var_names
self.alpha = alpha
self.N = len(var_names)
self.tau_max = val_matrix.shape[2] - 1
# Построение списка связей
self.links = self._extract_links()
def _extract_links(self) -> list[dict]:
"""Извлечение значимых связей."""
links = []
for i in range(self.N):
for j in range(self.N):
for tau in range(1, self.tau_max + 1):
if self.p_matrix[i, j, tau] < self.alpha:
links.append({
"source": i,
"target": j,
"source_name": self.var_names[i],
"target_name": self.var_names[j],
"lag": tau,
"mci": self.val_matrix[i, j, tau],
"p_value": self.p_matrix[i, j, tau],
})
return links
def compute_degree_centrality(self) -> pd.DataFrame:
"""
Вычисление степени центральности каждого узла.
out_degree: число исходящих причинных связей (причины)
in_degree: число входящих причинных связей (следствия)
"""
out_degree = defaultdict(int)
in_degree = defaultdict(int)
out_strength = defaultdict(float)
in_strength = defaultdict(float)
for link in self.links:
src = link["source_name"]
tgt = link["target_name"]
strength = abs(link["mci"])
out_degree[src] += 1
in_degree[tgt] += 1
out_strength[src] += strength
in_strength[tgt] += strength
records = []
for name in self.var_names:
records.append({
"variable": name,
"out_degree": out_degree.get(name, 0),
"in_degree": in_degree.get(name, 0),
"total_degree": out_degree.get(name, 0) + in_degree.get(name, 0),
"out_strength": round(out_strength.get(name, 0), 4),
"in_strength": round(in_strength.get(name, 0), 4),
"net_influence": round(
out_strength.get(name, 0) - in_strength.get(name, 0), 4
),
})
df = pd.DataFrame(records).sort_values("net_influence", ascending=False)
return df.reset_index(drop=True)
def identify_drivers_and_followers(self) -> dict:
"""
Классификация переменных на причинные драйверы и последователей.
Драйвер: высокий out_degree, низкий in_degree (причина)
Последователь: низкий out_degree, высокий in_degree (следствие)
Посредник: высокий и out, и in (передаёт влияние)
Изолированный: низкий и out, и in
"""
centrality = self.compute_degree_centrality()
median_out = centrality["out_degree"].median()
median_in = centrality["in_degree"].median()
roles = {}
for _, row in centrality.iterrows():
name = row["variable"]
high_out = row["out_degree"] > median_out
high_in = row["in_degree"] > median_in
if high_out and not high_in:
roles[name] = "driver"
elif not high_out and high_in:
roles[name] = "follower"
elif high_out and high_in:
roles[name] = "mediator"
else:
roles[name] = "isolated"
return roles
def compute_causal_lag_profile(self) -> pd.DataFrame:
"""
Профиль лагов для каждой пары переменных.
Определяет оптимальный лаг причинной связи.
"""
records = []
for i in range(self.N):
for j in range(self.N):
if i == j:
continue
# Найти лаг с минимальным p-значением
best_tau = None
best_p = 1.0
best_mci = 0.0
for tau in range(1, self.tau_max + 1):
p = self.p_matrix[i, j, tau]
if p < best_p:
best_p = p
best_tau = tau
best_mci = self.val_matrix[i, j, tau]
if best_p < self.alpha:
records.append({
"source": self.var_names[i],
"target": self.var_names[j],
"optimal_lag": best_tau,
"mci_at_optimal": round(best_mci, 4),
"p_value": round(best_p, 6),
})
df = pd.DataFrame(records)
if len(df) > 0:
df = df.sort_values("p_value").reset_index(drop=True)
return df
def compute_causal_clustering(self) -> dict:
"""
Кластеризация переменных на основе причинной структуры.
Переменные в одном кластере тесно причинно связаны.
"""
# Построение матрицы причинных расстояний
distance_matrix = np.ones((self.N, self.N))
for link in self.links:
i = link["source"]
j = link["target"]
strength = abs(link["mci"])
distance_matrix[i, j] = min(distance_matrix[i, j], 1.0 - strength)
distance_matrix[j, i] = min(distance_matrix[j, i], 1.0 - strength)
np.fill_diagonal(distance_matrix, 0)
# Простая иерархическая кластеризация
clusters = {i: [self.var_names[i]] for i in range(self.N)}
cluster_map = list(range(self.N))
threshold = 0.7 # расстояние для слияния
for i in range(self.N):
for j in range(i + 1, self.N):
if distance_matrix[i, j] < threshold:
# Слить кластеры
ci = cluster_map[i]
cj = cluster_map[j]
if ci != cj:
clusters[ci].extend(clusters[cj])
for k in range(self.N):
if cluster_map[k] == cj:
cluster_map[k] = ci
del clusters[cj]
# Формирование результата
result = {}
for idx, (cid, members) in enumerate(clusters.items()):
result[f"cluster_{idx}"] = list(set(members))
return result
def print_summary(self) -> None:
"""Вывод полного отчёта о причинной сети."""
print("=" * 60)
print("ОТЧЁТ О ПРИЧИННОЙ СЕТИ")
print("=" * 60)
# Общая статистика
print(f"\nОбщая статистика:")
print(f" Число переменных: {self.N}")
print(f" Число значимых связей: {len(self.links)}")
print(f" Максимальный лаг: {self.tau_max}")
print(f" Уровень значимости: {self.alpha}")
# Центральность
print(f"\nСтепень центральности:")
centrality = self.compute_degree_centrality()
print(centrality.to_string(index=False))
# Роли
print(f"\nРоли переменных:")
roles = self.identify_drivers_and_followers()
for name, role in sorted(roles.items()):
role_ru = {
"driver": "ДРАЙВЕР",
"follower": "ПОСЛЕДОВАТЕЛЬ",
"mediator": "ПОСРЕДНИК",
"isolated": "ИЗОЛИРОВАННЫЙ",
}[role]
print(f" {name}: {role_ru}")
# Лаг-профиль
print(f"\nОптимальные лаги причинных связей:")
lag_profile = self.compute_causal_lag_profile()
if len(lag_profile) > 0:
print(lag_profile.to_string(index=False))
else:
print(" Нет значимых связей")
# Кластеры
print(f"\nПричинные кластеры:")
clusters = self.compute_causal_clustering()
for name, members in clusters.items():
print(f" {name}: {members}")
# ============================================================
# Часть 2: Анализ стабильности причинного графа
# ============================================================
def bootstrap_pcmci_stability(
data: np.ndarray,
var_names: list[str],
n_bootstrap: int = 100,
tau_max: int = 5,
alpha: float = 0.05,
sample_fraction: float = 0.8,
) -> pd.DataFrame:
"""
Оценка стабильности причинных связей через бутстрэп.
Запускаем PCMCI на подвыборках данных и считаем,
как часто каждая связь обнаруживается.
Parameters
----------
data : np.ndarray
Данные (T x N)
var_names : list[str]
Имена переменных
n_bootstrap : int
Число бутстрэп-повторений
tau_max : int
Максимальный лаг
alpha : float
Уровень значимости
sample_fraction : float
Доля данных для подвыборки
Returns
-------
pd.DataFrame
Частота обнаружения каждой связи
"""
from tigramite import data_processing as pp
from tigramite.pcmci import PCMCI
from tigramite.independence_tests.parcorr import ParCorr
T, N = data.shape
sample_size = int(T * sample_fraction)
# Счётчик обнаружений
detection_count = np.zeros((N, N, tau_max + 1))
mci_sum = np.zeros((N, N, tau_max + 1))
for b in range(n_bootstrap):
if (b + 1) % 20 == 0:
print(f" Бутстрэп итерация {b+1}/{n_bootstrap}")
# Случайная подвыборка (блочный бутстрэп для временных рядов)
start_idx = np.random.randint(0, T - sample_size)
sample = data[start_idx:start_idx + sample_size]
df = pp.DataFrame(data=sample, var_names=var_names)
ci_test = ParCorr(significance="analytic")
pcmci = PCMCI(dataframe=df, cond_ind_test=ci_test, verbosity=0)
results = pcmci.run_pcmci(
tau_max=tau_max,
pc_alpha=alpha,
alpha_level=alpha,
)
significant = results["p_matrix"] < alpha
detection_count += significant.astype(int)
mci_sum += np.where(significant, results["val_matrix"], 0)
# Частота обнаружения
frequency = detection_count / n_bootstrap
avg_mci = np.where(detection_count > 0, mci_sum / detection_count, 0)
# Формирование таблицы
records = []
for i in range(N):
for j in range(N):
for tau in range(1, tau_max + 1):
if frequency[i, j, tau] > 0.1: # хотя бы 10% обнаружений
records.append({
"source": var_names[i],
"target": var_names[j],
"lag": tau,
"frequency": round(frequency[i, j, tau], 2),
"avg_mci": round(avg_mci[i, j, tau], 4),
"stable": frequency[i, j, tau] > 0.5,
})
df_stability = pd.DataFrame(records)
if len(df_stability) > 0:
df_stability = df_stability.sort_values("frequency", ascending=False)
df_stability = df_stability.reset_index(drop=True)
print(f"\nРезультаты бутстрэп-стабильности ({n_bootstrap} итераций):")
print(f" Стабильные связи (>50%): {df_stability['stable'].sum()}")
print(f" Нестабильные связи: {(~df_stability['stable']).sum()}")
return df_stability
# ============================================================
# Часть 3: Основной скрипт
# ============================================================
if __name__ == "__main__":
# Демонстрация на синтетических данных
np.random.seed(42)
T = 1000
N = 5
# Известная причинная структура
data = np.zeros((T, N))
for t in range(2, T):
data[t, 0] = 0.6 * data[t-1, 0] + np.random.normal(0, 0.5)
data[t, 1] = 0.4 * data[t-1, 0] + 0.3 * data[t-1, 1] + np.random.normal(0, 0.5)
data[t, 2] = 0.3 * data[t-2, 0] + 0.5 * data[t-1, 2] + np.random.normal(0, 0.5)
data[t, 3] = 0.35 * data[t-1, 1] + 0.4 * data[t-1, 3] + np.random.normal(0, 0.5)
data[t, 4] = 0.2 * data[t-1, 4] + np.random.normal(0, 0.5)
var_names = ["BTC", "ETH", "SOL", "AVAX", "BNB"]
# Создание синтетических результатов для демонстрации
from tigramite import data_processing as pp
from tigramite.pcmci import PCMCI
from tigramite.independence_tests.parcorr import ParCorr
dataframe = pp.DataFrame(data=data, var_names=var_names)
ci_test = ParCorr(significance="analytic")
pcmci = PCMCI(dataframe=dataframe, cond_ind_test=ci_test, verbosity=0)
results = pcmci.run_pcmci(tau_max=5, pc_alpha=0.05, alpha_level=0.05)
# Анализ сети
analyzer = CausalNetworkAnalyzer(
val_matrix=results["val_matrix"],
p_matrix=results["p_matrix"],
var_names=var_names,
alpha=0.05,
)
analyzer.print_summary()
# Бутстрэп-стабильность
print("\n" + "=" * 60)
print("БУТСТРЭП-АНАЛИЗ СТАБИЛЬНОСТИ")
print("=" * 60)
stability = bootstrap_pcmci_stability(
data=data,
var_names=var_names,
n_bootstrap=50,
tau_max=5,
alpha=0.05,
)
if len(stability) > 0:
print("\nТаблица стабильности:")
print(stability.to_string(index=False))
print("\nАнализ причинной сети завершён!")

Ключевые концепции анализа причинной сети:

Метрики причинного графа:
1. Out-degree (исходящая степень)
Сколько других переменных причинно зависят от данной.
Высокий out-degree = ДРАЙВЕР рынка.
Пример: BTC имеет высокий out-degree на крипторынке.
2. In-degree (входящая степень)
Сколько других переменных причинно влияют на данную.
Высокий in-degree = ПОСЛЕДОВАТЕЛЬ.
Пример: Альткоины обычно имеют высокий in-degree.
3. Net influence (чистое влияние)
= out_strength - in_strength
Положительный = чистый причинный источник
Отрицательный = чистый причинный приёмник
4. Бутстрэп-стабильность
Как часто связь обнаруживается на подвыборках.
> 80% = очень стабильная связь (можно торговать)
50-80% = умеренно стабильная
< 50% = нестабильная (осторожно!)

Пример 04: Торговая стратегия на основе причинности

"""
Пример 04: Торговая стратегия на основе причинных связей PCMCI
Генерация сигналов и управление позициями
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
# ============================================================
# Часть 1: Определение структур данных
# ============================================================
@dataclass
class CausalLink:
"""Причинная связь между двумя переменными."""
source: str
target: str
lag: int
mci_value: float
p_value: float
stability: float = 1.0 # бутстрэп-стабильность
@dataclass
class TradeSignal:
"""Торговый сигнал на основе причинной связи."""
timestamp: pd.Timestamp
asset: str
direction: int # +1 long, -1 short
strength: float # сила сигнала [0, 1]
source_asset: str # какой актив сгенерировал сигнал
lag: int # с каким лагом
reason: str # описание причины
@dataclass
class Position:
"""Текущая позиция."""
asset: str
direction: int
size: float
entry_price: float
entry_time: pd.Timestamp
stop_loss: float
take_profit: float
signal: TradeSignal
# ============================================================
# Часть 2: Генератор сигналов на основе PCMCI
# ============================================================
class CausalSignalGenerator:
"""
Генератор торговых сигналов на основе причинных связей.
Использует обнаруженные PCMCI связи для прогнозирования
движений целевых активов на основе движений причинных
драйверов.
"""
def __init__(
self,
causal_links: list[CausalLink],
min_mci: float = 0.10,
min_stability: float = 0.5,
lookback_window: int = 20,
z_score_threshold: float = 1.5,
):
"""
Parameters
----------
causal_links : list[CausalLink]
Список обнаруженных причинных связей
min_mci : float
Минимальная сила MCI для использования связи
min_stability : float
Минимальная бутстрэп-стабильность
lookback_window : int
Окно для вычисления z-score
z_score_threshold : float
Порог z-score для генерации сигнала
"""
self.min_mci = min_mci
self.min_stability = min_stability
self.lookback_window = lookback_window
self.z_score_threshold = z_score_threshold
# Фильтрация надёжных связей
self.active_links = [
link for link in causal_links
if abs(link.mci_value) >= min_mci
and link.stability >= min_stability
]
print(f"Активные причинные связи: {len(self.active_links)} "
f"из {len(causal_links)}")
# Группировка связей по целевому активу
self.links_by_target = {}
for link in self.active_links:
if link.target not in self.links_by_target:
self.links_by_target[link.target] = []
self.links_by_target[link.target].append(link)
def compute_z_score(
self,
returns: pd.Series,
current_idx: int,
) -> float:
"""Вычисление z-score текущей доходности."""
window = returns.iloc[
max(0, current_idx - self.lookback_window):current_idx
]
if len(window) < 5:
return 0.0
mean = window.mean()
std = window.std()
if std == 0:
return 0.0
return (returns.iloc[current_idx] - mean) / std
def generate_signals(
self,
returns: pd.DataFrame,
current_idx: int,
) -> list[TradeSignal]:
"""
Генерация сигналов для текущего момента.
Логика: если причинный драйвер показал сильное движение
(z-score превышает порог), ожидаем реакцию целевого актива
через соответствующий лаг.
Parameters
----------
returns : pd.DataFrame
Доходности всех активов
current_idx : int
Текущий индекс (строка в returns)
Returns
-------
list[TradeSignal]
Список сигналов
"""
signals = []
timestamp = returns.index[current_idx]
for target, links in self.links_by_target.items():
# Агрегируем сигналы от всех причинных драйверов
combined_signal = 0.0
signal_count = 0
reasons = []
for link in links:
# Проверяем движение драйвера link.lag периодов назад
driver_idx = current_idx - link.lag
if driver_idx < self.lookback_window:
continue
z_score = self.compute_z_score(
returns[link.source], driver_idx
)
if abs(z_score) > self.z_score_threshold:
# Знак MCI определяет направление:
# MCI > 0 и z > 0 => target пойдёт вверх
# MCI > 0 и z < 0 => target пойдёт вниз
# MCI < 0 и z > 0 => target пойдёт вниз
signal_direction = np.sign(z_score * link.mci_value)
signal_strength = (
abs(link.mci_value)
* link.stability
* min(abs(z_score) / self.z_score_threshold, 3.0) / 3.0
)
combined_signal += signal_direction * signal_strength
signal_count += 1
reasons.append(
f"{link.source}(lag={link.lag}, z={z_score:.2f}, "
f"MCI={link.mci_value:.3f})"
)
# Генерация сигнала если есть достаточно подтверждений
if signal_count > 0:
avg_signal = combined_signal / signal_count
if abs(avg_signal) > 0.1: # минимальный порог
signals.append(TradeSignal(
timestamp=timestamp,
asset=target,
direction=int(np.sign(avg_signal)),
strength=min(abs(avg_signal), 1.0),
source_asset=", ".join(
[l.source for l in links[:3]]
),
lag=links[0].lag,
reason="; ".join(reasons),
))
return signals
# ============================================================
# Часть 3: Управление позициями
# ============================================================
class CausalPositionManager:
"""
Менеджер позиций для причинной стратегии.
Управляет размером позиций, стоп-лоссами и тейк-профитами
на основе силы причинных сигналов.
"""
def __init__(
self,
initial_capital: float = 100000.0,
max_position_pct: float = 0.10, # максимум 10% на позицию
max_total_exposure: float = 0.50, # максимум 50% экспозиция
default_stop_loss_pct: float = 0.02, # 2% стоп-лосс
default_take_profit_pct: float = 0.04, # 4% тейк-профит
):
self.initial_capital = initial_capital
self.capital = initial_capital
self.max_position_pct = max_position_pct
self.max_total_exposure = max_total_exposure
self.default_stop_loss_pct = default_stop_loss_pct
self.default_take_profit_pct = default_take_profit_pct
self.positions: dict[str, Position] = {}
self.closed_trades: list[dict] = []
self.equity_curve: list[float] = [initial_capital]
def compute_position_size(
self,
signal: TradeSignal,
current_price: float,
) -> float:
"""
Размер позиции пропорционален силе причинного сигнала.
size = capital * max_position_pct * signal_strength
"""
# Текущая экспозиция
current_exposure = sum(
abs(pos.size * pos.entry_price)
for pos in self.positions.values()
) / self.capital
remaining_capacity = max(
0, self.max_total_exposure - current_exposure
)
# Размер позиции
position_value = (
self.capital
* self.max_position_pct
* signal.strength
)
position_value = min(
position_value,
self.capital * remaining_capacity
)
if position_value < 100: # минимальный размер
return 0.0
return position_value / current_price
def open_position(
self,
signal: TradeSignal,
current_price: float,
) -> Optional[Position]:
"""Открытие позиции на основе сигнала."""
# Проверка: нет ли уже позиции по этому активу
if signal.asset in self.positions:
return None
size = self.compute_position_size(signal, current_price)
if size == 0:
return None
# Адаптивные стоп-лосс и тейк-профит
# Сильнее сигнал -> шире стоп (больше уверенности)
sl_multiplier = 1.0 + signal.strength * 0.5
tp_multiplier = 1.0 + signal.strength * 1.0
if signal.direction == 1: # long
stop_loss = current_price * (
1 - self.default_stop_loss_pct * sl_multiplier
)
take_profit = current_price * (
1 + self.default_take_profit_pct * tp_multiplier
)
else: # short
stop_loss = current_price * (
1 + self.default_stop_loss_pct * sl_multiplier
)
take_profit = current_price * (
1 - self.default_take_profit_pct * tp_multiplier
)
position = Position(
asset=signal.asset,
direction=signal.direction,
size=size,
entry_price=current_price,
entry_time=signal.timestamp,
stop_loss=stop_loss,
take_profit=take_profit,
signal=signal,
)
self.positions[signal.asset] = position
return position
def check_exits(
self,
current_prices: dict[str, float],
current_time: pd.Timestamp,
) -> list[dict]:
"""Проверка условий выхода для всех позиций."""
exits = []
for asset, pos in list(self.positions.items()):
if asset not in current_prices:
continue
price = current_prices[asset]
# Проверка стоп-лосса
if pos.direction == 1: # long
if price <= pos.stop_loss:
exits.append(self._close_position(
asset, price, current_time, "stop_loss"
))
elif price >= pos.take_profit:
exits.append(self._close_position(
asset, price, current_time, "take_profit"
))
else: # short
if price >= pos.stop_loss:
exits.append(self._close_position(
asset, price, current_time, "stop_loss"
))
elif price <= pos.take_profit:
exits.append(self._close_position(
asset, price, current_time, "take_profit"
))
return exits
def _close_position(
self,
asset: str,
exit_price: float,
exit_time: pd.Timestamp,
exit_reason: str,
) -> dict:
"""Закрытие позиции."""
pos = self.positions.pop(asset)
pnl = pos.direction * (exit_price - pos.entry_price) * pos.size
pnl_pct = pos.direction * (exit_price / pos.entry_price - 1)
self.capital += pnl
trade = {
"asset": asset,
"direction": "LONG" if pos.direction == 1 else "SHORT",
"entry_time": pos.entry_time,
"exit_time": exit_time,
"entry_price": pos.entry_price,
"exit_price": exit_price,
"size": pos.size,
"pnl": round(pnl, 2),
"pnl_pct": round(pnl_pct * 100, 2),
"exit_reason": exit_reason,
"signal_strength": pos.signal.strength,
"causal_source": pos.signal.source_asset,
}
self.closed_trades.append(trade)
return trade
def update_equity(
self,
current_prices: dict[str, float],
) -> float:
"""Обновление кривой эквити."""
unrealized_pnl = sum(
pos.direction * (current_prices.get(pos.asset, pos.entry_price)
- pos.entry_price) * pos.size
for pos in self.positions.values()
)
equity = self.capital + unrealized_pnl
self.equity_curve.append(equity)
return equity
# ============================================================
# Часть 4: Полная торговая стратегия
# ============================================================
class PCMCICausalStrategy:
"""
Полная торговая стратегия на основе причинного анализа PCMCI.
Алгоритм:
1. Обучение: Запуск PCMCI на исторических данных
2. Фильтрация: Отбор стабильных причинных связей
3. Сигналы: Мониторинг причинных драйверов
4. Исполнение: Открытие/закрытие позиций
5. Адаптация: Переобучение PCMCI на скользящем окне
"""
def __init__(
self,
training_window: int = 252, # 1 год дневных данных
refit_frequency: int = 63, # переобучение каждый квартал
tau_max: int = 5,
alpha_pc: float = 0.05,
alpha_mci: float = 0.05,
min_mci: float = 0.10,
z_score_threshold: float = 1.5,
initial_capital: float = 100000.0,
):
self.training_window = training_window
self.refit_frequency = refit_frequency
self.tau_max = tau_max
self.alpha_pc = alpha_pc
self.alpha_mci = alpha_mci
self.min_mci = min_mci
self.z_score_threshold = z_score_threshold
self.initial_capital = initial_capital
self.signal_generator = None
self.position_manager = None
self.last_refit = 0
def fit_causal_model(
self,
returns: pd.DataFrame,
) -> list[CausalLink]:
"""Обучение PCMCI модели на данных."""
from tigramite import data_processing as pp
from tigramite.pcmci import PCMCI
from tigramite.independence_tests.parcorr import ParCorr
values = returns.values
var_names = list(returns.columns)
dataframe = pp.DataFrame(data=values, var_names=var_names)
ci_test = ParCorr(significance="analytic")
pcmci = PCMCI(dataframe=dataframe, cond_ind_test=ci_test, verbosity=0)
results = pcmci.run_pcmci(
tau_max=self.tau_max,
pc_alpha=self.alpha_pc,
alpha_level=self.alpha_mci,
)
# Извлечение связей
links = []
N = len(var_names)
for i in range(N):
for j in range(N):
for tau in range(1, self.tau_max + 1):
if results["p_matrix"][i, j, tau] < self.alpha_mci:
links.append(CausalLink(
source=var_names[i],
target=var_names[j],
lag=tau,
mci_value=results["val_matrix"][i, j, tau],
p_value=results["p_matrix"][i, j, tau],
))
return links
def run(
self,
prices: pd.DataFrame,
) -> dict:
"""
Запуск стратегии на исторических данных.
Parameters
----------
prices : pd.DataFrame
Цены закрытия (T x N)
Returns
-------
dict
Результаты бэктеста
"""
returns = np.log(prices / prices.shift(1)).dropna()
var_names = list(returns.columns)
self.position_manager = CausalPositionManager(
initial_capital=self.initial_capital,
)
print(f"Запуск причинной стратегии PCMCI")
print(f" Период: {returns.index[0]} -- {returns.index[-1]}")
print(f" Активы: {var_names}")
print(f" Обучающее окно: {self.training_window} дней")
all_signals = []
for idx in range(self.training_window, len(returns)):
current_time = returns.index[idx]
# Переобучение PCMCI если нужно
if idx - self.last_refit >= self.refit_frequency or self.signal_generator is None:
train_data = returns.iloc[idx - self.training_window:idx]
links = self.fit_causal_model(train_data)
self.signal_generator = CausalSignalGenerator(
causal_links=links,
min_mci=self.min_mci,
z_score_threshold=self.z_score_threshold,
)
self.last_refit = idx
if idx % 100 == 0:
print(f" [{current_time.date()}] "
f"Переобучение: {len(links)} связей, "
f"{len(self.signal_generator.active_links)} активных")
# Текущие цены
current_prices = {
col: prices[col].iloc[idx]
for col in prices.columns
}
# Проверка выходов
self.position_manager.check_exits(current_prices, current_time)
# Генерация сигналов
signals = self.signal_generator.generate_signals(returns, idx)
# Открытие позиций
for signal in signals:
if signal.asset in current_prices:
self.position_manager.open_position(
signal, current_prices[signal.asset]
)
all_signals.append(signal)
# Обновление эквити
self.position_manager.update_equity(current_prices)
# Закрытие всех открытых позиций
final_prices = {
col: prices[col].iloc[-1] for col in prices.columns
}
for asset in list(self.position_manager.positions.keys()):
self.position_manager._close_position(
asset,
final_prices[asset],
returns.index[-1],
"end_of_backtest"
)
return {
"equity_curve": self.position_manager.equity_curve,
"trades": self.position_manager.closed_trades,
"signals": all_signals,
"final_capital": self.position_manager.capital,
}
# ============================================================
# Часть 5: Основной скрипт
# ============================================================
if __name__ == "__main__":
# Генерация демонстрационных данных
np.random.seed(42)
T = 1000
dates = pd.bdate_range(start="2020-01-01", periods=T)
# Генерация цен с причинной структурой
returns = np.zeros((T, 5))
for t in range(2, T):
returns[t, 0] = 0.0005 + 0.05 * returns[t-1, 0] + np.random.normal(0, 0.02)
returns[t, 1] = 0.0003 + 0.3 * returns[t-1, 0] + 0.05 * returns[t-1, 1] + np.random.normal(0, 0.025)
returns[t, 2] = 0.0002 + 0.2 * returns[t-2, 0] + 0.04 * returns[t-1, 2] + np.random.normal(0, 0.03)
returns[t, 3] = 0.0001 + 0.25 * returns[t-1, 1] + 0.03 * returns[t-1, 3] + np.random.normal(0, 0.025)
returns[t, 4] = 0.0002 + 0.02 * returns[t-1, 4] + np.random.normal(0, 0.02)
prices_data = 100 * np.exp(np.cumsum(returns, axis=0))
var_names = ["BTC", "ETH", "SOL", "AVAX", "BNB"]
prices = pd.DataFrame(prices_data, index=dates, columns=var_names)
# Запуск стратегии
strategy = PCMCICausalStrategy(
training_window=252,
refit_frequency=63,
tau_max=5,
min_mci=0.08,
z_score_threshold=1.5,
initial_capital=100000.0,
)
results = strategy.run(prices)
# Отчёт
print("\n" + "=" * 60)
print("РЕЗУЛЬТАТЫ ПРИЧИННОЙ СТРАТЕГИИ PCMCI")
print("=" * 60)
trades_df = pd.DataFrame(results["trades"])
if len(trades_df) > 0:
print(f"\nСделки: {len(trades_df)}")
print(f"Прибыльные: {(trades_df['pnl'] > 0).sum()}")
print(f"Убыточные: {(trades_df['pnl'] < 0).sum()}")
print(f"Win rate: {(trades_df['pnl'] > 0).mean():.1%}")
print(f"Средний PnL: {trades_df['pnl'].mean():.2f}")
print(f"Общий PnL: {trades_df['pnl'].sum():.2f}")
print(f"Начальный капитал: {strategy.initial_capital:,.0f}")
print(f"Конечный капитал: {results['final_capital']:,.0f}")
print(f"Доходность: {(results['final_capital']/strategy.initial_capital - 1)*100:.1f}%")
print("\nСтратегия завершена!")

Пример 05: Бэктестинг причинной стратегии

"""
Пример 05: Бэктестинг и оценка причинной стратегии PCMCI
Включает метрики производительности и сравнение с бенчмарками
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
# ============================================================
# Часть 1: Метрики производительности
# ============================================================
@dataclass
class PerformanceMetrics:
"""Метрики производительности торговой стратегии."""
total_return: float
annualized_return: float
annualized_volatility: float
sharpe_ratio: float
sortino_ratio: float
max_drawdown: float
max_drawdown_duration: int # в днях
calmar_ratio: float
win_rate: float
profit_factor: float
avg_trade_pnl: float
avg_win: float
avg_loss: float
total_trades: int
avg_holding_period: float # в днях
def compute_performance_metrics(
equity_curve: list[float],
trades: list[dict],
risk_free_rate: float = 0.04, # 4% годовых
trading_days: int = 252,
) -> PerformanceMetrics:
"""
Вычисление полного набора метрик производительности.
Parameters
----------
equity_curve : list[float]
Кривая эквити
trades : list[dict]
Список сделок
risk_free_rate : float
Безрисковая ставка (годовая)
trading_days : int
Число торговых дней в году
Returns
-------
PerformanceMetrics
Метрики
"""
equity = np.array(equity_curve)
returns = np.diff(equity) / equity[:-1]
# Базовые метрики
total_return = (equity[-1] / equity[0]) - 1
n_years = len(returns) / trading_days
annualized_return = (1 + total_return) ** (1 / max(n_years, 0.01)) - 1
annualized_vol = returns.std() * np.sqrt(trading_days)
# Sharpe Ratio
daily_rf = (1 + risk_free_rate) ** (1/trading_days) - 1
excess_returns = returns - daily_rf
sharpe = (
excess_returns.mean() / excess_returns.std() * np.sqrt(trading_days)
if excess_returns.std() > 0 else 0
)
# Sortino Ratio (только отрицательная волатильность)
downside_returns = returns[returns < 0]
downside_vol = (
downside_returns.std() * np.sqrt(trading_days)
if len(downside_returns) > 0 else 0.001
)
sortino = (annualized_return - risk_free_rate) / downside_vol
# Maximum Drawdown
running_max = np.maximum.accumulate(equity)
drawdowns = (equity - running_max) / running_max
max_dd = drawdowns.min()
# Длительность максимальной просадки
dd_duration = 0
max_dd_duration = 0
for dd in drawdowns:
if dd < 0:
dd_duration += 1
max_dd_duration = max(max_dd_duration, dd_duration)
else:
dd_duration = 0
# Calmar Ratio
calmar = annualized_return / abs(max_dd) if max_dd != 0 else 0
# Торговые метрики
trades_df = pd.DataFrame(trades) if trades else pd.DataFrame()
if len(trades_df) > 0 and "pnl" in trades_df.columns:
wins = trades_df[trades_df["pnl"] > 0]
losses = trades_df[trades_df["pnl"] <= 0]
win_rate = len(wins) / len(trades_df) if len(trades_df) > 0 else 0
avg_trade = trades_df["pnl"].mean()
avg_win = wins["pnl"].mean() if len(wins) > 0 else 0
avg_loss = losses["pnl"].mean() if len(losses) > 0 else 0
gross_profit = wins["pnl"].sum() if len(wins) > 0 else 0
gross_loss = abs(losses["pnl"].sum()) if len(losses) > 0 else 0.001
profit_factor = gross_profit / gross_loss
# Средний период удержания
if "entry_time" in trades_df.columns and "exit_time" in trades_df.columns:
trades_df["holding_days"] = (
pd.to_datetime(trades_df["exit_time"])
- pd.to_datetime(trades_df["entry_time"])
).dt.days
avg_holding = trades_df["holding_days"].mean()
else:
avg_holding = 0
else:
win_rate = 0
avg_trade = 0
avg_win = 0
avg_loss = 0
profit_factor = 0
avg_holding = 0
return PerformanceMetrics(
total_return=round(total_return * 100, 2),
annualized_return=round(annualized_return * 100, 2),
annualized_volatility=round(annualized_vol * 100, 2),
sharpe_ratio=round(sharpe, 2),
sortino_ratio=round(sortino, 2),
max_drawdown=round(max_dd * 100, 2),
max_drawdown_duration=max_dd_duration,
calmar_ratio=round(calmar, 2),
win_rate=round(win_rate * 100, 1),
profit_factor=round(profit_factor, 2),
avg_trade_pnl=round(avg_trade, 2),
avg_win=round(avg_win, 2),
avg_loss=round(avg_loss, 2),
total_trades=len(trades_df),
avg_holding_period=round(avg_holding, 1),
)
def print_performance_report(
metrics: PerformanceMetrics,
strategy_name: str = "PCMCI Causal Strategy",
) -> None:
"""Вывод отчёта о производительности."""
print(f"\n{'=' * 60}")
print(f"ОТЧЁТ О ПРОИЗВОДИТЕЛЬНОСТИ: {strategy_name}")
print(f"{'=' * 60}")
print(f"\n--- Метрики доходности ---")
print(f" Общая доходность: {metrics.total_return:>8.2f}%")
print(f" Годовая доходность: {metrics.annualized_return:>8.2f}%")
print(f" Годовая волатильность: {metrics.annualized_volatility:>8.2f}%")
print(f"\n--- Метрики риска ---")
print(f" Sharpe Ratio: {metrics.sharpe_ratio:>8.2f}")
print(f" Sortino Ratio: {metrics.sortino_ratio:>8.2f}")
print(f" Calmar Ratio: {metrics.calmar_ratio:>8.2f}")
print(f" Максимальная просадка: {metrics.max_drawdown:>8.2f}%")
print(f" Длит. макс. просадки: {metrics.max_drawdown_duration:>5d} дней")
print(f"\n--- Торговые метрики ---")
print(f" Всего сделок: {metrics.total_trades:>8d}")
print(f" Win Rate: {metrics.win_rate:>8.1f}%")
print(f" Profit Factor: {metrics.profit_factor:>8.2f}")
print(f" Средний PnL: {metrics.avg_trade_pnl:>8.2f}")
print(f" Средний выигрыш: {metrics.avg_win:>8.2f}")
print(f" Средний проигрыш: {metrics.avg_loss:>8.2f}")
print(f" Ср. период удержания: {metrics.avg_holding_period:>8.1f} дн.")
# ============================================================
# Часть 2: Сравнение с бенчмарками
# ============================================================
def compare_with_benchmarks(
strategy_equity: list[float],
strategy_trades: list[dict],
prices: pd.DataFrame,
benchmark_col: str = "BTC",
) -> pd.DataFrame:
"""
Сравнение причинной стратегии с бенчмарками.
Бенчмарки:
1. Buy-and-Hold (основной актив)
2. Equal-Weight Buy-and-Hold
3. Momentum стратегия
Parameters
----------
strategy_equity : list[float]
Кривая эквити стратегии
strategy_trades : list[dict]
Сделки стратегии
prices : pd.DataFrame
Цены активов
benchmark_col : str
Актив для buy-and-hold бенчмарка
Returns
-------
pd.DataFrame
Сравнительная таблица
"""
initial_capital = strategy_equity[0]
T = len(prices)
# Бенчмарк 1: Buy-and-Hold одного актива
bh_returns = prices[benchmark_col] / prices[benchmark_col].iloc[0]
bh_equity = (initial_capital * bh_returns).tolist()
# Бенчмарк 2: Equal-Weight Portfolio
ew_returns = prices.div(prices.iloc[0])
ew_portfolio = ew_returns.mean(axis=1)
ew_equity = (initial_capital * ew_portfolio).tolist()
# Бенчмарк 3: Простая Momentum стратегия
momentum_window = 20
momentum_equity = [initial_capital]
capital = initial_capital
log_returns = np.log(prices / prices.shift(1)).dropna()
for idx in range(momentum_window, len(log_returns)):
# Momentum: покупаем актив с лучшей доходностью за окно
past_returns = log_returns.iloc[idx-momentum_window:idx].sum()
best_asset = past_returns.idxmax()
daily_return = log_returns[best_asset].iloc[idx]
capital *= np.exp(daily_return * 0.5) # 50% экспозиция
momentum_equity.append(capital)
# Вычисление метрик для каждой стратегии
results = []
strategies = {
"PCMCI Causal": (strategy_equity, strategy_trades),
f"Buy&Hold {benchmark_col}": (bh_equity, []),
"Equal-Weight": (ew_equity, []),
"Momentum": (momentum_equity, []),
}
for name, (equity, trades) in strategies.items():
if len(equity) > 1:
metrics = compute_performance_metrics(equity, trades)
results.append({
"Strategy": name,
"Return%": metrics.total_return,
"Ann.Return%": metrics.annualized_return,
"Volatility%": metrics.annualized_volatility,
"Sharpe": metrics.sharpe_ratio,
"Sortino": metrics.sortino_ratio,
"MaxDD%": metrics.max_drawdown,
"Calmar": metrics.calmar_ratio,
})
df = pd.DataFrame(results)
print("\n" + "=" * 80)
print("СРАВНЕНИЕ С БЕНЧМАРКАМИ")
print("=" * 80)
print(df.to_string(index=False))
return df
# ============================================================
# Часть 3: Анализ причинных сигналов
# ============================================================
def analyze_signal_quality(
trades: list[dict],
) -> None:
"""
Анализ качества причинных сигналов.
Оценивает, насколько сила причинного сигнала предсказывает успех сделки.
"""
if not trades:
print("Нет сделок для анализа.")
return
df = pd.DataFrame(trades)
if "signal_strength" not in df.columns:
print("Нет данных о силе сигнала.")
return
print("\n" + "=" * 60)
print("АНАЛИЗ КАЧЕСТВА ПРИЧИННЫХ СИГНАЛОВ")
print("=" * 60)
# Разбивка по силе сигнала
df["strength_bucket"] = pd.cut(
df["signal_strength"],
bins=[0, 0.25, 0.50, 0.75, 1.0],
labels=["Слабый(0-25%)", "Средний(25-50%)",
"Сильный(50-75%)", "Очень сильный(75-100%)"],
)
if "strength_bucket" in df.columns:
bucket_stats = df.groupby("strength_bucket", observed=True).agg({
"pnl": ["count", "mean", "sum"],
"pnl_pct": "mean",
}).round(2)
print("\nПроизводительность по силе сигнала:")
print(bucket_stats)
# Разбивка по причинному источнику
if "causal_source" in df.columns:
source_stats = df.groupby("causal_source").agg({
"pnl": ["count", "mean", "sum"],
}).round(2)
print("\nПроизводительность по причинному источнику:")
print(source_stats)
# Разбивка по направлению
if "direction" in df.columns:
dir_stats = df.groupby("direction").agg({
"pnl": ["count", "mean", "sum"],
}).round(2)
print("\nПроизводительность по направлению:")
print(dir_stats)
# Разбивка по причине выхода
if "exit_reason" in df.columns:
exit_stats = df.groupby("exit_reason").agg({
"pnl": ["count", "mean", "sum"],
}).round(2)
print("\nПроизводительность по причине выхода:")
print(exit_stats)
def plot_backtest_results(
equity_curve: list[float],
trades: list[dict],
output_file: str = "backtest_results.png",
) -> None:
"""
Визуализация результатов бэктеста.
"""
fig, axes = plt.subplots(3, 1, figsize=(14, 10),
gridspec_kw={'height_ratios': [3, 1, 1]})
# 1. Кривая эквити
ax1 = axes[0]
ax1.plot(equity_curve, label="PCMCI Strategy", linewidth=1.5)
ax1.axhline(y=equity_curve[0], color='gray', linestyle='--', alpha=0.5)
ax1.set_title("Кривая эквити причинной стратегии PCMCI")
ax1.set_ylabel("Капитал")
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. Просадки
ax2 = axes[1]
equity = np.array(equity_curve)
running_max = np.maximum.accumulate(equity)
drawdowns = (equity - running_max) / running_max * 100
ax2.fill_between(range(len(drawdowns)), drawdowns, 0,
color='red', alpha=0.3)
ax2.set_title("Просадка (%)")
ax2.set_ylabel("Просадка %")
ax2.grid(True, alpha=0.3)
# 3. PnL по сделкам
ax3 = axes[2]
if trades:
pnls = [t.get("pnl", 0) for t in trades]
colors = ['green' if p > 0 else 'red' for p in pnls]
ax3.bar(range(len(pnls)), pnls, color=colors, alpha=0.7)
ax3.axhline(y=0, color='black', linewidth=0.5)
ax3.set_title("PnL по сделкам")
ax3.set_ylabel("PnL")
ax3.set_xlabel("Номер сделки")
ax3.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_file, dpi=150, bbox_inches='tight')
plt.close()
print(f"\nГрафик сохранён: {output_file}")
# ============================================================
# Часть 4: Основной скрипт бэктеста
# ============================================================
if __name__ == "__main__":
# Импорт стратегии из примера 04
# (в реальном проекте используется импорт модуля)
np.random.seed(42)
T = 1000
dates = pd.bdate_range(start="2020-01-01", periods=T)
# Генерация цен
returns_data = np.zeros((T, 5))
for t in range(2, T):
returns_data[t, 0] = 0.0005 + 0.05 * returns_data[t-1, 0] + np.random.normal(0, 0.02)
returns_data[t, 1] = 0.0003 + 0.3 * returns_data[t-1, 0] + 0.05 * returns_data[t-1, 1] + np.random.normal(0, 0.025)
returns_data[t, 2] = 0.0002 + 0.2 * returns_data[t-2, 0] + 0.04 * returns_data[t-1, 2] + np.random.normal(0, 0.03)
returns_data[t, 3] = 0.0001 + 0.25 * returns_data[t-1, 1] + 0.03 * returns_data[t-1, 3] + np.random.normal(0, 0.025)
returns_data[t, 4] = 0.0002 + 0.02 * returns_data[t-1, 4] + np.random.normal(0, 0.02)
prices_data = 100 * np.exp(np.cumsum(returns_data, axis=0))
var_names = ["BTC", "ETH", "SOL", "AVAX", "BNB"]
prices = pd.DataFrame(prices_data, index=dates, columns=var_names)
# Создаём демо-результаты стратегии
# (в реальном проекте используется PCMCICausalStrategy.run())
initial_capital = 100000.0
equity = [initial_capital]
capital = initial_capital
log_returns = np.log(prices / prices.shift(1)).dropna()
for idx in range(1, len(log_returns)):
daily_ret = log_returns.iloc[idx].mean() * 0.3 # 30% экспозиция
capital *= np.exp(daily_ret)
equity.append(capital)
demo_trades = [
{"pnl": 150, "pnl_pct": 1.5, "direction": "LONG",
"exit_reason": "take_profit", "signal_strength": 0.8,
"causal_source": "BTC", "asset": "ETH",
"entry_time": "2020-06-01", "exit_time": "2020-06-10"},
{"pnl": -80, "pnl_pct": -0.8, "direction": "LONG",
"exit_reason": "stop_loss", "signal_strength": 0.3,
"causal_source": "BTC", "asset": "SOL",
"entry_time": "2020-07-01", "exit_time": "2020-07-05"},
{"pnl": 200, "pnl_pct": 2.0, "direction": "SHORT",
"exit_reason": "take_profit", "signal_strength": 0.9,
"causal_source": "ETH", "asset": "AVAX",
"entry_time": "2020-08-01", "exit_time": "2020-08-15"},
]
# Метрики производительности
metrics = compute_performance_metrics(equity, demo_trades)
print_performance_report(metrics)
# Сравнение с бенчмарками
compare_with_benchmarks(equity, demo_trades, prices, "BTC")
# Анализ сигналов
analyze_signal_quality(demo_trades)
# Визуализация
plot_backtest_results(equity, demo_trades)
print("\nБэктестинг завершён!")

Реализация на Rust

Структура проекта

Реализация PCMCI на Rust обеспечивает высокую производительность для работы с большими объёмами данных. Структура проекта:

97_pcmci_causal_discovery/
rust_pcmci/
Cargo.toml
src/
lib.rs # Основной модуль
data.rs # Структуры данных
partial_corr.rs # Частная корреляция
pc_algorithm.rs # Фаза 1: PC-алгоритм
mci_test.rs # Фаза 2: MCI-тестирование
pcmci.rs # Основной алгоритм PCMCI
network.rs # Анализ причинных сетей
strategy.rs # Торговая стратегия
main.rs # Точка входа

Основные модули

Cargo.toml
// ============================================================
// ============================================================
// [package]
// name = "pcmci-trading"
// version = "0.1.0"
// edition = "2021"
//
// [dependencies]
// ndarray = { version = "0.15", features = ["blas"] }
// ndarray-linalg = { version = "0.16", features = ["openblas-system"] }
// ndarray-stats = "0.5"
// statrs = "0.16"
// serde = { version = "1.0", features = ["derive"] }
// serde_json = "1.0"
// rayon = "1.8"
// anyhow = "1.0"
// ============================================================
// src/data.rs — Структуры данных
// ============================================================
use ndarray::{Array1, Array2, Array3};
use serde::{Deserialize, Serialize};
/// Результат теста условной независимости
#[derive(Debug, Clone, Serialize)]
pub struct CITestResult {
pub test_statistic: f64,
pub p_value: f64,
pub significant: bool,
}
/// Причинная связь
#[derive(Debug, Clone, Serialize)]
pub struct CausalLink {
pub source: usize,
pub target: usize,
pub lag: usize,
pub mci_value: f64,
pub p_value: f64,
pub source_name: String,
pub target_name: String,
}
/// Результаты PCMCI
#[derive(Debug, Clone)]
pub struct PCMCIResults {
pub val_matrix: Array3<f64>, // N x N x (tau_max+1)
pub p_matrix: Array3<f64>, // N x N x (tau_max+1)
pub parents: Vec<Vec<(usize, usize)>>, // для каждой переменной: [(var, lag)]
pub significant_links: Vec<CausalLink>,
}
/// Набор данных временных рядов
#[derive(Debug, Clone)]
pub struct TimeSeriesData {
pub data: Array2<f64>, // T x N
pub var_names: Vec<String>,
pub n_obs: usize, // T
pub n_vars: usize, // N
}
impl TimeSeriesData {
pub fn new(data: Array2<f64>, var_names: Vec<String>) -> Self {
let (n_obs, n_vars) = data.dim();
assert_eq!(n_vars, var_names.len(),
"Number of variable names must match data columns");
Self { data, var_names, n_obs, n_vars }
}
/// Стандартизация данных (mean=0, std=1)
pub fn standardize(&mut self) {
for j in 0..self.n_vars {
let col = self.data.column(j);
let mean = col.mean().unwrap_or(0.0);
let std = col.std(1.0);
if std > 1e-10 {
for i in 0..self.n_obs {
self.data[[i, j]] = (self.data[[i, j]] - mean) / std;
}
}
}
}
}
// ============================================================
// src/partial_corr.rs — Частная корреляция
// ============================================================
use ndarray::{Array1, Array2, s};
use statrs::distribution::{StudentsT, ContinuousCDF};
/// Вычисление частной корреляции через OLS-регрессию
pub fn partial_correlation(
x: &Array1<f64>,
y: &Array1<f64>,
z: &Array2<f64>, // условие (T x d)
) -> CITestResult {
let n = x.len() as f64;
let d = z.ncols() as f64;
if z.ncols() == 0 {
// Без условия — простая корреляция
let corr = pearson_correlation(x, y);
let t_stat = corr * ((n - 2.0) / (1.0 - corr * corr)).sqrt();
let df = n - 2.0;
let p_value = compute_p_value_t(t_stat, df);
return CITestResult {
test_statistic: corr,
p_value,
significant: false, // устанавливается позже
};
}
// OLS-регрессия: X на Z и Y на Z
let residual_x = ols_residuals(x, z);
let residual_y = ols_residuals(y, z);
// Корреляция остатков = частная корреляция
let parcorr = pearson_correlation(&residual_x, &residual_y);
// t-статистика
let df = n - d - 2.0;
if df <= 0.0 {
return CITestResult {
test_statistic: parcorr,
p_value: 1.0,
significant: false,
};
}
let t_stat = parcorr * (df / (1.0 - parcorr * parcorr)).sqrt();
let p_value = compute_p_value_t(t_stat, df);
CITestResult {
test_statistic: parcorr,
p_value,
significant: false,
}
}
/// Корреляция Пирсона
fn pearson_correlation(x: &Array1<f64>, y: &Array1<f64>) -> f64 {
let n = x.len() as f64;
let mean_x = x.mean().unwrap_or(0.0);
let mean_y = y.mean().unwrap_or(0.0);
let mut cov = 0.0;
let mut var_x = 0.0;
let mut var_y = 0.0;
for i in 0..x.len() {
let dx = x[i] - mean_x;
let dy = y[i] - mean_y;
cov += dx * dy;
var_x += dx * dx;
var_y += dy * dy;
}
let denom = (var_x * var_y).sqrt();
if denom < 1e-15 {
return 0.0;
}
cov / denom
}
/// OLS-остатки
fn ols_residuals(y: &Array1<f64>, x: &Array2<f64>) -> Array1<f64> {
// y = X * beta + residual
// beta = (X^T X)^{-1} X^T y
let xt = x.t();
let xtx = xt.dot(x);
let xty = xt.dot(y);
// Решение системы (простой метод для небольших размерностей)
let beta = solve_linear_system(&xtx, &xty);
let predicted = x.dot(&beta);
y - &predicted
}
/// Решение линейной системы Ax = b (LU-разложение)
fn solve_linear_system(a: &Array2<f64>, b: &Array1<f64>) -> Array1<f64> {
let n = a.nrows();
let mut aug = Array2::zeros((n, n + 1));
aug.slice_mut(s![.., ..n]).assign(a);
aug.column_mut(n).assign(b);
// Прямой ход (Гаусс)
for k in 0..n {
let pivot = aug[[k, k]];
if pivot.abs() < 1e-15 {
continue;
}
for i in (k + 1)..n {
let factor = aug[[i, k]] / pivot;
for j in k..(n + 1) {
aug[[i, j]] -= factor * aug[[k, j]];
}
}
}
// Обратная подстановка
let mut x = Array1::zeros(n);
for k in (0..n).rev() {
let mut sum = aug[[k, n]];
for j in (k + 1)..n {
sum -= aug[[k, j]] * x[j];
}
if aug[[k, k]].abs() > 1e-15 {
x[k] = sum / aug[[k, k]];
}
}
x
}
/// Вычисление p-значения из t-статистики
fn compute_p_value_t(t_stat: f64, df: f64) -> f64 {
if df <= 0.0 {
return 1.0;
}
let dist = StudentsT::new(0.0, 1.0, df).unwrap();
2.0 * (1.0 - dist.cdf(t_stat.abs()))
}
// ============================================================
// src/pcmci.rs — Основной алгоритм PCMCI
// ============================================================
use crate::data::*;
use crate::partial_corr::partial_correlation;
use ndarray::{Array1, Array2, Array3, s};
use rayon::prelude::*;
pub struct PCMCI {
data: TimeSeriesData,
tau_max: usize,
alpha_pc: f64,
alpha_mci: f64,
}
impl PCMCI {
pub fn new(
data: TimeSeriesData,
tau_max: usize,
alpha_pc: f64,
alpha_mci: f64,
) -> Self {
Self { data, tau_max, alpha_pc, alpha_mci }
}
/// Запуск полного алгоритма PCMCI
pub fn run(&self) -> PCMCIResults {
println!("=== PCMCI запущен ===");
println!(" Переменные: {}", self.data.n_vars);
println!(" Наблюдения: {}", self.data.n_obs);
println!(" tau_max: {}", self.tau_max);
// Фаза 1: PC-отбор предков
let parents = self.phase_pc();
println!(" Фаза 1 (PC) завершена");
// Фаза 2: MCI-тестирование
let (val_matrix, p_matrix) = self.phase_mci(&parents);
println!(" Фаза 2 (MCI) завершена");
// Извлечение значимых связей
let significant_links = self.extract_significant_links(
&val_matrix, &p_matrix
);
println!(" Значимых связей: {}", significant_links.len());
PCMCIResults {
val_matrix,
p_matrix,
parents,
significant_links,
}
}
/// Фаза 1: PC-алгоритм для отбора предков
fn phase_pc(&self) -> Vec<Vec<(usize, usize)>> {
let n = self.data.n_vars;
let mut parents: Vec<Vec<(usize, usize)>> = Vec::new();
for j in 0..n {
// Начальное множество: все (i, tau) для tau=1..tau_max
let mut candidates: Vec<(usize, usize)> = Vec::new();
for i in 0..n {
for tau in 1..=self.tau_max {
candidates.push((i, tau));
}
}
let mut p_order = 0;
loop {
if p_order >= candidates.len() {
break;
}
let mut to_remove = Vec::new();
for (idx, &(i, tau)) in candidates.iter().enumerate() {
// Выбор p условий с наибольшей зависимостью
let conditions = self.select_conditions(
j, i, tau, &candidates, p_order
);
// Тест условной независимости
let result = self.conditional_independence_test(
j, i, tau, &conditions
);
if result.p_value > self.alpha_pc {
to_remove.push(idx);
}
}
// Удаление незначимых (в обратном порядке)
for &idx in to_remove.iter().rev() {
candidates.remove(idx);
}
p_order += 1;
}
parents.push(candidates);
}
parents
}
/// Выбор условий для теста
fn select_conditions(
&self,
target: usize,
source: usize,
source_lag: usize,
candidates: &[(usize, usize)],
p_order: usize,
) -> Vec<(usize, usize)> {
// Выбираем p_order условий (без тестируемого)
let mut conditions: Vec<(usize, usize)> = candidates
.iter()
.filter(|&&(i, tau)| !(i == source && tau == source_lag))
.copied()
.collect();
conditions.truncate(p_order);
conditions
}
/// Тест условной независимости
fn conditional_independence_test(
&self,
target: usize,
source: usize,
lag: usize,
conditions: &[(usize, usize)],
) -> CITestResult {
let t_start = self.tau_max;
let t_end = self.data.n_obs;
let n_samples = t_end - t_start;
// Целевая переменная: X_t^target
let y: Array1<f64> = (t_start..t_end)
.map(|t| self.data.data[[t, target]])
.collect();
// Источник: X_{t-lag}^source
let x: Array1<f64> = (t_start..t_end)
.map(|t| self.data.data[[t - lag, source]])
.collect();
// Условие: матрица (n_samples x n_conditions)
let n_cond = conditions.len();
let mut z = Array2::zeros((n_samples, n_cond));
for (c_idx, &(c_var, c_lag)) in conditions.iter().enumerate() {
for (s_idx, t) in (t_start..t_end).enumerate() {
if t >= c_lag {
z[[s_idx, c_idx]] = self.data.data[[t - c_lag, c_var]];
}
}
}
partial_correlation(&x, &y, &z)
}
/// Фаза 2: MCI-тестирование
fn phase_mci(
&self,
parents: &[Vec<(usize, usize)>],
) -> (Array3<f64>, Array3<f64>) {
let n = self.data.n_vars;
let mut val_matrix = Array3::zeros((n, n, self.tau_max + 1));
let mut p_matrix = Array3::ones((n, n, self.tau_max + 1));
for j in 0..n {
for i in 0..n {
for tau in 1..=self.tau_max {
// MCI-условие: P(X_t^j) \ {X_{t-tau}^i} UNION P(X_{t-tau}^i)
let mut mci_conditions: Vec<(usize, usize)> = Vec::new();
// Предки целевой (без тестируемого)
for &(pi, ptau) in &parents[j] {
if !(pi == i && ptau == tau) {
mci_conditions.push((pi, ptau));
}
}
// Предки источника (со сдвигом)
for &(pi, ptau) in &parents[i] {
let shifted_lag = tau + ptau;
if shifted_lag <= self.tau_max {
mci_conditions.push((pi, shifted_lag));
}
}
// Удаление дубликатов
mci_conditions.sort();
mci_conditions.dedup();
let result = self.conditional_independence_test(
j, i, tau, &mci_conditions
);
val_matrix[[i, j, tau]] = result.test_statistic;
p_matrix[[i, j, tau]] = result.p_value;
}
}
}
(val_matrix, p_matrix)
}
/// Извлечение значимых связей
fn extract_significant_links(
&self,
val_matrix: &Array3<f64>,
p_matrix: &Array3<f64>,
) -> Vec<CausalLink> {
let n = self.data.n_vars;
let mut links = Vec::new();
for i in 0..n {
for j in 0..n {
for tau in 1..=self.tau_max {
if p_matrix[[i, j, tau]] < self.alpha_mci {
links.push(CausalLink {
source: i,
target: j,
lag: tau,
mci_value: val_matrix[[i, j, tau]],
p_value: p_matrix[[i, j, tau]],
source_name: self.data.var_names[i].clone(),
target_name: self.data.var_names[j].clone(),
});
}
}
}
}
// Сортировка по абсолютному MCI
links.sort_by(|a, b| {
b.mci_value.abs()
.partial_cmp(&a.mci_value.abs())
.unwrap_or(std::cmp::Ordering::Equal)
});
links
}
}
// ============================================================
// src/strategy.rs — Торговая стратегия на Rust
// ============================================================
use crate::data::CausalLink;
/// Торговый сигнал
#[derive(Debug, Clone)]
pub struct TradeSignal {
pub timestamp: usize,
pub asset: String,
pub direction: i32, // +1 long, -1 short
pub strength: f64,
pub source: String,
}
/// Генератор сигналов на основе причинных связей
pub struct CausalSignalGenerator {
links: Vec<CausalLink>,
min_mci: f64,
z_threshold: f64,
lookback: usize,
}
impl CausalSignalGenerator {
pub fn new(
links: Vec<CausalLink>,
min_mci: f64,
z_threshold: f64,
lookback: usize,
) -> Self {
let filtered: Vec<CausalLink> = links
.into_iter()
.filter(|l| l.mci_value.abs() >= min_mci)
.collect();
println!("Активных связей: {}", filtered.len());
Self {
links: filtered,
min_mci,
z_threshold,
lookback,
}
}
/// Генерация сигналов для текущего момента
pub fn generate_signals(
&self,
returns: &ndarray::Array2<f64>,
var_names: &[String],
current_idx: usize,
) -> Vec<TradeSignal> {
let mut signals = Vec::new();
for link in &self.links {
let driver_idx = if current_idx >= link.lag {
current_idx - link.lag
} else {
continue;
};
if driver_idx < self.lookback {
continue;
}
// z-score драйвера
let window: Vec<f64> = (driver_idx - self.lookback..driver_idx)
.map(|t| returns[[t, link.source]])
.collect();
let mean: f64 = window.iter().sum::<f64>() / window.len() as f64;
let std: f64 = (window.iter()
.map(|&x| (x - mean).powi(2))
.sum::<f64>() / window.len() as f64)
.sqrt();
if std < 1e-10 {
continue;
}
let z_score = (returns[[driver_idx, link.source]] - mean) / std;
if z_score.abs() > self.z_threshold {
let direction = (z_score * link.mci_value).signum() as i32;
let strength = (link.mci_value.abs()
* (z_score.abs() / self.z_threshold).min(3.0) / 3.0)
.min(1.0);
signals.push(TradeSignal {
timestamp: current_idx,
asset: link.target_name.clone(),
direction,
strength,
source: link.source_name.clone(),
});
}
}
signals
}
}
// ============================================================
// src/main.rs — Точка входа
// ============================================================
// mod data;
// mod partial_corr;
// mod pcmci;
// mod network;
// mod strategy;
fn main() {
println!("PCMCI Causal Discovery for Trading (Rust)");
println!("==========================================");
// Генерация синтетических данных
let t = 1000usize;
let n = 5usize;
let mut data = ndarray::Array2::zeros((t, n));
// Seed для воспроизводимости
let mut rng_state: u64 = 42;
let next_normal = |state: &mut u64| -> f64 {
// Простой генератор (Box-Muller)
*state = state.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
let u1 = (*state as f64) / (u64::MAX as f64);
*state = state.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
let u2 = (*state as f64) / (u64::MAX as f64);
(-2.0 * u1.max(1e-15).ln()).sqrt()
* (2.0 * std::f64::consts::PI * u2).cos()
};
for t_idx in 2..t {
data[[t_idx, 0]] = 0.6 * data[[t_idx-1, 0]]
+ next_normal(&mut rng_state) * 0.5;
data[[t_idx, 1]] = 0.4 * data[[t_idx-1, 0]]
+ 0.3 * data[[t_idx-1, 1]]
+ next_normal(&mut rng_state) * 0.5;
data[[t_idx, 2]] = 0.3 * data[[t_idx-2, 0]]
+ 0.5 * data[[t_idx-1, 2]]
+ next_normal(&mut rng_state) * 0.5;
data[[t_idx, 3]] = 0.35 * data[[t_idx-1, 1]]
+ 0.4 * data[[t_idx-1, 3]]
+ next_normal(&mut rng_state) * 0.5;
data[[t_idx, 4]] = 0.2 * data[[t_idx-1, 4]]
+ next_normal(&mut rng_state) * 0.5;
}
let var_names = vec![
"BTC".to_string(),
"ETH".to_string(),
"SOL".to_string(),
"AVAX".to_string(),
"BNB".to_string(),
];
let ts_data = TimeSeriesData::new(data, var_names);
// Запуск PCMCI
let pcmci = PCMCI::new(ts_data, 5, 0.05, 0.05);
let results = pcmci.run();
// Вывод результатов
println!("\nЗначимые причинные связи:");
for link in &results.significant_links {
println!(" {} -> {} (lag={}, MCI={:.4}, p={:.6})",
link.source_name, link.target_name,
link.lag, link.mci_value, link.p_value);
}
}

Объяснение реализации на Rust:

Реализация на Rust обеспечивает несколько преимуществ:

  • Производительность: Вычисление частных корреляций и решение линейных систем работает без накладных расходов интерпретатора
  • Параллелизм: Библиотека rayon позволяет параллелизовать тесты условной независимости
  • Безопасность типов: Ошибки в размерностях матриц обнаруживаются на этапе компиляции
  • Потребление памяти: Более предсказуемое и контролируемое по сравнению с Python

Реализация на Python

Модульная архитектура

"""
Модульная реализация PCMCI для финансового анализа.
Файл: pcmci_trading.py
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class CITestType(Enum):
"""Типы тестов условной независимости."""
PARCORR = "parcorr"
ROBUST_PARCORR = "robust_parcorr"
CMIKNN = "cmiknn"
GPDC = "gpdc"
@dataclass
class PCMCIConfig:
"""Конфигурация PCMCI анализа."""
tau_max: int = 5
alpha_pc: float = 0.05
alpha_mci: float = 0.05
ci_test: CITestType = CITestType.PARCORR
n_bootstrap: int = 0
fdr_correction: bool = True
verbosity: int = 1
@dataclass
class StrategyConfig:
"""Конфигурация торговой стратегии."""
training_window: int = 252
refit_frequency: int = 63
min_mci: float = 0.10
min_stability: float = 0.5
z_score_threshold: float = 1.5
max_position_pct: float = 0.10
max_total_exposure: float = 0.50
stop_loss_pct: float = 0.02
take_profit_pct: float = 0.04
initial_capital: float = 100000.0
class PCMCITradingPipeline:
"""
Полный пайплайн причинного анализа для трейдинга.
Объединяет:
1. Загрузку и подготовку данных
2. PCMCI причинный анализ
3. Построение и анализ причинных сетей
4. Генерацию торговых сигналов
5. Бэктестинг
"""
def __init__(
self,
pcmci_config: PCMCIConfig = PCMCIConfig(),
strategy_config: StrategyConfig = StrategyConfig(),
):
self.pcmci_config = pcmci_config
self.strategy_config = strategy_config
self.causal_graph = None
self.signal_generator = None
def load_data(
self,
source: str = "yahoo",
tickers: Optional[list[str]] = None,
**kwargs,
) -> pd.DataFrame:
"""Загрузка данных из указанного источника."""
if source == "yahoo":
import yfinance as yf
data = yf.download(tickers, **kwargs)
return data["Close"]
elif source == "bybit":
return self._load_bybit(tickers, **kwargs)
elif source == "csv":
return pd.read_csv(kwargs["filepath"], index_col=0, parse_dates=True)
else:
raise ValueError(f"Неизвестный источник: {source}")
def _load_bybit(self, symbols, **kwargs):
"""Загрузка данных с Bybit."""
import requests
all_data = {}
for symbol in symbols:
params = {
"category": "spot",
"symbol": symbol,
"interval": kwargs.get("interval", "D"),
"limit": kwargs.get("limit", 1000),
}
resp = requests.get(
"https://api.bybit.com/v5/market/kline",
params=params, timeout=10
)
data = resp.json()
if data["retCode"] == 0:
records = data["result"]["list"]
df = pd.DataFrame(records)
df.columns = ["ts", "o", "h", "l", "c", "v", "turnover"]
df["ts"] = pd.to_datetime(df["ts"].astype(int), unit="ms")
df = df.set_index("ts").sort_index()
all_data[symbol.replace("USDT", "")] = df["c"].astype(float)
return pd.DataFrame(all_data).dropna()
def preprocess(
self,
prices: pd.DataFrame,
return_type: str = "log",
) -> pd.DataFrame:
"""Предобработка: вычисление доходностей, стандартизация."""
if return_type == "log":
returns = np.log(prices / prices.shift(1)).dropna()
else:
returns = prices.pct_change().dropna()
# Стандартизация
returns = (returns - returns.mean()) / returns.std()
return returns
def fit_causal_model(
self,
returns: pd.DataFrame,
) -> dict:
"""Обучение PCMCI модели."""
from tigramite import data_processing as pp
from tigramite.pcmci import PCMCI
from tigramite.independence_tests.parcorr import ParCorr
from tigramite.independence_tests.robust_parcorr import RobustParCorr
values = returns.values
var_names = list(returns.columns)
dataframe = pp.DataFrame(data=values, var_names=var_names)
# Выбор теста
if self.pcmci_config.ci_test == CITestType.PARCORR:
ci_test = ParCorr(significance="analytic")
elif self.pcmci_config.ci_test == CITestType.ROBUST_PARCORR:
ci_test = RobustParCorr(significance="analytic")
else:
ci_test = ParCorr(significance="analytic")
pcmci = PCMCI(
dataframe=dataframe,
cond_ind_test=ci_test,
verbosity=self.pcmci_config.verbosity,
)
results = pcmci.run_pcmci(
tau_max=self.pcmci_config.tau_max,
pc_alpha=self.pcmci_config.alpha_pc,
alpha_level=self.pcmci_config.alpha_mci,
)
self.causal_graph = {
"pcmci": pcmci,
"results": results,
"var_names": var_names,
}
return results
def run_backtest(
self,
prices: pd.DataFrame,
) -> dict:
"""Полный бэктест стратегии."""
# Здесь вызывается PCMCICausalStrategy из примера 04
# (см. пример 04 для полной реализации)
returns = self.preprocess(prices)
results = self.fit_causal_model(returns)
return {
"causal_results": results,
"var_names": list(prices.columns),
}
# ============================================================
# Удобные функции-обёртки
# ============================================================
def quick_pcmci_analysis(
data: pd.DataFrame,
tau_max: int = 5,
alpha: float = 0.05,
) -> pd.DataFrame:
"""
Быстрый запуск PCMCI анализа.
Parameters
----------
data : pd.DataFrame
Данные (доходности, стационарные)
tau_max : int
Максимальный лаг
alpha : float
Уровень значимости
Returns
-------
pd.DataFrame
Таблица причинных связей
"""
from tigramite import data_processing as pp
from tigramite.pcmci import PCMCI
from tigramite.independence_tests.parcorr import ParCorr
values = data.values
var_names = list(data.columns)
dataframe = pp.DataFrame(data=values, var_names=var_names)
ci_test = ParCorr(significance="analytic")
pcmci = PCMCI(dataframe=dataframe, cond_ind_test=ci_test, verbosity=0)
results = pcmci.run_pcmci(
tau_max=tau_max,
pc_alpha=alpha,
alpha_level=alpha,
)
# Извлечение связей
N = len(var_names)
links = []
for i in range(N):
for j in range(N):
for tau in range(1, tau_max + 1):
if results["p_matrix"][i, j, tau] < alpha:
links.append({
"source": var_names[i],
"target": var_names[j],
"lag": tau,
"mci": round(results["val_matrix"][i, j, tau], 4),
"p_value": round(results["p_matrix"][i, j, tau], 6),
})
return pd.DataFrame(links).sort_values("p_value")

Основные классы

Модульная архитектура позволяет гибко комбинировать компоненты:

Архитектура модулей:
PCMCITradingPipeline
|
+-- DataLoader (загрузка: Yahoo, Bybit, CSV)
|
+-- Preprocessor (доходности, стандартизация, ADF-тест)
|
+-- CausalAnalyzer (PCMCI, PCMCI+, LPCMCI)
| |
| +-- ParCorr / RobustParCorr / CMIknn
|
+-- NetworkAnalyzer (центральность, кластеры, стабильность)
|
+-- SignalGenerator (z-score, причинные сигналы)
|
+-- PositionManager (размер, стопы, тейки)
|
+-- Backtester (метрики, сравнение, отчёты)

Сравнение с другими методами

Ниже представлена сравнительная таблица PCMCI с другими популярными методами причинного анализа временных рядов:

+=====================+==========+==========+===========+============+
| Характеристика | PCMCI | Granger | VAR | Transfer |
| | | Causality| | Entropy |
+=====================+==========+==========+===========+============+
| Тип связей | Прямые | Попарные | Линейные | Попарные |
| | причинн. | | многомерн.| нелинейные |
+---------------------+----------+----------+-----------+------------+
| Контроль | Да | Нет* | Частично | Нет* |
| конфаундеров | (полный) | | (VAR) | |
+---------------------+----------+----------+-----------+------------+
| Нелинейность | Да | Нет | Нет | Да |
| | (CMIknn) | | | |
+---------------------+----------+----------+-----------+------------+
| Масштабируемость | Высокая | Средняя | Низкая | Средняя |
| (N переменных) | (>100) | (<50) | (<20) | (<50) |
+---------------------+----------+----------+-----------+------------+
| Контроль FDR | Да | Нет** | Нет | Нет** |
+---------------------+----------+----------+-----------+------------+
| Одновременные | Да | Нет | Нет | Нет |
| связи | (PCMCI+) | | | |
+---------------------+----------+----------+-----------+------------+
| Латентные | Да | Нет | Нет | Нет |
| конфаундеры | (LPCMCI) | | | |
+---------------------+----------+----------+-----------+------------+
| Направленность | Да | Да | Частично | Да |
+---------------------+----------+----------+-----------+------------+
| Автокорреляция | Корректн.| Проблемы | Включена | Проблемы |
| | через MCI| | в модель | |
+---------------------+----------+----------+-----------+------------+
| Вычислительная | O(N^2*p) | O(N^2) | O(N^2*p) | O(N^2*T) |
| сложность | | | | |
+---------------------+----------+----------+-----------+------------+
| Мин. T для | 100-200 | 50-100 | 100-500 | 500-1000 |
| надёжных результатов| | | | |
+---------------------+----------+----------+-----------+------------+
| Интерпретируемость | Высокая | Высокая | Средняя | Средняя |
| (граф) | (DAG) | (пары) | (матрица) | (пары) |
+---------------------+----------+----------+-----------+------------+
| Реализация |tigramite | statsmod.| statsmod. | pyinform, |
| (Python) | | | | IDTxl |
+---------------------+----------+----------+-----------+------------+
* Можно расширить условным вариантом, но это не стандартная реализация
** Можно добавить коррекцию вручную (Bonferroni, FDR)

Детальное сравнение

PCMCI vs Granger Causality:

Пример: 3 переменные, X->Y->Z
Granger Causality (попарные тесты):
X -> Y? Да (p < 0.01) -- ВЕРНО
X -> Z? Да (p < 0.05) -- ЛОЖНОЕ (опосредованное через Y)
Y -> Z? Да (p < 0.01) -- ВЕРНО
PCMCI:
X -> Y (lag=1)? Да, MCI=0.35 -- ВЕРНО
X -> Z (lag=2)? Нет, p=0.42 -- ВЕРНО (нет прямой связи)
Y -> Z (lag=1)? Да, MCI=0.30 -- ВЕРНО
PCMCI корректно различает прямые и опосредованные связи!

PCMCI vs Transfer Entropy:

Преимущества Transfer Entropy:
+ Обнаруживает нелинейные зависимости
+ Теоретически обоснован (теория информации)
Недостатки Transfer Entropy vs PCMCI:
- Попарный тест (не контролирует конфаундеры)
- Требует много данных для оценки энтропии
- Нет контроля FDR
- Чувствителен к параметрам (число бинов, kNN)
PCMCI с CMIknn тестом:
+ Нелинейные зависимости (как Transfer Entropy)
+ Контроль конфаундеров (через условную независимость)
+ Контроль FDR
+ Лучшая масштабируемость

Когда использовать какой метод?

Дерево выбора метода:
Число переменных N?
|
+-- N <= 5:
| Нужна нелинейность?
| +-- Нет --> Granger Causality (просто и быстро)
| +-- Да --> Transfer Entropy
|
+-- 5 < N <= 20:
| Достаточно данных (T > 500)?
| +-- Нет --> PCMCI (ParCorr)
| +-- Да --> PCMCI (CMIknn) или VAR + Granger
|
+-- N > 20:
| --> PCMCI (единственный масштабируемый вариант)
| ParCorr для быстрого анализа
| RobustParCorr для тяжёлых хвостов
|
Есть ненаблюдаемые конфаундеры?
+-- Да --> LPCMCI
+-- Нет --> PCMCI или PCMCI+

Лучшие практики

1. Подготовка данных

Чеклист подготовки данных для PCMCI:
[x] Стационарность
- Использовать доходности (log-returns), не цены
- Проверить ADF-тестом
- Если нестационарно -> дифференцировать
[x] Пропущенные значения
- PCMCI не работает с NaN
- Удалить или интерполировать
- Предпочтительно: forward-fill для финансовых данных
[x] Выбросы
- Winsorize на уровне 3-5 sigma
- Или использовать RobustParCorr
[x] Стандартизация
- z-score (mean=0, std=1)
- Или ранговое преобразование для RobustParCorr
[x] Достаточный объём
- Минимум T > 5 * N * tau_max для ParCorr
- Минимум T > 20 * N * tau_max для CMIknn
[x] Временное выравнивание
- Все ряды должны быть на одной частоте
- Учёт часовых поясов для международных данных

2. Выбор параметров

Рекомендации по параметрам PCMCI:
tau_max (максимальный лаг):
+-------------------+-------------------+
| Данные | Рекомендация |
+-------------------+-------------------+
| Тиковые | tau_max = 10-50 |
| Минутные | tau_max = 5-20 |
| Часовые | tau_max = 5-24 |
| Дневные | tau_max = 1-10 |
| Недельные | tau_max = 1-4 |
+-------------------+-------------------+
alpha_PC (уровень для фазы PC):
- Стандарт: 0.05
- Консервативный: 0.01 (меньше ложных связей)
- Либеральный: 0.10 (больше кандидатов, менее эффективно)
- Рекомендация для финансов: 0.05
alpha_MCI (уровень для фазы MCI):
- Стандарт: 0.05
- С FDR-коррекцией: 0.05 (автоматически корректируется)
- Без коррекции: 0.01 (для контроля ложных открытий)
Тест условной независимости:
- Финансовые доходности -> RobustParCorr
- Дневные данные, T > 500 -> ParCorr (быстрее)
- Нелинейные зависимости -> CMIknn (T > 1000)

3. Интерпретация результатов

Как интерпретировать результаты PCMCI:
1. MCI-значение:
|MCI| > 0.3 --> Сильная причинная связь
0.15 < |MCI| < 0.3 --> Умеренная связь
0.05 < |MCI| < 0.15 --> Слабая связь
|MCI| < 0.05 --> Пренебрежимо малая
2. Направление:
MCI > 0: положительная причинная связь
(рост X_{t-tau} причинно ведёт к росту Y_t)
MCI < 0: отрицательная причинная связь
(рост X_{t-tau} причинно ведёт к падению Y_t)
3. Лаг:
tau = 1: немедленная передача (следующий период)
tau > 1: задержанная передача
Множественные лаги: сложная динамика передачи
4. Автопричинность (i == j):
MCI(X_{t-1} -> X_t) > 0: положительная автокорреляция
Сильная автопричинность: ряд имеет память
5. Стабильность (бутстрэп):
> 80%: торгуемая связь
50-80%: осторожно, может быть нестационарной
< 50%: не рекомендуется для торговли

4. Управление рисками

Риск-менеджмент в причинной стратегии:
1. Диверсификация сигналов:
- Не полагаться на одну причинную связь
- Агрегировать сигналы от нескольких драйверов
- Взвешивать по стабильности и силе MCI
2. Адаптивное переобучение:
- Причинная структура меняется со временем
- Переобучать каждые 1-3 месяца
- Использовать скользящее окно (rolling window)
3. Защита от переобучения:
- Бутстрэп-валидация стабильности связей
- Out-of-sample тестирование
- Не использовать связи с p > 0.01
4. Размер позиций:
- Пропорционально силе сигнала и стабильности
- Уменьшать при смене рыночного режима
- Максимум 10% капитала на одну позицию
5. Мониторинг:
- Отслеживать изменения причинного графа
- Если ключевая связь исчезла — закрыть позиции
- Сравнивать текущий граф с ожидаемым

5. Распространённые ошибки

Ошибки при использовании PCMCI в трейдинге:
ОШИБКА 1: Применение к ценам вместо доходностей
Ценовые ряды нестационарны -> ложные причинные связи
РЕШЕНИЕ: Всегда работать с логарифмическими доходностями
ОШИБКА 2: Слишком большой tau_max
tau_max=20 для дневных данных -> множество ложных связей
РЕШЕНИЕ: tau_max <= 5 для дневных, <= 10 для часовых
ОШИБКА 3: Слишком мало данных
T=100, N=20, tau_max=5 -> переобучение
Минимум: T > 5 * N * tau_max
РЕШЕНИЕ: Уменьшить N или tau_max, или собрать больше данных
ОШИБКА 4: Игнорирование нестационарности причинных связей
Причинные связи меняются в разных режимах
РЕШЕНИЕ: Режимно-зависимый анализ + периодическое переобучение
ОШИБКА 5: Торговля на нестабильных связях
Связь обнаружена на одной выборке, но нестабильна
РЕШЕНИЕ: Бутстрэп-валидация, торговать только стабильные (>80%)
ОШИБКА 6: Смешение статистической и экономической значимости
p < 0.001, но MCI = 0.05 -> статистически значимо,
но экономически незначимо (слишком слабая связь)
РЕШЕНИЕ: Фильтровать по |MCI| > 0.10 для торговли

Ресурсы

Основные статьи

  1. Runge, J. (2019). “Detecting and Quantifying Causal Associations in Large Nonlinear Time Series Datasets.” Science Advances, 5(11), eaau4996.

    • Ключевая статья, вводящая алгоритм PCMCI
    • DOI: 10.1126/sciadv.aau4996
    • Описывает двухфазный подход и MCI-тест
  2. Runge, J. (2020). “Discovering Contemporaneous and Lagged Causal Relations in Autocorrelated Nonlinear Time Series Datasets.” Proceedings of the 36th Conference on Uncertainty in Artificial Intelligence (UAI).

    • Расширение PCMCI+ для одновременных связей
    • Ориентация рёбер через правила Марковской эквивалентности
  3. Runge, J., Nowack, P., Kretschmer, M., Flaxman, S., & Sejdinovic, D. (2019). “Detecting and Quantifying Causal Associations in Large Nonlinear Time Series Datasets.” Science Advances.

    • Полное математическое обоснование
    • Сравнение с Granger causality и Transfer Entropy
  4. Gerhardus, A. & Runge, J. (2020). “High-recall Causal Discovery for Autocorrelated Time Series with Latent Confounders.” NeurIPS 2020.

    • LPCMCI — работа с латентными конфаундерами
    • Критично для финансовых данных, где не все факторы наблюдаемы

Библиотеки и инструменты

  1. Tigramite — Python-библиотека для причинного анализа временных рядов

  2. CausalNex — библиотека для причинного вывода (QuantumBlack / McKinsey)

  3. DoWhy — Microsoft Research библиотека для причинного вывода

Книги

  1. Pearl, J. (2009). “Causality: Models, Reasoning, and Inference.” Cambridge University Press.

    • Фундаментальная работа по причинному выводу
    • do-исчисление, SCM (Structural Causal Models)
  2. Peters, J., Janzing, D., & Scholkopf, B. (2017). “Elements of Causal Inference: Foundations and Learning Algorithms.” MIT Press.

    • Современное введение в причинное обнаружение
    • Алгоритмы PC, FCI и их расширения
  3. Spirtes, P., Glymour, C., & Scheines, R. (2000). “Causation, Prediction, and Search.” MIT Press.

    • Оригинальная работа по алгоритму PC
    • Фундамент для PCMCI

Финансовые приложения

  1. Kleinberg, S. (2013). “Causality, Probability, and Time.” Cambridge University Press.

    • Причинность во временных рядах
    • Применения к биомедицинским и финансовым данным
  2. Shojaie, A. & Fox, E.B. (2022). “Granger Causality: A Review and Recent Advances.” Annual Review of Statistics and Its Application.

    • Обзор методов причинности для временных рядов
    • Сравнение с современными подходами

Онлайн-ресурсы

  1. Tigramite Tutorials — официальные jupyter-ноутбуки с примерами

  2. Causal Discovery Toolbox (CDT) — библиотека алгоритмов причинного обнаружения

Данные

  1. Bybit API — данные криптовалютного рынка

  2. Yahoo Finance (yfinance) — данные фондового рынка