Spark核心概念与懒惰计算[未修订完]

Spark核心概念与懒惰计算[未修订完]

1. Spark核心数据结构:RDD与共享变量

在深入探讨算子之前,我们必须首先理解Spark工作的基本单元:弹性分布式数据集(RDD)共享变量。它们是构建所有Spark应用的基础。

1.1 核心抽象:弹性分布式数据集(RDD)

RDD (Resilient Distributed Dataset) 是Spark最核心的抽象。可以将其理解为一个不可变的、可分区的、包含可并行计算元素的大型集合

想象一下,你有一本超大的书(你的海量数据),这本书太厚了,一个人根本读不完。于是你想了个办法:

  1. 分布式(Distributed)
    • 你把书撕成很多小册子(分片)
    • 分给一群朋友(集群中的计算机)每人读一部分
  2. 数据集(Dataset)
    • 这本”书”就是你的数据集合
    • 可以是数字、文字、用户信息等等
  3. 弹性(Resilient)
    • 突然有个朋友把咖啡洒在小册子上(机器故障)
    • 没关系!因为你记得这本书是怎么撕开的(血统)
    • 你可以重新复印那几页(重新计算)
    • 整个阅读工作不会因此停止

这个就是RDD的设计特点:

  • **弹性 (Resilient)**:RDD通过其“血缘关系(Lineage)”天生支持容错。如果某个分区的数据丢失,Spark可以根据血缘关系重新计算出该分区,而无需从头再来。
  • **分布式 (Distributed)**:RDD的数据被分成多个分区(Partition),存储在集群的不同节点上。这使得数据可以被并行处理。
  • **数据集 (Dataset)**:它是一个只读的数据集合,可以存储任何类型的Java或Python对象。
  • **不可变 (Immutable)*:一旦创建,RDD就不能被修改。对RDD的任何操作(转换)都会生成一个新的*RDD。这种设计简化了并发和容错。
  • **惰性计算 (Lazy Evaluation)**:在Spark中,对RDD的转换操作(如map、filter、join等)不会立即执行,而是记录下操作(形成血缘关系图)。只有当遇到一个行动操作(如count、collect、save等)时,才会触发实际的计算。

为什么RDD重要?

  • 不怕故障:机器坏了数据能恢复
  • 高效并行:任务可以分给很多机器同时做
  • 灵活处理:适合各种复杂的数据处理任务
  • 内存计算:数据可以放在内存中处理,比读硬盘快得多

创建RDD

在Spark中,创建RDD主要有两种方式:

  1. 并行化一个已有的集合:使用SparkContextparallelize方法,将Driver程序中的一个普通集合(如List)转换为一个分布式RDD。这主要用于学习和测试。

    1
    2
    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> distData = sc.parallelize(data);
  2. 读取外部数据源:从HDFS、S3、本地文件系统等外部存储系统加载数据。这是生产环境中最常见的方式。

    1
    JavaRDD<String> distFile = sc.textFile("data.txt");

1.2 优化工具:共享变量

通常情况下,当我们在Driver端定义的函数(闭包)被发送到Executor上执行时,函数中引用的所有变量都会被复制一份,每个任务都拥有一份独立的副本。但有时,我们需要在所有任务间共享数据,或者将结果聚合回Driver端。为此,Spark提供了两种特殊的共享变量。

1.2.1 广播变量 (Broadcast Variables)

问题:当一个较大的只读变量(例如,一个查找表或配置对象)被多个任务使用时,如果直接在闭包中引用它,这个变量会被序列化并随每个任务一起发送到Executor,造成巨大的网络开销。

解决方案:使用广播变量。广播变量只会被发送到每个Executor一次,然后该Executor上的所有任务都可以共享这份数据。这极大地减少了网络传输和Driver的负载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 有一个较大的只读查找表
Map<String, String> lookupTable = new HashMap<>();
lookupTable.put("A", "Apple");
lookupTable.put("B", "Ball");

// 将其广播出去
Broadcast<Map<String, String>> broadcastTable = sc.broadcast(lookupTable);

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("A", "B"));

