行为
0002-数据基建 » 历史记录 » 修订 4
« 上一页 |
修订 4/10
(差异)
| 下一页 »
Huarui Lin, 2026-04-13 13:46
数据基建¶
| 周次 | Story ID | Story 描述 | 规约硬性映射 | 验收标准 | 工时 |
|---|---|---|---|---|---|
| W2 | S2-01 |
配置扩展与 DuckDB 粗筛降维 在 config.yaml 新增数据源路径配置;实现 DuckDB 读取 CSV、显式列投影(丢弃 id+4列冗余)、周频聚合、白名单 JOIN,输出精简 Arrow。 |
规约 1.2 (周聚合/类型过滤);章程 R-5 (显式丢弃4列);FW-1/FW-2 | ① config.yaml 新增 data.raw_paths 节点,data_loader.py 从配置读取路径;② DuckDB SQL 禁止 SELECT *,必须显式列出 fund_id, net_value_date, cumulative_net_value;③ JOIN fund_basic_info 后,Arrow Schema 仅含上述 3 列 + fund_type(临时审计用,最终落盘前丢弃);④ 单测:Mock 小 CSV 验证列裁剪正确。 |
2天 |
| W2 | S2-02 |
Polars 时序状态机(核心攻坚) 接收全量 Arrow,实现 ffill、>4周缺失截断并打 segment_id、建仓期硬删、异常值处理。 |
规约 1.2 (缺失填充/截断/建仓期/异常值);CR-001 (segment_id);全量 Arrow 破局方案 | ① segment_id 截断处递增,同基金内从 0 开始;② 建仓期硬删取 segment_id == 0 首条日期减 12 周(非全局首条);③ cumulative_net_value ≤ 0 的行被剔除(不进入 Parquet);④ 单周涨跌幅 >50% 的行 cumulative_net_value 置 NULL(保留行,保留时间连续性);⑤ 单测必覆盖:跨年 ffill、刚好第 5 周缺失触发截断、单基金仅 10 周被建仓期完全剔除、异常值 NULL 化。 |
3天 |
| W3 | S2-03 |
存活期过滤与按年分区落盘 统计各基金有效周数,剔除 <52 周,Polars 内按年切片写 Parquet,创建 DuckDB VIEW。 |
规约 1.2 (存活期过滤/清盘基金);规约 1.3 (存储方案/row_group_size);FW-4/FW-5 | ① 存活期计算 = 该基金所有段有效周数之和,< 52 周整体剔除; ② data/processed/ 下产出 net_value_YYYY.parquet(覆盖写入,不追加);③ PyArrow write_parquet() 显式指定 row_group_size=100000;④ create_duckdb_view() 函数拼接所有年份文件创建 v_net_value_processed;⑤ 单测:Mock 含短生命周期基金的 DataFrame,验证其被完整剔除。 |
2天 |
| W3 | S2-04 |
数据血缘、审计日志与内存压测 计算处理后 Hash,生成结构化审计 JSON,在 64GB 实体机执行全量压测并记录内存峰值。 |
章程 M2 Done 标准(更新版);规约 7.3 (数据血缘) | ① 产出 data/metadata/processed_hash.json(按年份文件逐个计算 SHA-256);② 产出 data/metadata/data_audit.json,Schema 严格符合下方定义;③ 压测日志包含 peak_memory_gb,证明 ≤ 40GB;④ 若 whitelist_retention_rate < 15%,触发 R-6 预警流程,向 PM 汇报。 |
2天 |
| W3 | S2-05 |
Gitea /docs 数据契约文档与 Redmine 验收编写 data_contract.md 沉淀为技术真相源,在 Redmine 录入 M2 验收 CheckList。 |
架构基线 3 (双轨制);章程 M2 | ① docs/data_contract.md 内容包含完整 Parquet Schema 定义与 data_audit.json Schema 定义;② Redmine M2 任务状态可更新为"待验收"; ③ CI 软预警检查通过( src/ 变更已同步 docs/ 变更)。 |
1天 |
data_audit.json 标准 Schema 定义
此文件为 M2 验收的机器可解析审计凭证,S2-04 必须严格按此 Schema 输出:
{
"pipeline": {
"step": "step2_data_loading",
"run_time": "2026-04-11T10:00:00+08:00",
"python_version": "3.12.x"
},
"input": {
"fund_net_info": {
"file_path": "data/raw/fund_net_info.csv",
"sha256": "a1b2c3d4...",
"total_rows": 23000000,
"date_range": ["2015-12-07", "2026-03-28"]
},
"fund_basic_info": {
"file_path": "data/raw/fund_basic_info.csv",
"sha256": "e5f6g7h8...",
"total_records": 26000,
"columns_actually_read": ["fund_id", "fund_name", "fund_type", "create_date"],
"columns_discarded": ["insert_datetime", "update_datetime", "insert_operator", "update_operator"]
}
},
"filtering": {
"whitelist_total_funds": 26000,
"whitelist_retained_funds": 8000,
"whitelist_filtered_funds": 18000,
"whitelist_retention_rate": 0.3077
},
"cleaning": {
"weekly_aggregation_dedup_rows": 500000,
"nav_le_zero_removed_rows": 1200,
"weekly_change_over_limit_nullified": 3500,
"building_period_dropped_rows": 96000,
"truncated_rows_by_gap": 42000,
"survival_filtered_funds": 1500,
"survival_filtered_rows": 78000
},
"output": {
"parquet_files": [
{"filename": "net_value_2016.parquet", "sha256": "...", "rows": 450000},
{"filename": "net_value_2017.parquet", "sha256": "...", "rows": 480000}
],
"total_rows_after_cleaning": 8500000,
"total_funds_after_cleaning": 6500,
"date_range": ["2016-01-04", "2026-03-28"],
"schema": ["fund_id:String", "net_value_date:Date", "cumulative_net_value:Float64", "segment_id:UInt32"],
"peak_memory_gb": 35.2,
"duckdb_view_name": "v_net_value_processed"
}
}
PM 卡控说明:上述数值为占位示例,研发必须使用
settings.filter.fund_type_whitelist等配置参数驱动计算,严禁硬编码。whitelist_retention_rate的分母必须是fund_basic_info的原始总行数。
可优化与可复用逻辑提示¶
1. ffill + 截断的 Polars 向量化实现(复用级)¶
# 推荐实现思路(伪代码,非最终交付)
# 先按 fund_id 排序,再用 Polars 原生 forward fill
df = df.sort(["fund_id", "net_value_date"])
df = df.with_columns(
pl.col("cumulative_net_value").forward_fill().over("fund_id")
)
# 然后 Polars 原生检测连续 NULL(即原始缺失超过 max_gap_weeks 的位置)
# 利用 diff 计算相邻日期间隔,间隔 > (max_gap_weeks+1)*7 天即为截断点
2. DuckDB memory_limit 应从配置读取¶
S2-01 中 DuckDB 连接应设置 memory_limit=str(settings.duckdb_memory_limit),建议在 config.yaml 中新增:
duckdb:
memory_limit: "40GB"
这样后续压测时可通过环境变量 DUCKDB__MEMORY_LIMIT=20GB 快速模拟低内存环境。
3. 审计日志的"率"统一精度¶
建议在 src/utils/config.py 中新增一个审计工具函数:
def calc_rate(numerator: int, denominator: int, decimals: int = 4) -> float:
"""通用比率计算,分母为 0 时返回 0.0"""
供 data_loader.py 调用,保证全项目比率计算逻辑一致。¶
新增/修改文件清单¶
| 操作 | 文件路径 | 简介与企业级约束 |
|---|---|---|
| 修改 | config/config.yaml |
① data: 节点下新增 raw_paths 字典(fund_net_info / fund_basic_info 路径);② 新增 duckdb.memory_limit: "40GB"
|
| 修改 | src/utils/config.py |
对应新增 data.raw_paths 和 duckdb.memory_limit 的强类型属性 |
| 新增 | src/data_loader.py |
核心模块。包含 5 个公开函数:extract_raw_to_arrow() — DuckDB 粗筛process_timeseries() — Polars 状态机(ffill/截断/segment_id/建仓期/异常值)filter_and_sink_parquet() — 存活期过滤 + 按年写 Parquetcreate_duckdb_view() — 创建 v_net_value_processedgenerate_audit_report() — 产出 data_audit.json严禁魔法数字,全部从 settings 读取 |
| 新增 | tests/test_data_loader.py |
时序状态机专项单测。必覆盖:跨年 ffill、>4 周截断 + segment_id 递增、建仓期仅取 segment_0、异常值 NULL 化、短生命周期基金剔除 |
| 新增 | docs/data_contract.md |
技术真相源。定义 net_value_YYYY.parquet 的严格 Schema(含 segment_id 语义)、data_audit.json 的 JSON Schema、v_net_value_processed VIEW 的列定义 |
| 修改 | docs/architecture_baseline.md |
同步更新:① FW-3 章节补充"Step 2 全量 Arrow 破局方案"的例外说明;② 新增 segment_id 在防火墙架构中的数据流位置 |
| 修改 | .gitea/workflows/ci.yml |
增加 pytest tests/test_data_loader.py -v 步骤,纳入 CI 强阻断 |
| 产出 | data/metadata/data_audit.json |
运行时产物,不由代码模板生成,由 data_loader.py 动态写入 |
| 产出 | data/metadata/processed_hash.json |
运行时产物,由 src/utils/hash.py 计算 |
| 产出 | data/processed/net_value_YYYY.parquet |
运行时产物,Step 3/4 的直接输入 |
下级目录¶
CR-001-M2里程碑_Parquet_Schema_变更¶
由 Huarui Lin 更新于 4 天 之前 · 4 修订