从 CSV文件的加载、分区和处理 来理解 RDD

从 CSV文件的加载、分区和处理 来理解 RDD
XR从 CSV文件的加载、分区和处理 来理解 RDD
之前看了不少关于Spark RDD的介绍,其实看都可以看得懂,但是还是会有不少疑问。比如
- RDD 是抽象的,那么RDD中的谈到的分区列表、依赖关系、计算函数又是存储在哪的?
- 执行器是怎么就拿到了自己的 分区数据的?会不会多拿到其他分区的数据
- RDD分区列表中如果不存储真实数据,那么这些数据又是怎么分配到执行器的?
这里我拆解一个 csv文件在 HDFS上 如何被 Spark RDD 加载、分区和存储的,就应该很方便理解 RDD了。
1. HDFS 存储层面:物理分块
- 假设: 比如 CSV 文件
data.csv大小为 256MB,HDFS 默认块大小(block size)为 128MB。 - HDFS 行为:
- HDFS 会自动将文件切割成 2 个物理块:
Block 0: 0 - 128MB (存储在节点Node1和Node2上,副本)Block 1: 128MB - 256MB (存储在节点Node3和Node4上,副本)
- 关键点: 这是 物理存储级别 的分块,由 HDFS 控制,目的是容错和分布式存储。
- HDFS 会自动将文件切割成 2 个物理块:
2. Spark 读取:创建初始 RDD (textFile)
当我在 Spark 中执行:
1 | JavaRDD<String> rdd = sc.textFile("hdfs://data/users.csv"); |
Spark 会执行以下操作:
a) 确定 RDD 的分区数
默认规则: 每个 HDFS Block 对应 1个 RDD 分区。
- 本例中:文件被切分成 2 个 HDFS Block → RDD 会有 2 个分区 (
Partition 0,Partition 1)。
- 本例中:文件被切分成 2 个 HDFS Block → RDD 会有 2 个分区 (
手动指定分区数:
1
val rdd = sc.textFile("hdfs://path/to/data.csv", 4) // 强制分成4个分区
- 如果文件不可切分(如 GZIP 压缩文件),分区数 = 文件数。
- 如果文件可切分(如 CSV),Spark 会尝试按字节划分成 4 份(可能不等分)。
b) 分区与 HDFS Block 的映射
| RDD 分区 | 负责的 HDFS Block 范围 | 数据位置偏好 (Preferred Locations) |
|---|---|---|
| 分区 0 | 0 - 128MB (Block 0) | [Node1, Node2] (Block 0 的存储节点) |
| 分区 1 | 128MB - 256MB (Block 1) | [Node3, Node4] (Block 1 的存储节点) |
✅ 极其重要: 每个 RDD 分区知道它应该读取哪个 HDFS Block,并且知道该 Block 存储在哪些节点上。
3. 数据存储位置:运行时过程
执行阶段:
- Driver 分配任务:
- 根据 RDD 分区的位置偏好,Driver 将任务分配给离数据最近的 Executor。
- 例如:
Task 0(处理分区0) → 优先调度到Node1或Node2上的 Executor。Task 1(处理分区1) → 优先调度到Node3或Node4上的 Executor。
- Executor 读取数据:
- 每个 Task 启动后,通过 HDFS Client 读取对应的 Block。
- 本地性优化:
- 如果 Task 在
Node1执行,它直接从本机的 HDFS DataNode 读取Block 0(本地读取,速度最快)。 - 如果
Node1繁忙,Task 可能调度到Node2→ 从同一机架内的Node1读取 (机架本地性,速度中等)。 - 最差情况:跨机架读取(如调度到
Node5读取Block 0)。
- 如果 Task 在
- 数据在 Executor 中的存储:
- 读取的数据以 行(String) 的形式加载到内存(每行是 CSV 中的一条记录)。
- 存储位置:Executor 的 JVM 堆内存 中(除非指定了持久化级别)。
- 存储形式:按分区存储,每个 Task 处理的分区数据独立存在于执行它的 Executor 内存中。
4. 分区数据内容
假设 CSV 内容如下:
1 | 1,Alice,Beijing |
分区 0 可能包含:
1
["1,Alice,Beijing", "2,Bob,Shanghai", "3,Charlie,Beijing", ...] # 前128MB
分区 1 可能包含:
1
["1001,David,Shanghai", "1002,Eve,Shenzhen", ...] # 后128MB
⚠️ 注意: 分区边界是按字节切割的,可能截断某一行(最后一行不完整)。Spark 会确保:
- 分区0读取到第一个完整行 → 最后一个完整行
- 分区1从上个分区未读完的位置开始 → 直到文件结束
5.详细执行过程
我们所编写的spark代码整体如下,刚刚说的是 加载csv部分,下面说的就是 数据清洗与聚合了。
1 | JavaSparkContext sc = new JavaSparkContext(); |
阶段1:加载与初始分区(窄依赖)
graph TB
A[HDFS Block 0] -->|读取| B[Executor1-Part0]
C[HDFS Block 1] -->|读取| D[Executor2-Part1]
阶段2:转换操作(窄依赖 - 流水线执行)
graph LR
B[Part0原始数据] --> E[filter+map操作] --> F[Part0清洗后]
D[Part1原始数据] --> G[filter+map操作] --> H[Part1清洗后]
Executor1 处理 Partition 0:
1
2
3
4
5# 输入
["1,Alice,Beijing", "2,Bob,Shanghai", ...]
# 转换后
[("Beijing",1), ("Shanghai",1), ("Beijing",1), ...]Executor2 处理 Partition 1:
1
2
3
4
5# 输入
["1001,David,Shanghai", "1002,Eve,Shenzhen", ...]
# 转换后
[("Shanghai",1), ("Shenzhen",1), ...]
这里我们其实可以发现一个特点,就是每个 Executor 都整好处理的是 自己机器上的 Partition数据。
✅ 关键特点:无需跨节点通信,各分区独立处理
阶段3:Shuffle操作(宽依赖 - 数据重组)
步骤 1: 划分 Stage(Driver决策)
- 识别
reduceByKey是宽依赖 → 创建新 Stage - Stage 0: 所有窄依赖操作(textFile → filter → map)
- Stage 1: reduceByKey
步骤 2:Stage 0 执行(Shuffle Write)
graph TB
F[Part0清洗后] --> I[按城市分区]
H[Part1清洗后] --> J[按城市分区]
I --> K[Executor1本地磁盘]
J --> L[Executor2本地磁盘]
Shuffle 写操作:
每个 Task 将自己的数据 按 Key(城市)分组
根据默认分区器(
HashPartitioner)计算目标分区1
2Partition 0: Beijing, Shenzhen → Hash值%2=0
Partition 1: Shanghai → Hash值%2=1将数据写入 本地磁盘的 Shuffle 临时文件,并生成索引文件
Executor1 写入:
1
2
3
4
5# 文件:shuffle_0_temp_0.data (Partition 0)
("Beijing",1), ("Beijing",1)
# 文件:shuffle_0_temp_1.data (Partition 1)
("Shanghai",1)Executor2 写入:
1
2
3
4
5# 文件:shuffle_1_temp_0.data (Partition 0)
("Shenzhen",1)
# 文件:shuffle_1_temp_1.data (Partition 1)
("Shanghai",1)
步骤 3: Stage 1 执行(Shuffle Read)
graph LR
K[Shuffle文件] --> M[TaskA]
L[Shuffle文件] --> M
K --> N[TaskB]
L --> N
Driver 创建新 Task:
- Task A:处理最终 RDD 的 Partition 0(Key: Beijing, Shenzhen)
- Task B:处理最终 RDD 的 Partition 1(Key: Shanghai)
Shuffle 读操作:
Task A 从 所有 Executor 拉取属于 Partition 0 的数据:
1
2# 从 Executor1 拉取:("Beijing",1), ("Beijing",1)
# 从 Executor2 拉取:("Shenzhen",1)Task B 拉取 Partition 1 的数据:
1
2# 从 Executor1 拉取:("Shanghai",1)
# 从 Executor2 拉取:("Shanghai",1)
聚合计算:
Task A 执行
reduceByKey:1
2Beijing: 1 + 1 = 2
Shenzhen: 1Task B 执行:
1
Shanghai: 1 + 1 = 2
阶段 4: 结果收集
graph LR
M[TaskA结果] --> O[Driver]
N[TaskB结果] --> O
最终数据到达 Driver:
1 | [("Beijing",2), ("Shenzhen",1), ("Shanghai",2)] |
Shuffle 数据流全景
flowchart LR
subgraph Stage 0
A[Executor1] -->|Shuffle Write| D[磁盘文件]
B[Executor2] -->|Shuffle Write| E[磁盘文件]
end
subgraph Stage 1
D -->|Part0数据| F[TaskA]
E -->|Part0数据| F
D -->|Part1数据| G[TaskB]
E -->|Part1数据| G
end
F --> H[最终结果]
G --> H
为什么需要 Shuffle?因为有些操作必须要将数据进行重新分区才好进行计算、统计。
| 操作类型 | 数据移动 | 网络开销 | 典型操作 |
|---|---|---|---|
| 窄依赖 | 数据不动 Task移动 | 低 | map, filter |
| 宽依赖 | 数据按Key重组 跨节点传输 | 高 | groupByKey, reduceByKey |
通过这个完整示例,就可以看到:
- 分区如何从物理存储映射到计算单元
- 窄依赖如何实现零数据传输的流水线
- 宽依赖如何通过Shuffle重组数据
- Stage划分如何驱动分布式计算
这正是 RDD 抽象的核心价值——通过清晰的阶段划分和依赖关系,在分布式环境中高效执行复杂计算。不需要拘束于通过概念理解 RDD。
5. 核心问题解答
Q1:分区数据存储在哪里?
- 原始数据: 物理存储在 HDFS DataNode(如
Node1,Node2,Node3,Node4)。 - RDD 处理时:
- 计算中:数据加载到 Executor 的 JVM 堆内存。
- 持久化后:可缓存到 Executor 内存/磁盘(通过
rdd.persist())。
Q2:会多获取数据吗?
绝对不会! 每个 Task 只读取自己分区映射的 HDFS Block 范围。
优化机制:
机制 作用 位置感知调度 Task 优先在存有数据的节点执行 HDFS 块定位 Executor 直接读取本地或邻近节点的数据块 精确的字节范围读取 每个 Task 只读取分配给它的连续字节区间
6. 总结:
sequenceDiagram
participant Driver
participant Executor
participant HDFS
participant LocalDisk
participant OtherExecutor
Note over Driver: 初始化阶段
Driver->>Driver: 1. 创建SparkContext
Driver->>Driver: 2. 构建RDD依赖图 (DAG)
Driver->>Driver: 3. 划分Stage (根据Shuffle边界)
Driver->>Driver: 4. 计算分区/任务列表(构建DAG)
Note over Driver,Executor: Stage 0 执行 (ShuffleMapStage)
Driver->>Executor: 5. 发送Stage0任务(含分区计算逻辑)
Executor->>HDFS: 6. 读取指定数据块
Executor->>Executor: 7. 执行流水线操作:<br/>- Map<br/>- Filter<br/>- Partitioning
Executor->>Executor: 8. 将结果写入内存缓冲区
Executor->>LocalDisk: 9. Shuffle Write:<br/>- 排序<br/>- 按分区写入本地磁盘文件
Executor->>Driver: 10. 汇报状态和Shuffle元数据
Note over Driver,Executor: Stage 1 执行 (ResultStage)
Driver->>Executor: 11. 发送Stage1任务(含聚合逻辑)
Executor->>OtherExecutor: 12. Shuffle Read:<br/>- 根据元数据拉取对应分区的数据
Executor->>Executor: 13. 合并数据(可能溢写到磁盘)
Executor->>Executor: 14. 执行聚合操作:<br/>- reduceByKey<br/>- combine
Executor->>Driver: 15. 返回最终结果分区数据
Driver->>Driver: 16. 汇总所有分区的结果
关键原则:
移动计算,而非数据:将 Task 发送到数据所在的节点。简而言之就是让计算尽量贴近数据。我们可能习惯于将数据传输到计算端进行处理,比如查询mysql、es、mongo。但是对于大数据处理来说,计算逻辑好移动,而数据难移动。
分而治之:大文件被划分成小分区,并行处理。这其实就是类似于后端常用的多线程,大文件划区之后,并行处理,那么整体速度就得到极大提升。
精准映射:RDD 分区与 HDFS Block 一一对应,无重复读取。不光HDFS天然分块,其实很多分布式文件系统都是有分块机制的,比如S3。对于不天然支持分块的数据源,比如mysql 是可以人工分区的。
1
2
3
4
5
6
7
8
9
10// 并行读取 MySQL (按照分区键进行划分区,此处就是1-1000000是分区0,1000001-2000000是分区1 以此类推...)
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://host:3306/db")
.option("dbtable", "orders")
.option("partitionColumn", "order_id") // 分区键
.option("lowerBound", 1) // 最小值
.option("upperBound", 1000000) // 最大值
.option("numPartitions", 10) // 分区数
.load()
通过这种设计,Spark 能高效处理远大于内存的分布式文件,而开发者只需写几行代码。