// 在算子中通过.value()方法访问广播变量
rdd.map(key -> {
Map<String, String> localTable = broadcastTable.value();
return localTable.get(key);
}).collect();

1.2.2 累加器 (Accumulators)

问题:任务在Executor上执行时是相互隔离的,我们无法在算子内部安全地修改一个外部变量(例如,用一个计数器来统计符合某个条件的记录数)。

解决方案:使用累加器。累加器是一种只支持“累加”操作的变量,它可以在所有任务中被安全地并行更新,最终由Driver端统一读取结果。Spark原生支持数值型和集合类型的累加器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建一个长整型累加器,初始值为0
LongAccumulator counter = sc.sc().longAccumulator("MyCounter");

JavaRDD<Integer> numbers = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));

// 在算子中通过.add()方法累加
numbers.foreach(x -> {
if (x % 2 == 0) {
counter.add(1);
}
});

// 在Driver端通过.value()获取最终结果
System.out.println("偶数的数量是: " + counter.value()); // 输出: 2

理解了RDD、广播变量和累加器之后,我们就可以开始学习如何使用算子来操作这些数据结构了。

2. Spark算子的分类与特性

Spark算子是构建分布式数据处理应用的基础指令,理解其分类是掌握Spark编程模型的第一步。算子可以从两个核心维度进行分类:按功能划分按依赖关系划分。这两个维度共同决定了算子的行为、执行时机和性能特征。

2.1 按功能划分:转换(Transformation)与行动(Action)

这个维度决定了算子的执行时机,是理解Spark核心特性——懒惰计算——的关键。

转换(Transformation)

  • 核心思想:只定义计算逻辑,不立即执行。
  • 特点懒惰计算(Lazy Evaluation)。调用时,Spark并不会立即执行计算,而是将该操作记录下来,形成一个计算的有向无环图(DAG)的一部分。这就像是制定一份详细的作战计划,但并不开火。
  • 返回值:一个新的RDD,代表了应用该转换后的结果数据集。
  • 代表算子map, filter, flatMap, groupByKey, reduceByKey, join, repartition等。

行动(Action)

  • 核心思想:触发计算,获取结果。
  • 特点立即计算(Eager Evaluation)。调用时,会触发一个Spark作业(Job)的提交和执行。Spark会根据之前构建的DAG,将计算任务分发到集群执行,这是“开火”的信号。
  • 返回值:一个非RDD类型的值(如Int, List)或无返回值(例如,将结果写入外部存储)。
  • 代表算子count, collect, first, take, reduce, foreach, saveAsTextFile等。

2.2 按依赖关系划分:窄依赖(Narrow)与宽依赖(Wide)

这个维度决定了数据的物理流转方式,是理解Spark性能瓶颈——Shuffle——的关键。

窄依赖算子(Narrow Dependency):

  • 定义:子RDD的每个分区只依赖父RDD的一个或少数几个固定的分区。这意味着计算可以在单个节点上独立完成,无需等待其他节点的数据。
  • 特点:数据不需要跨节点传输(No Shuffle),计算可以在单个节点上以流水线(Pipeline)方式高效执行,性能极高。
  • 代表算子map, filter, flatMap, union等。

宽依赖算子(Wide Dependency):

  • 定义:子RDD的每个分区依赖父RDD的所有或多个分区。这意味着一个子分区的计算需要从父RDD的多个分区拉取数据。
  • 特点:需要进行Shuffle操作,数据必须在网络间进行大规模传输和重分区。Shuffle是Spark中最昂贵的操作之一,是性能优化的重点和难点。
  • 代表算子groupByKey, reduceByKey, join, distinct, repartition等。

3. 算子执行的内存与磁盘管理

在分布式计算中,内存与磁盘的管理是决定性能和稳定性的核心要素。Spark通过一个精巧的统一内存管理(Unified Memory Management)模型以及高效的溢写(Spill)机制,在执行效率、数据缓存和大规模数据处理能力之间取得了动态平衡。

3.1 统一内存管理模型

