从 CSV文件的加载、分区和处理 来理解 RDD

从 CSV文件的加载、分区和处理 来理解 RDD

之前看了不少关于Spark RDD的介绍,其实看都可以看得懂,但是还是会有不少疑问。比如

  • RDD 是抽象的,那么RDD中的谈到的分区列表、依赖关系、计算函数又是存储在哪的?
  • 执行器是怎么就拿到了自己的 分区数据的?会不会多拿到其他分区的数据
  • RDD分区列表中如果不存储真实数据,那么这些数据又是怎么分配到执行器的?

这里我拆解一个 csv文件在 HDFS上 如何被 Spark RDD 加载、分区和存储的,就应该很方便理解 RDD了。


1. HDFS 存储层面:物理分块

  • 假设: 比如 CSV 文件 data.csv 大小为 256MB,HDFS 默认块大小(block size)为 128MB
  • HDFS 行为:
    • HDFS 会自动将文件切割成 2 个物理块:
      • Block 0: 0 - 128MB (存储在节点 Node1Node2 上,副本)
      • Block 1: 128MB - 256MB (存储在节点 Node3Node4 上,副本)
    • 关键点: 这是 物理存储级别 的分块,由 HDFS 控制,目的是容错和分布式存储。

2. Spark 读取:创建初始 RDD (textFile)

当我在 Spark 中执行:

1
JavaRDD<String> rdd = sc.textFile("hdfs://data/users.csv");  

Spark 会执行以下操作:

a) 确定 RDD 的分区数

  • 默认规则: 每个 HDFS Block 对应 1个 RDD 分区

    • 本例中:文件被切分成 2 个 HDFS Block → RDD 会有 2 个分区 (Partition 0, Partition 1)。
  • 手动指定分区数:

    1
    val rdd = sc.textFile("hdfs://path/to/data.csv", 4) // 强制分成4个分区
    • 如果文件不可切分(如 GZIP 压缩文件),分区数 = 文件数。
    • 如果文件可切分(如 CSV),Spark 会尝试按字节划分成 4 份(可能不等分)。

b) 分区与 HDFS Block 的映射

RDD 分区 负责的 HDFS Block 范围 数据位置偏好 (Preferred Locations)
分区 0 0 - 128MB (Block 0) [Node1, Node2] (Block 0 的存储节点)
分区 1 128MB - 256MB (Block 1) [Node3, Node4] (Block 1 的存储节点)

极其重要: 每个 RDD 分区知道它应该读取哪个 HDFS Block,并且知道该 Block 存储在哪些节点上。


3. 数据存储位置:运行时过程

执行阶段:

  1. Driver 分配任务:
    • 根据 RDD 分区的位置偏好,Driver 将任务分配给离数据最近的 Executor。
    • 例如:
      • Task 0 (处理分区0) → 优先调度到 Node1Node2 上的 Executor。
      • Task 1 (处理分区1) → 优先调度到 Node3Node4 上的 Executor。
  2. Executor 读取数据:
    • 每个 Task 启动后,通过 HDFS Client 读取对应的 Block。
    • 本地性优化:
      • 如果 Task 在 Node1 执行,它直接从本机的 HDFS DataNode 读取 Block 0 (本地读取,速度最快)。
      • 如果 Node1 繁忙,Task 可能调度到 Node2 → 从同一机架内的 Node1 读取 (机架本地性,速度中等)。
      • 最差情况:跨机架读取(如调度到 Node5 读取 Block 0)。
  3. 数据在 Executor 中的存储:
    • 读取的数据以 行(String) 的形式加载到内存(每行是 CSV 中的一条记录)。
    • 存储位置:Executor 的 JVM 堆内存 中(除非指定了持久化级别)。
    • 存储形式:按分区存储,每个 Task 处理的分区数据独立存在于执行它的 Executor 内存中。

4. 分区数据内容

假设 CSV 内容如下:

