项目

一般

简介

行为

0002-数据基建 » 历史记录 » 修订 7

« 上一页 | 修订 7/10 (差异) | 下一页 »
Huarui Lin, 2026-04-13 13:48


数据基建

周次 Story ID Story 描述 规约硬性映射 验收标准 工时
W2 S2-01 配置扩展与 DuckDB 粗筛降维
config.yaml 新增数据源路径配置;实现 DuckDB 读取 CSV、显式列投影(丢弃 id+4列冗余)、周频聚合、白名单 JOIN,输出精简 Arrow。
规约 1.2 (周聚合/类型过滤);章程 R-5 (显式丢弃4列);FW-1/FW-2 config.yaml 新增 data.raw_paths 节点,data_loader.py 从配置读取路径;
② DuckDB SQL 禁止 SELECT *,必须显式列出 fund_id, net_value_date, cumulative_net_value
③ JOIN fund_basic_info 后,Arrow Schema 仅含上述 3 列 + fund_type(临时审计用,最终落盘前丢弃);
④ 单测:Mock 小 CSV 验证列裁剪正确。
2天
W2 S2-02 Polars 时序状态机(核心攻坚)
接收全量 Arrow,实现 ffill、>4周缺失截断并打 segment_id、建仓期硬删、异常值处理。
规约 1.2 (缺失填充/截断/建仓期/异常值);CR-001 (segment_id);全量 Arrow 破局方案 segment_id 截断处递增,同基金内从 0 开始;
② 建仓期硬删取 segment_id == 0 首条日期减 12 周(非全局首条);
cumulative_net_value ≤ 0 的行被剔除(不进入 Parquet);
④ 单周涨跌幅 >50% 的行 cumulative_net_value 置 NULL(保留行,保留时间连续性);
单测必覆盖:跨年 ffill、刚好第 5 周缺失触发截断、单基金仅 10 周被建仓期完全剔除、异常值 NULL 化。
3天
W3 S2-03 存活期过滤与按年分区落盘
统计各基金有效周数,剔除 <52 周,Polars 内按年切片写 Parquet,创建 DuckDB VIEW。
规约 1.2 (存活期过滤/清盘基金);规约 1.3 (存储方案/row_group_size);FW-4/FW-5 ① 存活期计算 = 该基金所有段有效周数之和,< 52 周整体剔除;
data/processed/ 下产出 net_value_YYYY.parquet(覆盖写入,不追加);
③ PyArrow write_parquet() 显式指定 row_group_size=100000
create_duckdb_view() 函数拼接所有年份文件创建 v_net_value_processed
⑤ 单测:Mock 含短生命周期基金的 DataFrame,验证其被完整剔除。
2天
W3 S2-04 数据血缘、审计日志与内存压测
计算处理后 Hash,生成结构化审计 JSON,在 64GB 实体机执行全量压测并记录内存峰值。
章程 M2 Done 标准(更新版);规约 7.3 (数据血缘) ① 产出 data/metadata/processed_hash.json(按年份文件逐个计算 SHA-256);
② 产出 data/metadata/data_audit.json,Schema 严格符合下方定义;
③ 压测日志包含 peak_memory_gb,证明 ≤ 40GB;
④ 若 whitelist_retention_rate < 15%,触发 R-6 预警流程,向 PM 汇报。
2天
W3 S2-05 Gitea /docs 数据契约文档与 Redmine 验收
编写 data_contract.md 沉淀为技术真相源,在 Redmine 录入 M2 验收 CheckList。
架构基线 3 (双轨制);章程 M2 docs/data_contract.md 内容包含完整 Parquet Schema 定义与 data_audit.json Schema 定义;
② Redmine M2 任务状态可更新为"待验收";
③ CI 软预警检查通过(src/ 变更已同步 docs/ 变更)。
1天

data_audit.json 标准 Schema 定义

此文件为 M2 验收的机器可解析审计凭证,S2-04 必须严格按此 Schema 输出:

