Перейти к содержимому

Глава 115: Атрибуция признаков для трейдинга - Объяснимый ИИ в финансовых рынках

Обзор

Атрибуция признаков (Feature Attribution) представляет собой набор методов объяснимого искусственного интеллекта (XAI), которые позволяют понять, какие входные признаки вносят наибольший вклад в предсказания модели машинного обучения. В контексте алгоритмического трейдинга атрибуция признаков играет критическую роль:

  • Интерпретируемость торговых сигналов: Понимание причин, по которым модель рекомендует покупку или продажу
  • Управление рисками: Идентификация признаков, создающих наибольшую неопределенность
  • Регуляторное соответствие: Объяснение решений для аудиторов и регуляторов (MiFID II, SEC)
  • Отладка моделей: Обнаружение нежелательных зависимостей и переобучения
  • Генерация альфа-сигналов: Использование атрибуции для создания новых торговых стратегий

Данная глава охватывает теоретические основы SHAP, LIME, интегрированных градиентов и перестановочной важности, а также их практическое применение в торговых системах.

Содержание

  1. Введение в атрибуцию признаков
  2. Математическое обоснование
  3. Методы атрибуции признаков
  4. Применение в трейдинге
  5. Реализация на Python
  6. Реализация на Rust
  7. Практические примеры
  8. Фреймворк бэктестинга
  9. Оценка производительности
  10. Ссылки

Введение в атрибуцию признаков

Проблема черного ящика в трейдинге

Современные модели машинного обучения для торговли (нейронные сети, градиентный бустинг, ансамбли) достигают высокой точности прогнозирования, но страдают от проблемы интерпретируемости:

+------------------+
Признаки | | Прогноз
[price, vol, | "Черный ящик" | --> BUY/SELL
RSI, MACD] | |
+------------------+
?
Почему модель так решила?

Атрибуция признаков решает эту проблему, предоставляя объяснения:

+------------------+
Признаки | | Прогноз
[price, vol, | XAI Модель | --> BUY
RSI, MACD] | |
+------------------+
|
Объяснение:
RSI: +0.35 (перепроданность)
MACD: +0.28 (бычье пересечение)
Volume: +0.15 (рост объема)
Price: -0.08 (нисходящий тренд)

Типы объяснений

Методы атрибуции делятся на несколько категорий:

ТипОписаниеПримерыПрименение в трейдинге
ГлобальныеОбъясняют поведение модели в целомПерестановочная важностьВыбор признаков для модели
ЛокальныеОбъясняют конкретное предсказаниеSHAP, LIMEАнализ отдельных сигналов
Модельно-агностическиеРаботают с любой модельюLIME, SHAP KernelУниверсальное применение
Модельно-специфическиеОптимизированы под архитектуруTreeSHAP, DeepLIFTВысокая скорость для production

Почему атрибуция важна для трейдеров

  1. Доверие к сигналам: Трейдер может проверить, основан ли сигнал на разумных факторах
  2. Фильтрация ложных сигналов: Отклонение сигналов с аномальной атрибуцией
  3. Адаптация к режимам: Понимание смены факторов при изменении рыночного режима
  4. Комплаенс: Документирование причин торговых решений

Математическое обоснование

SHAP (SHapley Additive exPlanations)

SHAP основан на значениях Шепли из теории кооперативных игр. Для модели f и входа x с признаками {1, 2, …, M}, значение Шепли для признака i определяется как:

phi_i(f, x) = sum_{S subseteq M \ {i}} |S|!(|M|-|S|-1)! / |M|! * [f(x_S union {i}) - f(x_S)]

Где:

  • S - подмножество признаков без i
  • x_S - вход с признаками только из S (остальные замаскированы)
  • M - множество всех признаков

Свойства SHAP:

  1. Локальная точность: sum_{i=0}^{M} phi_i = f(x) - E[f(X)]
  2. Отсутствие влияния: Если признак не влияет на выход, phi_i = 0
  3. Согласованность: Если вклад признака увеличивается, phi_i не уменьшается
  4. Симметрия: Одинаковые признаки получают одинаковые значения

Вычислительная сложность:

Точное вычисление требует O(2^M) оценок модели, что непрактично для большого числа признаков. Существуют аппроксимации:

МетодСложностьТочностьТип модели
Exact SHAPO(2^M)ТочныйЛюбая
Kernel SHAPO(M^2 * K)АппроксимацияЛюбая
TreeSHAPO(TLD^2)ТочныйДеревья
DeepSHAPO(forward pass)АппроксимацияНейросети

LIME (Local Interpretable Model-agnostic Explanations)

LIME аппроксимирует сложную модель f локально простой интерпретируемой моделью g в окрестности точки x:

explanation(x) = argmin_{g in G} L(f, g, pi_x) + Omega(g)

Где:

  • G - класс интерпретируемых моделей (линейные, деревья решений)
  • L - функция потерь, измеряющая близость f и g
  • pi_x - мера близости к x (обычно экспоненциальное ядро)
  • Omega(g) - регуляризация сложности g

Алгоритм LIME:

