Sprint 3 (M3a) 最终版完备实施设计方案 (V2.0 Baseline)¶
一、 架构与目录规范演进(核心重构决策)¶
基于企业级模块化内聚原则,重新划定 src/core/ 的架构语义边界:
-
src/core/的绝对定位:跨步骤高度复用的纯算法/纯逻辑内核。拒绝包含任何“特定数据源 IO”、“业务组装状态机”或“特定字典绑定”。 -
src/根目录定位:各 Step 的主体业务模块实现。 -
src/pipelines/定位:流程编排层,负责串联src/下的业务模块,处理按年循环、内存打点、IO 落盘。
最终锁定的 S3 交付目录结构:
src/
├── pipeline.py # [新增] 全局唯一顶层调度入口
├── data_loader.py # [保留] S2 业务实现,仅调整内部 MemoryTracker 引用
├── feature_engineering.py # [新增] S3 38维特征业务实现主体
├── core/
│ ├── __init__.py # [新增]
│ └── standardization.py # [新增] 跨步骤复用的纯函数内核(标准化/共线性)
├── pipelines/
│ ├── __init__.py # [新增]
│ ├── s2_data_infra.py # [迁移自 scripts/] S2 编排层,main() 重构为 run()
│ └── s3_feature_pipeline.py # [新增] S3 编排层(按年串行+落盘)
└── utils/
├── memory.py # [剥离自 data_loader.py] 进程级内存打点器
└── ...
二、 全局调度与路由机制设计¶
-
路由协议:完全基于环境变量,拒绝引入 CLI 框架。
- 变量名:
RUN_STEP(类型:str) - 取值范围:
"2","3","all" - 默认行为:未配置或非预期值时,默认执行
"all"(按顺序执行 S2 → S3)。
- 变量名:
-
入口契约 (
src/pipeline.py):- 作为 Docker
command的唯一注入点。 - 加载
Settings单例。 - 根据
RUN_STEP调用src.pipelines.s2_data_infra.run(settings)或src.pipelines.s3_feature_pipeline.run(settings)。 -
Fail-Fast:任意
run()抛出异常,pipeline.py捕获并记录 CRITICAL 日志后,执行sys.exit(1)终止进程,严禁跨步骤静默继续。
- 作为 Docker
三、 S3 数据流转与内存防线设计(单循环内嵌式累积)¶
为彻底规避 OOM 并消除冗余 IO,S3 Pipeline 采用单循环内嵌式累积架构:
循环体逻辑(针对每一个 year):
-
读取:DuckDB 仅执行
SELECT * FROM read_parquet('data/processed/net_value_{year}.parquet'),转换为 Arrow(≤300万行,预估 1-2 GB)。 -
计算:交由 Polars 执行
feature_engineering.calculate_raw_features(),生成 38 维宽表(预估 15-25 GB)。 -
落盘:通过 PyArrow 以
row_group_size=100000覆盖写入data/features/features_{year}.parquet。 -
内嵌统计:在内存释放前,对该单年宽表按
net_value_date分组,调用standardization.calculate_stats()提取该年所有截面的统计量(极小体积,仅几十 KB)。 -
累加与释放:将统计量
pl.concat到内存中的全局all_stats_df变量,然后立即del释放单年特征 DataFrame,触发 Python GC 与 Polars 底层 Arrow 释放。
循环结束后逻辑: - 为
all_stats_df增加is_production_ready = false列。 - 落盘为
data/models/standardization_params.parquet。 - 产出
data/models/final_feature_list.json与data/metadata/features_hash.json。
四、 src/feature_engineering.py 内部架构¶
严格执行两阶段架构与段隔离:
阶段一:公共基础层 (跨类复用)¶
上下文强制设定为 .group_by(['fund_id', 'segment_id'])。
-
微观动量:
weekly_return -
MA 矩阵:
rolling_mean(nav, [12, 26, 52]) -
Std 矩阵:
rolling_std(weekly_return, [12, 26, 52]) -
极值矩阵:
rolling_max(nav, [26, 52]) -
下行波动:
rolling_std(min(weekly_return, 0), [26, 52]) -
EWM 矩阵:显式计算
alpha = 2 / (span + 1)传入ewm_mean(alpha=alpha),计算span=[26, 52]。
阶段二:A-E 类特征派生¶
通过 with_columns 一次性追加,所有窗口期特征必须经过 with_validity_check 包装器:
- A 类 (5维):复用 MA 与 EWM 矩阵。
- B 类 (6维):复用 Std 与下行波动矩阵。
-
C 类 (11维):复用 Std/极值矩阵,OLS 求解封装
np.linalg.lstsq。 - D 类 (10维):复用 MA 矩阵计算 DCA 收益率,再对其做专属滚动。
-
E 类 (5维):复用 Std/极值矩阵,注入参数化
weekly_rf,Calmar 直接复用 C 类的momentum_52w与max_drawdown_52w直除。
五、 src/core/standardization.py 纯函数契约¶
-
calculate_stats(df, feature_cols):输出严格 6 列基线(feature_name,mean,std,median,mad,valid_count)。MAD 输出原值。 -
apply_standardization(df, stats_df):执行 Z-Score,使用mad * 1.4826修正后判定越界,执行 ±3.0 Clip。调用侧负责首日拦截。 -
remove_collinear_features(df, labels, threshold):Spearman IC 降序排列,Pearson 相关系数矩阵剔除。
六、 S3 产出物物理契约¶
-
features_YYYY.parquet:宽表 Schema(fund_id:String,net_value_date:Date,segment_id:UInt32, 38个Float64特征列)。特征名严格遵循_12w范式。PyArrow 强制row_group_size=100000。 -
standardization_params.parquet:增加物理列is_production_ready: Boolean,全表填入false。 -
final_feature_list.json:标准 JSON Array,包含 38 个精确匹配字典的字符串。
七、 TDD 测试防线矩阵 (tests/test_feature_engineering.py)¶
6 个独立测试类,Mock 数据统一为 2 基金 × 60 周:
-
TestMeanReversionFeatures:5 维精确度(round(6))。 -
TestVolatilityFeatures:6 维精确度,卡控下行波动仅含负收益。 -
TestTrendMomentumFeatures:11 维精确度,含 OLS 零斜率边界断言。 -
TestDcaSpecificFeatures:10 维精确度,验证底层复用一致性。 -
TestRiskAdjustedFeatures:5 维精确度,验证 Calmar 复用除法逻辑。 -
TestStandardizationPureFunctions:6 列 Schema 断言、T-1 首日全 NULL 断言、MAD Clip 边界断言。
Sprint 3 (M3a) 功能实现拆分步骤 (TDD 严格执行)¶
Batch 0:结构性重构与基础设施(前置零回归)¶
目标:统一目录规范,剥离公共工具,重构 S2 入口。
验收:S2 原有单测 (tests/test_data_loader.py) 100% 全绿。
| 编号 | 动作 | 交付物 |
|---|---|---|
| 0-1 | 新增包初始化文件 |
src/core/__init__.py, src/pipelines/__init__.py
|
| 0-2 | 剥离并新建内存工具 |
src/utils/memory.py (从 data_loader.py 迁出 MemoryTracker) |
| 0-3 | 修正 S2 源码内部引用 |
src/data_loader.py (修改 from src.utils... 引用路径) |
| 0-4 | 迁移并重构 S2 编排层 |
src/pipelines/s2_data_infra.py (重构 main() 为 run(settings)) |
| 0-5 | 同步修正测试引用 |
tests/test_data_loader.py (修正 import 路径) |
| 0-6 | 清理历史遗留 | 删除 scripts/run_step2_pipeline.py
|
Batch 1:S3-01 A/B/C 类特征引擎(22 维)— TDD¶
目标:完成基础滚动、波动、趋势动量计算及 OLS 精度验证。
产出:tests/test_feature_engineering.py(Part 1) +src/feature_engineering.py(Part 1)
-
Step 1.1:输出
TestMeanReversionFeatures,TestVolatilityFeatures,TestTrendMomentumFeatures测试类代码。 - Step 1.2:⏸ 等待您确认测试用例。
-
Step 1.3:输出
src/feature_engineering.py实现代码(含with_validity_check、公共基础层、A/B/C 类计算、OLS 封装)及工程自证。
Batch 2:S3-02 D/E 类特征引擎(16 维)— TDD¶
目标:完成定投特有与风险调整特征,验证跨域复用与参数化注入。
产出:tests/test_feature_engineering.py(Part 2) +src/feature_engineering.py(Part 2)
-
Step 2.1:输出
TestDcaSpecificFeatures,TestRiskAdjustedFeatures测试类代码。 - Step 2.2:⏸ 等待您确认测试用例。
-
Step 2.3:输出
src/feature_engineering.py补全代码(D/E 类计算、顶层calculate_raw_features编排函数)及工程自证。
Batch 3:S3-03 标准化与共线性纯函数 — TDD¶
目标:交付零副作用的矩阵代数内核,锁死 T-1 防泄露逻辑。
产出:tests/test_feature_engineering.py(Part 3) +src/core/standardization.py
-
Step 3.1:输出
TestStandardizationPureFunctions测试类代码。 - Step 3.2:⏸ 等待您确认测试用例。
-
Step 3.3:输出
src/core/standardization.py完整实现代码及工程自证。
Batch 4:S3-04 Pipeline 编排、落盘与预览执行¶
目标:串联计算与纯函数,实现单循环内嵌累积防 OOM,完成物理落盘。
产出:src/pipeline.py,src/pipelines/s3_feature_pipeline.py, 4类数据产物
-
Step 4.1:输出
src/pipeline.py(全局入口与环境变量路由)。 -
Step 4.2:输出
src/pipelines/s3_feature_pipeline.py(按年串行、内嵌统计累积、PyArrow 强制参数落盘、Hash 计算与 JSON 产出)。
Batch 5:文档同步与 CI 集成¶
目标:技术真相源与代码闭环,CI 强阻断部署。
-
Step 5.1:输出
docs/baseline/data_contract.md更新片段(宽表契约)。 -
Step 5.2:输出
docs/project-structure.md更新片段。 -
Step 5.3:输出
.gitea/workflows/deployment.yaml变更指令(新增 pytest S3 门禁)。
由 Huarui Lin 更新于 大约 3 小时 之前 · 1 修订