1
2
3
4
1,Alice,Beijing
2,Bob,Shanghai
1001,David,Shanghai
1002,Eve,Shenzhen
  • 分区 0 可能包含:

    1
    ["1,Alice,Beijing", "2,Bob,Shanghai", "3,Charlie,Beijing", ...]  # 前128MB
  • 分区 1 可能包含:

    1
    ["1001,David,Shanghai", "1002,Eve,Shenzhen", ...]  # 后128MB

⚠️ 注意: 分区边界是按字节切割的,可能截断某一行(最后一行不完整)。Spark 会确保:

  • 分区0读取到第一个完整行 → 最后一个完整行
  • 分区1从上个分区未读完的位置开始 → 直到文件结束

5.详细执行过程

我们所编写的spark代码整体如下,刚刚说的是 加载csv部分,下面说的就是 数据清洗与聚合了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
JavaSparkContext sc = new JavaSparkContext();

// 1. 加载 CSV(假设文件 256MB,2个HDFS Block)
JavaRDD<String> rdd = sc.textFile("hdfs://data/users.csv"); // 2个分区

// 2. 数据清洗(过滤 + 提取字段)
JavaPairRDD<String, Integer> cleanedRdd = rdd.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String line) throws Exception {
return !line.contains("id"); // 过滤标题行
}
}).mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String line) throws Exception {
String[] arr = line.split(",");
return new Tuple2<>(arr[2], 1); // (城市, 1)
}
});

// 3. Shuffle操作(按城市聚合)
JavaPairRDD<String, Integer> cityCountRdd = cleanedRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
});

// 4. 触发计算(行动操作)
List<Tuple2<String, Integer>> results = cityCountRdd.collect();

// 输出结果
for (Tuple2<String, Integer> result : results) {
System.out.println(result._1() + ": " + result._2());
}

阶段1:加载与初始分区(窄依赖)

阶段2:转换操作(窄依赖 - 流水线执行)

  • Executor1 处理 Partition 0

    1
    2
    3
    4
    5
    # 输入
    ["1,Alice,Beijing", "2,Bob,Shanghai", ...]

    # 转换后
    [("Beijing",1), ("Shanghai",1), ("Beijing",1), ...]
  • Executor2 处理 Partition 1

    1
    2
    3
    4
    5
    # 输入
    ["1001,David,Shanghai", "1002,Eve,Shenzhen", ...]

    # 转换后
    [("Shanghai",1), ("Shenzhen",1), ...]

这里我们其实可以发现一个特点,就是每个 Executor 都整好处理的是 自己机器上的 Partition数据。

关键特点:无需跨节点通信,各分区独立处理

阶段3:Shuffle操作(宽依赖 - 数据重组)

步骤 1: 划分 Stage(Driver决策)

  • 识别 reduceByKey 是宽依赖 → 创建新 Stage
  • Stage 0: 所有窄依赖操作(textFile → filter → map)
  • Stage 1: reduceByKey

步骤 2:Stage 0 执行(Shuffle Write)

  • Shuffle 写操作

    • 每个 Task 将自己的数据 按 Key(城市)分组

    • 根据默认分区器(HashPartitioner)计算目标分区

      1
      2
      Partition 0: Beijing, Shenzhen → Hash值%2=0
      Partition 1: Shanghai → Hash值%2=1
    • 将数据写入 本地磁盘的 Shuffle 临时文件,并生成索引文件

  • Executor1 写入

    1
    2
    3
    4
    5
    # 文件:shuffle_0_temp_0.data (Partition 0)
    ("Beijing",1), ("Beijing",1)

    # 文件:shuffle_0_temp_1.data (Partition 1)
    ("Shanghai",1)
  • Executor2 写入

    1
    2
    3
    4
    5
    # 文件:shuffle_1_temp_0.data (Partition 0)
    ("Shenzhen",1)

    # 文件:shuffle_1_temp_1.data (Partition 1)
    ("Shanghai",1)

