项目

一般

简介

0002-dev-low-level-design » 历史记录 » 版本 1

Huarui Lin, 2026-04-17 19:36

1 1 Huarui Lin
# Sprint 3 (M3a) 最终版完备实施设计方案 (V2.0 Baseline)
2
3
## 一、 架构与目录规范演进(核心重构决策)
4
5
基于企业级模块化内聚原则,重新划定 `src/core/` 的架构语义边界:
6
*   **`src/core/` 的绝对定位**:**跨步骤高度复用的纯算法/纯逻辑内核**。拒绝包含任何“特定数据源 IO”、“业务组装状态机”或“特定字典绑定”。
7
*   **`src/` 根目录定位**:各 Step 的**主体业务模块实现**。
8
*   **`src/pipelines/` 定位**:流程编排层,负责串联 `src/` 下的业务模块,处理按年循环、内存打点、IO 落盘。
9
10
**最终锁定的 S3 交付目录结构:**
11
12
```text
13
src/
14
├── pipeline.py                 # [新增] 全局唯一顶层调度入口
15
├── data_loader.py              # [保留] S2 业务实现,仅调整内部 MemoryTracker 引用
16
├── feature_engineering.py      # [新增] S3 38维特征业务实现主体
17
├── core/
18
│   ├── __init__.py             # [新增]
19
│   └── standardization.py      # [新增] 跨步骤复用的纯函数内核(标准化/共线性)
20
├── pipelines/
21
│   ├── __init__.py             # [新增]
22
│   ├── s2_data_infra.py        # [迁移自 scripts/] S2 编排层,main() 重构为 run()
23
│   └── s3_feature_pipeline.py  # [新增] S3 编排层(按年串行+落盘)
24
└── utils/
25
    ├── memory.py               # [剥离自 data_loader.py] 进程级内存打点器
26
    └── ...
27
```
28
29
---
30
## 二、 全局调度与路由机制设计
31
32
1.  **路由协议**:完全基于环境变量,拒绝引入 CLI 框架。
33
    *   变量名:`RUN_STEP` (类型:`str`)
34
    *   取值范围:`"2"`, `"3"`, `"all"`
35
    *   默认行为:未配置或非预期值时,默认执行 `"all"`(按顺序执行 S2 → S3)。
36
2.  **入口契约 (`src/pipeline.py`)**:
37
    *   作为 Docker `command` 的唯一注入点。
38
    *   加载 `Settings` 单例。
39
    *   根据 `RUN_STEP` 调用 `src.pipelines.s2_data_infra.run(settings)` 或 `src.pipelines.s3_feature_pipeline.run(settings)`。
40
    *   **Fail-Fast**:任意 `run()` 抛出异常,`pipeline.py` 捕获并记录 CRITICAL 日志后,执行 `sys.exit(1)` 终止进程,严禁跨步骤静默继续。