Spark通过一个精巧的统一内存管理(Unified Memory Management)模型,在执行效率和数据缓存之间取得了动态平衡。

在Spark 1.6版本之前,执行内存和存储内存是静态划分的,利用率不高。而统一内存管理模型允许这两部分内存在运行时动态地相互借用,从而极大地提升了内存使用效率。其核心思想是:计算优先,在不影响计算的前提下,尽可能多地利用内存进行数据缓存。

内存区域划分:

Spark Executor的内存被划分为几个关键区域:

  1. **保留内存 (Reserved Memory)**:系统保留内存,固定为300MB,用于存储Spark内部对象和元数据,防止OOM。
  2. **用户内存 (User Memory)**:用户代码使用的内存区域,例如,在算子函数中创建的自定义对象、数据结构等。这部分内存不受Spark管理,如果使用不当,是OOM的主要来源之一。
  3. **Spark内存 (Spark Memory)**:Spark框架自身管理的内存,是优化的核心区域。它进一步动态地分为:
    • **执行内存 (Execution Memory)**:执行算子(如Shuffle、Join、Sort、Aggregate)时所需的内存。这部分内存用于存储中间数据,例如Shuffle时的缓冲区。它是保障计算任务顺利执行的关键。
    • **存储内存 (Storage Memory)**:用于缓存RDD、广播变量等数据的内存。通过将常用数据缓存在此,可以避免重复计算,提升性能。当执行内存不足时,Spark会强制驱逐(Evict)存储内存中缓存的数据块,为计算任务腾出空间。

🔥 核心机制:执行内存和存储内存共享 Unified Memory 区域(动态抢占):

  • 执行任务可抢占存储内存(若存储内存未用完)
  • 存储内存只能被动回收(LRU 策略),不能抢占执行内存

3.2 内存与磁盘的交互:溢写 (Spill) 与合并 (Merge)

统一内存管理模型解释了内存的内部划分与动态调整,但当执行内存本身也不足以容纳所有计算所需的数据时会发生什么?这时,Spark会启动溢写(Spill)机制,将部分数据临时写入磁盘,以释放内存供当前计算任务继续使用。

溢写(Spill)

  • 触发时机:在执行需要大量内存的算子时(如 reduceByKey, groupByKey, sortByKey, join),这些算子通常使用基于哈希的聚合器或外部排序器。当这些内存中的数据结构(例如,一个巨大的哈希表)的大小超过了可用的执行内存时,溢写就会被触发。
  • 过程:Spark将内存中的数据(例如哈希表的部分内容)进行排序(如果需要),然后序列化成字节流,写入本地磁盘上的一个临时文件。之后,清空内存中的这部分数据结构,以继续处理后续数据。一个任务可能会因为数据量巨大而产生多个溢写文件。

合并(Merge)

  • 触发时机:当一个任务处理完其所有的输入数据后,它可能已经在磁盘上留下了多个溢写文件,同时内存里可能还剩余一部分数据。
  • 过程:为了形成该任务的最终输出(例如,为Shuffle的下一阶段准备数据),Spark会启动一个合并流程。它使用归并排序的策略,同时从所有溢写文件和内存中读取数据,将它们合并成一个单一的、通常是排序好的输出文件。这个最终文件才是Shuffle阶段网络传输的源文件。

这个 内存-溢写-合并 的流程是Spark能够处理远超内存容量的大规模数据的关键。它以磁盘I/O的开销为代价,换取了计算的稳定性和对海量数据的处理能力。

4. 数据序列化与网络传输

在分布式系统中,数据需要在不同节点间通过网络传输,而网络传输的数据必须是二进制格式。序列化就是将内存中的Java对象(包含数据和结构)转换为二进制字节流的过程,而反序列化则是相反的过程。序列化是Spark中一个基础但极其重要的性能影响点,它的效率直接决定了网络传输和磁盘IO的开销。

Spark在多个场景下都会触发序列化操作:

序列化触发场景:

1
2
3
4
5
6
7
// 1. Task序列化:将Task从Driver发送到Executor
public class TaskSerialization {
public byte[] serializeTask(Task<?> task) {
// 任务包含:代码、依赖、分区信息
return serializer.serialize(task);
}
}
  • 场景解读:当你在Driver端编写的算子函数(闭包)需要被发送到Executor上执行时,整个任务(包括代码和其引用的外部变量)都会被序列化。如果闭包引用了庞大且不可序列化的对象,会导致任务提交失败或性能低下。
1
2
3
4
5
6
7
8
9
10
11
// 2. Shuffle序列化:数据在节点间传输
public class ShuffleDataSerialization {
public void writeShuffleData(Iterator<Product2<K, V>> records) {
SerializationStream stream = serializer.serializeStream(outputStream);
while (records.hasNext()) {
Product2<K, V> record = records.next();
stream.writeKey(record._1); // 序列化key
stream.writeValue(record._2); // 序列化value
}
}
}
  • 场景解读:这是序列化最影响性能的环节。在Shuffle过程中,大量数据需要在节点间流动,高效的序列化格式(如Kryo)可以显著减少网络传输的数据量和CPU消耗。
1
2
3
4
5
6
7
8
9
10
// 3. 缓存序列化:RDD持久化到内存/磁盘
public class CacheSerialization {
public void cacheRDD(RDD<?> rdd, StorageLevel level) {
if (level.useSerialization()) {
// 将RDD数据序列化后存储
byte[] serializedData = serializer.serialize(rdd.collect());
blockManager.putBytes(blockId, serializedData, level);
}
}
}
  • 场景解读:当你调用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)等包含_SER的缓存级别时,数据会以序列化的形式存储。这样做的好处是节省内存空间,但代价是每次访问缓存数据时都需要进行反序列化,增加了CPU开销。

5. 为什么理解算子原理如此重要?

在Spark开发中,实现同一个业务需求往往有多种算子组合。然而,不同的实现方式可能导致百倍甚至千倍的性能差异。这种差异的根源,就在于每个算子背后的数据处理和流转机制完全不同。

性能差异的根本原因:

不同算子的性能差异主要源于它们在以下几个方面的不同选择:

  1. 依赖关系:是需要Shuffle的宽依赖,还是无需Shuffle的窄依赖?这是最核心的区别。
  2. 数据局部性:计算是在数据所在的节点本地执行,还是必须通过网络拉取远程数据?
  3. 内存使用模式:算子是一次性将整个分区加载到内存,还是以流式(Streaming)方式逐条处理?这决定了内存消耗的峰值。
  4. CPU利用率:算子的计算逻辑是否复杂,是否能被Spark的优化器(如Tungsten)进行优化?

下面的例子直观地展示了groupByKeyreduceByKey的巨大性能差异,尽管它们都能实现分组聚合的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 场景:处理1GB数据,统计每个用户的订单数量

// 方案1:使用groupByKey - 性能差
JavaPairRDD<String, Integer> result1 = orders
.mapToPair(order -> new Tuple2<>(order.getUserId(), 1))
.groupByKey() // 宽依赖,Shuffle所有数据
.mapValues(values -> {
int count = 0;
for (Integer v : values) count += v;
return count;
});
// 执行时间:约45秒,Shuffle数据量:1GB

// 方案2:使用reduceByKey - 性能好
JavaPairRDD<String, Integer> result2 = orders
.mapToPair(order -> new Tuple2<>(order.getUserId(), 1))
.reduceByKey((a, b) -> a + b); // 本地预聚合,减少Shuffle数据
// 执行时间:约15秒,Shuffle数据量:约100MB(假设有10万用户)

优化思路的本质:
因此,理解算子原理并非炫技,而是进行性能优化的基石。它能让你在开发时就具备“性能思维”,从而能够:

  • 选择合适的算子:主动避免不必要的Shuffle,例如用reduceByKey替代groupByKey
  • 设计合理的数据流:通过broadcast等技巧,将Shuffle密集型的join操作优化为本地计算。
  • 利用数据局部性:合理设计分区策略,让计算尽可能在数据所在的节点发生。
  • 合理配置资源:预估算子的内存消耗,为作业分配合理的内存和CPU资源,避免OOM和性能瓶颈。