步骤 3: Stage 1 执行(Shuffle Read)

  • Driver 创建新 Task

    • Task A:处理最终 RDD 的 Partition 0(Key: Beijing, Shenzhen)
    • Task B:处理最终 RDD 的 Partition 1(Key: Shanghai)
  • Shuffle 读操作

    • Task A 从 所有 Executor 拉取属于 Partition 0 的数据:

      1
      2
      # 从 Executor1 拉取:("Beijing",1), ("Beijing",1)
      # 从 Executor2 拉取:("Shenzhen",1)
    • Task B 拉取 Partition 1 的数据:

      1
      2
      # 从 Executor1 拉取:("Shanghai",1)
      # 从 Executor2 拉取:("Shanghai",1)
  • 聚合计算

    • Task A 执行 reduceByKey

      1
      2
      Beijing: 1 + 1 = 2
      Shenzhen: 1
    • Task B 执行:

      1
      Shanghai: 1 + 1 = 2

阶段 4: 结果收集

最终数据到达 Driver:

1
[("Beijing",2), ("Shenzhen",1), ("Shanghai",2)]

Shuffle 数据流全景

为什么需要 Shuffle?因为有些操作必须要将数据进行重新分区才好进行计算、统计。

操作类型 数据移动 网络开销 典型操作
窄依赖 数据不动 Task移动 map, filter
宽依赖 数据按Key重组 跨节点传输 groupByKey, reduceByKey

通过这个完整示例,就可以看到:

  1. 分区如何从物理存储映射到计算单元
  2. 窄依赖如何实现零数据传输的流水线
  3. 宽依赖如何通过Shuffle重组数据
  4. Stage划分如何驱动分布式计算

这正是 RDD 抽象的核心价值——通过清晰的阶段划分和依赖关系,在分布式环境中高效执行复杂计算。不需要拘束于通过概念理解 RDD。

5. 核心问题解答

Q1:分区数据存储在哪里?

  • 原始数据: 物理存储在 HDFS DataNode(如 Node1, Node2, Node3, Node4)。
  • RDD 处理时:
    • 计算中:数据加载到 Executor 的 JVM 堆内存
    • 持久化后:可缓存到 Executor 内存/磁盘(通过 rdd.persist())。

Q2:会多获取数据吗?

  • 绝对不会! 每个 Task 只读取自己分区映射的 HDFS Block 范围

  • 优化机制:

    机制 作用
    位置感知调度 Task 优先在存有数据的节点执行
    HDFS 块定位 Executor 直接读取本地或邻近节点的数据块
    精确的字节范围读取 每个 Task 只读取分配给它的连续字节区间

6. 总结:

关键原则:

  1. 移动计算,而非数据:将 Task 发送到数据所在的节点。简而言之就是让计算尽量贴近数据。我们可能习惯于将数据传输到计算端进行处理,比如查询mysql、es、mongo。但是对于大数据处理来说,计算逻辑好移动,而数据难移动。

  2. 分而治之:大文件被划分成小分区,并行处理。这其实就是类似于后端常用的多线程,大文件划区之后,并行处理,那么整体速度就得到极大提升。

  3. 精准映射:RDD 分区与 HDFS Block 一一对应,无重复读取。不光HDFS天然分块,其实很多分布式文件系统都是有分块机制的,比如S3。对于不天然支持分块的数据源,比如mysql 是可以人工分区的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 并行读取 MySQL (按照分区键进行划分区,此处就是1-1000000是分区0,1000001-2000000是分区1 以此类推...)
    val jdbcDF = spark.read
    .format("jdbc")
    .option("url", "jdbc:mysql://host:3306/db")
    .option("dbtable", "orders")
    .option("partitionColumn", "order_id") // 分区键
    .option("lowerBound", 1) // 最小值
    .option("upperBound", 1000000) // 最大值
    .option("numPartitions", 10) // 分区数
    .load()

通过这种设计,Spark 能高效处理远大于内存的分布式文件,而开发者只需写几行代码。