在 Apache Spark 中,DAG(有向无环图)并不是在 RDD 转换操作定义完成后就立即生成的,而是在触发 行动操作(Action) 时动态构建的。以下是详细解释:
1. DAG 生成的触发时机
-
转换操作(Transformations):
当用户定义map
、filter
、groupByKey
等转换操作时,Spark 仅记录 RDD 的血缘关系(Lineage),不会生成 DAG。# 示例:仅定义转换操作,此时未生成 DAG rdd = sc.parallelize([1, 2, 3, 4]) mapped = rdd.map(lambda x: x * 2) filtered = mapped.filter(lambda x: x > 4)
-
行动操作(Actions):
当调用collect()
、count()
、saveAsTextFile()
等行动操作时,Spark 才会根据 RDD 的血缘关系生成 DAG。# 触发行动操作时生成 DAG filtered.collect() # 此时开始构建 DAG 并执行任务
2. DAG 的生成过程
Spark 的 DAGScheduler
负责以下步骤:
- 解析 RDD 的血缘关系:
根据 RDD 的依赖链(父 RDD → 子 RDD)构建逻辑执行计划。 - 划分阶段(Stages):
- 窄依赖:合并为同一个阶段(Pipeline),无需 Shuffle。
- 宽依赖:作为阶段边界,触发 Shuffle 并划分新阶段。
- 生成 DAG:
将阶段按依赖关系连接为 DAG,提交给TaskScheduler
执行。
示例:
rdd = sc.parallelize([1, 2, 3, 4])
mapped = rdd.map(lambda x: x * 2) # 窄依赖
grouped = mapped.groupByKey() # 宽依赖(假设 mapped 是键值对)
filtered = grouped.filter(lambda x: x > 4) # 窄依赖
filtered.collect() # 触发 DAG 生成
- 阶段划分:
- Stage 0:
map
(窄依赖)。 - Stage 1:
groupByKey
(宽依赖)→filter
(窄依赖)。
- Stage 0:
- DAG 结构:Stage 0 → Stage 1。
3. 为什么是惰性生成?
- 优化机会:
延迟到行动操作时生成 DAG,允许 Spark 对多个转换操作进行优化(如合并连续的map
操作)。 - 资源节省:
避免在定义转换时提前分配资源,直到需要计算时才生成任务。
4. 可视化 DAG
通过 Spark UI 可以查看生成的 DAG:
- 访问
http://<driver-node>:4040
。 - 在 Jobs 标签页中,点击具体 Job 查看 DAG 可视化图。
- 每个阶段(Stage)用方框表示,箭头表示依赖关系。
总结
- DAG 的生成时机:在调用行动操作(如
collect()
)时动态构建,而非 RDD 转换定义时。 - 生成逻辑:基于 RDD 的血缘关系和宽窄依赖划分阶段,形成 DAG。
- 核心价值:通过延迟执行和动态优化,提升分布式计算的效率和资源利用率。