为什么有些算子执行很快,有些却很慢?答案就藏在算子的实现原理和数据流转机制中。只有深入理解这些,才能真正驾驭Spark。

6. RDD懒惰计算机制深度剖析

懒惰计算(Lazy Evaluation)是Spark最核心、最巧妙的设计之一,是其实现高效、容错的分布式计算的基石。简单来说,懒惰计算就是“非到万不得已,绝不执行计算”

6.1 核心概念:懒惰计算 vs 急切计算

懒惰计算(Lazy Evaluation)

  • 指的是Spark在遇到转换操作(Transformations)时,并不会立即执行计算并生成新的RDD
  • 它只是记录下这个操作以及它依赖的父RDD(即:构建了一个逻辑执行计划或称为Lineage)
  • 真正的计算(数据读取、转换处理)会被推迟到遇到行动操作(Actions)时才触发执行

急切计算(Eager Evaluation)

  • 传统编程或某些数据处理框架(如Scala集合的某些操作)是急切计算的
  • 当你调用一个函数,它会立即执行并返回结果
  • 例如,在Scala中List(1,2,3).map(_ * 2)会立即计算并返回List(2,4,6)

对比示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 急切计算 - 传统Java集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> doubled = numbers.stream()
.map(x -> x * 2) // 立即执行
.filter(x -> x > 5) // 立即执行
.collect(Collectors.toList()); // 立即返回结果

// 懒惰计算 - Spark RDD
JavaRDD<Integer> numbersRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaRDD<Integer> transformedRDD = numbersRDD
.map(x -> x * 2) // 仅记录操作,不执行
.filter(x -> x > 5); // 仅记录操作,不执行
// 此时还没有任何实际计算发生!

List<Integer> result = transformedRDD.collect(); // 这里才开始真正计算

6.2 为什么Spark要采用懒惰计算?

这种“谋定而后动”的设计哲学,为Spark带来了几个在分布式环境下至关重要的优势:

1. 优化执行计划(Optimization)

这是懒惰计算最大的优势。因为所有转换操作都只是记录在DAG中,直到行动操作被调用前,Spark都拥有了计算的全景图。这使得Spark的优化器(如DAGScheduler和Catalyst)可以从全局视角对整个计算流程进行优化。

  • 流水线化(Pipelining):在急切计算中,rdd.map(...).filter(...)会执行两次全量数据扫描。但在懒惰计算中,Spark会将mapfilter这两个操作合并(fuse)成一个任务。数据在分区内以流式的方式被处理,一条数据处理完map后立刻进行filter,无需将中间结果写入内存或磁盘,极大地提升了效率。
  • 谓词下推(Predicate Pushdown):如果数据源(如Parquet、ORC)支持,Spark会将filter操作下推到数据读取层。这样,在数据加载到内存之前,就能过滤掉大量无关数据,从源头上减少了IO和内存的消耗。
  • 减少Shuffle:优化器可以分析整个DAG,识别出可以避免或优化的Shuffle操作,例如在多个join操作中选择最优的执行顺序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 懒惰计算的优化示例
public class LazyEvaluationOptimizer {

public void demonstrateOptimizations() {
JavaRDD<String> textRDD = sc.textFile("large_file.txt");

// 定义一系列转换操作(全部是懒惰的)
JavaRDD<String> result = textRDD
.filter(line -> line.contains("ERROR")) // 过滤操作
.map(line -> line.toUpperCase()) // 转换操作
.filter(line -> line.length() > 50) // 再次过滤
.map(line -> line.substring(0, 100)); // 截取操作

// 只有在调用Action时,Spark才开始优化和执行
long count = result.count();

// Spark的优化策略:
// 1. 流水线化:将所有map和filter操作合并为单个Task执行
// 2. 谓词下推:如果数据源支持,将filter下推到读取层
// 3. 减少中间结果:不需要物化每个中间RDD
}
}

2. 减少不必要的计算(Reduced Computation)