41
42
---
43
## 三、 S3 数据流转与内存防线设计(单循环内嵌式累积)
44
45
为彻底规避 OOM 并消除冗余 IO,S3 Pipeline 采用**单循环内嵌式累积架构**:
46
**循环体逻辑(针对每一个 `year`):**
47
1.  **读取**:DuckDB 仅执行 `SELECT * FROM read_parquet('data/processed/net_value_{year}.parquet')`,转换为 Arrow(≤300万行,预估 1-2 GB)。
48
2.  **计算**:交由 Polars 执行 `feature_engineering.calculate_raw_features()`,生成 38 维宽表(预估 15-25 GB)。
49
3.  **落盘**:通过 PyArrow 以 `row_group_size=100000` 覆盖写入 `data/features/features_{year}.parquet`。
50
4.  **内嵌统计**:**在内存释放前**,对该单年宽表按 `net_value_date` 分组,调用 `standardization.calculate_stats()` 提取该年所有截面的统计量(极小体积,仅几十 KB)。
51
5.  **累加与释放**:将统计量 `pl.concat` 到内存中的全局 `all_stats_df` 变量,然后立即 `del` 释放单年特征 DataFrame,触发 Python GC 与 Polars 底层 Arrow 释放。
52
**循环结束后逻辑:**
53
1. 为 `all_stats_df` 增加 `is_production_ready = false` 列。
54
2. 落盘为 `data/models/standardization_params.parquet`。
55
3. 产出 `data/models/final_feature_list.json` 与 `data/metadata/features_hash.json`。
56
57
---
58
## 四、 `src/feature_engineering.py` 内部架构
59
60
严格执行两阶段架构与段隔离:
61
62
### 阶段一:公共基础层 (跨类复用)
63
64
上下文强制设定为 `.group_by(['fund_id', 'segment_id'])`。
65
*   **微观动量**:`weekly_return`
66
*   **MA 矩阵**:`rolling_mean(nav, [12, 26, 52])`
67
*   **Std 矩阵**:`rolling_std(weekly_return, [12, 26, 52])`
68
*   **极值矩阵**:`rolling_max(nav, [26, 52])`
69
*   **下行波动**:`rolling_std(min(weekly_return, 0), [26, 52])`
70
*   **EWM 矩阵**:显式计算 `alpha = 2 / (span + 1)` 传入 `ewm_mean(alpha=alpha)`,计算 `span=[26, 52]`。
71
72
### 阶段二:A-E 类特征派生
73
74
通过 `with_columns` 一次性追加,所有窗口期特征必须经过 `with_validity_check` 包装器:
75
*   **A 类 (5维)**:复用 MA 与 EWM 矩阵。
76
*   **B 类 (6维)**:复用 Std 与下行波动矩阵。
77
*   **C 类 (11维)**:复用 Std/极值矩阵,OLS 求解封装 `np.linalg.lstsq`。
78
*   **D 类 (10维)**:复用 MA 矩阵计算 DCA 收益率,再对其做专属滚动。
79
*   **E 类 (5维)**:复用 Std/极值矩阵,注入参数化 `weekly_rf`,Calmar 直接复用 C 类的 `momentum_52w` 与 `max_drawdown_52w` 直除。
80
81
---
82
## 五、 `src/core/standardization.py` 纯函数契约
83
84
1.  **`calculate_stats(df, feature_cols)`**:输出严格 6 列基线(`feature_name`, `mean`, `std`, `median`, `mad`, `valid_count`)。MAD 输出原值。
85
2.  **`apply_standardization(df, stats_df)`**:执行 Z-Score,使用 `mad * 1.4826` 修正后判定越界,执行 ±3.0 Clip。调用侧负责首日拦截。
86
3.  **`remove_collinear_features(df, labels, threshold)`**:Spearman IC 降序排列,Pearson 相关系数矩阵剔除。
87
88
---
89
## 六、 S3 产出物物理契约
90
91
*   **`features_YYYY.parquet`**:宽表 Schema(`fund_id:String`, `net_value_date:Date`, `segment_id:UInt32`, 38个`Float64`特征列)。特征名严格遵循 `_12w` 范式。PyArrow 强制 `row_group_size=100000`。
92
*   **`standardization_params.parquet`**:增加物理列 `is_production_ready: Boolean`,全表填入 `false`。
93
*   **`final_feature_list.json`**:标准 JSON Array,包含 38 个精确匹配字典的字符串。
94
95
---
96
## 七、 TDD 测试防线矩阵 (`tests/test_feature_engineering.py`)
97
98
6 个独立测试类,Mock 数据统一为 2 基金 × 60 周:
99
1.  **`TestMeanReversionFeatures`**:5 维精确度(`round(6)`)。
100
2.  **`TestVolatilityFeatures`**:6 维精确度,卡控下行波动仅含负收益。
101
3.  **`TestTrendMomentumFeatures`**:11 维精确度,**含 OLS 零斜率边界断言**。
102
4.  **`TestDcaSpecificFeatures`**:10 维精确度,验证底层复用一致性。
103
5.  **`TestRiskAdjustedFeatures`**:5 维精确度,验证 Calmar 复用除法逻辑。
104
6.  **`TestStandardizationPureFunctions`**:6 列 Schema 断言、T-1 首日全 NULL 断言、MAD Clip 边界断言。
105
106
---
107
# Sprint 3 (M3a) 功能实现拆分步骤 (TDD 严格执行)
108
109
### Batch 0:结构性重构与基础设施(前置零回归)
110
111
> **目标**:统一目录规范,剥离公共工具,重构 S2 入口。
112
> **验收**:S2 原有单测 (`tests/test_data_loader.py`) 100% 全绿。
113
114
| 编号 | 动作 | 交付物 |
115
|:---:|------|--------|
116
| 0-1 | 新增包初始化文件 | `src/core/__init__.py`, `src/pipelines/__init__.py` |
117
| 0-2 | 剥离并新建内存工具 | `src/utils/memory.py` (从 `data_loader.py` 迁出 `MemoryTracker`) |
118
| 0-3 | 修正 S2 源码内部引用 | `src/data_loader.py` (修改 `from src.utils...` 引用路径) |
119
| 0-4 | 迁移并重构 S2 编排层 | `src/pipelines/s2_data_infra.py` (重构 `main()` 为 `run(settings)`) |
120
| 0-5 | 同步修正测试引用 | `tests/test_data_loader.py` (修正 import 路径) |
121
| 0-6 | 清理历史遗留 | 删除 `scripts/run_step2_pipeline.py` |
122
123
### Batch 1:S3-01 A/B/C 类特征引擎(22 维)— TDD
124
125
> **目标**:完成基础滚动、波动、趋势动量计算及 OLS 精度验证。
126
> **产出**:`tests/test_feature_engineering.py` (Part 1) + `src/feature_engineering.py` (Part 1)
127
*   **Step 1.1**:输出 `TestMeanReversionFeatures`, `TestVolatilityFeatures`, `TestTrendMomentumFeatures` 测试类代码。
128
*   **Step 1.2**:⏸ 等待您确认测试用例。
129
*   **Step 1.3**:输出 `src/feature_engineering.py` 实现代码(含 `with_validity_check`、公共基础层、A/B/C 类计算、OLS 封装)及工程自证。
130
131
### Batch 2:S3-02 D/E 类特征引擎(16 维)— TDD
132
133
> **目标**:完成定投特有与风险调整特征,验证跨域复用与参数化注入。
134
> **产出**:`tests/test_feature_engineering.py` (Part 2) + `src/feature_engineering.py` (Part 2)
135
*   **Step 2.1**:输出 `TestDcaSpecificFeatures`, `TestRiskAdjustedFeatures` 测试类代码。
136
*   **Step 2.2**:⏸ 等待您确认测试用例。
137
*   **Step 2.3**:输出 `src/feature_engineering.py` 补全代码(D/E 类计算、顶层 `calculate_raw_features` 编排函数)及工程自证。
138
139
### Batch 3:S3-03 标准化与共线性纯函数 — TDD
140
141
> **目标**:交付零副作用的矩阵代数内核,锁死 T-1 防泄露逻辑。
142
> **产出**:`tests/test_feature_engineering.py` (Part 3) + `src/core/standardization.py`
143
*   **Step 3.1**:输出 `TestStandardizationPureFunctions` 测试类代码。
144
*   **Step 3.2**:⏸ 等待您确认测试用例。
145
*   **Step 3.3**:输出 `src/core/standardization.py` 完整实现代码及工程自证。
146
147
### Batch 4:S3-04 Pipeline 编排、落盘与预览执行
148
149
> **目标**:串联计算与纯函数,实现单循环内嵌累积防 OOM,完成物理落盘。
150
> **产出**:`src/pipeline.py`, `src/pipelines/s3_feature_pipeline.py`, 4类数据产物
151
*   **Step 4.1**:输出 `src/pipeline.py` (全局入口与环境变量路由)。
152
*   **Step 4.2**:输出 `src/pipelines/s3_feature_pipeline.py` (按年串行、内嵌统计累积、PyArrow 强制参数落盘、Hash 计算与 JSON 产出)。
153
154
### Batch 5:文档同步与 CI 集成
155
156
> **目标**:技术真相源与代码闭环,CI 强阻断部署。
157
*   **Step 5.1**:输出 `docs/baseline/data_contract.md` 更新片段(宽表契约)。
158
*   **Step 5.2**:输出 `docs/project-structure.md` 更新片段。
159
*   **Step 5.3**:输出 `.gitea/workflows/deployment.yaml` 变更指令(新增 pytest S3 门禁)。