0002-数据基建 » 历史记录 » 版本 8
Huarui Lin, 2026-04-13 13:54
| 1 | 2 | Huarui Lin | ## 数据基建 |
|---|---|---|---|
| 2 | 8 | Huarui Lin | |
| 3 | 1 | Huarui Lin | | 周次 | Story ID | Story 描述 | 规约硬性映射 | 验收标准 | 工时 | |
| 4 | |:---:|:---|:---|:---|:---|:---:| |
||
| 5 | | **W2** | **S2-01** | **配置扩展与 DuckDB 粗筛降维**<br>在 `config.yaml` 新增数据源路径配置;实现 DuckDB 读取 CSV、显式列投影(丢弃 `id`+4列冗余)、周频聚合、白名单 JOIN,输出精简 Arrow。 | 规约 1.2 (周聚合/类型过滤);章程 R-5 (显式丢弃4列);FW-1/FW-2 | ① `config.yaml` 新增 `data.raw_paths` 节点,`data_loader.py` 从配置读取路径;<br>② DuckDB SQL **禁止** `SELECT *`,必须显式列出 `fund_id, net_value_date, cumulative_net_value`;<br>③ JOIN `fund_basic_info` 后,Arrow Schema 仅含上述 3 列 + `fund_type`(临时审计用,最终落盘前丢弃);<br>④ 单测:Mock 小 CSV 验证列裁剪正确。 | 2天 | |
||
| 6 | | **W2** | **S2-02** | **Polars 时序状态机(核心攻坚)**<br>接收全量 Arrow,实现 ffill、>4周缺失截断并打 `segment_id`、建仓期硬删、异常值处理。 | 规约 1.2 (缺失填充/截断/建仓期/异常值);CR-001 (segment_id);全量 Arrow 破局方案 | ① `segment_id` 截断处递增,同基金内从 0 开始;<br>② 建仓期硬删取 **`segment_id == 0`** 首条日期减 12 周(非全局首条);<br>③ `cumulative_net_value ≤ 0` 的行**被剔除**(不进入 Parquet);<br>④ 单周涨跌幅 >50% 的行 `cumulative_net_value` **置 NULL**(保留行,保留时间连续性);<br>⑤ **单测必覆盖**:跨年 ffill、刚好第 5 周缺失触发截断、单基金仅 10 周被建仓期完全剔除、异常值 NULL 化。 | 3天 | |
||
| 7 | | **W3** | **S2-03** | **存活期过滤与按年分区落盘**<br>统计各基金有效周数,剔除 <52 周,Polars 内按年切片写 Parquet,创建 DuckDB VIEW。 | 规约 1.2 (存活期过滤/清盘基金);规约 1.3 (存储方案/row_group_size);FW-4/FW-5 | ① 存活期计算 = **该基金所有段有效周数之和**,< 52 周整体剔除;<br>② `data/processed/` 下产出 `net_value_YYYY.parquet`(覆盖写入,不追加);<br>③ PyArrow `write_parquet()` 显式指定 `row_group_size=100000`;<br>④ `create_duckdb_view()` 函数拼接所有年份文件创建 `v_net_value_processed`;<br>⑤ 单测:Mock 含短生命周期基金的 DataFrame,验证其被完整剔除。 | 2天 | |
||
| 8 | | **W3** | **S2-04** | **数据血缘、审计日志与内存压测**<br>计算处理后 Hash,生成结构化审计 JSON,在 64GB 实体机执行全量压测并记录内存峰值。 | 章程 M2 Done 标准(更新版);规约 7.3 (数据血缘) | ① 产出 `data/metadata/processed_hash.json`(按年份文件逐个计算 SHA-256);<br>② 产出 `data/metadata/data_audit.json`,Schema 严格符合下方定义;<br>③ 压测日志包含 `peak_memory_gb`,证明 ≤ 40GB;<br>④ 若 `whitelist_retention_rate < 15%`,触发 R-6 预警流程,向 PM 汇报。 | 2天 | |
||
| 9 | | **W3** | **S2-05** | **Gitea `/docs` 数据契约文档与 Redmine 验收**<br>编写 `data_contract.md` 沉淀为技术真相源,在 Redmine 录入 M2 验收 CheckList。 | 架构基线 3 (双轨制);章程 M2 | ① `docs/data_contract.md` 内容包含完整 Parquet Schema 定义与 `data_audit.json` Schema 定义;<br>② Redmine M2 任务状态可更新为"待验收";<br>③ CI 软预警检查通过(`src/` 变更已同步 `docs/` 变更)。 | 1天 | |
||
| 10 | 8 | Huarui Lin | |
| 11 | 1 | Huarui Lin | --- |
| 12 | ## `data_audit.json` 标准 Schema 定义 |
||
| 13 | 此文件为 M2 验收的机器可解析审计凭证,S2-04 必须严格按此 Schema 输出: |
||
| 14 | ```json |
||
| 15 | { |
||
| 16 | "pipeline": { |
||
| 17 | "step": "step2_data_loading", |
||
| 18 | "run_time": "2026-04-11T10:00:00+08:00", |
||
| 19 | "python_version": "3.12.x" |
||
| 20 | }, |
||
| 21 | "input": { |
||
| 22 | "fund_net_info": { |
||
| 23 | "file_path": "data/raw/fund_net_info.csv", |
||
| 24 | "sha256": "a1b2c3d4...", |
||
| 25 | "total_rows": 23000000, |
||
| 26 | "date_range": ["2015-12-07", "2026-03-28"] |
||
| 27 | }, |
||
| 28 | "fund_basic_info": { |
||
| 29 | "file_path": "data/raw/fund_basic_info.csv", |
||
| 30 | "sha256": "e5f6g7h8...", |
||
| 31 | "total_records": 26000, |
||
| 32 | "columns_actually_read": ["fund_id", "fund_name", "fund_type", "create_date"], |
||
| 33 | "columns_discarded": ["insert_datetime", "update_datetime", "insert_operator", "update_operator"] |
||
| 34 | } |
||
| 35 | }, |
||
| 36 | "filtering": { |
||
| 37 | "whitelist_total_funds": 26000, |
||
| 38 | "whitelist_retained_funds": 8000, |
||
| 39 | "whitelist_filtered_funds": 18000, |
||
| 40 | "whitelist_retention_rate": 0.3077 |
||
| 41 | }, |
||
| 42 | "cleaning": { |
||
| 43 | "weekly_aggregation_dedup_rows": 500000, |
||
| 44 | "nav_le_zero_removed_rows": 1200, |
||
| 45 | "weekly_change_over_limit_nullified": 3500, |
||
| 46 | "building_period_dropped_rows": 96000, |
||
| 47 | "truncated_rows_by_gap": 42000, |
||
| 48 | "survival_filtered_funds": 1500, |
||
| 49 | "survival_filtered_rows": 78000 |
||
| 50 | }, |
||
| 51 | "output": { |
||
| 52 | "parquet_files": [ |
||
| 53 | {"filename": "net_value_2016.parquet", "sha256": "...", "rows": 450000}, |
||
| 54 | {"filename": "net_value_2017.parquet", "sha256": "...", "rows": 480000} |
||
| 55 | ], |
||
| 56 | "total_rows_after_cleaning": 8500000, |
||
| 57 | "total_funds_after_cleaning": 6500, |
||
| 58 | "date_range": ["2016-01-04", "2026-03-28"], |
||
| 59 | "schema": ["fund_id:String", "net_value_date:Date", "cumulative_net_value:Float64", "segment_id:UInt32"], |
||
| 60 | "peak_memory_gb": 35.2, |
||
| 61 | "duckdb_view_name": "v_net_value_processed" |
||
| 62 | } |
||
| 63 | } |
||
| 64 | ``` |
||
| 65 | > **PM 卡控说明**:上述数值为占位示例,研发必须使用 `settings.filter.fund_type_whitelist` 等配置参数驱动计算,**严禁硬编码**。`whitelist_retention_rate` 的分母必须是 `fund_basic_info` 的原始总行数。 |
||
| 66 | 8 | Huarui Lin | |
| 67 | 1 | Huarui Lin | --- |
| 68 | ## 可优化与可复用逻辑提示 |
||
| 69 | 8 | Huarui Lin | |
| 70 | 1 | Huarui Lin | ### 1. ffill + 截断的 Polars 向量化实现(复用级) |
| 71 | ```python |
||
| 72 | # 推荐实现思路(伪代码,非最终交付) |
||
| 73 | # 先按 fund_id 排序,再用 Polars 原生 forward fill |
||
| 74 | df = df.sort(["fund_id", "net_value_date"]) |
||
| 75 | df = df.with_columns( |
||
| 76 | pl.col("cumulative_net_value").forward_fill().over("fund_id") |
||
| 77 | ) |
||
| 78 | # 然后 Polars 原生检测连续 NULL(即原始缺失超过 max_gap_weeks 的位置) |
||
| 79 | # 利用 diff 计算相邻日期间隔,间隔 > (max_gap_weeks+1)*7 天即为截断点 |
||
| 80 | ``` |
||
| 81 | ### 2. DuckDB `memory_limit` 应从配置读取 |
||
| 82 | S2-01 中 DuckDB 连接应设置 `memory_limit=str(settings.duckdb_memory_limit)`,建议在 `config.yaml` 中新增: |
||
| 83 | ```yaml |
||
| 84 | duckdb: |
||
| 85 | memory_limit: "40GB" |
||
| 86 | ``` |
||
| 87 | 这样后续压测时可通过环境变量 `DUCKDB__MEMORY_LIMIT=20GB` 快速模拟低内存环境。 |
||
| 88 | ### 3. 审计日志的"率"统一精度 |
||
| 89 | 建议在 `src/utils/config.py` 中新增一个审计工具函数: |
||
| 90 | ```python |
||
| 91 | def calc_rate(numerator: int, denominator: int, decimals: int = 4) -> float: |
||
| 92 | """通用比率计算,分母为 0 时返回 0.0""" |
||
| 93 | ``` |
||
| 94 | 供 `data_loader.py` 调用,保证全项目比率计算逻辑一致。 |
||
| 95 | --- |
||
| 96 | ## 新增/修改文件清单 |
||
| 97 | | 操作 | 文件路径 | 简介与企业级约束 | |
||
| 98 | |:---:|:---|:---| |
||
| 99 | | **修改** | `config/config.yaml` | ① `data:` 节点下新增 `raw_paths` 字典(`fund_net_info` / `fund_basic_info` 路径);② 新增 `duckdb.memory_limit: "40GB"` | |
||
| 100 | | **修改** | `src/utils/config.py` | 对应新增 `data.raw_paths` 和 `duckdb.memory_limit` 的强类型属性 | |
||
| 101 | | **新增** | `src/data_loader.py` | **核心模块**。包含 5 个公开函数:<br>`extract_raw_to_arrow()` — DuckDB 粗筛<br>`process_timeseries()` — Polars 状态机(ffill/截断/segment_id/建仓期/异常值)<br>`filter_and_sink_parquet()` — 存活期过滤 + 按年写 Parquet<br>`create_duckdb_view()` — 创建 `v_net_value_processed`<br>`generate_audit_report()` — 产出 `data_audit.json`<br>**严禁魔法数字,全部从 settings 读取** | |
||
| 102 | | **新增** | `tests/test_data_loader.py` | 时序状态机专项单测。必覆盖:跨年 ffill、>4 周截断 + segment_id 递增、建仓期仅取 segment_0、异常值 NULL 化、短生命周期基金剔除 | |
||
| 103 | | **新增** | `docs/data_contract.md` | **技术真相源**。定义 `net_value_YYYY.parquet` 的严格 Schema(含 `segment_id` 语义)、`data_audit.json` 的 JSON Schema、`v_net_value_processed` VIEW 的列定义 | |
||
| 104 | | **修改** | `docs/architecture_baseline.md` | 同步更新:① FW-3 章节补充"Step 2 全量 Arrow 破局方案"的例外说明;② 新增 `segment_id` 在防火墙架构中的数据流位置 | |
||
| 105 | | **修改** | `.gitea/workflows/ci.yml` | 增加 `pytest tests/test_data_loader.py -v` 步骤,纳入 CI 强阻断 | |
||
| 106 | | **产出** | `data/metadata/data_audit.json` | 运行时产物,不由代码模板生成,由 `data_loader.py` 动态写入 | |
||
| 107 | | **产出** | `data/metadata/processed_hash.json` | 运行时产物,由 `src/utils/hash.py` 计算 | |
||
| 108 | | **产出** | `data/processed/net_value_YYYY.parquet` | 运行时产物,Step 3/4 的直接输入 | |
||
| 109 | 4 | Huarui Lin | |
| 110 | --- |
||
| 111 | ## 下级目录 |
||
| 112 | 3 | Huarui Lin | ### [[CR-001-M2里程碑_Parquet_Schema_变更]] |