针对离散化标签的排序任务优化方案
自定义评估函数:等级加权NDCG
import numpy as np
from sklearn.metrics import ndcg_score
def discrete_rank_weighted_ndcg(preds: np.ndarray, dtrain: xgb.DMatrix, k: int = 10) -> Tuple[str, float]:
"""
针对离散化标签的等级加权NDCG评估函数
:param preds: 模型预测分数
:param dtrain: 训练数据矩阵
:param k: 计算前k个结果的NDCG
:return: (指标名称, 指标值)
"""
# 获取离散化标签(1-20等级)
discrete_labels = dtrain.get_label().astype(int)
groups = dtrain.get_uint_info('group_ptr')
# 检查组信息有效性
if len(groups) <= 1:
return f'ndcg@k_{k}', 0.0
ndcg_values = []
# 等级权重映射(高等级权重更高)
rank_weights = {i: np.log(i + 1) for i in range(1, 21)}
# 遍历每个交易日
for i in range(len(groups) - 1):
start = groups[i]
end = groups[i + 1]
group_preds = preds[start:end]
group_labels = discrete_labels[start:end]
# 跳过过小的交易日
if len(group_labels) < 2:
continue
# 创建等级相关性分数(使用等级权重)
relevance_scores = np.array([rank_weights.get(l, 1.0) for l in group_labels])
# 计算NDCG@k
try:
if len(group_labels) < k:
current_k = len(group_labels)
ndcg = ndcg_score([relevance_scores], [group_preds], k=current_k)
else:
ndcg = ndcg_score([relevance_scores], [group_preds], k=k)
except:
ndcg = 0.0
ndcg_values.append(ndcg)
# 计算平均NDCG
avg_ndcg = np.mean(ndcg_values) if ndcg_values else 0.0
return f'rank_ndcg@{k}', avg_ndcg
自定义损失函数:等级差异加权Pairwise Loss
import numpy as np
def discrete_rank_pairwise_loss(preds: np.ndarray, dtrain: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
"""
离散化标签的等级差异加权损失函数
:param preds: 模型预测分数
:param dtrain: 训练数据矩阵
:return: (梯度, 海森矩阵)
"""
discrete_labels = dtrain.get_label().astype(int)
groups = dtrain.get_uint_info(‘group_ptr’)
features = dtrain.get_data().toarray()
# 初始化梯度
grad = np.zeros_like(preds)
# 使用自适应海森矩阵(基于等级差异)
hess = np.ones_like(preds)
# 等级差异权重函数
def rank_diff_weight(diff):
"""非线性放大大等级差异的权重"""
return 1 + np.tanh(diff / 5.0)
# 边界强化因子(处理等级边界附近的样本对)
def boundary_boost(label_j, label_k):
"""强化跨越重要边界的等级差异"""
# 重要边界:前10%、前20%、后10%
boundaries = [18, 16, 4] # 对应等级划分的边界
for b in boundaries:
if (label_j > b and label_k <= b) or (label_k > b and label_j <= b):
return 1.5 # 增加50%权重
return 1.0
# 遍历每个交易日
for i in range(len(groups) - 1):
start = groups[i]
end = groups[i + 1]
group_size = end - start
# 跳过过小的交易日
if group_size < 2:
continue
group_preds = preds[start:end]
group_labels = discrete_labels[start:end]
group_features = features[start:end]
# 遍历所有股票对
for j in range(group_size):
for k in range(j + 1, group_size):
# 只处理等级有差异的股票对
if group_labels[j] != group_labels[k]:
# 确定等级高低顺序
if group_labels[j] > group_labels[k]:
high_idx, low_idx = j, k
else:
high_idx, low_idx = k, j
# 计算等级差异
rank_diff = group_labels[high_idx] - group_labels[low_idx]
# 计算基础权重
base_weight = rank_diff_weight(rank_diff)
# 应用边界强化
boundary_factor = boundary_boost(group_labels[high_idx], group_labels[low_idx])
# 计算特征一致性因子(减少特征相似但等级差异大的惩罚)
feature_sim = np.corrcoef(
group_features[high_idx],
group_features[low_idx]
)[0, 1]
feature_factor = 1.2 - 0.4 * feature_sim # 特征越相似,权重越低
# 计算预测差异
pred_diff = group_preds[high_idx] - group_preds[low_idx]
# 计算梯度分量
gradient_component = -1 / (1 + np.exp(pred_diff))
# 应用复合权重
weighted_gradient = (
gradient_component *
base_weight *
boundary_factor *
feature_factor
)
# 更新梯度
grad[start + high_idx] += weighted_gradient
grad[start + low_idx] -= weighted_gradient
# 更新海森矩阵(基于等级差异)
hess[start + high_idx] += 0.1 * rank_diff
hess[start + low_idx] += 0.1 * rank_diff
# 海森矩阵平滑处理
hess = np.clip(hess, 0.5, 5.0)
return grad, hess