Глава 97: PCMCI — Обнаружение причинных связей для трейдинга
Обзор
Алгоритм PCMCI (Peter and Clark Momentary Conditional Independence) представляет собой современный двухфазный метод обнаружения причинно-следственных связей в многомерных временных рядах. Разработанный Якобом Рунге (Jakob Runge) в 2019 году, PCMCI специально адаптирован для работы с автокоррелированными данными, что делает его исключительно подходящим для анализа финансовых рынков. В отличие от традиционных корреляционных подходов, которые лишь фиксируют статистические ассоциации, PCMCI выявляет направленные причинные связи между переменными, что позволяет строить более робастные торговые стратегии.
В финансовом контексте PCMCI позволяет:
- Построение причинных графов: Определение того, какие активы действительно влияют на другие, а не просто коррелируют
- Выявление lead-lag структур: Обнаружение опережающих и запаздывающих связей между инструментами
- Режимно-зависимый анализ: Как причинные связи изменяются в различных рыночных режимах
- Оптимизация портфеля: Использование причинной структуры для построения более устойчивых портфелей
- Генерация альфа-сигналов: Создание торговых сигналов на основе причинных, а не корреляционных зависимостей
Ключевое преимущество PCMCI перед классическими методами (Granger causality, VAR) заключается в контроле ложных открытий (false discovery rate) и корректной работе с высокоразмерными данными при наличии сильной автокорреляции — типичной характеристики финансовых временных рядов.
Содержание
- Введение в причинное обнаружение
- Обзор алгоритма PCMCI
- Математические основы
- Финансовые приложения
- Практические примеры на Python
- Реализация на Rust
- Реализация на Python
- Сравнение с другими методами
- Лучшие практики
- Ресурсы
Введение в причинное обнаружение
Почему корреляция не равна причинности
Одна из самых фундаментальных проблем количественного трейдинга — это смешение корреляции и причинности. Когда два актива движутся вместе, это может быть вызвано:
- Прямой причинной связью: Актив A действительно влияет на актив B (например, цена нефти влияет на акции нефтяных компаний)
- Общим конфаундером: Оба актива подвержены влиянию третьей переменной (например, макроэкономических показателей)
- Случайной корреляцией: Статистическая ассоциация без реального механизма
- Обратной причинностью: Кажущаяся связь идёт в противоположном направлении
Пример ложной корреляции в финансах:
Корреляция: BTC <---> ETH = 0.85
Но реальная причинная структура может быть:
Макро-риск аппетит / \ v v BTC ETH
Или: BTC ---> ETH (BTC причинно влияет на ETH)
Или: BTC <--- ETH (ETH влияет на BTC)
Корреляция не может различить эти сценарии!В трейдинге это различие критически важно: стратегия, основанная на ложной корреляции, разрушится при изменении скрытого конфаундера. Стратегия, основанная на истинной причинной связи, более устойчива к режимным сдвигам.
Проблемы классических методов
Причинность по Грейнджеру (Granger Causality)
Тест Грейнджера — самый популярный метод проверки причинности во временных рядах. Его идея: переменная X «причинно влияет по Грейнджеру» на Y, если прошлые значения X помогают предсказать Y сверх того, что дают прошлые значения Y.
Granger Causality Test: H0: X не Granger-причина Y H1: X является Granger-причиной Y
Модель: Y_t = a0 + a1*Y_{t-1} + ... + ap*Y_{t-p} + b1*X_{t-1} + ... + bp*X_{t-p} + e_t
F-тест: проверка b1 = b2 = ... = bp = 0Проблемы Granger causality:
- Попарный тест: Не учитывает третьи переменные (конфаундеры)
- Линейность: Предполагает линейную зависимость
- Чувствительность к лагу: Результат сильно зависит от выбора порядка лага p
- Проблема множественных сравнений: При тестировании многих пар без коррекции
- Автокорреляция: Может давать ложные результаты при сильной автокорреляции
Модели VAR (Vector Autoregression)
VAR(p) модель для N переменных: X_t = c + A1*X_{t-1} + A2*X_{t-2} + ... + Ap*X_{t-p} + e_t
где X_t — вектор N переменных Ai — матрицы коэффициентов NxN
Число параметров: N + p * N^2
Для N=50 активов, p=5 лагов: 50 + 5 * 2500 = 12550 параметров!Проблемы VAR:
- Проклятие размерности: Количество параметров растёт квадратично
- Переобучение: Особенно на коротких финансовых данных
- Нет причинной интерпретации: Коэффициенты VAR не имеют прямой причинной интерпретации
- Мультиколлинеарность: Коррелированные предикторы искажают оценки
Что такое PCMCI
PCMCI — это двухфазный алгоритм обнаружения причинных связей, разработанный специально для многомерных временных рядов. Название расшифровывается как:
- PC (Peter and Clark) — первая фаза, использующая модифицированный алгоритм PC для отбора потенциальных причинных предков
- MCI (Momentary Conditional Independence) — вторая фаза, тестирующая моментальную условную независимость
Высокоуровневая схема PCMCI:
Входные данные Фаза 1 (PC) Фаза 2 (MCI) +-----------+ +-------------+ +--------------+ | Многомер. | Итеративный | Отобранные | Тест | Причинные | | временной | --> удаление --> | родители | --> MCI --> | связи с | | ряд | условий | P(X_t^j) | | p-значениями | +-----------+ +-------------+ +--------------+
N переменных Уменьшает число Контролирует T наблюдений ложных кандидатов false positives tau_max лаговКлючевые преимущества PCMCI:
- Двухфазный подход: Сначала отсеивает слабые связи, затем точно тестирует оставшиеся
- Контроль конфаундеров: Условная независимость учитывает все другие переменные
- Масштабируемость: Работает с десятками и сотнями переменных
- Контроль ложных открытий: Встроенная коррекция множественных сравнений
- Гибкость тестов: Поддержка линейных (ParCorr) и нелинейных (CMI, GPDC) тестов
Исторический контекст и мотивация
Причинное обнаружение (causal discovery) имеет долгую историю в статистике и эпидемиологии. В хронологическом порядке:
Хронология развития причинного вывода:
1956 Reichenbach — Принцип общей причины | 1980 Granger — Granger Causality (Нобелевская премия 2003) | 1988 Pearl, Verma — Bayesian Networks | 1991 Spirtes, Glymour, Scheines — Алгоритм PC | 2000 Pearl — "Causality" (do-исчисление) | 2000 Schreiber — Transfer Entropy | 2012 Runge et al. — Причинный анализ климатических данных | 2019 Runge — PCMCI (Science Advances) <--- ключевая работа | 2020 Runge — PCMCI+ (одновременные связи) | 2020 Runge — LPCMCI (латентные конфаундеры) | 2023+ Применение в финансах и трейдингеРабота Рунге была мотивирована проблемами климатологии — выявление причинных связей между климатическими переменными (температура, давление, осадки и т.д.) в присутствии сильных автокорреляций и нелинейных зависимостей. Эти же проблемы свойственны финансовым данным, что делает PCMCI естественным инструментом для трейдинга.
Ключевая публикация: “Detecting and Quantifying Causal Associations in Large Nonlinear Time Series Datasets”, Jakob Runge, Science Advances, 2019.
Обзор алгоритма PCMCI
Фаза 1: Отбор условий (алгоритм PC)
Первая фаза PCMCI адаптирует классический алгоритм PC (named after its creators Peter Spirtes and Clark Glymour) для работы с временными рядами. Цель этой фазы — для каждой переменной X_t^j определить множество её потенциальных причинных предков (parents).
Алгоритм начинает с полного набора возможных предков и итеративно удаляет те, которые оказываются условно независимыми от целевой переменной при условии уже отобранных предков.
Алгоритм PC (Фаза 1) для переменной X_t^j:
Вход: Данные X, максимальный лаг tau_max, уровень значимости alpha_PC
Инициализация: P_0(X_t^j) = { X_{t-tau}^i : для всех i, tau = 1,...,tau_max } (все возможные предки)
Итерация p = 0, 1, 2, ...: Для каждого X_{t-tau}^i в P_p(X_t^j): Выбрать p условий S из P_p(X_t^j) \ {X_{t-tau}^i} с наибольшей зависимостью от X_t^j
Тест: X_t^j _||_ X_{t-tau}^i | S (условная независимость)
Если p-value > alpha_PC: Удалить X_{t-tau}^i из P_{p+1}(X_t^j)
Стоп: если p >= |P_p(X_t^j)| - 1 (все подмножества проверены)
Выход: P(X_t^j) — множество отобранных предковВизуальный пример работы фазы PC для трёх переменных (X, Y, Z) с максимальным лагом 2:
Начальное состояние (все возможные предки X_t):
X_{t-1} --?--> X_t X_{t-2} --?--> X_t Y_{t-1} --?--> X_t Y_{t-2} --?--> X_t Z_{t-1} --?--> X_t Z_{t-2} --?--> X_t
Итерация p=0 (безусловные тесты): Тест: X_t _||_ Z_{t-2}? p-value = 0.45 > 0.05 --> УДАЛИТЬ Тест: X_t _||_ Y_{t-2}? p-value = 0.32 > 0.05 --> УДАЛИТЬ (остальные значимы, оставить)
Итерация p=1 (условные тесты с 1 условием): Тест: X_t _||_ X_{t-2} | X_{t-1}? p-value = 0.08 > 0.05 --> УДАЛИТЬ (остальные значимы)
Результат P(X_t) = { X_{t-1}, Y_{t-1}, Z_{t-1} }Ключевое свойство фазы PC: она эффективно уменьшает число кандидатов без полного перебора всех подмножеств, что обеспечивает масштабируемость.
Фаза 2: Тест моментальной условной независимости (MCI)
После того как для каждой переменной определены потенциальные предки, вторая фаза выполняет тест моментальной условной независимости (Momentary Conditional Independence, MCI) для количественной оценки силы каждой связи.
MCI-тест для проверки связи X_{t-tau}^i -> X_t^j:
MCI-статистика:
X_t^j _||_ X_{t-tau}^i | P(X_t^j) \ {X_{t-tau}^i}, P(X_{t-tau}^i)
Условие включает: 1. P(X_t^j) \ {X_{t-tau}^i} — предки X_t^j (без тестируемого) 2. P(X_{t-tau}^i) — предки самого X_{t-tau}^i
Это "моментальная" условная независимость: - Контролирует автокорреляцию через P(X_{t-tau}^i) - Контролирует конфаундеры через P(X_t^j)Критическое отличие MCI от обычного условного теста:
Проблема автокорреляции:
Обычный тест: X_t^j _||_ X_{t-1}^i | P(X_t^j)
Проблема: Если X^i сильно автокоррелирован: X_{t-2}^i --> X_{t-1}^i --> X_t^i (автокорреляция) X_{t-2}^i --> X_t^j (истинная причина)
Тогда X_{t-1}^i будет ложно значимым, потому что коррелирован с X_{t-2}^i через автокорреляцию.
MCI-тест: X_t^j _||_ X_{t-1}^i | P(X_t^j), P(X_{t-1}^i)
Решение: Условливаясь на P(X_{t-1}^i), мы "убираем" автокорреляцию из X_{t-1}^i и тестируем только моментальную (одномоментную) связь.Это ключевая инновация PCMCI — именно дополнительное условие на предков источника (P(X_{t-tau}^i)) позволяет корректно работать с автокоррелированными финансовыми данными.
Двухфазная архитектура
Полная архитектура PCMCI объединяет обе фазы:
+============================================================+| АЛГОРИТМ PCMCI |+============================================================+| || ВХОД: Данные X[T x N], tau_max, alpha_PC, alpha_MCI || || +----- ФАЗА 1: PC-стабильный отбор предков -----+ || | | || | Для j = 1, ..., N: | || | P(X_t^j) = все возможные предки | || | Повторять: | || | Для каждого кандидата в P(X_t^j): | || | Тест условной независимости | || | Если незначим -> удалить | || | Увеличить порядок условия | || | Пока не проверены все подмножества | || | | || | Результат: P(X_t^j) для всех j | || +-------------------------------------------------+ || | || v || +----- ФАЗА 2: MCI-тестирование ------------------+ || | | || | Для каждой пары (i,j) и лага tau: | || | S = P(X_t^j) \ {X_{t-tau}^i} | || | UNION P(X_{t-tau}^i) | || | | || | MCI-val = I(X_t^j ; X_{t-tau}^i | S) | || | p-value = тест значимости | || | | || | Коррекция множественных сравнений | || | (Bonferroni или FDR) | || | | || | Результат: причинный граф G с p-значениями | || +---------------------------------------------------+ || || ВЫХОД: G — направленный граф причинных связей || с весами (сила связи) и p-значениями |+============================================================+Расширения: PCMCI+ и LPCMCI
PCMCI+ (Runge, 2020) расширяет стандартный PCMCI для обнаружения одновременных (contemporaneous) причинных связей:
Стандартный PCMCI: X_{t-tau}^i --> X_t^j (только лагированные, tau >= 1)
PCMCI+: X_{t-tau}^i --> X_t^j (лагированные, tau >= 1) X_t^i --- X_t^j (одновременные, tau = 0) X_t^i --> X_t^j (одновременные направленные)Это важно для финансовых данных на дневных частотах, где многие связи проявляются «одновременно» (в рамках одного торгового дня).
LPCMCI (Latent PCMCI) работает в условиях наличия ненаблюдаемых конфаундеров:
Стандартный PCMCI: предполагает каузальную достаточность (все конфаундеры наблюдаемы)
LPCMCI: допускает латентные (скрытые) конфаундеры
X_t^i --> X_t^j (прямая причинная связь) X_t^i <-> X_t^j (связь через латентный конфаундер) X_t^i o--> X_t^j (неопределённый тип связи)Для финансовых данных LPCMCI особенно актуален, так как мы никогда не наблюдаем все релевантные переменные (настроения участников, инсайдерская информация, геополитические факторы и т.д.).
Математические основы
Условная независимость
Центральное понятие в причинном обнаружении — условная независимость. Две переменные X и Y условно независимы при заданном множестве Z, если:
Определение условной независимости:
X _||_ Y | Z <=> P(X, Y | Z) = P(X | Z) * P(Y | Z)
Эквивалентно: X _||_ Y | Z <=> P(X | Y, Z) = P(X | Z)
То есть: знание Y не даёт дополнительной информации о X, если Z уже известно.В контексте PCMCI условная независимость означает, что связь между X_{t-tau}^i и X_t^j является опосредованной (через другие переменные), а не прямой.
Пример с финансовыми данными:
Пример: Причинная цепочка
Ставка ФРС (t-2) --> Курс доллара (t-1) --> Цена золота (t)
Безусловно: Ставка ФРС _||_ Цена золота? Нет! Они зависимы (корреляция существует).
Условно: Ставка ФРС _||_ Цена золота | Курс доллара? Да! При фиксированном курсе доллара, ставка ФРС не даёт дополнительной информации о цене золота.
Вывод: Ставка ФРС -> Доллар -> Золото (цепочка) Нет прямой связи: Ставка ФРС -/-> ЗолотоЧастная корреляция
Для линейных зависимостей условная независимость проверяется через частную корреляцию (partial correlation). Частная корреляция между X и Y при условии Z — это корреляция остатков после регрессии X и Y на Z.
Частная корреляция ParCorr(X, Y | Z):
Шаг 1: Регрессия X на Z: X = beta_XZ * Z + residual_X Шаг 2: Регрессия Y на Z: Y = beta_YZ * Z + residual_Y Шаг 3: ParCorr = Corr(residual_X, residual_Y)
Свойства: - ParCorr = 0 <=> X _||_ Y | Z (для гауссовских данных) - |ParCorr| in [0, 1] - Знак указывает направление связи
Тест значимости: H0: ParCorr = 0 (условная независимость) H1: ParCorr != 0
Статистика: t = ParCorr * sqrt((T - dim_Z - 2) / (1 - ParCorr^2)) Распределение: t(T - dim_Z - 2) (t-распределение Стьюдента)Пример вычисления:
Пример: ParCorr для трёх акций
Данные (доходности): AAPL = [0.01, -0.02, 0.03, 0.01, -0.01, 0.02, ...] MSFT = [0.02, -0.01, 0.02, 0.02, -0.02, 0.01, ...] SPY = [0.01, -0.01, 0.02, 0.01, -0.01, 0.01, ...]
Простая корреляция: Corr(AAPL, MSFT) = 0.75
Частная корреляция (контроль SPY): ParCorr(AAPL, MSFT | SPY) = 0.30
Интерпретация: Большая часть совместного движения AAPL и MSFT объясняется общим рыночным фактором (SPY). Прямая связь между AAPL и MSFT значительно слабее.Условная взаимная информация (CMI)
Для нелинейных зависимостей вместо частной корреляции используется условная взаимная информация (Conditional Mutual Information, CMI):
Условная взаимная информация:
I(X; Y | Z) = integral p(x,y,z) * log(p(x,y|z) / (p(x|z)*p(y|z))) dx dy dz
Свойства: - I(X; Y | Z) >= 0 - I(X; Y | Z) = 0 <=> X _||_ Y | Z - Обнаруживает нелинейные зависимости - Не требует предположений о распределении
Проблемы: - Оценка плотностей в высокоразмерных пространствах - Нужен большой объём данных - Вычислительная сложностьНа практике CMI оценивается с помощью k-nearest neighbor (kNN) оценки:
kNN-оценка CMI (Kraskov et al., 2004):
I_knn(X; Y | Z) = psi(k) - <psi(n_xz + 1) + psi(n_yz + 1) - psi(n_z + 1)>
где: psi() — функция дигамма k — число ближайших соседей n_xz, n_yz, n_z — число точек в гиперпрямоугольниках <.> — среднее по всем точкам данныхТестирование значимости
PCMCI поддерживает несколько методов тестирования значимости:
Методы тестирования в PCMCI:
1. ParCorr (Partial Correlation) - Для линейных зависимостей - Быстрый, аналитическое распределение - t-тест с (T - d - 2) степенями свободы - Рекомендуется для финансовых данных при T > 200
2. GPDC (Gaussian Process Distance Correlation) - Для нелинейных зависимостей - Основан на distance correlation с GP-регрессией - Пермутационный тест для p-значения - Хорошо для умеренной нелинейности
3. CMIknn (CMI kNN-estimator) - Для произвольных нелинейных зависимостей - Наиболее общий метод - Требует большого T (>500) - Вычислительно затратный
4. RobustParCorr - Устойчивая версия ParCorr - Использует ранговые преобразования - Для данных с тяжёлыми хвостами (финансовые доходности)Выбор метода зависит от характера данных:
Дерево выбора метода тестирования:
Данные линейны? | +-- Да --> Нормальное распределение остатков? | | | +-- Да --> ParCorr | | | +-- Нет --> RobustParCorr | +-- Нет --> Достаточно данных (T > 500)? | +-- Да --> CMIknn | +-- Нет --> GPDCКонтроль множественных сравнений
При тестировании множества связей одновременно необходима коррекция на множественные сравнения. PCMCI поддерживает:
Методы коррекции множественных сравнений:
1. Bonferroni: alpha_corrected = alpha / m где m — число тестов Консервативный, контролирует FWER
2. Benjamini-Hochberg (FDR): Упорядочить p-значения: p(1) <= p(2) <= ... <= p(m) Отвергнуть H0 для i: p(i) <= (i/m) * alpha Менее консервативный, контролирует FDR
Для финансовых данных: - Bonferroni: когда ложные сигналы очень дороги - FDR: когда хотим обнаружить больше связей (исследование)
Пример: N=10 активов, tau_max=5 лагов Число тестов: m = N * N * tau_max = 10 * 10 * 5 = 500
alpha = 0.05 Bonferroni: alpha_corrected = 0.05 / 500 = 0.0001 FDR: зависит от распределения p-значенийФинансовые приложения
Причинные сети между активами
PCMCI позволяет построить направленный граф причинных связей между финансовыми инструментами. Это принципиально отличается от корреляционной матрицы:
Корреляционная матрица (симметричная, ненаправленная):
BTC ETH SOL AVAX BNB BTC 1.00 0.85 0.78 0.72 0.68 ETH 0.85 1.00 0.82 0.76 0.71 SOL 0.78 0.82 1.00 0.80 0.65 AVAX 0.72 0.76 0.80 1.00 0.63 BNB 0.68 0.71 0.65 0.63 1.00
Причинный граф PCMCI (асимметричный, направленный):
BTC =====> ETH (lag=1, MCI=0.35, p<0.001) BTC =====> SOL (lag=1, MCI=0.28, p<0.001) ETH ----> SOL (lag=1, MCI=0.15, p=0.003) ETH ----> AVAX (lag=1, MCI=0.12, p=0.008) BTC ----> BNB (lag=2, MCI=0.10, p=0.015) SOL ----> AVAX (lag=1, MCI=0.09, p=0.022)
Интерпретация: - BTC — главный причинный драйвер (наибольшее число исходящих связей) - ETH — посредник между BTC и альткоинами - SOL и AVAX — в основном следуют за BTC и ETH - Прямая связь BTC->BNB слабее и с бОльшим лагомВизуализация причинного графа в виде ASCII:
Причинный граф криптовалютного рынка
lag=1, 0.35 BTC ==================> ETH | | | lag=1, 0.28 | lag=1, 0.15 | | v lag=1, 0.09 v SOL ................... AVAX | | lag=2, 0.10 v BNB
Толщина стрелки ~ сила MCI-связи ==== сильная связь (MCI > 0.25) ---- средняя связь (0.10 < MCI < 0.25) .... слабая связь (MCI < 0.10)Lead-lag структуры
Одно из важнейших приложений PCMCI в трейдинге — обнаружение lead-lag структур. Если актив A причинно предшествует активу B с определённым лагом, это создаёт торговую возможность:
Lead-lag обнаружение с PCMCI:
Результат PCMCI анализа (дневные данные):
S&P 500 (t-1) --> DAX (t) lag=1 day, MCI=0.22 DAX (t-1) --> Nikkei 225 (t) lag=1 day, MCI=0.18 VIX (t-1) --> S&P 500 (t) lag=1 day, MCI=-0.31 US 10Y (t-2) --> Gold (t) lag=2 days, MCI=-0.15 DXY (t-1) --> EM equities (t) lag=1 day, MCI=-0.20
Торговая логика:
Если VIX значительно вырос сегодня: --> Ожидать снижение S&P 500 завтра (lag=1, MCI=-0.31) --> Ожидать снижение DAX послезавтра (через S&P 500)
Если US 10Y выросла сегодня: --> Ожидать снижение Gold через 2 дня (lag=2, MCI=-0.15)Важно отметить, что lead-lag структуры могут быть нестационарными:
Динамика lead-lag во времени:
Период BTC -> ETH lag Сила MCI 2020 Q1 1 час 0.42 2020 Q2 2 часа 0.35 2020 Q3 30 мин 0.48 2021 Q1 (бычий) 15 мин 0.55 2022 Q1 (медвеж) 4 часа 0.25 2023 Q1 1 час 0.38
Наблюдение: - В бычьих режимах лаг короче (быстрая передача) - В медвежьих режимах лаг длиннее - Сила связи варьируется с волатильностьюРежимно-зависимая причинность
Причинная структура рынка не постоянна — она существенно изменяется в зависимости от рыночного режима. PCMCI можно применять к различным подпериодам для выявления режимно-зависимых паттернов:
Причинная структура в разных рыночных режимах:
БЫЧИЙ РЕЖИМ (низкая волатильность): +-----------------------------------------------+ | Много слабых причинных связей | | Секторная ротация видна: | | Tech -> Financials -> Energy | | Международные связи слабые | | Доминирует momentum | +-----------------------------------------------+
МЕДВЕЖИЙ РЕЖИМ / КРИЗИС (высокая волатильность): +-----------------------------------------------+ | Мало, но очень сильных связей | | Всё коррелировано = "correlation breakdown" | | Один доминирующий фактор (risk-on/risk-off) | | Международная передача шоков очень быстрая | | VIX -> всё | +-----------------------------------------------+
ПЕРЕХОДНЫЙ РЕЖИМ: +-----------------------------------------------+ | Причинная структура перестраивается | | Появляются новые связи, старые исчезают | | Высокая неопределённость в графе | | Возможности для арбитража максимальны | +-----------------------------------------------+Стратегия на основе режимно-зависимой причинности:
Алгоритм адаптивной причинной стратегии:
1. Определить текущий рыночный режим (HMM, VIX-based и т.д.)
2. Для каждого режима r: Запустить PCMCI на данных из режима r Получить причинный граф G_r
3. В реальном времени: a. Определить текущий режим r* b. Использовать граф G_{r*} для генерации сигналов c. Вес сигнала ~ сила MCI-связи в текущем режиме
4. При смене режима: a. Переключиться на соответствующий граф b. Уменьшить размер позиций (неопределённость при переходе) c. Постепенно наращивать позиции по мере подтверждения режимаПричинное построение портфеля
PCMCI открывает новый подход к построению портфеля — на основе причинной структуры вместо (или в дополнение к) корреляционной:
Традиционный подход Марковица: min w^T * Sigma * w (минимизация дисперсии) s.t. w^T * mu >= target_return w^T * 1 = 1
Проблема: Sigma — корреляционная матрица Не отличает прямые связи от косвенных Нестабильна, меняется в кризис
Причинный подход: 1. Построить причинный граф G с помощью PCMCI
2. Определить причинную матрицу C: C_ij = MCI(X^i -> X^j) — сила причинного влияния i на j
3. Оптимизировать портфель: - Минимизировать причинную концентрацию: min sum_i (sum_j C_ij * w_j)^2 (не давать одному активу слишком влиять на портфель)
- Или: использовать C вместо/вместе с Sigma min w^T * (lambda*Sigma + (1-lambda)*C_sym) * w где C_sym = (C + C^T) / 2
4. Причинная диверсификация: - Выбирать активы из разных "причинных кластеров" - Избегать активов, причинно управляемых одним источникомПример причинной диверсификации:
Причинный граф показывает:
BTC --> ETH, SOL, AVAX (BTC управляет крипто-кластером) S&P500 --> Tech, Finance (рынок управляет секторами) Oil --> Energy, Transport (нефть управляет энергетическим)
Плохая диверсификация (одна причинная группа): Портфель: [ETH, SOL, AVAX] -> Все управляются BTC, причинная диверсификация = 0
Хорошая причинная диверсификация: Портфель: [BTC, S&P500, Gold, Oil] -> Разные причинные источники -> Причинная диверсификация высока
Лучшая стратегия: Портфель: [BTC, S&P500, Gold, Oil] + Веса обратно пропорциональны причинному влиянию: Актив с большим числом исходящих связей -> меньший вес (так как он "заражает" другие при шоках)Практические примеры на Python
Пример 01: Подготовка данных
В этом примере мы загружаем и подготавливаем данные фондового рынка и криптовалютного рынка (Bybit) для последующего причинного анализа.
"""Пример 01: Подготовка данных для PCMCI причинного анализаВключает данные фондового рынка (Yahoo Finance) и криптовалюты (Bybit)"""
import numpy as npimport pandas as pdfrom datetime import datetime, timedeltaimport warningswarnings.filterwarnings('ignore')
# ============================================================# Часть 1: Загрузка данных фондового рынка# ============================================================
def load_stock_data( tickers: list[str], start_date: str = "2020-01-01", end_date: str = "2024-01-01",) -> pd.DataFrame: """ Загрузка данных фондового рынка через yfinance.
Parameters ---------- tickers : list[str] Список тикеров (например, ['AAPL', 'MSFT', 'GOOGL']) start_date : str Начальная дата в формате 'YYYY-MM-DD' end_date : str Конечная дата
Returns ------- pd.DataFrame DataFrame с колонками для каждого тикера (цены закрытия) """ import yfinance as yf
data = yf.download(tickers, start=start_date, end=end_date) prices = data['Close'] if isinstance(data.columns, pd.MultiIndex) else data[['Close']]
if isinstance(prices.columns, pd.MultiIndex): prices.columns = prices.columns.get_level_values(0)
prices = prices.dropna()
print(f"Загружено {len(prices)} дневных наблюдений") print(f"Период: {prices.index[0].date()} -- {prices.index[-1].date()}") print(f"Тикеры: {list(prices.columns)}")
return prices
def load_bybit_data( symbols: list[str], interval: str = "1d", limit: int = 1000,) -> pd.DataFrame: """ Загрузка данных криптовалют с Bybit через REST API.
Parameters ---------- symbols : list[str] Список символов (например, ['BTCUSDT', 'ETHUSDT']) interval : str Интервал свечей ('1', '5', '15', '60', '240', '1d', '1w') limit : int Количество свечей (max 1000)
Returns ------- pd.DataFrame DataFrame с ценами закрытия для каждого символа """ import requests
all_data = {} base_url = "https://api.bybit.com/v5/market/kline"
for symbol in symbols: params = { "category": "spot", "symbol": symbol, "interval": "D" if interval == "1d" else interval, "limit": limit, }
try: response = requests.get(base_url, params=params, timeout=10) data = response.json()
if data["retCode"] == 0: records = data["result"]["list"] df = pd.DataFrame(records, columns=[ "timestamp", "open", "high", "low", "close", "volume", "turnover" ]) df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms") df = df.set_index("timestamp") df["close"] = df["close"].astype(float) all_data[symbol.replace("USDT", "")] = df["close"]
print(f" {symbol}: {len(df)} свечей загружено") else: print(f" {symbol}: ошибка API -- {data['retMsg']}")
except Exception as e: print(f" {symbol}: ошибка загрузки -- {e}")
prices = pd.DataFrame(all_data).sort_index().dropna() print(f"\nИтого: {len(prices)} совмещённых наблюдений")
return prices
# ============================================================# Часть 2: Предобработка данных для PCMCI# ============================================================
def compute_returns( prices: pd.DataFrame, method: str = "log",) -> pd.DataFrame: """ Вычисление доходностей из ценовых данных.
Parameters ---------- prices : pd.DataFrame Цены закрытия method : str 'log' для логарифмических, 'simple' для простых доходностей
Returns ------- pd.DataFrame Доходности """ if method == "log": returns = np.log(prices / prices.shift(1)) elif method == "simple": returns = prices.pct_change() else: raise ValueError(f"Неизвестный метод: {method}")
returns = returns.dropna()
print(f"\nСтатистика доходностей ({method}):") print(returns.describe().round(4))
return returns
def prepare_tigramite_data( returns: pd.DataFrame, standardize: bool = True,) -> tuple: """ Подготовка данных в формате tigramite.
Parameters ---------- returns : pd.DataFrame Доходности (T x N) standardize : bool Стандартизировать ли данные
Returns ------- tuple (dataframe, var_names) для tigramite """ from tigramite import data_processing as pp
values = returns.values.copy() var_names = list(returns.columns)
if standardize: means = values.mean(axis=0) stds = values.std(axis=0) values = (values - means) / stds print("Данные стандартизированы (mean=0, std=1)")
# Проверка на пропущенные значения n_missing = np.isnan(values).sum() if n_missing > 0: print(f"ПРЕДУПРЕЖДЕНИЕ: {n_missing} пропущенных значений!")
dataframe = pp.DataFrame( data=values, var_names=var_names, )
print(f"\nДанные подготовлены для tigramite:") print(f" Переменные (N): {len(var_names)}") print(f" Наблюдения (T): {values.shape[0]}") print(f" Имена: {var_names}")
return dataframe, var_names
def check_stationarity( returns: pd.DataFrame, significance: float = 0.05,) -> pd.DataFrame: """ Проверка стационарности рядов (ADF-тест).
Parameters ---------- returns : pd.DataFrame Доходности significance : float Уровень значимости
Returns ------- pd.DataFrame Результаты ADF-теста """ from statsmodels.tsa.stattools import adfuller
results = [] for col in returns.columns: adf_stat, p_value, used_lag, nobs, critical_values, icbest = adfuller( returns[col].dropna() ) results.append({ "variable": col, "adf_statistic": round(adf_stat, 4), "p_value": round(p_value, 6), "used_lag": used_lag, "stationary": p_value < significance, })
df_results = pd.DataFrame(results)
print("\nТест Дики-Фуллера (ADF) на стационарность:") print(df_results.to_string(index=False))
non_stationary = df_results[~df_results["stationary"]] if len(non_stationary) > 0: print(f"\nПРЕДУПРЕЖДЕНИЕ: {len(non_stationary)} рядов нестационарны!") print("Рекомендуется использовать доходности вместо цен.") else: print("\nВсе ряды стационарны -- можно применять PCMCI.")
return df_results
# ============================================================# Часть 3: Основной скрипт# ============================================================
if __name__ == "__main__": # --- Загрузка данных фондового рынка --- print("=" * 60) print("ЗАГРУЗКА ДАННЫХ ФОНДОВОГО РЫНКА") print("=" * 60)
stock_tickers = ["SPY", "QQQ", "TLT", "GLD", "USO"] stock_prices = load_stock_data(stock_tickers) stock_returns = compute_returns(stock_prices, method="log")
# Проверка стационарности check_stationarity(stock_returns)
# Подготовка для tigramite stock_df, stock_names = prepare_tigramite_data(stock_returns)
# --- Загрузка данных криптовалют (Bybit) --- print("\n" + "=" * 60) print("ЗАГРУЗКА ДАННЫХ КРИПТОВАЛЮТ (BYBIT)") print("=" * 60)
crypto_symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "AVAXUSDT", "BNBUSDT"] crypto_prices = load_bybit_data(crypto_symbols, interval="1d", limit=500) crypto_returns = compute_returns(crypto_prices, method="log")
check_stationarity(crypto_returns) crypto_df, crypto_names = prepare_tigramite_data(crypto_returns)
# --- Комбинированный датасет --- print("\n" + "=" * 60) print("КОМБИНИРОВАННЫЙ ДАТАСЕТ") print("=" * 60)
# Объединяем по датам (inner join) combined = pd.concat( [stock_returns, crypto_returns], axis=1, join="inner" ) combined = combined.dropna()
print(f"Комбинированный набор: {combined.shape}") combined_df, combined_names = prepare_tigramite_data(combined)
print("\nДанные готовы для причинного анализа PCMCI!")Объяснение ключевых шагов:
Подготовка данных для PCMCI включает несколько критически важных этапов:
-
Стационарность: PCMCI требует стационарных временных рядов. Цены активов нестационарны (содержат тренд), поэтому мы используем логарифмические доходности.
-
Стандартизация: Приведение всех рядов к единому масштабу (среднее = 0, стандартное отклонение = 1) важно для корректной работы тестов условной независимости.
-
Формат tigramite: Библиотека tigramite ожидает данные в формате numpy-массива (T x N) и список имён переменных.
-
Пропущенные значения: PCMCI не работает с пропусками, поэтому необходимо либо удалить, либо интерполировать пропущенные значения.
Пример 02: Построение причинных графов
"""Пример 02: Построение причинных графов с помощью PCMCIИспользует библиотеку tigramite для обнаружения причинных связей"""
import numpy as npimport pandas as pdfrom tigramite import data_processing as ppfrom tigramite.pcmci import PCMCIfrom tigramite.independence_tests.parcorr import ParCorrfrom tigramite.independence_tests.robust_parcorr import RobustParCorrfrom tigramite.independence_tests.cmiknn import CMIknnfrom tigramite import plotting as tpimport matplotlibmatplotlib.use('Agg')import matplotlib.pyplot as plt
# ============================================================# Часть 1: Базовый PCMCI анализ# ============================================================
def run_pcmci_analysis( dataframe: pp.DataFrame, var_names: list[str], tau_max: int = 5, alpha_pc: float = 0.05, alpha_mci: float = 0.05, cond_ind_test: str = "parcorr",) -> dict: """ Запуск PCMCI анализа.
Parameters ---------- dataframe : pp.DataFrame Данные в формате tigramite var_names : list[str] Имена переменных tau_max : int Максимальный лаг alpha_pc : float Уровень значимости для фазы PC alpha_mci : float Уровень значимости для фазы MCI cond_ind_test : str Метод тестирования: 'parcorr', 'robust_parcorr', 'cmiknn'
Returns ------- dict Результаты PCMCI анализа """ # Выбор теста условной независимости if cond_ind_test == "parcorr": ci_test = ParCorr(significance="analytic") elif cond_ind_test == "robust_parcorr": ci_test = RobustParCorr(significance="analytic") elif cond_ind_test == "cmiknn": ci_test = CMIknn( knn=0.1, # доля ближайших соседей significance="shuffle_test", sig_samples=200, ) else: raise ValueError(f"Неизвестный тест: {cond_ind_test}")
print(f"Тест условной независимости: {cond_ind_test}") print(f"Максимальный лаг: {tau_max}") print(f"alpha_PC: {alpha_pc}, alpha_MCI: {alpha_mci}")
# Создание экземпляра PCMCI pcmci = PCMCI( dataframe=dataframe, cond_ind_test=ci_test, verbosity=1, )
# Запуск PCMCI results = pcmci.run_pcmci( tau_max=tau_max, pc_alpha=alpha_pc, alpha_level=alpha_mci, )
# Извлечение результатов val_matrix = results["val_matrix"] # матрица MCI-значений p_matrix = results["p_matrix"] # матрица p-значений graph = results["graph"] # граф причинных связей conf_matrix = results.get("conf_matrix") # доверительные интервалы
# Подсчёт значимых связей significant_links = (p_matrix < alpha_mci).sum() total_links = p_matrix.size
print(f"\nРезультаты PCMCI:") print(f" Значимые связи: {significant_links} из {total_links}") print(f" Доля значимых: {significant_links/total_links:.2%}")
return { "pcmci": pcmci, "results": results, "val_matrix": val_matrix, "p_matrix": p_matrix, "graph": graph, "var_names": var_names, "tau_max": tau_max, }
# ============================================================# Часть 2: Визуализация причинного графа# ============================================================
def plot_causal_graph( pcmci_results: dict, output_file: str = "causal_graph.png",) -> None: """ Визуализация причинного графа.
Parameters ---------- pcmci_results : dict Результаты run_pcmci_analysis output_file : str Имя файла для сохранения """ pcmci = pcmci_results["pcmci"] results = pcmci_results["results"] var_names = pcmci_results["var_names"]
# Граф процесса (process graph) tp.plot_graph( val_matrix=results["val_matrix"], graph=results["graph"], var_names=var_names, link_colorbar_label="MCI strength", node_colorbar_label="Auto-MCI", figsize=(12, 8), save_name=output_file, ) print(f"Причинный граф сохранён: {output_file}")
def plot_time_series_graph( pcmci_results: dict, output_file: str = "time_series_graph.png",) -> None: """ Визуализация причинного графа как развёрнутого во времени.
Parameters ---------- pcmci_results : dict Результаты output_file : str Имя файла """ pcmci = pcmci_results["pcmci"] results = pcmci_results["results"] var_names = pcmci_results["var_names"] tau_max = pcmci_results["tau_max"]
tp.plot_time_series_graph( val_matrix=results["val_matrix"], graph=results["graph"], var_names=var_names, link_colorbar_label="MCI strength", figsize=(16, 6), save_name=output_file, ) print(f"Граф временных рядов сохранён: {output_file}")
# ============================================================# Часть 3: Извлечение причинных связей# ============================================================
def extract_causal_links( pcmci_results: dict, alpha: float = 0.05,) -> pd.DataFrame: """ Извлечение списка значимых причинных связей.
Parameters ---------- pcmci_results : dict Результаты PCMCI alpha : float Уровень значимости
Returns ------- pd.DataFrame Таблица причинных связей """ val_matrix = pcmci_results["val_matrix"] p_matrix = pcmci_results["p_matrix"] var_names = pcmci_results["var_names"]
N = len(var_names) tau_max = val_matrix.shape[2] - 1
links = [] for j in range(N): for i in range(N): for tau in range(1, tau_max + 1): if p_matrix[i, j, tau] < alpha: links.append({ "source": var_names[i], "target": var_names[j], "lag": tau, "mci_value": round(val_matrix[i, j, tau], 4), "p_value": round(p_matrix[i, j, tau], 6), "abs_mci": round(abs(val_matrix[i, j, tau]), 4), })
df_links = pd.DataFrame(links)
if len(df_links) > 0: df_links = df_links.sort_values("abs_mci", ascending=False) df_links = df_links.reset_index(drop=True)
print(f"\nЗначимые причинные связи (alpha={alpha}):") print(f" Всего: {len(df_links)}")
if len(df_links) > 0: print("\nТоп-10 сильнейших связей:") print(df_links.head(10).to_string(index=False))
return df_links
# ============================================================# Часть 4: PCMCI+ для одновременных связей# ============================================================
def run_pcmciplus_analysis( dataframe: pp.DataFrame, var_names: list[str], tau_max: int = 5, alpha_pc: float = 0.01,) -> dict: """ Запуск PCMCI+ (с одновременными связями).
Parameters ---------- dataframe : pp.DataFrame Данные var_names : list[str] Имена переменных tau_max : int Максимальный лаг alpha_pc : float Уровень значимости
Returns ------- dict Результаты """ ci_test = ParCorr(significance="analytic")
pcmci = PCMCI( dataframe=dataframe, cond_ind_test=ci_test, verbosity=1, )
results = pcmci.run_pcmciplus( tau_max=tau_max, pc_alpha=alpha_pc, )
print("\nPCMCI+ результаты (с одновременными связями):")
# Одновременные связи (tau=0) p_contemp = results["p_matrix"][:, :, 0] val_contemp = results["val_matrix"][:, :, 0]
N = len(var_names) contemp_links = [] for i in range(N): for j in range(i + 1, N): if p_contemp[i, j] < 0.05: contemp_links.append({ "var1": var_names[i], "var2": var_names[j], "mci_value": round(val_contemp[i, j], 4), "p_value": round(p_contemp[i, j], 6), })
if contemp_links: print(f" Одновременные связи: {len(contemp_links)}") for link in contemp_links: print(f" {link['var1']} <-> {link['var2']}: " f"MCI={link['mci_value']}, p={link['p_value']}")
return { "pcmci": pcmci, "results": results, "val_matrix": results["val_matrix"], "p_matrix": results["p_matrix"], "graph": results["graph"], "var_names": var_names, "tau_max": tau_max, }
# ============================================================# Часть 5: Основной скрипт# ============================================================
if __name__ == "__main__": # Создание демонстрационных данных np.random.seed(42) T = 1000 N = 5
# Генерация синтетических данных с известной причинной структурой # X0 -> X1 (lag=1), X0 -> X2 (lag=2), X1 -> X3 (lag=1) data = np.zeros((T, N)) for t in range(2, T): data[t, 0] = 0.6 * data[t-1, 0] + np.random.normal(0, 0.5) data[t, 1] = 0.4 * data[t-1, 0] + 0.3 * data[t-1, 1] + np.random.normal(0, 0.5) data[t, 2] = 0.3 * data[t-2, 0] + 0.5 * data[t-1, 2] + np.random.normal(0, 0.5) data[t, 3] = 0.35 * data[t-1, 1] + 0.4 * data[t-1, 3] + np.random.normal(0, 0.5) data[t, 4] = 0.2 * data[t-1, 4] + np.random.normal(0, 0.5)
var_names = ["BTC", "ETH", "SOL", "AVAX", "BNB"] dataframe = pp.DataFrame(data=data, var_names=var_names)
# Запуск PCMCI print("=" * 60) print("PCMCI АНАЛИЗ (ДЕМО)") print("=" * 60)
results = run_pcmci_analysis( dataframe=dataframe, var_names=var_names, tau_max=5, alpha_pc=0.05, alpha_mci=0.05, cond_ind_test="parcorr", )
# Извлечение связей links = extract_causal_links(results, alpha=0.05)
# Визуализация plot_causal_graph(results, output_file="demo_causal_graph.png") plot_time_series_graph(results, output_file="demo_ts_graph.png")
# PCMCI+ print("\n" + "=" * 60) print("PCMCI+ АНАЛИЗ (С ОДНОВРЕМЕННЫМИ СВЯЗЯМИ)") print("=" * 60)
results_plus = run_pcmciplus_analysis( dataframe=dataframe, var_names=var_names, tau_max=5, alpha_pc=0.01, )
print("\nАнализ завершён!")Пример 03: Анализ причинных сетей
"""Пример 03: Анализ причинных сетей — метрики и интерпретацияВычисление сетевых характеристик причинного графа"""
import numpy as npimport pandas as pdfrom collections import defaultdict
# ============================================================# Часть 1: Сетевые метрики причинного графа# ============================================================
class CausalNetworkAnalyzer: """ Анализатор причинных сетей для финансовых данных.
Вычисляет сетевые характеристики причинного графа, выявляет ключевые узлы и структурные паттерны. """
def __init__( self, val_matrix: np.ndarray, p_matrix: np.ndarray, var_names: list[str], alpha: float = 0.05, ): """ Parameters ---------- val_matrix : np.ndarray Матрица MCI-значений (N x N x (tau_max+1)) p_matrix : np.ndarray Матрица p-значений var_names : list[str] Имена переменных alpha : float Уровень значимости """ self.val_matrix = val_matrix self.p_matrix = p_matrix self.var_names = var_names self.alpha = alpha self.N = len(var_names) self.tau_max = val_matrix.shape[2] - 1
# Построение списка связей self.links = self._extract_links()
def _extract_links(self) -> list[dict]: """Извлечение значимых связей.""" links = [] for i in range(self.N): for j in range(self.N): for tau in range(1, self.tau_max + 1): if self.p_matrix[i, j, tau] < self.alpha: links.append({ "source": i, "target": j, "source_name": self.var_names[i], "target_name": self.var_names[j], "lag": tau, "mci": self.val_matrix[i, j, tau], "p_value": self.p_matrix[i, j, tau], }) return links
def compute_degree_centrality(self) -> pd.DataFrame: """ Вычисление степени центральности каждого узла.
out_degree: число исходящих причинных связей (причины) in_degree: число входящих причинных связей (следствия) """ out_degree = defaultdict(int) in_degree = defaultdict(int) out_strength = defaultdict(float) in_strength = defaultdict(float)
for link in self.links: src = link["source_name"] tgt = link["target_name"] strength = abs(link["mci"])
out_degree[src] += 1 in_degree[tgt] += 1 out_strength[src] += strength in_strength[tgt] += strength
records = [] for name in self.var_names: records.append({ "variable": name, "out_degree": out_degree.get(name, 0), "in_degree": in_degree.get(name, 0), "total_degree": out_degree.get(name, 0) + in_degree.get(name, 0), "out_strength": round(out_strength.get(name, 0), 4), "in_strength": round(in_strength.get(name, 0), 4), "net_influence": round( out_strength.get(name, 0) - in_strength.get(name, 0), 4 ), })
df = pd.DataFrame(records).sort_values("net_influence", ascending=False) return df.reset_index(drop=True)
def identify_drivers_and_followers(self) -> dict: """ Классификация переменных на причинные драйверы и последователей.
Драйвер: высокий out_degree, низкий in_degree (причина) Последователь: низкий out_degree, высокий in_degree (следствие) Посредник: высокий и out, и in (передаёт влияние) Изолированный: низкий и out, и in """ centrality = self.compute_degree_centrality()
median_out = centrality["out_degree"].median() median_in = centrality["in_degree"].median()
roles = {} for _, row in centrality.iterrows(): name = row["variable"] high_out = row["out_degree"] > median_out high_in = row["in_degree"] > median_in
if high_out and not high_in: roles[name] = "driver" elif not high_out and high_in: roles[name] = "follower" elif high_out and high_in: roles[name] = "mediator" else: roles[name] = "isolated"
return roles
def compute_causal_lag_profile(self) -> pd.DataFrame: """ Профиль лагов для каждой пары переменных. Определяет оптимальный лаг причинной связи. """ records = [] for i in range(self.N): for j in range(self.N): if i == j: continue
# Найти лаг с минимальным p-значением best_tau = None best_p = 1.0 best_mci = 0.0
for tau in range(1, self.tau_max + 1): p = self.p_matrix[i, j, tau] if p < best_p: best_p = p best_tau = tau best_mci = self.val_matrix[i, j, tau]
if best_p < self.alpha: records.append({ "source": self.var_names[i], "target": self.var_names[j], "optimal_lag": best_tau, "mci_at_optimal": round(best_mci, 4), "p_value": round(best_p, 6), })
df = pd.DataFrame(records) if len(df) > 0: df = df.sort_values("p_value").reset_index(drop=True) return df
def compute_causal_clustering(self) -> dict: """ Кластеризация переменных на основе причинной структуры. Переменные в одном кластере тесно причинно связаны. """ # Построение матрицы причинных расстояний distance_matrix = np.ones((self.N, self.N))
for link in self.links: i = link["source"] j = link["target"] strength = abs(link["mci"]) distance_matrix[i, j] = min(distance_matrix[i, j], 1.0 - strength) distance_matrix[j, i] = min(distance_matrix[j, i], 1.0 - strength)
np.fill_diagonal(distance_matrix, 0)
# Простая иерархическая кластеризация clusters = {i: [self.var_names[i]] for i in range(self.N)} cluster_map = list(range(self.N))
threshold = 0.7 # расстояние для слияния
for i in range(self.N): for j in range(i + 1, self.N): if distance_matrix[i, j] < threshold: # Слить кластеры ci = cluster_map[i] cj = cluster_map[j] if ci != cj: clusters[ci].extend(clusters[cj]) for k in range(self.N): if cluster_map[k] == cj: cluster_map[k] = ci del clusters[cj]
# Формирование результата result = {} for idx, (cid, members) in enumerate(clusters.items()): result[f"cluster_{idx}"] = list(set(members))
return result
def print_summary(self) -> None: """Вывод полного отчёта о причинной сети.""" print("=" * 60) print("ОТЧЁТ О ПРИЧИННОЙ СЕТИ") print("=" * 60)
# Общая статистика print(f"\nОбщая статистика:") print(f" Число переменных: {self.N}") print(f" Число значимых связей: {len(self.links)}") print(f" Максимальный лаг: {self.tau_max}") print(f" Уровень значимости: {self.alpha}")
# Центральность print(f"\nСтепень центральности:") centrality = self.compute_degree_centrality() print(centrality.to_string(index=False))
# Роли print(f"\nРоли переменных:") roles = self.identify_drivers_and_followers() for name, role in sorted(roles.items()): role_ru = { "driver": "ДРАЙВЕР", "follower": "ПОСЛЕДОВАТЕЛЬ", "mediator": "ПОСРЕДНИК", "isolated": "ИЗОЛИРОВАННЫЙ", }[role] print(f" {name}: {role_ru}")
# Лаг-профиль print(f"\nОптимальные лаги причинных связей:") lag_profile = self.compute_causal_lag_profile() if len(lag_profile) > 0: print(lag_profile.to_string(index=False)) else: print(" Нет значимых связей")
# Кластеры print(f"\nПричинные кластеры:") clusters = self.compute_causal_clustering() for name, members in clusters.items(): print(f" {name}: {members}")
# ============================================================# Часть 2: Анализ стабильности причинного графа# ============================================================
def bootstrap_pcmci_stability( data: np.ndarray, var_names: list[str], n_bootstrap: int = 100, tau_max: int = 5, alpha: float = 0.05, sample_fraction: float = 0.8,) -> pd.DataFrame: """ Оценка стабильности причинных связей через бутстрэп.
Запускаем PCMCI на подвыборках данных и считаем, как часто каждая связь обнаруживается.
Parameters ---------- data : np.ndarray Данные (T x N) var_names : list[str] Имена переменных n_bootstrap : int Число бутстрэп-повторений tau_max : int Максимальный лаг alpha : float Уровень значимости sample_fraction : float Доля данных для подвыборки
Returns ------- pd.DataFrame Частота обнаружения каждой связи """ from tigramite import data_processing as pp from tigramite.pcmci import PCMCI from tigramite.independence_tests.parcorr import ParCorr
T, N = data.shape sample_size = int(T * sample_fraction)
# Счётчик обнаружений detection_count = np.zeros((N, N, tau_max + 1)) mci_sum = np.zeros((N, N, tau_max + 1))
for b in range(n_bootstrap): if (b + 1) % 20 == 0: print(f" Бутстрэп итерация {b+1}/{n_bootstrap}")
# Случайная подвыборка (блочный бутстрэп для временных рядов) start_idx = np.random.randint(0, T - sample_size) sample = data[start_idx:start_idx + sample_size]
df = pp.DataFrame(data=sample, var_names=var_names) ci_test = ParCorr(significance="analytic") pcmci = PCMCI(dataframe=df, cond_ind_test=ci_test, verbosity=0)
results = pcmci.run_pcmci( tau_max=tau_max, pc_alpha=alpha, alpha_level=alpha, )
significant = results["p_matrix"] < alpha detection_count += significant.astype(int) mci_sum += np.where(significant, results["val_matrix"], 0)
# Частота обнаружения frequency = detection_count / n_bootstrap avg_mci = np.where(detection_count > 0, mci_sum / detection_count, 0)
# Формирование таблицы records = [] for i in range(N): for j in range(N): for tau in range(1, tau_max + 1): if frequency[i, j, tau] > 0.1: # хотя бы 10% обнаружений records.append({ "source": var_names[i], "target": var_names[j], "lag": tau, "frequency": round(frequency[i, j, tau], 2), "avg_mci": round(avg_mci[i, j, tau], 4), "stable": frequency[i, j, tau] > 0.5, })
df_stability = pd.DataFrame(records) if len(df_stability) > 0: df_stability = df_stability.sort_values("frequency", ascending=False) df_stability = df_stability.reset_index(drop=True)
print(f"\nРезультаты бутстрэп-стабильности ({n_bootstrap} итераций):") print(f" Стабильные связи (>50%): {df_stability['stable'].sum()}") print(f" Нестабильные связи: {(~df_stability['stable']).sum()}")
return df_stability
# ============================================================# Часть 3: Основной скрипт# ============================================================
if __name__ == "__main__": # Демонстрация на синтетических данных np.random.seed(42) T = 1000 N = 5
# Известная причинная структура data = np.zeros((T, N)) for t in range(2, T): data[t, 0] = 0.6 * data[t-1, 0] + np.random.normal(0, 0.5) data[t, 1] = 0.4 * data[t-1, 0] + 0.3 * data[t-1, 1] + np.random.normal(0, 0.5) data[t, 2] = 0.3 * data[t-2, 0] + 0.5 * data[t-1, 2] + np.random.normal(0, 0.5) data[t, 3] = 0.35 * data[t-1, 1] + 0.4 * data[t-1, 3] + np.random.normal(0, 0.5) data[t, 4] = 0.2 * data[t-1, 4] + np.random.normal(0, 0.5)
var_names = ["BTC", "ETH", "SOL", "AVAX", "BNB"]
# Создание синтетических результатов для демонстрации from tigramite import data_processing as pp from tigramite.pcmci import PCMCI from tigramite.independence_tests.parcorr import ParCorr
dataframe = pp.DataFrame(data=data, var_names=var_names) ci_test = ParCorr(significance="analytic") pcmci = PCMCI(dataframe=dataframe, cond_ind_test=ci_test, verbosity=0) results = pcmci.run_pcmci(tau_max=5, pc_alpha=0.05, alpha_level=0.05)
# Анализ сети analyzer = CausalNetworkAnalyzer( val_matrix=results["val_matrix"], p_matrix=results["p_matrix"], var_names=var_names, alpha=0.05, ) analyzer.print_summary()
# Бутстрэп-стабильность print("\n" + "=" * 60) print("БУТСТРЭП-АНАЛИЗ СТАБИЛЬНОСТИ") print("=" * 60)
stability = bootstrap_pcmci_stability( data=data, var_names=var_names, n_bootstrap=50, tau_max=5, alpha=0.05, )
if len(stability) > 0: print("\nТаблица стабильности:") print(stability.to_string(index=False))
print("\nАнализ причинной сети завершён!")Ключевые концепции анализа причинной сети:
Метрики причинного графа:
1. Out-degree (исходящая степень) Сколько других переменных причинно зависят от данной. Высокий out-degree = ДРАЙВЕР рынка. Пример: BTC имеет высокий out-degree на крипторынке.
2. In-degree (входящая степень) Сколько других переменных причинно влияют на данную. Высокий in-degree = ПОСЛЕДОВАТЕЛЬ. Пример: Альткоины обычно имеют высокий in-degree.
3. Net influence (чистое влияние) = out_strength - in_strength Положительный = чистый причинный источник Отрицательный = чистый причинный приёмник
4. Бутстрэп-стабильность Как часто связь обнаруживается на подвыборках. > 80% = очень стабильная связь (можно торговать) 50-80% = умеренно стабильная < 50% = нестабильная (осторожно!)Пример 04: Торговая стратегия на основе причинности
"""Пример 04: Торговая стратегия на основе причинных связей PCMCIГенерация сигналов и управление позициями"""
import numpy as npimport pandas as pdfrom dataclasses import dataclass, fieldfrom typing import Optional
# ============================================================# Часть 1: Определение структур данных# ============================================================
@dataclassclass CausalLink: """Причинная связь между двумя переменными.""" source: str target: str lag: int mci_value: float p_value: float stability: float = 1.0 # бутстрэп-стабильность
@dataclassclass TradeSignal: """Торговый сигнал на основе причинной связи.""" timestamp: pd.Timestamp asset: str direction: int # +1 long, -1 short strength: float # сила сигнала [0, 1] source_asset: str # какой актив сгенерировал сигнал lag: int # с каким лагом reason: str # описание причины
@dataclassclass Position: """Текущая позиция.""" asset: str direction: int size: float entry_price: float entry_time: pd.Timestamp stop_loss: float take_profit: float signal: TradeSignal
# ============================================================# Часть 2: Генератор сигналов на основе PCMCI# ============================================================
class CausalSignalGenerator: """ Генератор торговых сигналов на основе причинных связей.
Использует обнаруженные PCMCI связи для прогнозирования движений целевых активов на основе движений причинных драйверов. """
def __init__( self, causal_links: list[CausalLink], min_mci: float = 0.10, min_stability: float = 0.5, lookback_window: int = 20, z_score_threshold: float = 1.5, ): """ Parameters ---------- causal_links : list[CausalLink] Список обнаруженных причинных связей min_mci : float Минимальная сила MCI для использования связи min_stability : float Минимальная бутстрэп-стабильность lookback_window : int Окно для вычисления z-score z_score_threshold : float Порог z-score для генерации сигнала """ self.min_mci = min_mci self.min_stability = min_stability self.lookback_window = lookback_window self.z_score_threshold = z_score_threshold
# Фильтрация надёжных связей self.active_links = [ link for link in causal_links if abs(link.mci_value) >= min_mci and link.stability >= min_stability ]
print(f"Активные причинные связи: {len(self.active_links)} " f"из {len(causal_links)}")
# Группировка связей по целевому активу self.links_by_target = {} for link in self.active_links: if link.target not in self.links_by_target: self.links_by_target[link.target] = [] self.links_by_target[link.target].append(link)
def compute_z_score( self, returns: pd.Series, current_idx: int, ) -> float: """Вычисление z-score текущей доходности.""" window = returns.iloc[ max(0, current_idx - self.lookback_window):current_idx ] if len(window) < 5: return 0.0
mean = window.mean() std = window.std() if std == 0: return 0.0
return (returns.iloc[current_idx] - mean) / std
def generate_signals( self, returns: pd.DataFrame, current_idx: int, ) -> list[TradeSignal]: """ Генерация сигналов для текущего момента.
Логика: если причинный драйвер показал сильное движение (z-score превышает порог), ожидаем реакцию целевого актива через соответствующий лаг.
Parameters ---------- returns : pd.DataFrame Доходности всех активов current_idx : int Текущий индекс (строка в returns)
Returns ------- list[TradeSignal] Список сигналов """ signals = [] timestamp = returns.index[current_idx]
for target, links in self.links_by_target.items(): # Агрегируем сигналы от всех причинных драйверов combined_signal = 0.0 signal_count = 0 reasons = []
for link in links: # Проверяем движение драйвера link.lag периодов назад driver_idx = current_idx - link.lag if driver_idx < self.lookback_window: continue
z_score = self.compute_z_score( returns[link.source], driver_idx )
if abs(z_score) > self.z_score_threshold: # Знак MCI определяет направление: # MCI > 0 и z > 0 => target пойдёт вверх # MCI > 0 и z < 0 => target пойдёт вниз # MCI < 0 и z > 0 => target пойдёт вниз signal_direction = np.sign(z_score * link.mci_value) signal_strength = ( abs(link.mci_value) * link.stability * min(abs(z_score) / self.z_score_threshold, 3.0) / 3.0 )
combined_signal += signal_direction * signal_strength signal_count += 1 reasons.append( f"{link.source}(lag={link.lag}, z={z_score:.2f}, " f"MCI={link.mci_value:.3f})" )
# Генерация сигнала если есть достаточно подтверждений if signal_count > 0: avg_signal = combined_signal / signal_count
if abs(avg_signal) > 0.1: # минимальный порог signals.append(TradeSignal( timestamp=timestamp, asset=target, direction=int(np.sign(avg_signal)), strength=min(abs(avg_signal), 1.0), source_asset=", ".join( [l.source for l in links[:3]] ), lag=links[0].lag, reason="; ".join(reasons), ))
return signals
# ============================================================# Часть 3: Управление позициями# ============================================================
class CausalPositionManager: """ Менеджер позиций для причинной стратегии.
Управляет размером позиций, стоп-лоссами и тейк-профитами на основе силы причинных сигналов. """
def __init__( self, initial_capital: float = 100000.0, max_position_pct: float = 0.10, # максимум 10% на позицию max_total_exposure: float = 0.50, # максимум 50% экспозиция default_stop_loss_pct: float = 0.02, # 2% стоп-лосс default_take_profit_pct: float = 0.04, # 4% тейк-профит ): self.initial_capital = initial_capital self.capital = initial_capital self.max_position_pct = max_position_pct self.max_total_exposure = max_total_exposure self.default_stop_loss_pct = default_stop_loss_pct self.default_take_profit_pct = default_take_profit_pct
self.positions: dict[str, Position] = {} self.closed_trades: list[dict] = [] self.equity_curve: list[float] = [initial_capital]
def compute_position_size( self, signal: TradeSignal, current_price: float, ) -> float: """ Размер позиции пропорционален силе причинного сигнала.
size = capital * max_position_pct * signal_strength """ # Текущая экспозиция current_exposure = sum( abs(pos.size * pos.entry_price) for pos in self.positions.values() ) / self.capital
remaining_capacity = max( 0, self.max_total_exposure - current_exposure )
# Размер позиции position_value = ( self.capital * self.max_position_pct * signal.strength ) position_value = min( position_value, self.capital * remaining_capacity )
if position_value < 100: # минимальный размер return 0.0
return position_value / current_price
def open_position( self, signal: TradeSignal, current_price: float, ) -> Optional[Position]: """Открытие позиции на основе сигнала.""" # Проверка: нет ли уже позиции по этому активу if signal.asset in self.positions: return None
size = self.compute_position_size(signal, current_price) if size == 0: return None
# Адаптивные стоп-лосс и тейк-профит # Сильнее сигнал -> шире стоп (больше уверенности) sl_multiplier = 1.0 + signal.strength * 0.5 tp_multiplier = 1.0 + signal.strength * 1.0
if signal.direction == 1: # long stop_loss = current_price * ( 1 - self.default_stop_loss_pct * sl_multiplier ) take_profit = current_price * ( 1 + self.default_take_profit_pct * tp_multiplier ) else: # short stop_loss = current_price * ( 1 + self.default_stop_loss_pct * sl_multiplier ) take_profit = current_price * ( 1 - self.default_take_profit_pct * tp_multiplier )
position = Position( asset=signal.asset, direction=signal.direction, size=size, entry_price=current_price, entry_time=signal.timestamp, stop_loss=stop_loss, take_profit=take_profit, signal=signal, )
self.positions[signal.asset] = position return position
def check_exits( self, current_prices: dict[str, float], current_time: pd.Timestamp, ) -> list[dict]: """Проверка условий выхода для всех позиций.""" exits = []
for asset, pos in list(self.positions.items()): if asset not in current_prices: continue
price = current_prices[asset]
# Проверка стоп-лосса if pos.direction == 1: # long if price <= pos.stop_loss: exits.append(self._close_position( asset, price, current_time, "stop_loss" )) elif price >= pos.take_profit: exits.append(self._close_position( asset, price, current_time, "take_profit" )) else: # short if price >= pos.stop_loss: exits.append(self._close_position( asset, price, current_time, "stop_loss" )) elif price <= pos.take_profit: exits.append(self._close_position( asset, price, current_time, "take_profit" ))
return exits
def _close_position( self, asset: str, exit_price: float, exit_time: pd.Timestamp, exit_reason: str, ) -> dict: """Закрытие позиции.""" pos = self.positions.pop(asset)
pnl = pos.direction * (exit_price - pos.entry_price) * pos.size pnl_pct = pos.direction * (exit_price / pos.entry_price - 1)
self.capital += pnl
trade = { "asset": asset, "direction": "LONG" if pos.direction == 1 else "SHORT", "entry_time": pos.entry_time, "exit_time": exit_time, "entry_price": pos.entry_price, "exit_price": exit_price, "size": pos.size, "pnl": round(pnl, 2), "pnl_pct": round(pnl_pct * 100, 2), "exit_reason": exit_reason, "signal_strength": pos.signal.strength, "causal_source": pos.signal.source_asset, }
self.closed_trades.append(trade) return trade
def update_equity( self, current_prices: dict[str, float], ) -> float: """Обновление кривой эквити.""" unrealized_pnl = sum( pos.direction * (current_prices.get(pos.asset, pos.entry_price) - pos.entry_price) * pos.size for pos in self.positions.values() ) equity = self.capital + unrealized_pnl self.equity_curve.append(equity) return equity
# ============================================================# Часть 4: Полная торговая стратегия# ============================================================
class PCMCICausalStrategy: """ Полная торговая стратегия на основе причинного анализа PCMCI.
Алгоритм: 1. Обучение: Запуск PCMCI на исторических данных 2. Фильтрация: Отбор стабильных причинных связей 3. Сигналы: Мониторинг причинных драйверов 4. Исполнение: Открытие/закрытие позиций 5. Адаптация: Переобучение PCMCI на скользящем окне """
def __init__( self, training_window: int = 252, # 1 год дневных данных refit_frequency: int = 63, # переобучение каждый квартал tau_max: int = 5, alpha_pc: float = 0.05, alpha_mci: float = 0.05, min_mci: float = 0.10, z_score_threshold: float = 1.5, initial_capital: float = 100000.0, ): self.training_window = training_window self.refit_frequency = refit_frequency self.tau_max = tau_max self.alpha_pc = alpha_pc self.alpha_mci = alpha_mci self.min_mci = min_mci self.z_score_threshold = z_score_threshold self.initial_capital = initial_capital
self.signal_generator = None self.position_manager = None self.last_refit = 0
def fit_causal_model( self, returns: pd.DataFrame, ) -> list[CausalLink]: """Обучение PCMCI модели на данных.""" from tigramite import data_processing as pp from tigramite.pcmci import PCMCI from tigramite.independence_tests.parcorr import ParCorr
values = returns.values var_names = list(returns.columns)
dataframe = pp.DataFrame(data=values, var_names=var_names) ci_test = ParCorr(significance="analytic") pcmci = PCMCI(dataframe=dataframe, cond_ind_test=ci_test, verbosity=0)
results = pcmci.run_pcmci( tau_max=self.tau_max, pc_alpha=self.alpha_pc, alpha_level=self.alpha_mci, )
# Извлечение связей links = [] N = len(var_names) for i in range(N): for j in range(N): for tau in range(1, self.tau_max + 1): if results["p_matrix"][i, j, tau] < self.alpha_mci: links.append(CausalLink( source=var_names[i], target=var_names[j], lag=tau, mci_value=results["val_matrix"][i, j, tau], p_value=results["p_matrix"][i, j, tau], ))
return links
def run( self, prices: pd.DataFrame, ) -> dict: """ Запуск стратегии на исторических данных.
Parameters ---------- prices : pd.DataFrame Цены закрытия (T x N)
Returns ------- dict Результаты бэктеста """ returns = np.log(prices / prices.shift(1)).dropna() var_names = list(returns.columns)
self.position_manager = CausalPositionManager( initial_capital=self.initial_capital, )
print(f"Запуск причинной стратегии PCMCI") print(f" Период: {returns.index[0]} -- {returns.index[-1]}") print(f" Активы: {var_names}") print(f" Обучающее окно: {self.training_window} дней")
all_signals = []
for idx in range(self.training_window, len(returns)): current_time = returns.index[idx]
# Переобучение PCMCI если нужно if idx - self.last_refit >= self.refit_frequency or self.signal_generator is None: train_data = returns.iloc[idx - self.training_window:idx]
links = self.fit_causal_model(train_data)
self.signal_generator = CausalSignalGenerator( causal_links=links, min_mci=self.min_mci, z_score_threshold=self.z_score_threshold, )
self.last_refit = idx
if idx % 100 == 0: print(f" [{current_time.date()}] " f"Переобучение: {len(links)} связей, " f"{len(self.signal_generator.active_links)} активных")
# Текущие цены current_prices = { col: prices[col].iloc[idx] for col in prices.columns }
# Проверка выходов self.position_manager.check_exits(current_prices, current_time)
# Генерация сигналов signals = self.signal_generator.generate_signals(returns, idx)
# Открытие позиций for signal in signals: if signal.asset in current_prices: self.position_manager.open_position( signal, current_prices[signal.asset] ) all_signals.append(signal)
# Обновление эквити self.position_manager.update_equity(current_prices)
# Закрытие всех открытых позиций final_prices = { col: prices[col].iloc[-1] for col in prices.columns } for asset in list(self.position_manager.positions.keys()): self.position_manager._close_position( asset, final_prices[asset], returns.index[-1], "end_of_backtest" )
return { "equity_curve": self.position_manager.equity_curve, "trades": self.position_manager.closed_trades, "signals": all_signals, "final_capital": self.position_manager.capital, }
# ============================================================# Часть 5: Основной скрипт# ============================================================
if __name__ == "__main__": # Генерация демонстрационных данных np.random.seed(42) T = 1000
dates = pd.bdate_range(start="2020-01-01", periods=T)
# Генерация цен с причинной структурой returns = np.zeros((T, 5)) for t in range(2, T): returns[t, 0] = 0.0005 + 0.05 * returns[t-1, 0] + np.random.normal(0, 0.02) returns[t, 1] = 0.0003 + 0.3 * returns[t-1, 0] + 0.05 * returns[t-1, 1] + np.random.normal(0, 0.025) returns[t, 2] = 0.0002 + 0.2 * returns[t-2, 0] + 0.04 * returns[t-1, 2] + np.random.normal(0, 0.03) returns[t, 3] = 0.0001 + 0.25 * returns[t-1, 1] + 0.03 * returns[t-1, 3] + np.random.normal(0, 0.025) returns[t, 4] = 0.0002 + 0.02 * returns[t-1, 4] + np.random.normal(0, 0.02)
prices_data = 100 * np.exp(np.cumsum(returns, axis=0))
var_names = ["BTC", "ETH", "SOL", "AVAX", "BNB"] prices = pd.DataFrame(prices_data, index=dates, columns=var_names)
# Запуск стратегии strategy = PCMCICausalStrategy( training_window=252, refit_frequency=63, tau_max=5, min_mci=0.08, z_score_threshold=1.5, initial_capital=100000.0, )
results = strategy.run(prices)
# Отчёт print("\n" + "=" * 60) print("РЕЗУЛЬТАТЫ ПРИЧИННОЙ СТРАТЕГИИ PCMCI") print("=" * 60)
trades_df = pd.DataFrame(results["trades"]) if len(trades_df) > 0: print(f"\nСделки: {len(trades_df)}") print(f"Прибыльные: {(trades_df['pnl'] > 0).sum()}") print(f"Убыточные: {(trades_df['pnl'] < 0).sum()}") print(f"Win rate: {(trades_df['pnl'] > 0).mean():.1%}") print(f"Средний PnL: {trades_df['pnl'].mean():.2f}") print(f"Общий PnL: {trades_df['pnl'].sum():.2f}") print(f"Начальный капитал: {strategy.initial_capital:,.0f}") print(f"Конечный капитал: {results['final_capital']:,.0f}") print(f"Доходность: {(results['final_capital']/strategy.initial_capital - 1)*100:.1f}%")
print("\nСтратегия завершена!")Пример 05: Бэктестинг причинной стратегии
"""Пример 05: Бэктестинг и оценка причинной стратегии PCMCIВключает метрики производительности и сравнение с бенчмарками"""
import numpy as npimport pandas as pdfrom dataclasses import dataclassimport matplotlibmatplotlib.use('Agg')import matplotlib.pyplot as plt
# ============================================================# Часть 1: Метрики производительности# ============================================================
@dataclassclass PerformanceMetrics: """Метрики производительности торговой стратегии.""" total_return: float annualized_return: float annualized_volatility: float sharpe_ratio: float sortino_ratio: float max_drawdown: float max_drawdown_duration: int # в днях calmar_ratio: float win_rate: float profit_factor: float avg_trade_pnl: float avg_win: float avg_loss: float total_trades: int avg_holding_period: float # в днях
def compute_performance_metrics( equity_curve: list[float], trades: list[dict], risk_free_rate: float = 0.04, # 4% годовых trading_days: int = 252,) -> PerformanceMetrics: """ Вычисление полного набора метрик производительности.
Parameters ---------- equity_curve : list[float] Кривая эквити trades : list[dict] Список сделок risk_free_rate : float Безрисковая ставка (годовая) trading_days : int Число торговых дней в году
Returns ------- PerformanceMetrics Метрики """ equity = np.array(equity_curve) returns = np.diff(equity) / equity[:-1]
# Базовые метрики total_return = (equity[-1] / equity[0]) - 1 n_years = len(returns) / trading_days annualized_return = (1 + total_return) ** (1 / max(n_years, 0.01)) - 1 annualized_vol = returns.std() * np.sqrt(trading_days)
# Sharpe Ratio daily_rf = (1 + risk_free_rate) ** (1/trading_days) - 1 excess_returns = returns - daily_rf sharpe = ( excess_returns.mean() / excess_returns.std() * np.sqrt(trading_days) if excess_returns.std() > 0 else 0 )
# Sortino Ratio (только отрицательная волатильность) downside_returns = returns[returns < 0] downside_vol = ( downside_returns.std() * np.sqrt(trading_days) if len(downside_returns) > 0 else 0.001 ) sortino = (annualized_return - risk_free_rate) / downside_vol
# Maximum Drawdown running_max = np.maximum.accumulate(equity) drawdowns = (equity - running_max) / running_max max_dd = drawdowns.min()
# Длительность максимальной просадки dd_duration = 0 max_dd_duration = 0 for dd in drawdowns: if dd < 0: dd_duration += 1 max_dd_duration = max(max_dd_duration, dd_duration) else: dd_duration = 0
# Calmar Ratio calmar = annualized_return / abs(max_dd) if max_dd != 0 else 0
# Торговые метрики trades_df = pd.DataFrame(trades) if trades else pd.DataFrame()
if len(trades_df) > 0 and "pnl" in trades_df.columns: wins = trades_df[trades_df["pnl"] > 0] losses = trades_df[trades_df["pnl"] <= 0]
win_rate = len(wins) / len(trades_df) if len(trades_df) > 0 else 0 avg_trade = trades_df["pnl"].mean() avg_win = wins["pnl"].mean() if len(wins) > 0 else 0 avg_loss = losses["pnl"].mean() if len(losses) > 0 else 0
gross_profit = wins["pnl"].sum() if len(wins) > 0 else 0 gross_loss = abs(losses["pnl"].sum()) if len(losses) > 0 else 0.001 profit_factor = gross_profit / gross_loss
# Средний период удержания if "entry_time" in trades_df.columns and "exit_time" in trades_df.columns: trades_df["holding_days"] = ( pd.to_datetime(trades_df["exit_time"]) - pd.to_datetime(trades_df["entry_time"]) ).dt.days avg_holding = trades_df["holding_days"].mean() else: avg_holding = 0 else: win_rate = 0 avg_trade = 0 avg_win = 0 avg_loss = 0 profit_factor = 0 avg_holding = 0
return PerformanceMetrics( total_return=round(total_return * 100, 2), annualized_return=round(annualized_return * 100, 2), annualized_volatility=round(annualized_vol * 100, 2), sharpe_ratio=round(sharpe, 2), sortino_ratio=round(sortino, 2), max_drawdown=round(max_dd * 100, 2), max_drawdown_duration=max_dd_duration, calmar_ratio=round(calmar, 2), win_rate=round(win_rate * 100, 1), profit_factor=round(profit_factor, 2), avg_trade_pnl=round(avg_trade, 2), avg_win=round(avg_win, 2), avg_loss=round(avg_loss, 2), total_trades=len(trades_df), avg_holding_period=round(avg_holding, 1), )
def print_performance_report( metrics: PerformanceMetrics, strategy_name: str = "PCMCI Causal Strategy",) -> None: """Вывод отчёта о производительности.""" print(f"\n{'=' * 60}") print(f"ОТЧЁТ О ПРОИЗВОДИТЕЛЬНОСТИ: {strategy_name}") print(f"{'=' * 60}")
print(f"\n--- Метрики доходности ---") print(f" Общая доходность: {metrics.total_return:>8.2f}%") print(f" Годовая доходность: {metrics.annualized_return:>8.2f}%") print(f" Годовая волатильность: {metrics.annualized_volatility:>8.2f}%")
print(f"\n--- Метрики риска ---") print(f" Sharpe Ratio: {metrics.sharpe_ratio:>8.2f}") print(f" Sortino Ratio: {metrics.sortino_ratio:>8.2f}") print(f" Calmar Ratio: {metrics.calmar_ratio:>8.2f}") print(f" Максимальная просадка: {metrics.max_drawdown:>8.2f}%") print(f" Длит. макс. просадки: {metrics.max_drawdown_duration:>5d} дней")
print(f"\n--- Торговые метрики ---") print(f" Всего сделок: {metrics.total_trades:>8d}") print(f" Win Rate: {metrics.win_rate:>8.1f}%") print(f" Profit Factor: {metrics.profit_factor:>8.2f}") print(f" Средний PnL: {metrics.avg_trade_pnl:>8.2f}") print(f" Средний выигрыш: {metrics.avg_win:>8.2f}") print(f" Средний проигрыш: {metrics.avg_loss:>8.2f}") print(f" Ср. период удержания: {metrics.avg_holding_period:>8.1f} дн.")
# ============================================================# Часть 2: Сравнение с бенчмарками# ============================================================
def compare_with_benchmarks( strategy_equity: list[float], strategy_trades: list[dict], prices: pd.DataFrame, benchmark_col: str = "BTC",) -> pd.DataFrame: """ Сравнение причинной стратегии с бенчмарками.
Бенчмарки: 1. Buy-and-Hold (основной актив) 2. Equal-Weight Buy-and-Hold 3. Momentum стратегия
Parameters ---------- strategy_equity : list[float] Кривая эквити стратегии strategy_trades : list[dict] Сделки стратегии prices : pd.DataFrame Цены активов benchmark_col : str Актив для buy-and-hold бенчмарка
Returns ------- pd.DataFrame Сравнительная таблица """ initial_capital = strategy_equity[0] T = len(prices)
# Бенчмарк 1: Buy-and-Hold одного актива bh_returns = prices[benchmark_col] / prices[benchmark_col].iloc[0] bh_equity = (initial_capital * bh_returns).tolist()
# Бенчмарк 2: Equal-Weight Portfolio ew_returns = prices.div(prices.iloc[0]) ew_portfolio = ew_returns.mean(axis=1) ew_equity = (initial_capital * ew_portfolio).tolist()
# Бенчмарк 3: Простая Momentum стратегия momentum_window = 20 momentum_equity = [initial_capital] capital = initial_capital
log_returns = np.log(prices / prices.shift(1)).dropna()
for idx in range(momentum_window, len(log_returns)): # Momentum: покупаем актив с лучшей доходностью за окно past_returns = log_returns.iloc[idx-momentum_window:idx].sum() best_asset = past_returns.idxmax()
daily_return = log_returns[best_asset].iloc[idx] capital *= np.exp(daily_return * 0.5) # 50% экспозиция momentum_equity.append(capital)
# Вычисление метрик для каждой стратегии results = []
strategies = { "PCMCI Causal": (strategy_equity, strategy_trades), f"Buy&Hold {benchmark_col}": (bh_equity, []), "Equal-Weight": (ew_equity, []), "Momentum": (momentum_equity, []), }
for name, (equity, trades) in strategies.items(): if len(equity) > 1: metrics = compute_performance_metrics(equity, trades) results.append({ "Strategy": name, "Return%": metrics.total_return, "Ann.Return%": metrics.annualized_return, "Volatility%": metrics.annualized_volatility, "Sharpe": metrics.sharpe_ratio, "Sortino": metrics.sortino_ratio, "MaxDD%": metrics.max_drawdown, "Calmar": metrics.calmar_ratio, })
df = pd.DataFrame(results)
print("\n" + "=" * 80) print("СРАВНЕНИЕ С БЕНЧМАРКАМИ") print("=" * 80) print(df.to_string(index=False))
return df
# ============================================================# Часть 3: Анализ причинных сигналов# ============================================================
def analyze_signal_quality( trades: list[dict],) -> None: """ Анализ качества причинных сигналов. Оценивает, насколько сила причинного сигнала предсказывает успех сделки. """ if not trades: print("Нет сделок для анализа.") return
df = pd.DataFrame(trades)
if "signal_strength" not in df.columns: print("Нет данных о силе сигнала.") return
print("\n" + "=" * 60) print("АНАЛИЗ КАЧЕСТВА ПРИЧИННЫХ СИГНАЛОВ") print("=" * 60)
# Разбивка по силе сигнала df["strength_bucket"] = pd.cut( df["signal_strength"], bins=[0, 0.25, 0.50, 0.75, 1.0], labels=["Слабый(0-25%)", "Средний(25-50%)", "Сильный(50-75%)", "Очень сильный(75-100%)"], )
if "strength_bucket" in df.columns: bucket_stats = df.groupby("strength_bucket", observed=True).agg({ "pnl": ["count", "mean", "sum"], "pnl_pct": "mean", }).round(2)
print("\nПроизводительность по силе сигнала:") print(bucket_stats)
# Разбивка по причинному источнику if "causal_source" in df.columns: source_stats = df.groupby("causal_source").agg({ "pnl": ["count", "mean", "sum"], }).round(2)
print("\nПроизводительность по причинному источнику:") print(source_stats)
# Разбивка по направлению if "direction" in df.columns: dir_stats = df.groupby("direction").agg({ "pnl": ["count", "mean", "sum"], }).round(2)
print("\nПроизводительность по направлению:") print(dir_stats)
# Разбивка по причине выхода if "exit_reason" in df.columns: exit_stats = df.groupby("exit_reason").agg({ "pnl": ["count", "mean", "sum"], }).round(2)
print("\nПроизводительность по причине выхода:") print(exit_stats)
def plot_backtest_results( equity_curve: list[float], trades: list[dict], output_file: str = "backtest_results.png",) -> None: """ Визуализация результатов бэктеста. """ fig, axes = plt.subplots(3, 1, figsize=(14, 10), gridspec_kw={'height_ratios': [3, 1, 1]})
# 1. Кривая эквити ax1 = axes[0] ax1.plot(equity_curve, label="PCMCI Strategy", linewidth=1.5) ax1.axhline(y=equity_curve[0], color='gray', linestyle='--', alpha=0.5) ax1.set_title("Кривая эквити причинной стратегии PCMCI") ax1.set_ylabel("Капитал") ax1.legend() ax1.grid(True, alpha=0.3)
# 2. Просадки ax2 = axes[1] equity = np.array(equity_curve) running_max = np.maximum.accumulate(equity) drawdowns = (equity - running_max) / running_max * 100 ax2.fill_between(range(len(drawdowns)), drawdowns, 0, color='red', alpha=0.3) ax2.set_title("Просадка (%)") ax2.set_ylabel("Просадка %") ax2.grid(True, alpha=0.3)
# 3. PnL по сделкам ax3 = axes[2] if trades: pnls = [t.get("pnl", 0) for t in trades] colors = ['green' if p > 0 else 'red' for p in pnls] ax3.bar(range(len(pnls)), pnls, color=colors, alpha=0.7) ax3.axhline(y=0, color='black', linewidth=0.5) ax3.set_title("PnL по сделкам") ax3.set_ylabel("PnL") ax3.set_xlabel("Номер сделки") ax3.grid(True, alpha=0.3)
plt.tight_layout() plt.savefig(output_file, dpi=150, bbox_inches='tight') plt.close() print(f"\nГрафик сохранён: {output_file}")
# ============================================================# Часть 4: Основной скрипт бэктеста# ============================================================
if __name__ == "__main__": # Импорт стратегии из примера 04 # (в реальном проекте используется импорт модуля)
np.random.seed(42) T = 1000 dates = pd.bdate_range(start="2020-01-01", periods=T)
# Генерация цен returns_data = np.zeros((T, 5)) for t in range(2, T): returns_data[t, 0] = 0.0005 + 0.05 * returns_data[t-1, 0] + np.random.normal(0, 0.02) returns_data[t, 1] = 0.0003 + 0.3 * returns_data[t-1, 0] + 0.05 * returns_data[t-1, 1] + np.random.normal(0, 0.025) returns_data[t, 2] = 0.0002 + 0.2 * returns_data[t-2, 0] + 0.04 * returns_data[t-1, 2] + np.random.normal(0, 0.03) returns_data[t, 3] = 0.0001 + 0.25 * returns_data[t-1, 1] + 0.03 * returns_data[t-1, 3] + np.random.normal(0, 0.025) returns_data[t, 4] = 0.0002 + 0.02 * returns_data[t-1, 4] + np.random.normal(0, 0.02)
prices_data = 100 * np.exp(np.cumsum(returns_data, axis=0)) var_names = ["BTC", "ETH", "SOL", "AVAX", "BNB"] prices = pd.DataFrame(prices_data, index=dates, columns=var_names)
# Создаём демо-результаты стратегии # (в реальном проекте используется PCMCICausalStrategy.run()) initial_capital = 100000.0 equity = [initial_capital] capital = initial_capital
log_returns = np.log(prices / prices.shift(1)).dropna()
for idx in range(1, len(log_returns)): daily_ret = log_returns.iloc[idx].mean() * 0.3 # 30% экспозиция capital *= np.exp(daily_ret) equity.append(capital)
demo_trades = [ {"pnl": 150, "pnl_pct": 1.5, "direction": "LONG", "exit_reason": "take_profit", "signal_strength": 0.8, "causal_source": "BTC", "asset": "ETH", "entry_time": "2020-06-01", "exit_time": "2020-06-10"}, {"pnl": -80, "pnl_pct": -0.8, "direction": "LONG", "exit_reason": "stop_loss", "signal_strength": 0.3, "causal_source": "BTC", "asset": "SOL", "entry_time": "2020-07-01", "exit_time": "2020-07-05"}, {"pnl": 200, "pnl_pct": 2.0, "direction": "SHORT", "exit_reason": "take_profit", "signal_strength": 0.9, "causal_source": "ETH", "asset": "AVAX", "entry_time": "2020-08-01", "exit_time": "2020-08-15"}, ]
# Метрики производительности metrics = compute_performance_metrics(equity, demo_trades) print_performance_report(metrics)
# Сравнение с бенчмарками compare_with_benchmarks(equity, demo_trades, prices, "BTC")
# Анализ сигналов analyze_signal_quality(demo_trades)
# Визуализация plot_backtest_results(equity, demo_trades)
print("\nБэктестинг завершён!")Реализация на Rust
Структура проекта
Реализация PCMCI на Rust обеспечивает высокую производительность для работы с большими объёмами данных. Структура проекта:
97_pcmci_causal_discovery/ rust_pcmci/ Cargo.toml src/ lib.rs # Основной модуль data.rs # Структуры данных partial_corr.rs # Частная корреляция pc_algorithm.rs # Фаза 1: PC-алгоритм mci_test.rs # Фаза 2: MCI-тестирование pcmci.rs # Основной алгоритм PCMCI network.rs # Анализ причинных сетей strategy.rs # Торговая стратегия main.rs # Точка входаОсновные модули
// ============================================================// ============================================================// [package]// name = "pcmci-trading"// version = "0.1.0"// edition = "2021"//// [dependencies]// ndarray = { version = "0.15", features = ["blas"] }// ndarray-linalg = { version = "0.16", features = ["openblas-system"] }// ndarray-stats = "0.5"// statrs = "0.16"// serde = { version = "1.0", features = ["derive"] }// serde_json = "1.0"// rayon = "1.8"// anyhow = "1.0"
// ============================================================// src/data.rs — Структуры данных// ============================================================
use ndarray::{Array1, Array2, Array3};use serde::{Deserialize, Serialize};
/// Результат теста условной независимости#[derive(Debug, Clone, Serialize)]pub struct CITestResult { pub test_statistic: f64, pub p_value: f64, pub significant: bool,}
/// Причинная связь#[derive(Debug, Clone, Serialize)]pub struct CausalLink { pub source: usize, pub target: usize, pub lag: usize, pub mci_value: f64, pub p_value: f64, pub source_name: String, pub target_name: String,}
/// Результаты PCMCI#[derive(Debug, Clone)]pub struct PCMCIResults { pub val_matrix: Array3<f64>, // N x N x (tau_max+1) pub p_matrix: Array3<f64>, // N x N x (tau_max+1) pub parents: Vec<Vec<(usize, usize)>>, // для каждой переменной: [(var, lag)] pub significant_links: Vec<CausalLink>,}
/// Набор данных временных рядов#[derive(Debug, Clone)]pub struct TimeSeriesData { pub data: Array2<f64>, // T x N pub var_names: Vec<String>, pub n_obs: usize, // T pub n_vars: usize, // N}
impl TimeSeriesData { pub fn new(data: Array2<f64>, var_names: Vec<String>) -> Self { let (n_obs, n_vars) = data.dim(); assert_eq!(n_vars, var_names.len(), "Number of variable names must match data columns"); Self { data, var_names, n_obs, n_vars } }
/// Стандартизация данных (mean=0, std=1) pub fn standardize(&mut self) { for j in 0..self.n_vars { let col = self.data.column(j); let mean = col.mean().unwrap_or(0.0); let std = col.std(1.0); if std > 1e-10 { for i in 0..self.n_obs { self.data[[i, j]] = (self.data[[i, j]] - mean) / std; } } } }}
// ============================================================// src/partial_corr.rs — Частная корреляция// ============================================================
use ndarray::{Array1, Array2, s};use statrs::distribution::{StudentsT, ContinuousCDF};
/// Вычисление частной корреляции через OLS-регрессиюpub fn partial_correlation( x: &Array1<f64>, y: &Array1<f64>, z: &Array2<f64>, // условие (T x d)) -> CITestResult { let n = x.len() as f64; let d = z.ncols() as f64;
if z.ncols() == 0 { // Без условия — простая корреляция let corr = pearson_correlation(x, y); let t_stat = corr * ((n - 2.0) / (1.0 - corr * corr)).sqrt(); let df = n - 2.0; let p_value = compute_p_value_t(t_stat, df);
return CITestResult { test_statistic: corr, p_value, significant: false, // устанавливается позже }; }
// OLS-регрессия: X на Z и Y на Z let residual_x = ols_residuals(x, z); let residual_y = ols_residuals(y, z);
// Корреляция остатков = частная корреляция let parcorr = pearson_correlation(&residual_x, &residual_y);
// t-статистика let df = n - d - 2.0; if df <= 0.0 { return CITestResult { test_statistic: parcorr, p_value: 1.0, significant: false, }; }
let t_stat = parcorr * (df / (1.0 - parcorr * parcorr)).sqrt(); let p_value = compute_p_value_t(t_stat, df);
CITestResult { test_statistic: parcorr, p_value, significant: false, }}
/// Корреляция Пирсонаfn pearson_correlation(x: &Array1<f64>, y: &Array1<f64>) -> f64 { let n = x.len() as f64; let mean_x = x.mean().unwrap_or(0.0); let mean_y = y.mean().unwrap_or(0.0);
let mut cov = 0.0; let mut var_x = 0.0; let mut var_y = 0.0;
for i in 0..x.len() { let dx = x[i] - mean_x; let dy = y[i] - mean_y; cov += dx * dy; var_x += dx * dx; var_y += dy * dy; }
let denom = (var_x * var_y).sqrt(); if denom < 1e-15 { return 0.0; }
cov / denom}
/// OLS-остаткиfn ols_residuals(y: &Array1<f64>, x: &Array2<f64>) -> Array1<f64> { // y = X * beta + residual // beta = (X^T X)^{-1} X^T y let xt = x.t(); let xtx = xt.dot(x); let xty = xt.dot(y);
// Решение системы (простой метод для небольших размерностей) let beta = solve_linear_system(&xtx, &xty); let predicted = x.dot(&beta);
y - &predicted}
/// Решение линейной системы Ax = b (LU-разложение)fn solve_linear_system(a: &Array2<f64>, b: &Array1<f64>) -> Array1<f64> { let n = a.nrows(); let mut aug = Array2::zeros((n, n + 1)); aug.slice_mut(s![.., ..n]).assign(a); aug.column_mut(n).assign(b);
// Прямой ход (Гаусс) for k in 0..n { let pivot = aug[[k, k]]; if pivot.abs() < 1e-15 { continue; } for i in (k + 1)..n { let factor = aug[[i, k]] / pivot; for j in k..(n + 1) { aug[[i, j]] -= factor * aug[[k, j]]; } } }
// Обратная подстановка let mut x = Array1::zeros(n); for k in (0..n).rev() { let mut sum = aug[[k, n]]; for j in (k + 1)..n { sum -= aug[[k, j]] * x[j]; } if aug[[k, k]].abs() > 1e-15 { x[k] = sum / aug[[k, k]]; } }
x}
/// Вычисление p-значения из t-статистикиfn compute_p_value_t(t_stat: f64, df: f64) -> f64 { if df <= 0.0 { return 1.0; } let dist = StudentsT::new(0.0, 1.0, df).unwrap(); 2.0 * (1.0 - dist.cdf(t_stat.abs()))}
// ============================================================// src/pcmci.rs — Основной алгоритм PCMCI// ============================================================
use crate::data::*;use crate::partial_corr::partial_correlation;use ndarray::{Array1, Array2, Array3, s};use rayon::prelude::*;
pub struct PCMCI { data: TimeSeriesData, tau_max: usize, alpha_pc: f64, alpha_mci: f64,}
impl PCMCI { pub fn new( data: TimeSeriesData, tau_max: usize, alpha_pc: f64, alpha_mci: f64, ) -> Self { Self { data, tau_max, alpha_pc, alpha_mci } }
/// Запуск полного алгоритма PCMCI pub fn run(&self) -> PCMCIResults { println!("=== PCMCI запущен ==="); println!(" Переменные: {}", self.data.n_vars); println!(" Наблюдения: {}", self.data.n_obs); println!(" tau_max: {}", self.tau_max);
// Фаза 1: PC-отбор предков let parents = self.phase_pc(); println!(" Фаза 1 (PC) завершена");
// Фаза 2: MCI-тестирование let (val_matrix, p_matrix) = self.phase_mci(&parents); println!(" Фаза 2 (MCI) завершена");
// Извлечение значимых связей let significant_links = self.extract_significant_links( &val_matrix, &p_matrix ); println!(" Значимых связей: {}", significant_links.len());
PCMCIResults { val_matrix, p_matrix, parents, significant_links, } }
/// Фаза 1: PC-алгоритм для отбора предков fn phase_pc(&self) -> Vec<Vec<(usize, usize)>> { let n = self.data.n_vars; let mut parents: Vec<Vec<(usize, usize)>> = Vec::new();
for j in 0..n { // Начальное множество: все (i, tau) для tau=1..tau_max let mut candidates: Vec<(usize, usize)> = Vec::new(); for i in 0..n { for tau in 1..=self.tau_max { candidates.push((i, tau)); } }
let mut p_order = 0;
loop { if p_order >= candidates.len() { break; }
let mut to_remove = Vec::new();
for (idx, &(i, tau)) in candidates.iter().enumerate() { // Выбор p условий с наибольшей зависимостью let conditions = self.select_conditions( j, i, tau, &candidates, p_order );
// Тест условной независимости let result = self.conditional_independence_test( j, i, tau, &conditions );
if result.p_value > self.alpha_pc { to_remove.push(idx); } }
// Удаление незначимых (в обратном порядке) for &idx in to_remove.iter().rev() { candidates.remove(idx); }
p_order += 1; }
parents.push(candidates); }
parents }
/// Выбор условий для теста fn select_conditions( &self, target: usize, source: usize, source_lag: usize, candidates: &[(usize, usize)], p_order: usize, ) -> Vec<(usize, usize)> { // Выбираем p_order условий (без тестируемого) let mut conditions: Vec<(usize, usize)> = candidates .iter() .filter(|&&(i, tau)| !(i == source && tau == source_lag)) .copied() .collect();
conditions.truncate(p_order); conditions }
/// Тест условной независимости fn conditional_independence_test( &self, target: usize, source: usize, lag: usize, conditions: &[(usize, usize)], ) -> CITestResult { let t_start = self.tau_max; let t_end = self.data.n_obs; let n_samples = t_end - t_start;
// Целевая переменная: X_t^target let y: Array1<f64> = (t_start..t_end) .map(|t| self.data.data[[t, target]]) .collect();
// Источник: X_{t-lag}^source let x: Array1<f64> = (t_start..t_end) .map(|t| self.data.data[[t - lag, source]]) .collect();
// Условие: матрица (n_samples x n_conditions) let n_cond = conditions.len(); let mut z = Array2::zeros((n_samples, n_cond)); for (c_idx, &(c_var, c_lag)) in conditions.iter().enumerate() { for (s_idx, t) in (t_start..t_end).enumerate() { if t >= c_lag { z[[s_idx, c_idx]] = self.data.data[[t - c_lag, c_var]]; } } }
partial_correlation(&x, &y, &z) }
/// Фаза 2: MCI-тестирование fn phase_mci( &self, parents: &[Vec<(usize, usize)>], ) -> (Array3<f64>, Array3<f64>) { let n = self.data.n_vars; let mut val_matrix = Array3::zeros((n, n, self.tau_max + 1)); let mut p_matrix = Array3::ones((n, n, self.tau_max + 1));
for j in 0..n { for i in 0..n { for tau in 1..=self.tau_max { // MCI-условие: P(X_t^j) \ {X_{t-tau}^i} UNION P(X_{t-tau}^i) let mut mci_conditions: Vec<(usize, usize)> = Vec::new();
// Предки целевой (без тестируемого) for &(pi, ptau) in &parents[j] { if !(pi == i && ptau == tau) { mci_conditions.push((pi, ptau)); } }
// Предки источника (со сдвигом) for &(pi, ptau) in &parents[i] { let shifted_lag = tau + ptau; if shifted_lag <= self.tau_max { mci_conditions.push((pi, shifted_lag)); } }
// Удаление дубликатов mci_conditions.sort(); mci_conditions.dedup();
let result = self.conditional_independence_test( j, i, tau, &mci_conditions );
val_matrix[[i, j, tau]] = result.test_statistic; p_matrix[[i, j, tau]] = result.p_value; } } }
(val_matrix, p_matrix) }
/// Извлечение значимых связей fn extract_significant_links( &self, val_matrix: &Array3<f64>, p_matrix: &Array3<f64>, ) -> Vec<CausalLink> { let n = self.data.n_vars; let mut links = Vec::new();
for i in 0..n { for j in 0..n { for tau in 1..=self.tau_max { if p_matrix[[i, j, tau]] < self.alpha_mci { links.push(CausalLink { source: i, target: j, lag: tau, mci_value: val_matrix[[i, j, tau]], p_value: p_matrix[[i, j, tau]], source_name: self.data.var_names[i].clone(), target_name: self.data.var_names[j].clone(), }); } } } }
// Сортировка по абсолютному MCI links.sort_by(|a, b| { b.mci_value.abs() .partial_cmp(&a.mci_value.abs()) .unwrap_or(std::cmp::Ordering::Equal) });
links }}
// ============================================================// src/strategy.rs — Торговая стратегия на Rust// ============================================================
use crate::data::CausalLink;
/// Торговый сигнал#[derive(Debug, Clone)]pub struct TradeSignal { pub timestamp: usize, pub asset: String, pub direction: i32, // +1 long, -1 short pub strength: f64, pub source: String,}
/// Генератор сигналов на основе причинных связейpub struct CausalSignalGenerator { links: Vec<CausalLink>, min_mci: f64, z_threshold: f64, lookback: usize,}
impl CausalSignalGenerator { pub fn new( links: Vec<CausalLink>, min_mci: f64, z_threshold: f64, lookback: usize, ) -> Self { let filtered: Vec<CausalLink> = links .into_iter() .filter(|l| l.mci_value.abs() >= min_mci) .collect();
println!("Активных связей: {}", filtered.len());
Self { links: filtered, min_mci, z_threshold, lookback, } }
/// Генерация сигналов для текущего момента pub fn generate_signals( &self, returns: &ndarray::Array2<f64>, var_names: &[String], current_idx: usize, ) -> Vec<TradeSignal> { let mut signals = Vec::new();
for link in &self.links { let driver_idx = if current_idx >= link.lag { current_idx - link.lag } else { continue; };
if driver_idx < self.lookback { continue; }
// z-score драйвера let window: Vec<f64> = (driver_idx - self.lookback..driver_idx) .map(|t| returns[[t, link.source]]) .collect();
let mean: f64 = window.iter().sum::<f64>() / window.len() as f64; let std: f64 = (window.iter() .map(|&x| (x - mean).powi(2)) .sum::<f64>() / window.len() as f64) .sqrt();
if std < 1e-10 { continue; }
let z_score = (returns[[driver_idx, link.source]] - mean) / std;
if z_score.abs() > self.z_threshold { let direction = (z_score * link.mci_value).signum() as i32; let strength = (link.mci_value.abs() * (z_score.abs() / self.z_threshold).min(3.0) / 3.0) .min(1.0);
signals.push(TradeSignal { timestamp: current_idx, asset: link.target_name.clone(), direction, strength, source: link.source_name.clone(), }); } }
signals }}
// ============================================================// src/main.rs — Точка входа// ============================================================
// mod data;// mod partial_corr;// mod pcmci;// mod network;// mod strategy;
fn main() { println!("PCMCI Causal Discovery for Trading (Rust)"); println!("==========================================");
// Генерация синтетических данных let t = 1000usize; let n = 5usize; let mut data = ndarray::Array2::zeros((t, n));
// Seed для воспроизводимости let mut rng_state: u64 = 42; let next_normal = |state: &mut u64| -> f64 { // Простой генератор (Box-Muller) *state = state.wrapping_mul(6364136223846793005) .wrapping_add(1442695040888963407); let u1 = (*state as f64) / (u64::MAX as f64); *state = state.wrapping_mul(6364136223846793005) .wrapping_add(1442695040888963407); let u2 = (*state as f64) / (u64::MAX as f64); (-2.0 * u1.max(1e-15).ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos() };
for t_idx in 2..t { data[[t_idx, 0]] = 0.6 * data[[t_idx-1, 0]] + next_normal(&mut rng_state) * 0.5; data[[t_idx, 1]] = 0.4 * data[[t_idx-1, 0]] + 0.3 * data[[t_idx-1, 1]] + next_normal(&mut rng_state) * 0.5; data[[t_idx, 2]] = 0.3 * data[[t_idx-2, 0]] + 0.5 * data[[t_idx-1, 2]] + next_normal(&mut rng_state) * 0.5; data[[t_idx, 3]] = 0.35 * data[[t_idx-1, 1]] + 0.4 * data[[t_idx-1, 3]] + next_normal(&mut rng_state) * 0.5; data[[t_idx, 4]] = 0.2 * data[[t_idx-1, 4]] + next_normal(&mut rng_state) * 0.5; }
let var_names = vec![ "BTC".to_string(), "ETH".to_string(), "SOL".to_string(), "AVAX".to_string(), "BNB".to_string(), ];
let ts_data = TimeSeriesData::new(data, var_names);
// Запуск PCMCI let pcmci = PCMCI::new(ts_data, 5, 0.05, 0.05); let results = pcmci.run();
// Вывод результатов println!("\nЗначимые причинные связи:"); for link in &results.significant_links { println!(" {} -> {} (lag={}, MCI={:.4}, p={:.6})", link.source_name, link.target_name, link.lag, link.mci_value, link.p_value); }}Объяснение реализации на Rust:
Реализация на Rust обеспечивает несколько преимуществ:
- Производительность: Вычисление частных корреляций и решение линейных систем работает без накладных расходов интерпретатора
- Параллелизм: Библиотека
rayonпозволяет параллелизовать тесты условной независимости - Безопасность типов: Ошибки в размерностях матриц обнаруживаются на этапе компиляции
- Потребление памяти: Более предсказуемое и контролируемое по сравнению с Python
Реализация на Python
Модульная архитектура
"""Модульная реализация PCMCI для финансового анализа.Файл: pcmci_trading.py"""
import numpy as npimport pandas as pdfrom dataclasses import dataclass, fieldfrom typing import Optionalfrom enum import Enum
class CITestType(Enum): """Типы тестов условной независимости.""" PARCORR = "parcorr" ROBUST_PARCORR = "robust_parcorr" CMIKNN = "cmiknn" GPDC = "gpdc"
@dataclassclass PCMCIConfig: """Конфигурация PCMCI анализа.""" tau_max: int = 5 alpha_pc: float = 0.05 alpha_mci: float = 0.05 ci_test: CITestType = CITestType.PARCORR n_bootstrap: int = 0 fdr_correction: bool = True verbosity: int = 1
@dataclassclass StrategyConfig: """Конфигурация торговой стратегии.""" training_window: int = 252 refit_frequency: int = 63 min_mci: float = 0.10 min_stability: float = 0.5 z_score_threshold: float = 1.5 max_position_pct: float = 0.10 max_total_exposure: float = 0.50 stop_loss_pct: float = 0.02 take_profit_pct: float = 0.04 initial_capital: float = 100000.0
class PCMCITradingPipeline: """ Полный пайплайн причинного анализа для трейдинга.
Объединяет: 1. Загрузку и подготовку данных 2. PCMCI причинный анализ 3. Построение и анализ причинных сетей 4. Генерацию торговых сигналов 5. Бэктестинг """
def __init__( self, pcmci_config: PCMCIConfig = PCMCIConfig(), strategy_config: StrategyConfig = StrategyConfig(), ): self.pcmci_config = pcmci_config self.strategy_config = strategy_config self.causal_graph = None self.signal_generator = None
def load_data( self, source: str = "yahoo", tickers: Optional[list[str]] = None, **kwargs, ) -> pd.DataFrame: """Загрузка данных из указанного источника.""" if source == "yahoo": import yfinance as yf data = yf.download(tickers, **kwargs) return data["Close"] elif source == "bybit": return self._load_bybit(tickers, **kwargs) elif source == "csv": return pd.read_csv(kwargs["filepath"], index_col=0, parse_dates=True) else: raise ValueError(f"Неизвестный источник: {source}")
def _load_bybit(self, symbols, **kwargs): """Загрузка данных с Bybit.""" import requests all_data = {} for symbol in symbols: params = { "category": "spot", "symbol": symbol, "interval": kwargs.get("interval", "D"), "limit": kwargs.get("limit", 1000), } resp = requests.get( "https://api.bybit.com/v5/market/kline", params=params, timeout=10 ) data = resp.json() if data["retCode"] == 0: records = data["result"]["list"] df = pd.DataFrame(records) df.columns = ["ts", "o", "h", "l", "c", "v", "turnover"] df["ts"] = pd.to_datetime(df["ts"].astype(int), unit="ms") df = df.set_index("ts").sort_index() all_data[symbol.replace("USDT", "")] = df["c"].astype(float)
return pd.DataFrame(all_data).dropna()
def preprocess( self, prices: pd.DataFrame, return_type: str = "log", ) -> pd.DataFrame: """Предобработка: вычисление доходностей, стандартизация.""" if return_type == "log": returns = np.log(prices / prices.shift(1)).dropna() else: returns = prices.pct_change().dropna()
# Стандартизация returns = (returns - returns.mean()) / returns.std()
return returns
def fit_causal_model( self, returns: pd.DataFrame, ) -> dict: """Обучение PCMCI модели.""" from tigramite import data_processing as pp from tigramite.pcmci import PCMCI from tigramite.independence_tests.parcorr import ParCorr from tigramite.independence_tests.robust_parcorr import RobustParCorr
values = returns.values var_names = list(returns.columns)
dataframe = pp.DataFrame(data=values, var_names=var_names)
# Выбор теста if self.pcmci_config.ci_test == CITestType.PARCORR: ci_test = ParCorr(significance="analytic") elif self.pcmci_config.ci_test == CITestType.ROBUST_PARCORR: ci_test = RobustParCorr(significance="analytic") else: ci_test = ParCorr(significance="analytic")
pcmci = PCMCI( dataframe=dataframe, cond_ind_test=ci_test, verbosity=self.pcmci_config.verbosity, )
results = pcmci.run_pcmci( tau_max=self.pcmci_config.tau_max, pc_alpha=self.pcmci_config.alpha_pc, alpha_level=self.pcmci_config.alpha_mci, )
self.causal_graph = { "pcmci": pcmci, "results": results, "var_names": var_names, }
return results
def run_backtest( self, prices: pd.DataFrame, ) -> dict: """Полный бэктест стратегии.""" # Здесь вызывается PCMCICausalStrategy из примера 04 # (см. пример 04 для полной реализации)
returns = self.preprocess(prices) results = self.fit_causal_model(returns)
return { "causal_results": results, "var_names": list(prices.columns), }
# ============================================================# Удобные функции-обёртки# ============================================================
def quick_pcmci_analysis( data: pd.DataFrame, tau_max: int = 5, alpha: float = 0.05,) -> pd.DataFrame: """ Быстрый запуск PCMCI анализа.
Parameters ---------- data : pd.DataFrame Данные (доходности, стационарные) tau_max : int Максимальный лаг alpha : float Уровень значимости
Returns ------- pd.DataFrame Таблица причинных связей """ from tigramite import data_processing as pp from tigramite.pcmci import PCMCI from tigramite.independence_tests.parcorr import ParCorr
values = data.values var_names = list(data.columns)
dataframe = pp.DataFrame(data=values, var_names=var_names) ci_test = ParCorr(significance="analytic") pcmci = PCMCI(dataframe=dataframe, cond_ind_test=ci_test, verbosity=0)
results = pcmci.run_pcmci( tau_max=tau_max, pc_alpha=alpha, alpha_level=alpha, )
# Извлечение связей N = len(var_names) links = [] for i in range(N): for j in range(N): for tau in range(1, tau_max + 1): if results["p_matrix"][i, j, tau] < alpha: links.append({ "source": var_names[i], "target": var_names[j], "lag": tau, "mci": round(results["val_matrix"][i, j, tau], 4), "p_value": round(results["p_matrix"][i, j, tau], 6), })
return pd.DataFrame(links).sort_values("p_value")Основные классы
Модульная архитектура позволяет гибко комбинировать компоненты:
Архитектура модулей:
PCMCITradingPipeline | +-- DataLoader (загрузка: Yahoo, Bybit, CSV) | +-- Preprocessor (доходности, стандартизация, ADF-тест) | +-- CausalAnalyzer (PCMCI, PCMCI+, LPCMCI) | | | +-- ParCorr / RobustParCorr / CMIknn | +-- NetworkAnalyzer (центральность, кластеры, стабильность) | +-- SignalGenerator (z-score, причинные сигналы) | +-- PositionManager (размер, стопы, тейки) | +-- Backtester (метрики, сравнение, отчёты)Сравнение с другими методами
Ниже представлена сравнительная таблица PCMCI с другими популярными методами причинного анализа временных рядов:
+=====================+==========+==========+===========+============+| Характеристика | PCMCI | Granger | VAR | Transfer || | | Causality| | Entropy |+=====================+==========+==========+===========+============+| Тип связей | Прямые | Попарные | Линейные | Попарные || | причинн. | | многомерн.| нелинейные |+---------------------+----------+----------+-----------+------------+| Контроль | Да | Нет* | Частично | Нет* || конфаундеров | (полный) | | (VAR) | |+---------------------+----------+----------+-----------+------------+| Нелинейность | Да | Нет | Нет | Да || | (CMIknn) | | | |+---------------------+----------+----------+-----------+------------+| Масштабируемость | Высокая | Средняя | Низкая | Средняя || (N переменных) | (>100) | (<50) | (<20) | (<50) |+---------------------+----------+----------+-----------+------------+| Контроль FDR | Да | Нет** | Нет | Нет** |+---------------------+----------+----------+-----------+------------+| Одновременные | Да | Нет | Нет | Нет || связи | (PCMCI+) | | | |+---------------------+----------+----------+-----------+------------+| Латентные | Да | Нет | Нет | Нет || конфаундеры | (LPCMCI) | | | |+---------------------+----------+----------+-----------+------------+| Направленность | Да | Да | Частично | Да |+---------------------+----------+----------+-----------+------------+| Автокорреляция | Корректн.| Проблемы | Включена | Проблемы || | через MCI| | в модель | |+---------------------+----------+----------+-----------+------------+| Вычислительная | O(N^2*p) | O(N^2) | O(N^2*p) | O(N^2*T) || сложность | | | | |+---------------------+----------+----------+-----------+------------+| Мин. T для | 100-200 | 50-100 | 100-500 | 500-1000 || надёжных результатов| | | | |+---------------------+----------+----------+-----------+------------+| Интерпретируемость | Высокая | Высокая | Средняя | Средняя || (граф) | (DAG) | (пары) | (матрица) | (пары) |+---------------------+----------+----------+-----------+------------+| Реализация |tigramite | statsmod.| statsmod. | pyinform, || (Python) | | | | IDTxl |+---------------------+----------+----------+-----------+------------+
* Можно расширить условным вариантом, но это не стандартная реализация** Можно добавить коррекцию вручную (Bonferroni, FDR)Детальное сравнение
PCMCI vs Granger Causality:
Пример: 3 переменные, X->Y->Z
Granger Causality (попарные тесты): X -> Y? Да (p < 0.01) -- ВЕРНО X -> Z? Да (p < 0.05) -- ЛОЖНОЕ (опосредованное через Y) Y -> Z? Да (p < 0.01) -- ВЕРНО
PCMCI: X -> Y (lag=1)? Да, MCI=0.35 -- ВЕРНО X -> Z (lag=2)? Нет, p=0.42 -- ВЕРНО (нет прямой связи) Y -> Z (lag=1)? Да, MCI=0.30 -- ВЕРНО
PCMCI корректно различает прямые и опосредованные связи!PCMCI vs Transfer Entropy:
Преимущества Transfer Entropy: + Обнаруживает нелинейные зависимости + Теоретически обоснован (теория информации)
Недостатки Transfer Entropy vs PCMCI: - Попарный тест (не контролирует конфаундеры) - Требует много данных для оценки энтропии - Нет контроля FDR - Чувствителен к параметрам (число бинов, kNN)
PCMCI с CMIknn тестом: + Нелинейные зависимости (как Transfer Entropy) + Контроль конфаундеров (через условную независимость) + Контроль FDR + Лучшая масштабируемостьКогда использовать какой метод?
Дерево выбора метода:
Число переменных N? | +-- N <= 5: | Нужна нелинейность? | +-- Нет --> Granger Causality (просто и быстро) | +-- Да --> Transfer Entropy | +-- 5 < N <= 20: | Достаточно данных (T > 500)? | +-- Нет --> PCMCI (ParCorr) | +-- Да --> PCMCI (CMIknn) или VAR + Granger | +-- N > 20: | --> PCMCI (единственный масштабируемый вариант) | ParCorr для быстрого анализа | RobustParCorr для тяжёлых хвостов | Есть ненаблюдаемые конфаундеры? +-- Да --> LPCMCI +-- Нет --> PCMCI или PCMCI+Лучшие практики
1. Подготовка данных
Чеклист подготовки данных для PCMCI:
[x] Стационарность - Использовать доходности (log-returns), не цены - Проверить ADF-тестом - Если нестационарно -> дифференцировать
[x] Пропущенные значения - PCMCI не работает с NaN - Удалить или интерполировать - Предпочтительно: forward-fill для финансовых данных
[x] Выбросы - Winsorize на уровне 3-5 sigma - Или использовать RobustParCorr
[x] Стандартизация - z-score (mean=0, std=1) - Или ранговое преобразование для RobustParCorr
[x] Достаточный объём - Минимум T > 5 * N * tau_max для ParCorr - Минимум T > 20 * N * tau_max для CMIknn
[x] Временное выравнивание - Все ряды должны быть на одной частоте - Учёт часовых поясов для международных данных2. Выбор параметров
Рекомендации по параметрам PCMCI:
tau_max (максимальный лаг): +-------------------+-------------------+ | Данные | Рекомендация | +-------------------+-------------------+ | Тиковые | tau_max = 10-50 | | Минутные | tau_max = 5-20 | | Часовые | tau_max = 5-24 | | Дневные | tau_max = 1-10 | | Недельные | tau_max = 1-4 | +-------------------+-------------------+
alpha_PC (уровень для фазы PC): - Стандарт: 0.05 - Консервативный: 0.01 (меньше ложных связей) - Либеральный: 0.10 (больше кандидатов, менее эффективно) - Рекомендация для финансов: 0.05
alpha_MCI (уровень для фазы MCI): - Стандарт: 0.05 - С FDR-коррекцией: 0.05 (автоматически корректируется) - Без коррекции: 0.01 (для контроля ложных открытий)
Тест условной независимости: - Финансовые доходности -> RobustParCorr - Дневные данные, T > 500 -> ParCorr (быстрее) - Нелинейные зависимости -> CMIknn (T > 1000)3. Интерпретация результатов
Как интерпретировать результаты PCMCI:
1. MCI-значение: |MCI| > 0.3 --> Сильная причинная связь 0.15 < |MCI| < 0.3 --> Умеренная связь 0.05 < |MCI| < 0.15 --> Слабая связь |MCI| < 0.05 --> Пренебрежимо малая
2. Направление: MCI > 0: положительная причинная связь (рост X_{t-tau} причинно ведёт к росту Y_t) MCI < 0: отрицательная причинная связь (рост X_{t-tau} причинно ведёт к падению Y_t)
3. Лаг: tau = 1: немедленная передача (следующий период) tau > 1: задержанная передача Множественные лаги: сложная динамика передачи
4. Автопричинность (i == j): MCI(X_{t-1} -> X_t) > 0: положительная автокорреляция Сильная автопричинность: ряд имеет память
5. Стабильность (бутстрэп): > 80%: торгуемая связь 50-80%: осторожно, может быть нестационарной < 50%: не рекомендуется для торговли4. Управление рисками
Риск-менеджмент в причинной стратегии:
1. Диверсификация сигналов: - Не полагаться на одну причинную связь - Агрегировать сигналы от нескольких драйверов - Взвешивать по стабильности и силе MCI
2. Адаптивное переобучение: - Причинная структура меняется со временем - Переобучать каждые 1-3 месяца - Использовать скользящее окно (rolling window)
3. Защита от переобучения: - Бутстрэп-валидация стабильности связей - Out-of-sample тестирование - Не использовать связи с p > 0.01
4. Размер позиций: - Пропорционально силе сигнала и стабильности - Уменьшать при смене рыночного режима - Максимум 10% капитала на одну позицию
5. Мониторинг: - Отслеживать изменения причинного графа - Если ключевая связь исчезла — закрыть позиции - Сравнивать текущий граф с ожидаемым5. Распространённые ошибки
Ошибки при использовании PCMCI в трейдинге:
ОШИБКА 1: Применение к ценам вместо доходностей Ценовые ряды нестационарны -> ложные причинные связи РЕШЕНИЕ: Всегда работать с логарифмическими доходностями
ОШИБКА 2: Слишком большой tau_max tau_max=20 для дневных данных -> множество ложных связей РЕШЕНИЕ: tau_max <= 5 для дневных, <= 10 для часовых
ОШИБКА 3: Слишком мало данных T=100, N=20, tau_max=5 -> переобучение Минимум: T > 5 * N * tau_max РЕШЕНИЕ: Уменьшить N или tau_max, или собрать больше данных
ОШИБКА 4: Игнорирование нестационарности причинных связей Причинные связи меняются в разных режимах РЕШЕНИЕ: Режимно-зависимый анализ + периодическое переобучение
ОШИБКА 5: Торговля на нестабильных связях Связь обнаружена на одной выборке, но нестабильна РЕШЕНИЕ: Бутстрэп-валидация, торговать только стабильные (>80%)
ОШИБКА 6: Смешение статистической и экономической значимости p < 0.001, но MCI = 0.05 -> статистически значимо, но экономически незначимо (слишком слабая связь) РЕШЕНИЕ: Фильтровать по |MCI| > 0.10 для торговлиРесурсы
Основные статьи
-
Runge, J. (2019). “Detecting and Quantifying Causal Associations in Large Nonlinear Time Series Datasets.” Science Advances, 5(11), eaau4996.
- Ключевая статья, вводящая алгоритм PCMCI
- DOI: 10.1126/sciadv.aau4996
- Описывает двухфазный подход и MCI-тест
-
Runge, J. (2020). “Discovering Contemporaneous and Lagged Causal Relations in Autocorrelated Nonlinear Time Series Datasets.” Proceedings of the 36th Conference on Uncertainty in Artificial Intelligence (UAI).
- Расширение PCMCI+ для одновременных связей
- Ориентация рёбер через правила Марковской эквивалентности
-
Runge, J., Nowack, P., Kretschmer, M., Flaxman, S., & Sejdinovic, D. (2019). “Detecting and Quantifying Causal Associations in Large Nonlinear Time Series Datasets.” Science Advances.
- Полное математическое обоснование
- Сравнение с Granger causality и Transfer Entropy
-
Gerhardus, A. & Runge, J. (2020). “High-recall Causal Discovery for Autocorrelated Time Series with Latent Confounders.” NeurIPS 2020.
- LPCMCI — работа с латентными конфаундерами
- Критично для финансовых данных, где не все факторы наблюдаемы
Библиотеки и инструменты
-
Tigramite — Python-библиотека для причинного анализа временных рядов
- GitHub: https://github.com/jakobrunge/tigramite
- Документация: https://jakobrunge.github.io/tigramite/
- Поддержка PCMCI, PCMCI+, LPCMCI
- Визуализация причинных графов
-
CausalNex — библиотека для причинного вывода (QuantumBlack / McKinsey)
- GitHub: https://github.com/quantumblacklabs/causalnex
- Bayesian network structure learning
-
DoWhy — Microsoft Research библиотека для причинного вывода
- GitHub: https://github.com/py-why/dowhy
- Фокус на interventional queries
Книги
-
Pearl, J. (2009). “Causality: Models, Reasoning, and Inference.” Cambridge University Press.
- Фундаментальная работа по причинному выводу
- do-исчисление, SCM (Structural Causal Models)
-
Peters, J., Janzing, D., & Scholkopf, B. (2017). “Elements of Causal Inference: Foundations and Learning Algorithms.” MIT Press.
- Современное введение в причинное обнаружение
- Алгоритмы PC, FCI и их расширения
-
Spirtes, P., Glymour, C., & Scheines, R. (2000). “Causation, Prediction, and Search.” MIT Press.
- Оригинальная работа по алгоритму PC
- Фундамент для PCMCI
Финансовые приложения
-
Kleinberg, S. (2013). “Causality, Probability, and Time.” Cambridge University Press.
- Причинность во временных рядах
- Применения к биомедицинским и финансовым данным
-
Shojaie, A. & Fox, E.B. (2022). “Granger Causality: A Review and Recent Advances.” Annual Review of Statistics and Its Application.
- Обзор методов причинности для временных рядов
- Сравнение с современными подходами
Онлайн-ресурсы
-
Tigramite Tutorials — официальные jupyter-ноутбуки с примерами
- https://github.com/jakobrunge/tigramite/tree/master/tutorials
- Пошаговые примеры PCMCI, PCMCI+, LPCMCI
-
Causal Discovery Toolbox (CDT) — библиотека алгоритмов причинного обнаружения
- GitHub: https://github.com/FenTechSolutions/CausalDiscoveryToolbox
- Сравнение различных алгоритмов
Данные
-
Bybit API — данные криптовалютного рынка
- API Documentation: https://bybit-exchange.github.io/docs/v5/intro
- REST API для исторических данных (klines)
- WebSocket для потоковых данных
-
Yahoo Finance (yfinance) — данные фондового рынка
- GitHub: https://github.com/ranaroussi/yfinance
- Бесплатный доступ к историческим данным акций, ETF, индексов