项目

一般

简介

行为

Sprint 3 (M3a) 最终版完备实施设计方案 (V2.0 Baseline)

一、 架构与目录规范演进(核心重构决策)

基于企业级模块化内聚原则,重新划定 src/core/ 的架构语义边界:

  • src/core/ 的绝对定位跨步骤高度复用的纯算法/纯逻辑内核。拒绝包含任何“特定数据源 IO”、“业务组装状态机”或“特定字典绑定”。
  • src/ 根目录定位:各 Step 的主体业务模块实现
  • src/pipelines/ 定位:流程编排层,负责串联 src/ 下的业务模块,处理按年循环、内存打点、IO 落盘。

最终锁定的 S3 交付目录结构:

src/
├── pipeline.py                 # [新增] 全局唯一顶层调度入口
├── data_loader.py              # [保留] S2 业务实现,仅调整内部 MemoryTracker 引用
├── feature_engineering.py      # [新增] S3 38维特征业务实现主体
├── core/
│   ├── __init__.py             # [新增]
│   └── standardization.py      # [新增] 跨步骤复用的纯函数内核(标准化/共线性)
├── pipelines/
│   ├── __init__.py             # [新增]
│   ├── s2_data_infra.py        # [迁移自 scripts/] S2 编排层,main() 重构为 run()
│   └── s3_feature_pipeline.py  # [新增] S3 编排层(按年串行+落盘)
└── utils/
    ├── memory.py               # [剥离自 data_loader.py] 进程级内存打点器
    └── ...

二、 全局调度与路由机制设计

  1. 路由协议:完全基于环境变量,拒绝引入 CLI 框架。
    • 变量名:RUN_STEP (类型:str)
    • 取值范围:"2", "3", "all"
    • 默认行为:未配置或非预期值时,默认执行 "all"(按顺序执行 S2 → S3)。
  2. 入口契约 (src/pipeline.py)
    • 作为 Docker command 的唯一注入点。
    • 加载 Settings 单例。
    • 根据 RUN_STEP 调用 src.pipelines.s2_data_infra.run(settings)src.pipelines.s3_feature_pipeline.run(settings)
    • Fail-Fast:任意 run() 抛出异常,pipeline.py 捕获并记录 CRITICAL 日志后,执行 sys.exit(1) 终止进程,严禁跨步骤静默继续。

三、 S3 数据流转与内存防线设计(单循环内嵌式累积)

为彻底规避 OOM 并消除冗余 IO,S3 Pipeline 采用单循环内嵌式累积架构
循环体逻辑(针对每一个 year):

  1. 读取:DuckDB 仅执行 SELECT * FROM read_parquet('data/processed/net_value_{year}.parquet'),转换为 Arrow(≤300万行,预估 1-2 GB)。
  2. 计算:交由 Polars 执行 feature_engineering.calculate_raw_features(),生成 38 维宽表(预估 15-25 GB)。
  3. 落盘:通过 PyArrow 以 row_group_size=100000 覆盖写入 data/features/features_{year}.parquet
  4. 内嵌统计在内存释放前,对该单年宽表按 net_value_date 分组,调用 standardization.calculate_stats() 提取该年所有截面的统计量(极小体积,仅几十 KB)。
  5. 累加与释放:将统计量 pl.concat 到内存中的全局 all_stats_df 变量,然后立即 del 释放单年特征 DataFrame,触发 Python GC 与 Polars 底层 Arrow 释放。
    循环结束后逻辑:
  6. all_stats_df 增加 is_production_ready = false 列。
  7. 落盘为 data/models/standardization_params.parquet
  8. 产出 data/models/final_feature_list.jsondata/metadata/features_hash.json

四、 src/feature_engineering.py 内部架构

严格执行两阶段架构与段隔离:

阶段一:公共基础层 (跨类复用)

上下文强制设定为 .group_by(['fund_id', 'segment_id'])

  • 微观动量weekly_return
  • MA 矩阵rolling_mean(nav, [12, 26, 52])
  • Std 矩阵rolling_std(weekly_return, [12, 26, 52])
  • 极值矩阵rolling_max(nav, [26, 52])
  • 下行波动rolling_std(min(weekly_return, 0), [26, 52])
  • EWM 矩阵:显式计算 alpha = 2 / (span + 1) 传入 ewm_mean(alpha=alpha),计算 span=[26, 52]

阶段二:A-E 类特征派生

通过 with_columns 一次性追加,所有窗口期特征必须经过 with_validity_check 包装器:

  • A 类 (5维):复用 MA 与 EWM 矩阵。
  • B 类 (6维):复用 Std 与下行波动矩阵。
  • C 类 (11维):复用 Std/极值矩阵,OLS 求解封装 np.linalg.lstsq
  • D 类 (10维):复用 MA 矩阵计算 DCA 收益率,再对其做专属滚动。
  • E 类 (5维):复用 Std/极值矩阵,注入参数化 weekly_rf,Calmar 直接复用 C 类的 momentum_52wmax_drawdown_52w 直除。