{
  "pipeline": {
    "step": "step2_data_loading",
    "run_time": "2026-04-11T10:00:00+08:00",
    "python_version": "3.12.x"
  },
  "input": {
    "fund_net_info": {
      "file_path": "data/raw/fund_net_info.csv",
      "sha256": "a1b2c3d4...",
      "total_rows": 23000000,
      "date_range": ["2015-12-07", "2026-03-28"]
    },
    "fund_basic_info": {
      "file_path": "data/raw/fund_basic_info.csv",
      "sha256": "e5f6g7h8...",
      "total_records": 26000,
      "columns_actually_read": ["fund_id", "fund_name", "fund_type", "create_date"],
      "columns_discarded": ["insert_datetime", "update_datetime", "insert_operator", "update_operator"]
    }
  },
  "filtering": {
    "whitelist_total_funds": 26000,
    "whitelist_retained_funds": 8000,
    "whitelist_filtered_funds": 18000,
    "whitelist_retention_rate": 0.3077
  },
  "cleaning": {
    "weekly_aggregation_dedup_rows": 500000,
    "nav_le_zero_removed_rows": 1200,
    "weekly_change_over_limit_nullified": 3500,
    "building_period_dropped_rows": 96000,
    "truncated_rows_by_gap": 42000,
    "survival_filtered_funds": 1500,
    "survival_filtered_rows": 78000
  },
  "output": {
    "parquet_files": [
      {"filename": "net_value_2016.parquet", "sha256": "...", "rows": 450000},
      {"filename": "net_value_2017.parquet", "sha256": "...", "rows": 480000}
    ],
    "total_rows_after_cleaning": 8500000,
    "total_funds_after_cleaning": 6500,
    "date_range": ["2016-01-04", "2026-03-28"],
    "schema": ["fund_id:String", "net_value_date:Date", "cumulative_net_value:Float64", "segment_id:UInt32"],
    "peak_memory_gb": 35.2,
    "duckdb_view_name": "v_net_value_processed"
  }
}

PM 卡控说明:上述数值为占位示例,研发必须使用 settings.filter.fund_type_whitelist 等配置参数驱动计算,严禁硬编码whitelist_retention_rate 的分母必须是 fund_basic_info 的原始总行数。


可优化与可复用逻辑提示

1. ffill + 截断的 Polars 向量化实现(复用级)

# 推荐实现思路(伪代码,非最终交付)
# 先按 fund_id 排序,再用 Polars 原生 forward fill
df = df.sort(["fund_id", "net_value_date"])
df = df.with_columns(
    pl.col("cumulative_net_value").forward_fill().over("fund_id")
)
# 然后 Polars 原生检测连续 NULL(即原始缺失超过 max_gap_weeks 的位置)
# 利用 diff 计算相邻日期间隔,间隔 > (max_gap_weeks+1)*7 天即为截断点

2. DuckDB memory_limit 应从配置读取

S2-01 中 DuckDB 连接应设置 memory_limit=str(settings.duckdb_memory_limit),建议在 config.yaml 中新增:

duckdb:
  memory_limit: "40GB"

这样后续压测时可通过环境变量 DUCKDB__MEMORY_LIMIT=20GB 快速模拟低内存环境。

3. 审计日志的"率"统一精度

建议在 src/utils/config.py 中新增一个审计工具函数:

def calc_rate(numerator: int, denominator: int, decimals: int = 4) -> float:
    """通用比率计算,分母为 0 时返回 0.0"""

data_loader.py 调用,保证全项目比率计算逻辑一致。

新增/修改文件清单

操作 文件路径 简介与企业级约束
修改 config/config.yaml data: 节点下新增 raw_paths 字典(fund_net_info / fund_basic_info 路径);② 新增 duckdb.memory_limit: "40GB"
修改 src/utils/config.py 对应新增 data.raw_pathsduckdb.memory_limit 的强类型属性
新增 src/data_loader.py 核心模块。包含 5 个公开函数:
extract_raw_to_arrow() — DuckDB 粗筛
process_timeseries() — Polars 状态机(ffill/截断/segment_id/建仓期/异常值)
filter_and_sink_parquet() — 存活期过滤 + 按年写 Parquet
create_duckdb_view() — 创建 v_net_value_processed
generate_audit_report() — 产出 data_audit.json
严禁魔法数字,全部从 settings 读取
新增 tests/test_data_loader.py 时序状态机专项单测。必覆盖:跨年 ffill、>4 周截断 + segment_id 递增、建仓期仅取 segment_0、异常值 NULL 化、短生命周期基金剔除
新增 docs/data_contract.md 技术真相源。定义 net_value_YYYY.parquet 的严格 Schema(含 segment_id 语义)、data_audit.json 的 JSON Schema、v_net_value_processed VIEW 的列定义
修改 docs/architecture_baseline.md 同步更新:① FW-3 章节补充"Step 2 全量 Arrow 破局方案"的例外说明;② 新增 segment_id 在防火墙架构中的数据流位置
修改 .gitea/workflows/ci.yml 增加 pytest tests/test_data_loader.py -v 步骤,纳入 CI 强阻断
产出 data/metadata/data_audit.json 运行时产物,不由代码模板生成,由 data_loader.py 动态写入
产出 data/metadata/processed_hash.json 运行时产物,由 src/utils/hash.py 计算
产出 data/processed/net_value_YYYY.parquet 运行时产物,Step 3/4 的直接输入

下级目录

CR-001-M2里程碑_Parquet_Schema_变更


Huarui Lin 更新于 4 天 之前 · 7 修订