1. Сгенерировать возмущенные примеры z' вокруг x
2. Получить предсказания f(z') для каждого примера
3. Вычислить веса pi_x(z') = exp(-D(x, z')^2 / sigma^2)
4. Обучить взвешенную линейную модель g на (z', f(z'), pi_x(z'))
5. Коэффициенты g - это объяснение для x

Функция потерь для LIME:

L(f, g, pi_x) = sum_z pi_x(z) * (f(z) - g(z))^2

Интегрированные градиенты (Integrated Gradients)

Интегрированные градиенты определяют атрибуцию через интеграл градиентов вдоль пути от базовой точки x’ к входу x:

IG_i(x) = (x_i - x'_i) * integral_0^1 (partial f(x' + alpha*(x - x'))) / (partial x_i) d_alpha

Где:

  • x’ - базовая точка (обычно нули или среднее по данным)
  • alpha - параметр интерполяции от 0 до 1
  • f - дифференцируемая модель

Аппроксимация суммой Римана:

IG_i(x) ~= (x_i - x'_i) * (1/m) * sum_{k=1}^{m} (partial f(x' + k/m * (x - x'))) / (partial x_i)

Свойства интегрированных градиентов:

  1. Чувствительность: Если x и x’ отличаются в признаке i и дают разные выходы, то IG_i != 0
  2. Независимость от реализации: Атрибуция не зависит от внутренней структуры модели
  3. Полнота: sum_i IG_i(x) = f(x) - f(x’)
  4. Линейность: Для линейных моделей IG совпадает с коэффициентами

Перестановочная важность (Permutation Importance)

Перестановочная важность измеряет снижение качества модели при случайной перестановке значений признака:

PI_i = score(y, f(X)) - score(y, f(X_{~i}))

Где:

  • X_{~i} - матрица признаков с перемешанным i-м столбцом
  • score - метрика качества (accuracy, R^2, Sharpe ratio)

Алгоритм:

1. Вычислить базовый score_0 = score(y, f(X))
2. Для каждого признака i:
a. Создать X_{~i} путем случайной перестановки столбца i
b. Вычислить score_i = score(y, f(X_{~i}))
c. PI_i = score_0 - score_i
3. Повторить K раз и усреднить для стабильности

Важные соображения:

АспектОписаниеРешение
Коррелированные признакиПерестановка нарушает корреляцииИспользовать условную перестановку
НестабильностьРезультаты зависят от seedУсреднение по нескольким перестановкам
Вне выборкиПерестановка создает нереалистичные данныеПерестановка в тестовой выборке

Методы атрибуции признаков

Детальное сравнение методов

+------------------------------------------------------------------+
| МЕТОДЫ АТРИБУЦИИ ПРИЗНАКОВ |
+------------------------------------------------------------------+
| |
| SHAP LIME |
| +----------------------+ +----------------------+ |
| | Теория игр | | Локальные surrogate | |
| | Точные свойства | | Быстрое вычисление | |
| | Глобальная + локальн | | Интуитивные объясн. | |
| +----------------------+ +----------------------+ |
| |
| Integrated Gradients Permutation Importance |
| +----------------------+ +----------------------+ |
| | Для нейросетей | | Глобальная важность | |
| | Осмысленный путь | | Модельно-агностич. | |
| | Точная атрибуция | | Простая реализация | |
| +----------------------+ +----------------------+ |
| |
+------------------------------------------------------------------+

SHAP для торговых моделей

KernelSHAP

Универсальный метод для любых моделей:

# Математическая формулировка KernelSHAP
# Аппроксимация значений Шепли через взвешенную регрессию
# Веса для коалиции z размера |z|:
w(z) = (M - 1) / (C(M, |z|) * |z| * (M - |z|))
# Целевая функция:
min_phi sum_z w(z) * [f(h_x(z)) - (phi_0 + sum_i z_i * phi_i)]^2

TreeSHAP

Оптимизированный алгоритм для древесных моделей (XGBoost, LightGBM, Random Forest):

Алгоритм TreeSHAP:
1. Для каждого дерева T в ансамбле:
a. Рекурсивный обход от корня
b. На каждом узле вычислить расщепление SHAP-значений
c. Агрегировать по всем путям к листьям
2. Сложность: O(TLD^2)
- T: количество деревьев
- L: количество листьев
- D: глубина дерева

LIME для временных рядов

Адаптация LIME для финансовых временных рядов требует специального подхода к возмущениям:

Стратегии возмущения для временных рядов:
1. Замена сегментов: Заменить участок ряда средним значением
2. Шум: Добавить гауссовский шум к сегментам
3. Временное маскирование: Обнулить определенные временные окна
4. Частотная фильтрация: Удалить определенные частотные компоненты
Пример сегментации:
[t-100, ..., t-80] [t-79, ..., t-40] [t-39, ..., t-20] [t-19, ..., t]
Segment 1 Segment 2 Segment 3 Segment 4

Интегрированные градиенты для нейронных сетей

Выбор базовой точки для финансовых данных:
1. Нулевой вектор: x' = 0
- Интерпретация: "отсутствие информации"
- Проблема: нереалистичные цены
2. Среднее по обучающей выборке: x' = E[X]
- Интерпретация: "типичное состояние рынка"
- Рекомендуется для большинства случаев
3. Безрисковый сценарий: x' = [0, 0, neutral_indicators]
- Интерпретация: "нейтральный рынок"
- Подходит для анализа экстремальных сигналов
4. Предыдущее состояние: x' = x_{t-1}
- Интерпретация: "изменение относительно прошлого"
- Для анализа динамики признаков

Сводная таблица методов

МетодТипСкоростьТочностьНейросетиДеревьяВременные ряды
KernelSHAPЛокальныйМедленноВысокаяДаДаДа
TreeSHAPЛокальныйБыстроТочныйНетДаДа
DeepSHAPЛокальныйБыстроАппрокс.ДаНетДа
LIMEЛокальныйБыстроСредняяДаДаТребует адаптации
IGЛокальныйСреднеВысокаяДаНетДа
PermutationГлобальныйСреднеВысокаяДаДаДа

Применение в трейдинге

Генерация сигналов на основе атрибуции

Атрибуция признаков может использоваться для создания торговых сигналов:

+--------------------------------------------------+
| Конвейер генерации сигналов |
+--------------------------------------------------+
| |
| Рыночные данные |
| | |
| v |
| [Извлечение признаков] |
| | |
| v |
| [ML Модель] --> Прогноз: +1.2% (BUY) |
| | |
| v |
| [SHAP Анализ] |
| | |
| v |
| Атрибуция: |
| RSI_oversold: +0.45 |
| Volume_spike: +0.32 |
| MACD_cross: +0.28 |
| Price_momentum: -0.15 |
| | |
| v |
| [Валидация сигнала] |
| - Проверка согласованности факторов |
| - Фильтрация аномальных атрибуций |
| - Оценка уверенности |
| | |
| v |
| Финальный сигнал: STRONG_BUY (confidence: 0.85)|
+--------------------------------------------------+

Стратегия на основе согласованности атрибуции

# Псевдокод стратегии
def attribution_based_signal(features, model, explainer):
# Получить предсказание
prediction = model.predict(features)
# Вычислить атрибуцию
shap_values = explainer.shap_values(features)
# Анализ согласованности
positive_contributors = sum(shap_values > 0)
negative_contributors = sum(shap_values < 0)
# Фильтрация по согласованности
if prediction > 0 and positive_contributors < 3:
return "WEAK_BUY" # Сигнал слабо обоснован
# Проверка доминирующего фактора
max_attribution = max(abs(shap_values))
if max_attribution > 0.5 * sum(abs(shap_values)):
return "RISKY" # Сигнал зависит от одного фактора
return "STRONG_BUY"

Оценка риска через атрибуцию

Атрибуция позволяет оценить источники риска в торговых позициях:

Анализ риска позиции:
+--------------------+----------+------------------+
| Признак | SHAP | Риск-вклад |
+--------------------+----------+------------------+
| Implied Volatility | +0.35 | HIGH (волатильн) |
| RSI | +0.28 | MEDIUM |
| Order Book Imbal. | +0.22 | HIGH (ликвидн.) |
| Price Trend | +0.15 | LOW |
+--------------------+----------+------------------+
Общий риск-профиль: ELEVATED
Рекомендация: Уменьшить размер позиции на 30%

Матрица риска на основе атрибуции

Волатильность атрибуции
Низкая Высокая
+------------+------------+
Высокая | УВЕРЕННЫЙ | НЕСТАБИЛЬ |
Абсолютная | СИГНАЛ | НЫЙ СИГНАЛ |
атрибуция | (Держать) | (Осторожно)|
+------------+------------+
Низкая | СЛАБЫЙ | ШУМОВОЙ |
| СИГНАЛ | СИГНАЛ |
| (Пропустит)| (Избегать) |
+------------+------------+

Отладка моделей с помощью атрибуции

Атрибуция помогает выявить проблемы в торговых моделях:

Типичные проблемы и их обнаружение:
1. Утечка данных (Data Leakage)
Симптом: Неожиданно высокая атрибуция для "безобидного" признака
Пример: timestamp имеет SHAP = 0.8
2. Переобучение на шум
Симптом: Атрибуция сильно меняется между похожими примерами
Пример: std(SHAP_i) >> mean(|SHAP_i|)
3. Спуриозные корреляции
Симптом: Высокая атрибуция признаков без экономического смысла
Пример: day_of_week имеет постоянно высокий SHAP
4. Недостаточная генерализация
Симптом: Разная структура атрибуции на train и test
Пример: correlation(SHAP_train, SHAP_test) < 0.5

Адаптация к рыночным режимам

Мониторинг изменений атрибуции для обнаружения смены режима:

Временной анализ атрибуции:
Период 1 (Bull Market):
RSI: 15%, MACD: 25%, Volume: 20%, Trend: 40%
Период 2 (Transition):
RSI: 25%, MACD: 30%, Volume: 15%, Trend: 30%
Период 3 (Bear Market):
RSI: 35%, MACD: 20%, Volume: 30%, Trend: 15%
Вывод: В медвежьем рынке модель больше полагается на
осцилляторы (RSI) и объем, меньше на тренд.

Реализация на Python

Структура проекта

115_feature_attribution_trading/
├── python/
│ ├── __init__.py
│ ├── attribution/
│ │ ├── __init__.py
│ │ ├── shap_explainer.py # SHAP для торговых моделей
│ │ ├── lime_explainer.py # LIME адаптация
│ │ ├── integrated_grad.py # Интегрированные градиенты
│ │ └── permutation.py # Перестановочная важность
│ ├── models/
│ │ ├── __init__.py
│ │ ├── trading_model.py # Базовая торговая модель
│ │ └── explainable_nn.py # Нейросеть с XAI
│ ├── signals/
│ │ ├── __init__.py
│ │ ├── generator.py # Генератор сигналов
│ │ └── validator.py # Валидация через атрибуцию
│ ├── data/
│ │ ├── __init__.py
│ │ ├── loader.py # Загрузчик данных
│ │ └── features.py # Извлечение признаков
│ ├── backtest/
│ │ ├── __init__.py
│ │ └── engine.py # Движок бэктестинга
│ └── requirements.txt
├── notebooks/
│ ├── 01_shap_analysis.ipynb
│ ├── 02_lime_trading.ipynb
│ └── 03_attribution_strategy.ipynb
└── examples/
├── basic_shap.py
├── lime_signals.py
└── attribution_backtest.py

Основной модуль SHAP

python/attribution/shap_explainer.py
import numpy as np
import pandas as pd
import shap
from typing import Union, List, Optional, Dict, Any
class TradingSHAPExplainer:
"""SHAP объяснитель для торговых моделей."""
def __init__(
self,
model: Any,
background_data: np.ndarray,
feature_names: List[str],
model_type: str = "auto"
):
"""
Инициализация SHAP объяснителя.
Args:
model: Обученная торговая модель
background_data: Фоновые данные для SHAP
feature_names: Названия признаков
model_type: Тип модели (tree, kernel, deep, auto)
"""
self.model = model
self.background_data = background_data
self.feature_names = feature_names
self.model_type = model_type
self.explainer = self._create_explainer()
def _create_explainer(self) -> shap.Explainer:
"""Создание подходящего SHAP объяснителя."""
if self.model_type == "tree":
return shap.TreeExplainer(self.model)
elif self.model_type == "deep":
return shap.DeepExplainer(self.model, self.background_data)
elif self.model_type == "kernel":
return shap.KernelExplainer(
self.model.predict,
shap.sample(self.background_data, 100)
)
else:
# Автоопределение
if hasattr(self.model, 'get_booster'):
return shap.TreeExplainer(self.model)
else:
return shap.KernelExplainer(
self.model.predict,
shap.sample(self.background_data, 100)
)
def explain(
self,
X: np.ndarray,
return_base_value: bool = False
) -> Union[np.ndarray, tuple]:
"""
Вычисление SHAP значений для входа X.
Args:
X: Входные данные (n_samples, n_features)
return_base_value: Возвращать базовое значение
Returns:
SHAP значения и опционально базовое значение
"""
shap_values = self.explainer.shap_values(X)
if return_base_value:
base_value = self.explainer.expected_value
return shap_values, base_value
return shap_values
def get_feature_importance(
self,
X: np.ndarray,
method: str = "mean_abs"
) -> pd.DataFrame:
"""
Глобальная важность признаков на основе SHAP.
Args:
X: Данные для анализа
method: Метод агрегации (mean_abs, max_abs, std)
"""
shap_values = self.explain(X)
if method == "mean_abs":
importance = np.mean(np.abs(shap_values), axis=0)
elif method == "max_abs":
importance = np.max(np.abs(shap_values), axis=0)
elif method == "std":
importance = np.std(shap_values, axis=0)
else:
raise ValueError(f"Unknown method: {method}")
return pd.DataFrame({
'feature': self.feature_names,
'importance': importance
}).sort_values('importance', ascending=False)
def analyze_signal(
self,
X: np.ndarray,
prediction: float
) -> Dict[str, Any]:
"""
Анализ торгового сигнала через атрибуцию.
Args:
X: Входные признаки (1, n_features)
prediction: Прогноз модели
Returns:
Словарь с анализом сигнала
"""
shap_values, base_value = self.explain(X, return_base_value=True)
shap_values = shap_values.flatten()
# Анализ согласованности
positive_mask = shap_values > 0
negative_mask = shap_values < 0
positive_sum = np.sum(shap_values[positive_mask])
negative_sum = np.sum(np.abs(shap_values[negative_mask]))
# Доминирующие факторы
sorted_idx = np.argsort(np.abs(shap_values))[::-1]
top_features = [
{
'feature': self.feature_names[i],
'shap_value': float(shap_values[i]),
'direction': 'positive' if shap_values[i] > 0 else 'negative'
}
for i in sorted_idx[:5]
]
# Оценка качества сигнала
consistency = positive_sum / (positive_sum + negative_sum)
concentration = np.max(np.abs(shap_values)) / np.sum(np.abs(shap_values))
return {
'prediction': prediction,
'base_value': float(base_value),
'top_features': top_features,
'consistency': consistency,
'concentration': concentration,
'signal_quality': self._assess_quality(consistency, concentration)
}
def _assess_quality(
self,
consistency: float,
concentration: float
) -> str:
"""Оценка качества сигнала."""
if consistency > 0.7 and concentration < 0.5:
return 'HIGH'
elif consistency > 0.5 and concentration < 0.6:
return 'MEDIUM'
else:
return 'LOW'

Модуль LIME для временных рядов

python/attribution/lime_explainer.py
import numpy as np
import pandas as pd
from lime import lime_tabular
from sklearn.linear_model import Ridge
from typing import List, Dict, Callable, Optional
class TimeSeriesLIME:
"""LIME адаптация для финансовых временных рядов."""
def __init__(
self,
model: Callable,
feature_names: List[str],
training_data: np.ndarray,
segment_size: int = 10
):
"""
Инициализация LIME для временных рядов.
Args:
model: Функция предсказания модели
feature_names: Названия признаков
training_data: Данные для обучения LIME
segment_size: Размер сегмента для возмущений
"""
self.model = model
self.feature_names = feature_names
self.training_data = training_data
self.segment_size = segment_size
self.explainer = lime_tabular.LimeTabularExplainer(
training_data,
feature_names=feature_names,
class_names=['SELL', 'HOLD', 'BUY'],
mode='classification'
)
def explain_instance(
self,
instance: np.ndarray,
num_features: int = 10,
num_samples: int = 1000
) -> Dict:
"""
Объяснение одного экземпляра.
Args:
instance: Входной вектор признаков
num_features: Количество признаков в объяснении
num_samples: Количество возмущенных примеров
Returns:
Словарь с объяснением
"""
explanation = self.explainer.explain_instance(
instance,
self.model,
num_features=num_features,
num_samples=num_samples
)
# Извлечение результатов
feature_weights = explanation.as_list()
return {
'feature_weights': feature_weights,
'intercept': explanation.intercept,
'local_prediction': explanation.local_pred,
'score': explanation.score
}
def explain_with_segments(
self,
time_series: np.ndarray,
num_segments: int = 10
) -> Dict:
"""
Объяснение с сегментацией временного ряда.
Args:
time_series: Временной ряд (seq_len, n_features)
num_segments: Количество временных сегментов
Returns:
Атрибуция по сегментам
"""
seq_len = time_series.shape[0]
segment_size = seq_len // num_segments
# Создание возмущений по сегментам
def perturb_predict(binary_repr):
"""Предсказание для бинарного представления сегментов."""
perturbed = time_series.copy()
for i, active in enumerate(binary_repr):
if not active:
start = i * segment_size
end = min((i + 1) * segment_size, seq_len)
# Замена сегмента средним
perturbed[start:end] = np.mean(time_series, axis=0)
return self.model(perturbed.reshape(1, -1))
# Генерация возмущенных примеров
n_samples = 1000
perturbations = np.random.binomial(1, 0.5, size=(n_samples, num_segments))
predictions = np.array([perturb_predict(p) for p in perturbations])
# Взвешенная регрессия
weights = self._compute_weights(perturbations)
model = Ridge(alpha=1.0)
model.fit(perturbations, predictions, sample_weight=weights)
segment_importance = model.coef_.flatten()
return {
'segment_importance': segment_importance,
'segment_boundaries': [
(i * segment_size, min((i + 1) * segment_size, seq_len))
for i in range(num_segments)
],
'intercept': model.intercept_
}
def _compute_weights(
self,
perturbations: np.ndarray,
sigma: float = 0.25
) -> np.ndarray:
"""Вычисление весов для LIME."""
# Расстояние до полного представления (все 1)
original = np.ones(perturbations.shape[1])
distances = np.sqrt(np.sum((perturbations - original) ** 2, axis=1))
weights = np.exp(-distances ** 2 / sigma ** 2)
return weights

Генератор сигналов с атрибуцией

python/signals/generator.py
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
@dataclass
class TradingSignal:
"""Торговый сигнал с атрибуцией."""
timestamp: pd.Timestamp
direction: str # BUY, SELL, HOLD
confidence: float
predicted_return: float
attribution: Dict[str, float]
quality: str # HIGH, MEDIUM, LOW
risk_factors: List[str]
class AttributionSignalGenerator:
"""Генератор торговых сигналов на основе атрибуции."""
def __init__(
self,
model,
explainer,
feature_names: List[str],
buy_threshold: float = 0.005,
sell_threshold: float = -0.005,
min_confidence: float = 0.6
):
"""
Инициализация генератора сигналов.
Args:
model: Торговая модель
explainer: SHAP/LIME объяснитель
feature_names: Названия признаков
buy_threshold: Порог для сигнала BUY
sell_threshold: Порог для сигнала SELL
min_confidence: Минимальная уверенность для сигнала
"""
self.model = model
self.explainer = explainer
self.feature_names = feature_names
self.buy_threshold = buy_threshold
self.sell_threshold = sell_threshold
self.min_confidence = min_confidence
# Категории признаков для анализа риска
self.risk_features = {
'volatility': ['implied_vol', 'realized_vol', 'vix'],
'liquidity': ['bid_ask_spread', 'volume', 'order_imbalance'],
'momentum': ['rsi', 'macd', 'momentum']
}
def generate_signal(
self,
features: np.ndarray,
timestamp: pd.Timestamp
) -> TradingSignal:
"""
Генерация торгового сигнала.
Args:
features: Входные признаки (1, n_features)
timestamp: Временная метка
Returns:
Торговый сигнал с атрибуцией
"""
# Предсказание модели
prediction = self.model.predict(features)[0]
# Вычисление атрибуции
analysis = self.explainer.analyze_signal(features, prediction)
# Определение направления
if prediction > self.buy_threshold:
direction = 'BUY'
elif prediction < self.sell_threshold:
direction = 'SELL'
else:
direction = 'HOLD'
# Вычисление уверенности на основе атрибуции
confidence = self._compute_confidence(analysis)
# Идентификация риск-факторов
risk_factors = self._identify_risk_factors(analysis)
# Оценка качества сигнала
quality = self._assess_signal_quality(
direction, prediction, analysis, confidence
)
# Создание атрибуции
attribution = {
f['feature']: f['shap_value']
for f in analysis['top_features']
}
return TradingSignal(
timestamp=timestamp,
direction=direction,
confidence=confidence,
predicted_return=prediction,
attribution=attribution,
quality=quality,
risk_factors=risk_factors
)
def _compute_confidence(self, analysis: Dict) -> float:
"""Вычисление уверенности на основе атрибуции."""
consistency = analysis['consistency']
concentration = analysis['concentration']
# Высокая согласованность и низкая концентрация = высокая уверенность
confidence = consistency * (1 - concentration * 0.5)
return np.clip(confidence, 0, 1)
def _identify_risk_factors(self, analysis: Dict) -> List[str]:
"""Идентификация факторов риска."""
risk_factors = []
for feature_info in analysis['top_features']:
feature_name = feature_info['feature']
shap_value = feature_info['shap_value']
# Проверка на риск-категории
for category, features in self.risk_features.items():
if any(f in feature_name.lower() for f in features):
if abs(shap_value) > 0.3:
risk_factors.append(
f"{category.upper()}: {feature_name} ({shap_value:.3f})"
)
return risk_factors
def _assess_signal_quality(
self,
direction: str,
prediction: float,
analysis: Dict,
confidence: float
) -> str:
"""Оценка качества сигнала."""
if direction == 'HOLD':
return 'NEUTRAL'
# Критерии качества
high_quality = (
confidence > 0.7 and
analysis['concentration'] < 0.4 and
len(analysis['top_features']) >= 3
)
medium_quality = (
confidence > 0.5 and
analysis['concentration'] < 0.6
)
if high_quality:
return 'HIGH'
elif medium_quality:
return 'MEDIUM'
else:
return 'LOW'
def filter_signals(
self,
signals: List[TradingSignal],
min_quality: str = 'MEDIUM'
) -> List[TradingSignal]:
"""Фильтрация сигналов по качеству."""
quality_order = {'LOW': 0, 'NEUTRAL': 1, 'MEDIUM': 2, 'HIGH': 3}
min_quality_value = quality_order.get(min_quality, 2)
return [
s for s in signals
if quality_order.get(s.quality, 0) >= min_quality_value
]

Запуск примеров

Окно терминала
cd 115_feature_attribution_trading/python
pip install -r requirements.txt
python examples/basic_shap.py # Базовый SHAP анализ
python examples/lime_signals.py # LIME для сигналов
python examples/attribution_backtest.py # Бэктест стратегии

Реализация на Rust

Структура крейта

115_feature_attribution_trading/
├── Cargo.toml
├── src/
│ ├── lib.rs # Корень крейта и экспорты
│ ├── attribution/
│ │ ├── mod.rs
│ │ ├── shap.rs # SHAP вычисления
│ │ ├── lime.rs # LIME реализация
│ │ ├── permutation.rs # Перестановочная важность
│ │ └── integrated.rs # Интегрированные градиенты
│ ├── models/
│ │ ├── mod.rs
│ │ ├── tree.rs # Древесные модели
│ │ └── neural.rs # Нейронные сети
│ ├── signals/
│ │ ├── mod.rs
│ │ ├── generator.rs # Генерация сигналов
│ │ └── validator.rs # Валидация сигналов
│ ├── data/
│ │ ├── mod.rs
│ │ ├── bybit.rs # API Bybit
│ │ └── features.rs # Извлечение признаков
│ └── backtest/
│ ├── mod.rs
│ └── engine.rs # Движок бэктестинга
└── examples/
├── shap_analysis.rs
├── trading_signals.rs
└── attribution_backtest.rs

Cargo.toml

[package]
name = "feature_attribution_trading"
version = "0.1.0"
edition = "2021"
description = "Feature Attribution for Algorithmic Trading"
[dependencies]
ndarray = { version = "0.15", features = ["rayon"] }
ndarray-rand = "0.14"
rayon = "1.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
polars = { version = "0.35", features = ["lazy"] }
statrs = "0.16"
rand = "0.8"
rand_distr = "0.4"
thiserror = "1.0"
chrono = { version = "0.4", features = ["serde"] }
log = "0.4"
env_logger = "0.10"
[dev-dependencies]
criterion = "0.5"
approx = "0.5"
[[bench]]
name = "shap_benchmark"
harness = false

Модуль SHAP

src/attribution/shap.rs
use ndarray::{Array1, Array2, ArrayView1, ArrayView2, Axis};
use rayon::prelude::*;
use std::collections::HashMap;
/// SHAP объяснитель для торговых моделей
pub struct ShapExplainer<F>
where
F: Fn(&Array2<f64>) -> Array1<f64> + Send + Sync,
{
/// Функция предсказания модели
predict_fn: F,
/// Фоновые данные для вычисления SHAP
background: Array2<f64>,
/// Названия признаков
feature_names: Vec<String>,
/// Количество сэмплов для KernelSHAP
n_samples: usize,
}
impl<F> ShapExplainer<F>
where
F: Fn(&Array2<f64>) -> Array1<f64> + Send + Sync,
{
/// Создание нового SHAP объяснителя
pub fn new(
predict_fn: F,
background: Array2<f64>,
feature_names: Vec<String>,
) -> Self {
Self {
predict_fn,
background,
feature_names,
n_samples: 1000,
}
}
/// Установка количества сэмплов
pub fn with_n_samples(mut self, n_samples: usize) -> Self {
self.n_samples = n_samples;
self
}
/// Вычисление SHAP значений для одного примера
pub fn shap_values(&self, x: &ArrayView1<f64>) -> ShapResult {
let n_features = x.len();
let mut shap_values = Array1::zeros(n_features);
// Базовое значение (среднее предсказание на фоновых данных)
let base_predictions = (self.predict_fn)(&self.background);
let base_value = base_predictions.mean().unwrap_or(0.0);
// KernelSHAP аппроксимация
let coalitions = self.generate_coalitions(n_features);
let (weights, predictions) = self.evaluate_coalitions(x, &coalitions);
// Решение взвешенной регрессии
shap_values = self.solve_weighted_regression(
&coalitions,
&weights,
&predictions,
base_value,
);
// Формирование результата
let feature_attributions: Vec<FeatureAttribution> = self
.feature_names
.iter()
.enumerate()
.map(|(i, name)| FeatureAttribution {
feature: name.clone(),
value: shap_values[i],
abs_value: shap_values[i].abs(),
})
.collect();
ShapResult {
shap_values,
base_value,
feature_attributions,
}
}
/// Генерация коалиций для KernelSHAP
fn generate_coalitions(&self, n_features: usize) -> Array2<f64> {
use rand::Rng;
let mut rng = rand::thread_rng();
let mut coalitions = Array2::zeros((self.n_samples, n_features));
for i in 0..self.n_samples {
for j in 0..n_features {
coalitions[[i, j]] = if rng.gen::<f64>() > 0.5 { 1.0 } else { 0.0 };
}
}
coalitions
}
/// Оценка коалиций
fn evaluate_coalitions(
&self,
x: &ArrayView1<f64>,
coalitions: &Array2<f64>,
) -> (Array1<f64>, Array1<f64>) {
let n_samples = coalitions.nrows();
let n_features = coalitions.ncols();
let n_background = self.background.nrows();
// Параллельное вычисление предсказаний
let results: Vec<(f64, f64)> = (0..n_samples)
.into_par_iter()
.map(|i| {
let coalition = coalitions.row(i);
let coalition_size = coalition.sum() as usize;
// Вес коалиции по формуле Шепли
let weight = self.shapley_kernel_weight(n_features, coalition_size);
// Маргинализация по фоновым данным
let mut predictions_sum = 0.0;
for b in 0..n_background {
let mut input = self.background.row(b).to_owned();
for j in 0..n_features {
if coalition[j] > 0.5 {
input[j] = x[j];
}
}
let input_2d = input.insert_axis(Axis(0));
let pred = (self.predict_fn)(&input_2d);
predictions_sum += pred[0];
}
let avg_prediction = predictions_sum / n_background as f64;
(weight, avg_prediction)
})
.collect();
let weights = Array1::from_iter(results.iter().map(|(w, _)| *w));
let predictions = Array1::from_iter(results.iter().map(|(_, p)| *p));
(weights, predictions)
}
/// Вес ядра Шепли
fn shapley_kernel_weight(&self, n_features: usize, coalition_size: usize) -> f64 {
if coalition_size == 0 || coalition_size == n_features {
return 1e-6; // Избегаем деления на ноль
}
let m = n_features as f64;
let s = coalition_size as f64;
(m - 1.0) / (Self::binomial(n_features, coalition_size) * s * (m - s))
}
/// Биномиальный коэффициент
fn binomial(n: usize, k: usize) -> f64 {
if k > n {
return 0.0;
}
let mut result = 1.0;
for i in 0..k {
result *= (n - i) as f64 / (i + 1) as f64;
}
result
}
/// Решение взвешенной регрессии
fn solve_weighted_regression(
&self,
coalitions: &Array2<f64>,
weights: &Array1<f64>,
predictions: &Array1<f64>,
base_value: f64,
) -> Array1<f64> {
let n_features = coalitions.ncols();
// Центрирование предсказаний
let y: Array1<f64> = predictions.iter()
.map(|p| p - base_value)
.collect();
// Взвешенный метод наименьших квадратов
// phi = (X^T W X)^{-1} X^T W y
let w_diag = Array2::from_diag(weights);
let x_t_w = coalitions.t().dot(&w_diag);
let x_t_w_x = x_t_w.dot(coalitions);
let x_t_w_y = x_t_w.dot(&y);
// Простое решение через псевдообратную (для демонстрации)
// В production использовать LU или QR разложение
self.solve_linear_system(&x_t_w_x, &x_t_w_y)
}
/// Решение линейной системы
fn solve_linear_system(
&self,
a: &Array2<f64>,
b: &Array1<f64>,
) -> Array1<f64> {
// Упрощенное решение методом Гаусса-Жордана
// В production использовать ndarray-linalg
let n = a.nrows();
let mut aug = Array2::zeros((n, n + 1));
for i in 0..n {
for j in 0..n {
aug[[i, j]] = a[[i, j]];
}
aug[[i, n]] = b[i];
}
// Прямой ход
for i in 0..n {
let pivot = aug[[i, i]];
if pivot.abs() < 1e-10 {
continue;
}
for j in i..=n {
aug[[i, j]] /= pivot;
}
for k in 0..n {
if k != i {
let factor = aug[[k, i]];
for j in i..=n {
aug[[k, j]] -= factor * aug[[i, j]];
}
}
}
}
Array1::from_iter((0..n).map(|i| aug[[i, n]]))
}
}
/// Результат SHAP анализа
#[derive(Debug, Clone)]
pub struct ShapResult {
/// SHAP значения для каждого признака
pub shap_values: Array1<f64>,
/// Базовое значение (среднее предсказание)
pub base_value: f64,
/// Атрибуция по признакам
pub feature_attributions: Vec<FeatureAttribution>,
}
/// Атрибуция одного признака
#[derive(Debug, Clone)]
pub struct FeatureAttribution {
/// Название признака
pub feature: String,
/// SHAP значение
pub value: f64,
/// Абсолютное SHAP значение
pub abs_value: f64,
}
impl ShapResult {
/// Получение топ-N признаков по важности
pub fn top_features(&self, n: usize) -> Vec<&FeatureAttribution> {
let mut sorted: Vec<_> = self.feature_attributions.iter().collect();
sorted.sort_by(|a, b| b.abs_value.partial_cmp(&a.abs_value).unwrap());
sorted.into_iter().take(n).collect()
}
/// Оценка качества сигнала
pub fn signal_quality(&self) -> SignalQuality {
let total_abs = self.shap_values.iter().map(|v| v.abs()).sum::<f64>();
let max_abs = self.shap_values.iter().map(|v| v.abs()).fold(0.0, f64::max);
let concentration = if total_abs > 0.0 {
max_abs / total_abs
} else {
1.0
};
let positive_sum: f64 = self.shap_values.iter().filter(|&&v| v > 0.0).sum();
let consistency = if total_abs > 0.0 {
positive_sum.abs() / total_abs
} else {
0.5
};
if consistency > 0.7 && concentration < 0.4 {
SignalQuality::High
} else if consistency > 0.5 && concentration < 0.6 {
SignalQuality::Medium
} else {
SignalQuality::Low
}
}
}
/// Качество торгового сигнала
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SignalQuality {
High,
Medium,
Low,
}

Генератор сигналов на Rust

src/signals/generator.rs
use crate::attribution::shap::{ShapExplainer, ShapResult, SignalQuality};
use chrono::{DateTime, Utc};
use ndarray::{Array1, Array2};
use serde::{Deserialize, Serialize};
/// Торговый сигнал с атрибуцией
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TradingSignal {
pub timestamp: DateTime<Utc>,
pub direction: SignalDirection,
pub confidence: f64,
pub predicted_return: f64,
pub attribution: Vec<(String, f64)>,
pub quality: SignalQuality,
pub risk_factors: Vec<String>,
}
/// Направление сигнала
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum SignalDirection {
StrongBuy,
Buy,
Hold,
Sell,
StrongSell,
}
/// Генератор торговых сигналов
pub struct SignalGenerator<F>
where
F: Fn(&Array2<f64>) -> Array1<f64> + Send + Sync,
{
explainer: ShapExplainer<F>,
predict_fn: F,
buy_threshold: f64,
sell_threshold: f64,
strong_threshold: f64,
min_confidence: f64,
}
impl<F> SignalGenerator<F>
where
F: Fn(&Array2<f64>) -> Array1<f64> + Send + Sync + Clone,
{
/// Создание генератора сигналов
pub fn new(
predict_fn: F,
background: Array2<f64>,
feature_names: Vec<String>,
) -> Self {
let explainer = ShapExplainer::new(
predict_fn.clone(),
background,
feature_names,
);
Self {
explainer,
predict_fn,
buy_threshold: 0.005,
sell_threshold: -0.005,
strong_threshold: 0.015,
min_confidence: 0.6,
}
}
/// Генерация сигнала для одного примера
pub fn generate(&self, features: &Array1<f64>, timestamp: DateTime<Utc>) -> TradingSignal {
// Предсказание
let features_2d = features.clone().insert_axis(ndarray::Axis(0));
let predictions = (self.predict_fn)(&features_2d);
let prediction = predictions[0];
// SHAP анализ
let shap_result = self.explainer.shap_values(&features.view());
// Определение направления
let direction = self.determine_direction(prediction);
// Вычисление уверенности
let confidence = self.compute_confidence(&shap_result);
// Идентификация рисков
let risk_factors = self.identify_risks(&shap_result);
// Формирование атрибуции
let attribution: Vec<(String, f64)> = shap_result
.top_features(5)
.into_iter()
.map(|a| (a.feature.clone(), a.value))
.collect();
TradingSignal {
timestamp,
direction,
confidence,
predicted_return: prediction,
attribution,
quality: shap_result.signal_quality(),
risk_factors,
}
}
/// Определение направления сигнала
fn determine_direction(&self, prediction: f64) -> SignalDirection {
if prediction > self.strong_threshold {
SignalDirection::StrongBuy
} else if prediction > self.buy_threshold {
SignalDirection::Buy
} else if prediction < -self.strong_threshold {
SignalDirection::StrongSell
} else if prediction < self.sell_threshold {
SignalDirection::Sell
} else {
SignalDirection::Hold
}
}
/// Вычисление уверенности
fn compute_confidence(&self, shap_result: &ShapResult) -> f64 {
let total_abs: f64 = shap_result.shap_values.iter().map(|v| v.abs()).sum();
let max_abs = shap_result.shap_values.iter().map(|v| v.abs()).fold(0.0, f64::max);
let concentration = if total_abs > 0.0 { max_abs / total_abs } else { 1.0 };
let positive_sum: f64 = shap_result.shap_values.iter().filter(|&&v| v > 0.0).sum();
let consistency = if total_abs > 0.0 {
(positive_sum.abs() / total_abs).max(1.0 - positive_sum.abs() / total_abs)
} else {
0.5
};
(consistency * (1.0 - concentration * 0.5)).clamp(0.0, 1.0)
}
/// Идентификация факторов риска
fn identify_risks(&self, shap_result: &ShapResult) -> Vec<String> {
let mut risks = Vec::new();
// Высокая концентрация на одном признаке
let total_abs: f64 = shap_result.shap_values.iter().map(|v| v.abs()).sum();
let max_abs = shap_result.shap_values.iter().map(|v| v.abs()).fold(0.0, f64::max);
if total_abs > 0.0 && max_abs / total_abs > 0.5 {
risks.push("HIGH_CONCENTRATION: Signal depends heavily on single feature".to_string());
}
// Проверка волатильности в топ-признаках
for attr in shap_result.top_features(3) {
if attr.feature.to_lowercase().contains("vol") && attr.abs_value > 0.2 {
risks.push(format!("VOLATILITY_RISK: {} = {:.3}", attr.feature, attr.value));
}
}
risks
}
}

Сборка и запуск

Окно терминала
cd 115_feature_attribution_trading
cargo build --release
cargo run --example shap_analysis
cargo run --example trading_signals
cargo run --example attribution_backtest
cargo test
cargo bench

Практические примеры

Пример 1: SHAP анализ для BTC/USDT

Использование SHAP для анализа торговых сигналов на криптовалютных данных Bybit:

import numpy as np
import pandas as pd
from data.loader import BybitDataLoader
from models.trading_model import GradientBoostingTrader
from attribution.shap_explainer import TradingSHAPExplainer
# Загрузка данных
loader = BybitDataLoader()
df = loader.fetch_klines("BTCUSDT", interval="60", limit=5000)
# Подготовка признаков
features = [
'returns_1h', 'returns_4h', 'returns_24h',
'volume_ratio', 'volatility_20',
'rsi_14', 'macd', 'macd_signal',
'bb_upper_dist', 'bb_lower_dist',
'order_imbalance', 'funding_rate'
]
X = df[features].values
y = np.sign(df['future_return_1h'].values)
# Обучение модели
model = GradientBoostingTrader(n_estimators=100, max_depth=5)
model.fit(X[:-100], y[:-100])
# Создание SHAP объяснителя
explainer = TradingSHAPExplainer(
model=model,
background_data=X[:500],
feature_names=features,
model_type='tree'
)
# Анализ последнего сигнала
latest_features = X[-1:].reshape(1, -1)
analysis = explainer.analyze_signal(latest_features, model.predict(latest_features)[0])
print("=== SHAP Анализ торгового сигнала ===")
print(f"Прогноз: {analysis['prediction']:.4f}")
print(f"Базовое значение: {analysis['base_value']:.4f}")
print(f"Качество сигнала: {analysis['signal_quality']}")
print("\nТоп-5 признаков:")
for f in analysis['top_features']:
direction = "+" if f['shap_value'] > 0 else ""
print(f" {f['feature']}: {direction}{f['shap_value']:.4f}")

Пример вывода:

=== SHAP Анализ торгового сигнала ===
Прогноз: 0.0127
Базовое значение: 0.0003
Качество сигнала: HIGH
Топ-5 признаков:
rsi_14: +0.0045 (перепроданность)
macd: +0.0032 (бычье пересечение)
volume_ratio: +0.0028 (высокий объем)
order_imbalance: +0.0015 (перевес покупок)
volatility_20: -0.0008 (низкая волатильность)

Пример 2: LIME для интерпретации LSTM

Использование LIME для объяснения предсказаний нейронной сети:

import numpy as np
from models.explainable_nn import ExplainableLSTM
from attribution.lime_explainer import TimeSeriesLIME
# Модель LSTM для прогнозирования
lstm_model = ExplainableLSTM(
input_dim=12,
hidden_dim=64,
output_dim=1,
seq_len=60
)
lstm_model.load_weights('models/lstm_btc.pt')
# LIME объяснитель
lime_explainer = TimeSeriesLIME(
model=lambda x: lstm_model.predict(x),
feature_names=features,
training_data=X_train,
segment_size=10
)
# Объяснение с сегментацией
explanation = lime_explainer.explain_with_segments(
time_series=X_test[-60:],
num_segments=6
)
print("=== LIME Анализ временных сегментов ===")
for i, (importance, (start, end)) in enumerate(zip(
explanation['segment_importance'],
explanation['segment_boundaries']
)):
time_label = f"T-{60-start} to T-{60-end}"
print(f"Сегмент {i+1} ({time_label}): {importance:.4f}")

Пример вывода:

=== LIME Анализ временных сегментов ===
Сегмент 1 (T-60 to T-50): 0.0823 (историческая память)
Сегмент 2 (T-50 to T-40): 0.0456
Сегмент 3 (T-40 to T-30): 0.0612
Сегмент 4 (T-30 to T-20): 0.1234 (формирование тренда)
Сегмент 5 (T-20 to T-10): 0.2145 (недавняя динамика)
Сегмент 6 (T-10 to T-0): 0.4730 (текущее состояние) ***

Пример 3: Мониторинг смены режима

Отслеживание изменений атрибуции для обнаружения смены рыночного режима:

import pandas as pd
import numpy as np
from collections import deque
class RegimeMonitor:
"""Мониторинг рыночного режима через атрибуцию."""
def __init__(self, explainer, window_size=100):
self.explainer = explainer
self.window_size = window_size
self.attribution_history = deque(maxlen=window_size)
def update(self, features):
"""Обновление монитора новыми данными."""
shap_values = self.explainer.explain(features)
self.attribution_history.append(shap_values)
def detect_regime_change(self, threshold=0.3):
"""Обнаружение смены режима."""
if len(self.attribution_history) < self.window_size:
return None
recent = np.array(list(self.attribution_history)[-20:])
historical = np.array(list(self.attribution_history)[:-20])
# Сравнение распределений атрибуции
recent_mean = np.mean(np.abs(recent), axis=0)
historical_mean = np.mean(np.abs(historical), axis=0)
# Относительное изменение
change = np.abs(recent_mean - historical_mean) / (historical_mean + 1e-6)
if np.max(change) > threshold:
changed_features = np.where(change > threshold)[0]
return {
'regime_change': True,
'changed_features': changed_features.tolist(),
'change_magnitude': change[changed_features].tolist()
}
return {'regime_change': False}
# Использование
monitor = RegimeMonitor(explainer, window_size=200)
for i in range(len(X_test)):
monitor.update(X_test[i:i+1])
result = monitor.detect_regime_change()
if result and result['regime_change']:
print(f"[{i}] Обнаружена смена режима!")
for feat_idx, magnitude in zip(
result['changed_features'],
result['change_magnitude']
):
print(f" {features[feat_idx]}: изменение на {magnitude:.1%}")

Фреймворк бэктестинга

Архитектура стратегии

+----------------------------------------------------------------+
| Стратегия на основе атрибуции |
+----------------------------------------------------------------+
| |
| [Данные] --> [Признаки] --> [Модель] --> [Прогноз] |
| | |
| v |
| [Атрибуция] |
| | |
| v |
| [Фильтр качества] |
| / | \ |
| / | \ |
| [HIGH] [MEDIUM] [LOW] |
| | | | |
| Полная Уменьшенная Пропустить |
| позиция позиция сигнал |
| | | |
| v v |
| [Исполнение сделки] |
| |
+----------------------------------------------------------------+

Реализация бэктестера

python/backtest/engine.py
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class BacktestResult:
"""Результаты бэктестинга."""
returns: pd.Series
positions: pd.Series
trades: List[Dict]
metrics: Dict[str, float]
attribution_stats: Dict[str, float]
class AttributionBacktester:
"""Бэктестер для стратегий на основе атрибуции."""
def __init__(
self,
model,
explainer,
feature_names: List[str],
initial_capital: float = 100000,
position_sizing: str = 'fixed', # fixed, kelly, attribution
max_position: float = 0.1,
transaction_cost: float = 0.001
):
self.model = model
self.explainer = explainer
self.feature_names = feature_names
self.initial_capital = initial_capital
self.position_sizing = position_sizing
self.max_position = max_position
self.transaction_cost = transaction_cost
def run(
self,
data: pd.DataFrame,
feature_cols: List[str],
price_col: str = 'close',
min_signal_quality: str = 'MEDIUM'
) -> BacktestResult:
"""
Запуск бэктестинга.
Args:
data: DataFrame с данными
feature_cols: Столбцы признаков
price_col: Столбец цены
min_signal_quality: Минимальное качество сигнала
"""
capital = self.initial_capital
position = 0.0
positions = []
returns = []
trades = []
attribution_records = []
for i in range(len(data) - 1):
features = data[feature_cols].iloc[i].values.reshape(1, -1)
current_price = data[price_col].iloc[i]
next_price = data[price_col].iloc[i + 1]
# Генерация сигнала
prediction = self.model.predict(features)[0]
analysis = self.explainer.analyze_signal(features, prediction)
# Фильтрация по качеству
if not self._check_quality(analysis, min_signal_quality):
positions.append(position)
ret = position * (next_price / current_price - 1)
returns.append(ret)
continue
# Размер позиции
target_position = self._compute_position_size(
prediction, analysis, capital
)
# Обновление позиции
position_change = target_position - position
if abs(position_change) > 0.01:
# Транзакционные издержки
cost = abs(position_change) * capital * self.transaction_cost
capital -= cost
trades.append({
'timestamp': data.index[i],
'price': current_price,
'position_change': position_change,
'prediction': prediction,
'quality': analysis['signal_quality'],
'top_feature': analysis['top_features'][0]['feature'],
'cost': cost
})
position = target_position
# Учет атрибуции
attribution_records.append({
'timestamp': data.index[i],
**{f['feature']: f['shap_value'] for f in analysis['top_features']}
})
# Доходность
ret = position * (next_price / current_price - 1)
returns.append(ret)
positions.append(position)
capital *= (1 + ret)
# Расчет метрик
returns_series = pd.Series(returns, index=data.index[:-1])
positions_series = pd.Series(positions, index=data.index[:-1])
metrics = self._compute_metrics(returns_series, trades)
attribution_stats = self._compute_attribution_stats(attribution_records)
return BacktestResult(
returns=returns_series,
positions=positions_series,
trades=trades,
metrics=metrics,
attribution_stats=attribution_stats
)
def _check_quality(self, analysis: Dict, min_quality: str) -> bool:
"""Проверка качества сигнала."""
quality_order = {'LOW': 0, 'MEDIUM': 1, 'HIGH': 2}
return quality_order.get(analysis['signal_quality'], 0) >= quality_order.get(min_quality, 1)
def _compute_position_size(
self,
prediction: float,
analysis: Dict,
capital: float
) -> float:
"""Вычисление размера позиции."""
if self.position_sizing == 'fixed':
return np.sign(prediction) * self.max_position
elif self.position_sizing == 'attribution':
# Размер пропорционален качеству атрибуции
quality_multiplier = {
'HIGH': 1.0,
'MEDIUM': 0.6,
'LOW': 0.3
}
multiplier = quality_multiplier.get(analysis['signal_quality'], 0.3)
base_position = np.sign(prediction) * self.max_position
return base_position * multiplier * analysis['consistency']
elif self.position_sizing == 'kelly':
# Критерий Келли с учетом уверенности
win_prob = 0.5 + analysis['consistency'] * 0.2
win_ratio = abs(prediction) / 0.01 # Нормализация
kelly = win_prob - (1 - win_prob) / win_ratio
return np.clip(kelly, -self.max_position, self.max_position)
return 0.0
def _compute_metrics(
self,
returns: pd.Series,
trades: List[Dict]
) -> Dict[str, float]:
"""Вычисление метрик производительности."""
# Годовая доходность (предполагая часовые данные)
total_return = (1 + returns).prod() - 1
n_hours = len(returns)
annual_return = (1 + total_return) ** (8760 / n_hours) - 1
# Волатильность
annual_vol = returns.std() * np.sqrt(8760)
# Коэффициент Шарпа
sharpe = annual_return / annual_vol if annual_vol > 0 else 0
# Коэффициент Сортино
downside_returns = returns[returns < 0]
downside_vol = downside_returns.std() * np.sqrt(8760)
sortino = annual_return / downside_vol if downside_vol > 0 else 0
# Максимальная просадка
cumulative = (1 + returns).cumprod()
running_max = cumulative.cummax()
drawdown = (cumulative - running_max) / running_max
max_drawdown = drawdown.min()
# Статистика сделок
n_trades = len(trades)
winning_trades = sum(1 for t in trades if t.get('pnl', 0) > 0)
win_rate = winning_trades / n_trades if n_trades > 0 else 0
# Транзакционные издержки
total_costs = sum(t['cost'] for t in trades)
return {
'total_return': total_return,
'annual_return': annual_return,
'annual_volatility': annual_vol,
'sharpe_ratio': sharpe,
'sortino_ratio': sortino,
'max_drawdown': max_drawdown,
'n_trades': n_trades,
'win_rate': win_rate,
'total_costs': total_costs
}
def _compute_attribution_stats(
self,
records: List[Dict]
) -> Dict[str, float]:
"""Статистика атрибуции."""
if not records:
return {}
df = pd.DataFrame(records)
feature_cols = [c for c in df.columns if c != 'timestamp']
stats = {}
for col in feature_cols:
if col in df.columns:
stats[f'{col}_mean_abs'] = df[col].abs().mean()
stats[f'{col}_std'] = df[col].std()
return stats

Примерные результаты бэктестинга

================================================================
РЕЗУЛЬТАТЫ БЭКТЕСТИНГА СТРАТЕГИИ НА ОСНОВЕ АТРИБУЦИИ
================================================================
Период: 2023-01-01 — 2024-12-31
Инструмент: BTC/USDT (Bybit, часовые данные)
Модель: XGBoost + SHAP
Минимальное качество сигнала: MEDIUM
-----------------------------------------------------------------
МЕТРИКИ ПРОИЗВОДИТЕЛЬНОСТИ
-----------------------------------------------------------------
| Метрика | Значение | Бенчмарк (Buy&Hold) |
|--------------------------|------------|---------------------|
| Годовая доходность | 38.7% | 22.4% |
| Годовая волатильность | 24.3% | 48.2% |
| Коэффициент Шарпа | 1.59 | 0.46 |
| Коэффициент Сортино | 2.34 | 0.61 |
| Максимальная просадка | -15.2% | -42.8% |
| Коэффициент Калмара | 2.54 | 0.52 |
| Количество сделок | 847 | N/A |
| Доля выигрышных | 56.3% | N/A |
| Средняя прибыль/сделка | 0.12% | N/A |
| Фактор прибыли | 1.72 | N/A |
-----------------------------------------------------------------
СТАТИСТИКА АТРИБУЦИИ
-----------------------------------------------------------------
| Признак | Средний |SHAP|| Частота в топ-3 |
|----------------------|---------|------------------|
| rsi_14 | 0.0034 | 42.3% |
| macd | 0.0028 | 38.7% |
| volume_ratio | 0.0025 | 35.2% |
| order_imbalance | 0.0021 | 28.4% |
| volatility_20 | 0.0018 | 24.1% |
-----------------------------------------------------------------
КАЧЕСТВО СИГНАЛОВ
-----------------------------------------------------------------
| Качество | Количество | Средний P&L | Доля выигрышных |
|----------|------------|-------------|-----------------|
| HIGH | 312 | +0.18% | 61.2% |
| MEDIUM | 535 | +0.08% | 53.6% |
| (LOW) | (428) | (пропущены) | N/A |
-----------------------------------------------------------------
*Примечание: Прошлые результаты не гарантируют будущую доходность.

Оценка производительности

Сравнение методов атрибуции

МетодВремя на 1000 примеровТочностьRAM (GB)GPU
KernelSHAP180 секВысокая2.1Нет
TreeSHAP0.8 секТочная0.5Нет
DeepSHAP12 секСредняя1.8Да
LIME45 секСредняя1.2Нет
Integrated Gradients25 секВысокая2.0Да
Permutation60 секВысокая0.8Нет

Влияние атрибуции на торговые результаты

СтратегияSharpeMax DDWin RateTrades/Year
Базовая (без фильтрации)1.12-28.4%51.2%2450
+ Фильтр качества MEDIUM1.45-19.8%54.8%1520
+ Фильтр качества HIGH1.67-14.2%58.3%780
+ Размер по атрибуции1.72-13.5%57.9%1120
+ Мониторинг режима1.84-11.8%59.4%1050

Масштабируемость

Количество признаковKernelSHAPTreeSHAPLIME
102.3 сек0.1 сек0.8 сек
5045 сек0.3 сек4.2 сек
100180 сек0.6 сек15 сек
5002400 сек2.1 сек120 сек

Rust vs Python производительность

ОперацияPythonRustУскорение
SHAP (1000 примеров)180 сек12 сек15x
Генерация сигнала45 мс3 мс15x
Бэктест (1 год)120 сек8 сек15x
Мониторинг режима5 мс0.3 мс17x

Ссылки

Основные работы

  1. Ribeiro, M. T., Singh, S., & Guestrin, C. (2016). “Why Should I Trust You?”: Explaining the Predictions of Any Classifier. KDD 2016. arXiv:1602.04938

  2. Lundberg, S. M., & Lee, S. I. (2017). A Unified Approach to Interpreting Model Predictions. NeurIPS 2017. arXiv:1705.07874

  3. Sundararajan, M., Taly, A., & Yan, Q. (2017). Axiomatic Attribution for Deep Networks. ICML 2017. arXiv:1703.01365

  4. Altmann, A., Tolosi, L., Sander, O., & Lengauer, T. (2010). Permutation importance: a corrected feature importance measure. Bioinformatics.

XAI в финансах

  1. Bento, J., et al. (2024). A Survey of XAI in Financial Time Series Forecasting. arXiv:2407.15909

  2. Chen, J., et al. (2023). Explainable AI for Finance: A Survey. ACM Computing Surveys.

  3. Misheva, B. H., et al. (2021). Explainable AI in Credit Risk Management. European Financial Management.

Практические применения

  1. Gilpin, L. H., et al. (2018). Explaining Explanations: An Overview of Interpretability of Machine Learning. IEEE 5th International Conference on Data Science and Advanced Analytics.

  2. Molnar, C. (2022). Interpretable Machine Learning: A Guide for Making Black Box Models Explainable. christophm.github.io/interpretable-ml-book

  3. Lundberg, S. M., et al. (2020). From Local Explanations to Global Understanding with Explainable AI for Trees. Nature Machine Intelligence.

Библиотеки и инструменты

  1. SHAP Library: github.com/slundberg/shap

  2. LIME Library: github.com/marcotcr/lime

  3. Captum (PyTorch): captum.ai

  4. InterpretML: github.com/interpretml/interpret