五、 src/core/standardization.py 纯函数契约

  1. calculate_stats(df, feature_cols):输出严格 6 列基线(feature_name, mean, std, median, mad, valid_count)。MAD 输出原值。
  2. apply_standardization(df, stats_df):执行 Z-Score,使用 mad * 1.4826 修正后判定越界,执行 ±3.0 Clip。调用侧负责首日拦截。
  3. remove_collinear_features(df, labels, threshold):Spearman IC 降序排列,Pearson 相关系数矩阵剔除。

六、 S3 产出物物理契约

  • features_YYYY.parquet:宽表 Schema(fund_id:String, net_value_date:Date, segment_id:UInt32, 38个Float64特征列)。特征名严格遵循 _12w 范式。PyArrow 强制 row_group_size=100000
  • standardization_params.parquet:增加物理列 is_production_ready: Boolean,全表填入 false
  • final_feature_list.json:标准 JSON Array,包含 38 个精确匹配字典的字符串。

七、 TDD 测试防线矩阵 (tests/test_feature_engineering.py)

6 个独立测试类,Mock 数据统一为 2 基金 × 60 周:

  1. TestMeanReversionFeatures:5 维精确度(round(6))。
  2. TestVolatilityFeatures:6 维精确度,卡控下行波动仅含负收益。
  3. TestTrendMomentumFeatures:11 维精确度,含 OLS 零斜率边界断言
  4. TestDcaSpecificFeatures:10 维精确度,验证底层复用一致性。
  5. TestRiskAdjustedFeatures:5 维精确度,验证 Calmar 复用除法逻辑。
  6. TestStandardizationPureFunctions:6 列 Schema 断言、T-1 首日全 NULL 断言、MAD Clip 边界断言。

Sprint 3 (M3a) 功能实现拆分步骤 (TDD 严格执行)

Batch 0:结构性重构与基础设施(前置零回归)

目标:统一目录规范,剥离公共工具,重构 S2 入口。
验收:S2 原有单测 (tests/test_data_loader.py) 100% 全绿。

编号 动作 交付物
0-1 新增包初始化文件 src/core/__init__.py, src/pipelines/__init__.py
0-2 剥离并新建内存工具 src/utils/memory.py (从 data_loader.py 迁出 MemoryTracker)
0-3 修正 S2 源码内部引用 src/data_loader.py (修改 from src.utils... 引用路径)
0-4 迁移并重构 S2 编排层 src/pipelines/s2_data_infra.py (重构 main()run(settings))
0-5 同步修正测试引用 tests/test_data_loader.py (修正 import 路径)
0-6 清理历史遗留 删除 scripts/run_step2_pipeline.py

Batch 1:S3-01 A/B/C 类特征引擎(22 维)— TDD

目标:完成基础滚动、波动、趋势动量计算及 OLS 精度验证。
产出tests/test_feature_engineering.py (Part 1) + src/feature_engineering.py (Part 1)

  • Step 1.1:输出 TestMeanReversionFeatures, TestVolatilityFeatures, TestTrendMomentumFeatures 测试类代码。
  • Step 1.2:⏸ 等待您确认测试用例。
  • Step 1.3:输出 src/feature_engineering.py 实现代码(含 with_validity_check、公共基础层、A/B/C 类计算、OLS 封装)及工程自证。

Batch 2:S3-02 D/E 类特征引擎(16 维)— TDD

目标:完成定投特有与风险调整特征,验证跨域复用与参数化注入。
产出tests/test_feature_engineering.py (Part 2) + src/feature_engineering.py (Part 2)

  • Step 2.1:输出 TestDcaSpecificFeatures, TestRiskAdjustedFeatures 测试类代码。
  • Step 2.2:⏸ 等待您确认测试用例。
  • Step 2.3:输出 src/feature_engineering.py 补全代码(D/E 类计算、顶层 calculate_raw_features 编排函数)及工程自证。

Batch 3:S3-03 标准化与共线性纯函数 — TDD

目标:交付零副作用的矩阵代数内核,锁死 T-1 防泄露逻辑。
产出tests/test_feature_engineering.py (Part 3) + src/core/standardization.py

  • Step 3.1:输出 TestStandardizationPureFunctions 测试类代码。
  • Step 3.2:⏸ 等待您确认测试用例。
  • Step 3.3:输出 src/core/standardization.py 完整实现代码及工程自证。

Batch 4:S3-04 Pipeline 编排、落盘与预览执行

目标:串联计算与纯函数,实现单循环内嵌累积防 OOM,完成物理落盘。
产出src/pipeline.py, src/pipelines/s3_feature_pipeline.py, 4类数据产物

  • Step 4.1:输出 src/pipeline.py (全局入口与环境变量路由)。
  • Step 4.2:输出 src/pipelines/s3_feature_pipeline.py (按年串行、内嵌统计累积、PyArrow 强制参数落盘、Hash 计算与 JSON 产出)。

Batch 5:文档同步与 CI 集成

目标:技术真相源与代码闭环,CI 强阻断部署。

  • Step 5.1:输出 docs/baseline/data_contract.md 更新片段(宽表契约)。
  • Step 5.2:输出 docs/project-structure.md 更新片段。
  • Step 5.3:输出 .gitea/workflows/deployment.yaml 变更指令(新增 pytest S3 门禁)。

Huarui Lin 更新于 大约 3 小时 之前 · 1 修订