懒惰计算使得Spark可以只计算任务真正需要的数据。对于一些只需要部分结果的行动操作,这个特性可以节省大量的计算资源。

  • 当你调用rdd.take(5)时,Spark知道只需要获取5条记录。它会启动任务,一旦某个分区计算得到了足够的5条记录,其他正在运行的或尚未开始的任务就可以被终止,避免了对整个数据集的无效扫描。
  • 类似地,rdd.first()只会计算第一个分区,直到找到第一条记录为止。

3. 节省内存和存储(Memory/Storage Efficiency)

由于转换操作不会立即物化(materialize)中间结果RDD,因此极大地节省了内存和磁盘空间。在一个长长的转换链中,数据以流的方式在算子间传递,处理完即被回收,内存中只需保留当前正在处理的数据即可。这与那些每一步都生成完整中间结果的系统形成了鲜明对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 内存效率示例
public class MemoryEfficiency {

public void demonstrateMemoryEfficiency() {
JavaRDD<String> data = sc.textFile("input.txt");

// 定义长转换链(全部懒惰)
JavaRDD<String> step1 = data.map(line -> processStep1(line));
JavaRDD<String> step2 = step1.filter(line -> line.length() > 10);
JavaRDD<String> step3 = step2.map(line -> processStep2(line));
JavaRDD<String> step4 = step3.filter(line -> line.contains("important"));
JavaRDD<String> finalResult = step4.map(line -> processStep3(line));

// 关键点:这些中间RDD(step1-step4)不会真的存储在内存中!
// 它们只是包含Lineage信息的对象,实际数据在Action时才计算

// 只有在Action触发时,数据才流式处理,无需存储中间结果
long count = finalResult.count();

// 对比:如果是急切计算,每个step都会产生完整的中间数据集
// 这会消耗5倍的内存!
}
}

4. 容错性(Fault Tolerance)的天然支持

懒惰计算与Spark的容错机制紧密相关。因为Spark记录了完整的RDD血缘关系(Lineage),即每个RDD是如何通过转换操作从其父RDD派生而来的。这个Lineage就像一份详细的“数据重建指南”。当集群中某个节点故障,导致其上的数据分区丢失时,Spark可以根据这份指南,精确地只重新计算丢失的那个分区,而无需重跑整个作业。懒惰计算使得记录这份“指南”成为其执行模型的自然组成部分。

6.3 懒惰计算工作原理:详细示例分析

让我们通过一个完整的例子来理解懒惰计算的工作流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class LazyEvaluationWorkflow {

public void completeExample() {
// 1. 定义RDD(惰性:只记录来源)
JavaRDD<String> textRDD = sc.textFile("hdfs://path/to/largefile.txt");
System.out.println("Step 1: 创建textRDD - 无计算发生,只记录数据源");

// 2. 转换操作(惰性:只记录转换逻辑)
JavaRDD<String> wordsRDD = textRDD.flatMap(line ->
Arrays.asList(line.split(" ")).iterator());
System.out.println("Step 2: 创建wordsRDD - 无计算发生,只记录flatMap操作");

JavaRDD<String> filteredRDD = wordsRDD.filter(word ->
word.startsWith("error"));
System.out.println("Step 3: 创建filteredRDD - 无计算发生,只记录filter操作");

JavaPairRDD<String, Integer> mappedRDD = filteredRDD.mapToPair(word ->
new Tuple2<>(word, 1));
System.out.println("Step 4: 创建mappedRDD - 无计算发生,只记录mapToPair操作");

JavaPairRDD<String, Integer> errorCountRDD = mappedRDD.reduceByKey((a, b) -> a + b);
System.out.println("Step 5: 创建errorCountRDD - 无计算发生,只记录reduceByKey操作");

// 此时的状态:
printRDDLineage(errorCountRDD);

// 3. 行动操作(触发计算!)
System.out.println("Step 6: 调用collect() - 开始真正的计算!");
List<Tuple2<String, Integer>> result = errorCountRDD.collect();

System.out.println("计算完成,结果: " + result);
}

private void printRDDLineage(JavaPairRDD<String, Integer> rdd) {
System.out.println("=== RDD Lineage 信息 ===");
System.out.println("errorCountRDD 依赖链:");
System.out.println(" textRDD (HadoopRDD) <- 数据源");
System.out.println(" -> wordsRDD (FlatMappedRDD) <- flatMap转换");
System.out.println(" -> filteredRDD (FilteredRDD) <- filter转换");
System.out.println(" -> mappedRDD (MapPartitionsRDD) <- mapToPair转换");
System.out.println(" -> errorCountRDD (ShuffledRDD) <- reduceByKey转换");
System.out.println("此时只有逻辑执行计划,没有实际数据!");
}
}

