Airflow量化入门系列:第三章 A 股市场数据处理与存储
本教程系统性地讲解了 Apache Airflow 在 A 股量化交易中的应用,覆盖从基础安装到高级功能的完整知识体系。通过六章内容,读者将掌握 Airflow 的核心概念、任务调度、数据处理、技术指标计算、策略回测及工作流监控等关键技能。教程整合 Tushare 数据源、TA-Lib 技术指标库和 VectorBT 策略回测工具,提供丰富实战案例,帮助构建高效、可靠的工作流,助力量化交易实践。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。
学习对象
- 中高级水平的开发者
- 具备 Python 编程基础,熟悉基本的 ETL 流程和数据分析工具
- 希望掌握 Airflow 在量化交易场景中的应用
教程目标
- 系统掌握 Apache Airflow 的核心功能与高级特性
- 深入理解 Airflow 在 A 股量化交易工作流中的应用
- 能够独立设计、部署和维护复杂的量化交易工作流
教程目录
第一章:Apache Airflow 基础
1.1 Airflow 简介与安装
1.2 Airflow 核心概念
1.3 Airflow Web UI 使用与管理
1.4 Airflow 配置与环境搭建
第二章:Airflow 任务调度与依赖管理
2.1 DAG 定义与任务依赖关系
2.2 Task 类型
2.3 任务调度策略与优先级
2.4 动态任务生成与条件分支
第三章:A 股市场数据处理与存储
3.1 使用 Tushare 获取 A 股数据
3.2 数据清洗与预处理
3.3 Parquet 文件存储与读取
3.4 数据存储优化与性能提升
第四章:技术指标计算与策略实现
4.1 TA-Lib 基础与常用技术指标
4.2 技术指标计算流程设计
4.3 策略回测与评估(使用 Vector BT)
4.4 策略优化与参数调整
第五章:Airflow 高级功能与最佳实践
5.1 Airflow 与 Docker 集成
5.2 Airflow 任务监控与日志分析
5.3 Airflow 任务容错与重试机制
5.4 Airflow 任务性能优化
第六章:完整案例:A 股量化交易工作流
6.1 数据获取与存储工作流
6.2 技术指标计算工作流
6.3 策略回测与结果存储工作流
6.4 工作流监控与维护
第三章 A 股市场数据处理与存储
3.1 使用 Tushare 获取 A 股数据
理论详解
推荐阅读🚀:
- Pandas+PyArrow:股票数据存储 Parquet 入门指引 🔥
- A股数据存储实战:Parquet技术深度解析
Parquet 是一种高效的列式存储格式,具有以下优点:
- 高效存储:列式存储减少存储空间。
- 快速查询:支持按列查询,提高查询效率。
- 压缩支持:支持多种压缩算法。
Parquet 文件的使用:
- 存储:使用
pandas.to_parquet()
方法。 - 读取:使用
pandas.read_parquet()
方法。
Tushare 是一个开源的金融数据接口,提供了丰富的 A 股市场数据。常见的数据接口包括:
- daily:获取日线行情数据。
- weekly:获取周线行情数据。
- monthly:获取月线行情数据。
- trade_cal:获取交易日历。
Tushare API 使用方法:
- 安装 Tushare:使用
pip install tushare
安装。 - 配置 API Token:通过
ts.set_token("your_token")
配置。 - 获取数据:使用
pro.daily()
等接口获取数据。
实战示例
获取特定股票的历史数据并存储为 Parquet 文件:
import os
from datetime import datetimeimport tushare as tsfrom airflow import DAG
from airflow.configuration import conf
from airflow.operators.python import PythonOperator# 配置 Tushare API
tushare_token = conf.get("tushare", "api_token")
ts.set_token(tushare_token)
pro = ts.pro_api()# 数据存储目录
data_dir = conf.get("tushare", "data_folder")
if not os.path.exists(data_dir):os.makedirs(data_dir)def fetch_stock_data(**kwargs):"""获取特定股票的历史数据并存储为 Parquet 文件。:param kwargs: Airflow 任务参数:return: None"""stock_code = kwargs["dag_run"].conf.get("stock_code", "000001.SZ")start_date = kwargs["dag_run"].conf.get("start_date", "20230101")end_date = kwargs["dag_run"].conf.get("end_date", "20241231")print(f"Fetch data for {stock_code} from {start_date} to {end_date}")# 获取股票数据df = pro.daily(ts_code=stock_code, start_date=start_date, end_date=end_date)# 存储为 Parquet 文件data_path = os.path.join(data_dir, f"stock_data_{stock_code}.parquet")df.to_parquet(data_path, index=False)print(f"Data saved to {data_path}")return data_path# 定义 DAG
with DAG("fetch_stock_data",description="Fetch stock data from Tushare and save as Parquet",schedule_interval=None,start_date=datetime(2023, 1, 1),tags=["quant", "stock"],
) as dag:# 定义任务fetch_task = PythonOperator(task_id="fetch_stock_data",python_callable=fetch_stock_data,provide_context=True,)# 设置任务依赖关系
fetch_task
运行 DAG:
- 将 DAG 文件保存到 Airflow 的
dags
目录。 - 在 Airflow Web UI 中触发 DAG,查看任务状态和日志。
3.2 数据清洗与预处理
理论详解
数据清洗是数据处理的重要步骤,常见的清洗方法包括:
- 处理缺失值:删除或填充缺失值。
- 处理异常值:识别并处理异常值。
- 数据标准化:将数据转换为统一的尺度。
实战示例
清洗股票数据,计算股票下一日收益率,处理缺失值:
import os
from datetime import datetimeimport pandas as pdfrom airflow import DAG
from airflow.configuration import conf
from airflow.operators.python import PythonOperator# 数据存储目录
data_dir = conf.get("tushare", "data_folder")def calculate_clean_stock_data(**kwargs):"""清洗股票数据,计算股票下一日收益率,处理缺失值。:param kwargs: Airflow 任务参数:return: 处理后的数据文件路径"""stock_code = kwargs["dag_run"].conf.get("stock_code", "000001.SZ")data_path = os.path.join(data_dir, f"stock_data_{stock_code}.parquet")# 读取 Parquet 文件df = pd.read_parquet(data_path)# 计算下一日收益率df["returns"] = df["close"].pct_change().shift(-1)# 处理缺失值df.dropna(inplace=True) # 删除缺失值# 分离路径和文件名dir_path, file_name = os.path.split(data_path)# 存储清洗后的数据cleaned_data_path = os.path.join(dir_path, f"calculate_cleaned_{file_name}")df.to_parquet(cleaned_data_path, index=False)print(f"Data saved to {cleaned_data_path}")return cleaned_data_path# 定义 DAG
with DAG("calculate_clean_stock_data",description="Clean stock data, calculate the stock's return on the next day, and handle missing values.",schedule_interval=None,start_date=datetime(2023, 1, 1),tags=["quant", "stock"],
) as dag:# 定义任务calculate_clean_task = PythonOperator(task_id="calculate_clean_stock_data",python_callable=calculate_clean_stock_data,provide_context=True,)# 设置任务依赖关系
calculate_clean_task
运行 DAG:
- 将 DAG 文件保存到 Airflow 的
dags
目录。 - 在 Airflow Web UI 中触发 DAG,查看任务状态和日志。
3.3 数据存储优化与性能提升
理论详解
数据存储优化的常见策略包括:
- 分区存储:按时间或类别分区,提高查询效率。
- 索引优化:使用索引加速查询。
- 压缩算法:选择合适的压缩算法减少存储空间。
实战示例
使用分区存储与索引优化:
import os
from datetime import datetimeimport pandas as pdfrom airflow import DAG
from airflow.configuration import conf
from airflow.operators.python import PythonOperator# 数据存储目录
data_dir = conf.get("tushare", "data_folder")def optimize_storage(**kwargs):"""使用分区存储与索引优化数据存储。:param kwargs: Airflow 任务参数:return: 优化后的数据文件路径"""stock_code = kwargs["dag_run"].conf.get("stock_code", "000001.SZ")data_path = os.path.join(data_dir, f"calculate_cleaned_stock_data_{stock_code}.parquet")# 读取 Parquet 文件df = pd.read_parquet(data_path)#optimized_data_path = os.path.join(data_dir, "optimized_data")if not os.path.exists(optimized_data_path):os.makedirs(optimized_data_path)df.to_parquet(optimized_data_path,index=False,compression="snappy",partition_cols=["trade_date"],)print(f"Data saved to {optimized_data_path}")return optimized_data_path# 定义 DAG
with DAG("optimize_storage",description="Optimize data storage using partitioning and indexing",schedule_interval=None,start_date=datetime(2023, 1, 1),tags=["quant", "stock"],
) as dag:# 定义任务optimize_storage = PythonOperator(task_id="optimize_storage",python_callable=optimize_storage,provide_context=True,)# 设置任务依赖关系
optimize_storage
增加文件描述符限制
具体步骤如下:
在 Linux 上
-
临时增加限制(仅对当前会话有效):
ulimit -n 65535 # 将文件描述符限制设置为 65535
-
永久增加限制(需要重启系统或重新登录生效):
-
编辑
/etc/security/limits.conf
文件,添加或修改以下行:* soft nofile 65535 * hard nofile 65535
-
编辑
/etc/sysctl.conf
文件,添加或修改以下行:fs.file-max = 65535
-
应用更改:
sudo sysctl -p
-
在 macOS 上
-
临时增加限制(仅对当前会话有效):
ulimit -n 65535 # 将文件描述符限制设置为 65535
-
永久增加限制(需要重启系统或重新登录生效):
-
编辑
/etc/launchd.conf
文件,添加或修改以下行:limit maxfiles 65535 65535
-
重启系统以应用更改。
-
运行 DAG:
- 将 DAG 文件保存到 Airflow 的
dags
目录。 - 在 Airflow Web UI 中触发 DAG,查看任务状态和日志。
总结
通过本章的学习,您已经掌握了如何使用 Tushare 获取 A 股数据,以及如何进行数据清洗、存储和优化。在接下来的章节中,我们将深入探讨技术指标计算与策略实现。
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。