项目

一般

简介

0002-数据基建 » 历史记录 » 版本 3

Huarui Lin, 2026-04-12 21:38

1 2 Huarui Lin
2
## 数据基建
3 1 Huarui Lin
| 周次 | Story ID | Story 描述 | 规约硬性映射 | 验收标准 | 工时 |
4
|:---:|:---|:---|:---|:---|:---:|
5
| **W2** | **S2-01** | **配置扩展与 DuckDB 粗筛降维**<br>在 `config.yaml` 新增数据源路径配置;实现 DuckDB 读取 CSV、显式列投影(丢弃 `id`+4列冗余)、周频聚合、白名单 JOIN,输出精简 Arrow。 | 规约 1.2 (周聚合/类型过滤);章程 R-5 (显式丢弃4列);FW-1/FW-2 | ① `config.yaml` 新增 `data.raw_paths` 节点,`data_loader.py` 从配置读取路径;<br>② DuckDB SQL **禁止** `SELECT *`,必须显式列出 `fund_id, net_value_date, cumulative_net_value`;<br>③ JOIN `fund_basic_info` 后,Arrow Schema 仅含上述 3 列 + `fund_type`(临时审计用,最终落盘前丢弃);<br>④ 单测:Mock 小 CSV 验证列裁剪正确。 | 2天 |
6
| **W2** | **S2-02** | **Polars 时序状态机(核心攻坚)**<br>接收全量 Arrow,实现 ffill、>4周缺失截断并打 `segment_id`、建仓期硬删、异常值处理。 | 规约 1.2 (缺失填充/截断/建仓期/异常值);CR-001 (segment_id);全量 Arrow 破局方案 | ① `segment_id` 截断处递增,同基金内从 0 开始;<br>② 建仓期硬删取 **`segment_id == 0`** 首条日期减 12 周(非全局首条);<br>③ `cumulative_net_value ≤ 0` 的行**被剔除**(不进入 Parquet);<br>④ 单周涨跌幅 >50% 的行 `cumulative_net_value` **置 NULL**(保留行,保留时间连续性);<br>⑤ **单测必覆盖**:跨年 ffill、刚好第 5 周缺失触发截断、单基金仅 10 周被建仓期完全剔除、异常值 NULL 化。 | 3天 |
7
| **W3** | **S2-03** | **存活期过滤与按年分区落盘**<br>统计各基金有效周数,剔除 <52 周,Polars 内按年切片写 Parquet,创建 DuckDB VIEW。 | 规约 1.2 (存活期过滤/清盘基金);规约 1.3 (存储方案/row_group_size);FW-4/FW-5 | ① 存活期计算 = **该基金所有段有效周数之和**,< 52 周整体剔除;<br>② `data/processed/` 下产出 `net_value_YYYY.parquet`(覆盖写入,不追加);<br>③ PyArrow `write_parquet()` 显式指定 `row_group_size=100000`;<br>④ `create_duckdb_view()` 函数拼接所有年份文件创建 `v_net_value_processed`;<br>⑤ 单测:Mock 含短生命周期基金的 DataFrame,验证其被完整剔除。 | 2天 |
8
| **W3** | **S2-04** | **数据血缘、审计日志与内存压测**<br>计算处理后 Hash,生成结构化审计 JSON,在 64GB 实体机执行全量压测并记录内存峰值。 | 章程 M2 Done 标准(更新版);规约 7.3 (数据血缘) | ① 产出 `data/metadata/processed_hash.json`(按年份文件逐个计算 SHA-256);<br>② 产出 `data/metadata/data_audit.json`,Schema 严格符合下方定义;<br>③ 压测日志包含 `peak_memory_gb`,证明 ≤ 40GB;<br>④ 若 `whitelist_retention_rate < 15%`,触发 R-6 预警流程,向 PM 汇报。 | 2天 |
9
| **W3** | **S2-05** | **Gitea `/docs` 数据契约文档与 Redmine 验收**<br>编写 `data_contract.md` 沉淀为技术真相源,在 Redmine 录入 M2 验收 CheckList。 | 架构基线 3 (双轨制);章程 M2 | ① `docs/data_contract.md` 内容包含完整 Parquet Schema 定义与 `data_audit.json` Schema 定义;<br>② Redmine M2 任务状态可更新为"待验收";<br>③ CI 软预警检查通过(`src/` 变更已同步 `docs/` 变更)。 | 1天 |
10
---
11
## `data_audit.json` 标准 Schema 定义
12
此文件为 M2 验收的机器可解析审计凭证,S2-04 必须严格按此 Schema 输出:
13
```json
14
{
15
  "pipeline": {
16
    "step": "step2_data_loading",
17
    "run_time": "2026-04-11T10:00:00+08:00",
18
    "python_version": "3.12.x"
19
  },
20
  "input": {
21
    "fund_net_info": {
22
      "file_path": "data/raw/fund_net_info.csv",
23
      "sha256": "a1b2c3d4...",
24
      "total_rows": 23000000,
25
      "date_range": ["2015-12-07", "2026-03-28"]
26
    },
27
    "fund_basic_info": {
28
      "file_path": "data/raw/fund_basic_info.csv",
29
      "sha256": "e5f6g7h8...",
30
      "total_records": 26000,
31
      "columns_actually_read": ["fund_id", "fund_name", "fund_type", "create_date"],
32
      "columns_discarded": ["insert_datetime", "update_datetime", "insert_operator", "update_operator"]
33
    }
34
  },
35
  "filtering": {
36
    "whitelist_total_funds": 26000,
37
    "whitelist_retained_funds": 8000,
38
    "whitelist_filtered_funds": 18000,
39
    "whitelist_retention_rate": 0.3077
40
  },
41
  "cleaning": {
42
    "weekly_aggregation_dedup_rows": 500000,
43
    "nav_le_zero_removed_rows": 1200,
44
    "weekly_change_over_limit_nullified": 3500,
45
    "building_period_dropped_rows": 96000,
46
    "truncated_rows_by_gap": 42000,
47
    "survival_filtered_funds": 1500,
48
    "survival_filtered_rows": 78000
49
  },
50
  "output": {
51
    "parquet_files": [
52
      {"filename": "net_value_2016.parquet", "sha256": "...", "rows": 450000},
53
      {"filename": "net_value_2017.parquet", "sha256": "...", "rows": 480000}
54
    ],
55
    "total_rows_after_cleaning": 8500000,
56
    "total_funds_after_cleaning": 6500,
57
    "date_range": ["2016-01-04", "2026-03-28"],
58
    "schema": ["fund_id:String", "net_value_date:Date", "cumulative_net_value:Float64", "segment_id:UInt32"],
59
    "peak_memory_gb": 35.2,
60
    "duckdb_view_name": "v_net_value_processed"
61
  }
62
}
63
```
64
> **PM 卡控说明**:上述数值为占位示例,研发必须使用 `settings.filter.fund_type_whitelist` 等配置参数驱动计算,**严禁硬编码**。`whitelist_retention_rate` 的分母必须是 `fund_basic_info` 的原始总行数。
65
---
66
## 可优化与可复用逻辑提示
67
### 1. ffill + 截断的 Polars 向量化实现(复用级)
68
```python
69
# 推荐实现思路(伪代码,非最终交付)
70
# 先按 fund_id 排序,再用 Polars 原生 forward fill
71
df = df.sort(["fund_id", "net_value_date"])
72
df = df.with_columns(
73
    pl.col("cumulative_net_value").forward_fill().over("fund_id")
74
)
75
# 然后 Polars 原生检测连续 NULL(即原始缺失超过 max_gap_weeks 的位置)
76
# 利用 diff 计算相邻日期间隔,间隔 > (max_gap_weeks+1)*7 天即为截断点
77
```
78
### 2. DuckDB `memory_limit` 应从配置读取
79
S2-01 中 DuckDB 连接应设置 `memory_limit=str(settings.duckdb_memory_limit)`,建议在 `config.yaml` 中新增:
80
```yaml
81
duckdb:
82
  memory_limit: "40GB"
83
```
84
这样后续压测时可通过环境变量 `DUCKDB__MEMORY_LIMIT=20GB` 快速模拟低内存环境。
85
### 3. 审计日志的"率"统一精度
86
建议在 `src/utils/config.py` 中新增一个审计工具函数:
87
```python
88
def calc_rate(numerator: int, denominator: int, decimals: int = 4) -> float:
89
    """通用比率计算,分母为 0 时返回 0.0"""
90
```
91
供 `data_loader.py` 调用,保证全项目比率计算逻辑一致。
92
---
93
## 新增/修改文件清单
94
| 操作 | 文件路径 | 简介与企业级约束 |
95
|:---:|:---|:---|
96
| **修改** | `config/config.yaml` | ① `data:` 节点下新增 `raw_paths` 字典(`fund_net_info` / `fund_basic_info` 路径);② 新增 `duckdb.memory_limit: "40GB"` |
97
| **修改** | `src/utils/config.py` | 对应新增 `data.raw_paths` 和 `duckdb.memory_limit` 的强类型属性 |
98
| **新增** | `src/data_loader.py` | **核心模块**。包含 5 个公开函数:<br>`extract_raw_to_arrow()` — DuckDB 粗筛<br>`process_timeseries()` — Polars 状态机(ffill/截断/segment_id/建仓期/异常值)<br>`filter_and_sink_parquet()` — 存活期过滤 + 按年写 Parquet<br>`create_duckdb_view()` — 创建 `v_net_value_processed`<br>`generate_audit_report()` — 产出 `data_audit.json`<br>**严禁魔法数字,全部从 settings 读取** |
99
| **新增** | `tests/test_data_loader.py` | 时序状态机专项单测。必覆盖:跨年 ffill、>4 周截断 + segment_id 递增、建仓期仅取 segment_0、异常值 NULL 化、短生命周期基金剔除 |
100
| **新增** | `docs/data_contract.md` | **技术真相源**。定义 `net_value_YYYY.parquet` 的严格 Schema(含 `segment_id` 语义)、`data_audit.json` 的 JSON Schema、`v_net_value_processed` VIEW 的列定义 |
101
| **修改** | `docs/architecture_baseline.md` | 同步更新:① FW-3 章节补充"Step 2 全量 Arrow 破局方案"的例外说明;② 新增 `segment_id` 在防火墙架构中的数据流位置 |
102
| **修改** | `.gitea/workflows/ci.yml` | 增加 `pytest tests/test_data_loader.py -v` 步骤,纳入 CI 强阻断 |
103
| **产出** | `data/metadata/data_audit.json` | 运行时产物,不由代码模板生成,由 `data_loader.py` 动态写入 |
104
| **产出** | `data/metadata/processed_hash.json` | 运行时产物,由 `src/utils/hash.py` 计算 |
105
| **产出** | `data/processed/net_value_YYYY.parquet` | 运行时产物,Step 3/4 的直接输入 |
106 3 Huarui Lin
107
---
108
## 子目录
109
[[CR-001-M2里程碑_Parquet_Schema_变更]] 
110
111
---