6.4 执行流程详解

阶段1:定义和转换阶段(textFile 到 reduceByKey)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// DAG构建过程的内部机制
public class DAGBuildingProcess {

public void demonstrateDAGBuilding() {
// 当执行每个转换操作时,Spark内部的工作:

// 1. textFile操作
JavaRDD<String> textRDD = sc.textFile("input.txt");
// 内部:创建HadoopRDD对象,记录:
// - 数据源路径
// - 分区策略(基于HDFS块)
// - 依赖关系:无(叶子节点)

// 2. flatMap操作
JavaRDD<String> wordsRDD = textRDD.flatMap(line ->
Arrays.asList(line.split(" ")).iterator());
// 内部:创建FlatMappedRDD对象,记录:
// - 父RDD:textRDD
// - 转换函数:split和iterator
// - 依赖类型:窄依赖(OneToOneDependency)

// 3. filter操作
JavaRDD<String> filteredRDD = wordsRDD.filter(word -> word.startsWith("error"));
// 内部:创建FilteredRDD对象,记录:
// - 父RDD:wordsRDD
// - 过滤函数:startsWith判断
// - 依赖类型:窄依赖

// 4. reduceByKey操作
JavaPairRDD<String, Integer> countRDD =
filteredRDD.mapToPair(w -> new Tuple2<>(w, 1)).reduceByKey((a, b) -> a + b);
// 内部:创建ShuffledRDD对象,记录:
// - 父RDD:mappedRDD
// - 聚合函数:addition
// - 依赖类型:宽依赖(ShuffleDependency)
// - 分区器:HashPartitioner

// 构建的DAG图:
System.out.println("DAG结构:");
System.out.println("HadoopRDD -> FlatMappedRDD -> FilteredRDD -> MappedRDD -> ShuffledRDD");
System.out.println(" | | | | |");
System.out.println(" textFile flatMap filter mapToPair reduceByKey");
System.out.println(" |");
System.out.println(" Stage分界点(Shuffle)");
}
}

阶段2:Action触发执行

1. DAGScheduler分析

  • 从collect()的目标RDD(ShuffledRDD)开始回溯Lineage链
  • 识别依赖关系:发现flatMap+filter+mapToPair可流水线执行
  • 确定reduceByKey需要单独Stage(宽依赖边界)

2. Stage划分

  • Stage 0:textFile → flatMap → filter → mapToPair
    • 输出:按key分区的(word, 1)对
  • Stage 1:reduceByKey
    • 输入:Stage 0的Shuffle输出
    • 输出:(word, count)结果

3. 任务生成

  • Stage 0:根据输入分区数(4个HDFS块)生成4个ShuffleMapTask
  • Stage 1:根据Shuffle分区数(默认200)生成200个ResultTask

4. 任务调度

  • TaskScheduler将任务分发到Executor
  • 优先考虑数据本地性(数据所在节点)
  • 严格执行Stage顺序:Stage 0完成才能开始Stage 1

5. 任务执行

  • Stage 0任务:
    1. 读取HDFS文件块
    2. 流水线执行:split → filter → mapToPair
    3. 按key哈希分区,执行Shuffle Write
  • Stage 1任务:
    1. 从多个节点拉取数据(Shuffle Read)
    2. 按key聚合执行reduceByKey
    3. 生成最终(word, count)结果

6. 结果收集

  • 所有Stage 1的ResultTask将结果发送回Driver
  • Driver汇总所有结果返回给用户

RDD的懒惰计算机制是Spark实现高效、容错的大规模分布式数据处理的核心智慧。它将昂贵的计算推迟到最后,并利用这段时间窗口进行全局优化,极大地提升了处理能力和资源利用率。理解这一机制对于编写高效的Spark应用程序至关重要。

7. 数据本地性原理深度解析

“移动计算,而非移动数据”是大数据处理的基本原则。数据本地性(Data Locality)正是这一原则在Spark中的具体体现。由于网络传输的开销远大于内存读取,Spark的调度器会尽可能地将计算任务分配到存储着其所需数据的节点上执行,以最大限度地减少网络IO,提升性能。

7.1 数据本地性的层次结构

Spark定义了从优到劣的多个本地性级别,任务调度器会按照这个顺序,在一定的时间等待阈值内,为任务寻找最“近”的可用资源。

本地性级别的详细定义:

  • PROCESS_LOCAL: 任务和数据在同一个Executor的JVM进程中。这是最理想的级别,数据无需任何网络传输,可以直接在内存中访问。
  • NODE_LOCAL: 任务和数据在同一个物理节点上,但可能在不同的Executor进程中。数据需要通过节点内部的进程间通信或共享内存来传输。
  • RACK_LOCAL: 任务和数据在同一个机架(Rack)的不同节点上。数据需要通过机架内的交换机进行网络传输。
  • ANY: 任务和数据在集群的任何地方,通常意味着需要跨机架进行网络传输,这是开销最大的情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 数据本地性级别枚举
public enum TaskLocality {
PROCESS_LOCAL("PROCESS_LOCAL", 0) {
@Override
public boolean isAllowed(TaskSetManager taskSet, long currentTime) {
// 进程本地:数据在同一JVM进程中
return taskSet.getAllowedLocalityLevel(currentTime).ordinal() >= ordinal();
}
},

NODE_LOCAL("NODE_LOCAL", 1) {
@Override
public boolean isAllowed(TaskSetManager taskSet, long currentTime) {
// 节点本地:数据在同一物理节点上
return taskSet.getAllowedLocalityLevel(currentTime).ordinal() >= ordinal();
}
},

RACK_LOCAL("RACK_LOCAL", 2) {
@Override
public boolean isAllowed(TaskSetManager taskSet, long currentTime) {
// 机架本地:数据在同一机架内
return taskSet.getAllowedLocalityLevel(currentTime).ordinal() >= ordinal();
}
},

ANY("ANY", 3) {
@Override
public boolean isAllowed(TaskSetManager taskSet, long currentTime) {
// 任意位置:可以在任何地方执行
return true;
}
};

public final String toString;
public final int id;

TaskLocality(String toString, int id) {
this.toString = toString;
this.id = id;
}

public abstract boolean isAllowed(TaskSetManager taskSet, long currentTime);
}

本地性感知的任务调度算法:
Spark的任务调度并非“死等”最佳的本地性级别。它采用了一种延迟调度(Delay Scheduling)策略,在效率和时间之间进行权衡:

  1. 调度器首先尝试以PROCESS_LOCAL级别在数据所在的Executor上启动任务。
  2. 如果在设定的等待时间(默认为3秒,由spark.locality.wait配置)内,该Executor没有空闲资源,调度器会将本地性级别降级到NODE_LOCAL,尝试在该节点的其他Executor上启动任务。
  3. 如果又等待了一个周期后仍然没有资源,级别会继续降级到RACK_LOCAL,最后到ANY
    这种策略既追求了最佳的数据本地性,又避免了因等待某个繁忙节点而导致整个作业被阻塞。
1
2
3
4
5
6
7
8
9
// 本地性感知调度器
public class LocalityAwareTaskScheduler {
// 本地性等待时间配置
private final Map<TaskLocality, Long> localityWaitMap = Map.of(
TaskLocality.PROCESS_LOCAL, 3000L, // 3秒
TaskLocality.NODE_LOCAL, 3000L, // 3秒
TaskLocality.RACK_LOCAL, 3000L, // 3秒
TaskLocality.ANY, 0L // 立即执行
);