Spark 架构进阶
一些重要的源码类
DAGScheduler.scala
EventLoop.scala
spark DAG引擎
spark 优势
- 减少磁盘IO
- 增加并行度
mr:任务是进程级别
spark:线程级别 - 避免重复计算
中间结果进行持久化到内存或磁盘,mr 只能去写到磁盘 - 可选的shuffle和排序
mr如果有reduce阶段,一定会按照key排序,但是业务其实不一定需要key有序
spark提供多种可选的shuffle方案 - 灵活的内存管理策略
堆内内存onheap 堆外内存offheap
存储内存storagememory 执行内存executionmemory
架构:主从架构 master、worker
两种基本使用方式:
spark-shell 交互式编程
spark-submit 提交任务
spark application
driver 类似mrappmaster 驱动程序,管理application中task运行
executor 类似mapTask reduceTask 真正运行task
都是线程级别
spark-submit
–supervise # driver 有可能会宕机,通过这个参数可以自动恢复driver
注意:
- 不能使用 kill -9 方式来杀掉driver,因为杀了之后会自动再重启
- 需要通过 yarn -kill jobId 的方式来杀掉进程
spark 程序套路
- 获取编程入口
环境对象,链接对象 - 通过编程入口获取数据抽象
source对象 - 针对数据抽象对象进行各种计算
action transformation - 提交任务运行
submit - 输出结果
sink - 回收资源
stop close
使用scala编写的时候:链式调用
sparkContext.textFile(path).flatmap(func)…
代码规范:可读性、可维护性、可扩展性
rdd1 = sparkContext.textFile(path)
rdd2 = rdd1.flatmap()
…
使用java编写的时候
sparkContext.textFile(path).flatmap(new FlatMapFunction(){
…
})
核心功能
- sparkContext
编程入口
DAG引擎
- DAGScheduler 帮助我们把一个spark app构建成一个DAG有向无环图,同时也切分stage
- TaskScheduler 接受一个stage转变成一个TaskSet去提交执行
通信组件 Backend
- SchedulerBackend => driver
- ExecutorBackend => executor
app分driver和executor,就是两个backend通信
存储体系
BlockManager
spark app 在 hdfs 读取、写入数据,都是通过 BlockManager 来实现计算引擎
DAG部署模式
local
standalone
使用spark自带的资源管理系统:spark集群 master worker
yarn
driver+executor 不需要启动spark集群,只要有spark-submit客户端就行
应用模块
- sparkcore
- sparksql
- sparkstreaming
- spark graphx
- spark mllib
- sparkR
- pyspark
- structure streaming
基本架构
driver 和 executor 是进程,executor 会启动一个线程池,每个线程就是一个 Task
client 提交任务,master 根据 deploy-mode 是 client\cluster 来决定 driver 在哪里(前者在 client 端,后者会找一个结点去创建 driver);之后 master 向 yarn 的 resourcemanager 申请资源,executor 中的 task 会在 container 中执行,并保持和 driver 的通信
同一个 executor 里面的 task 是相同的;不同的 executor 里面的 task 可能不相同 => 有一部分 executor 执行 stage1, 有的执行 stage2 等等,执行同一个 stage 的 executor 工作方式是一样的,只是输入的数据不一样,所以里面的 task 是一样的,就是负责的功能是一样的
rdd
分布式集合(由分散在多个不同worker节点上的多个partition组成)
一个stage中有多少个task?由当前stage最后一个rdd的分区个数来决定
数据倾斜最根本的原因:shuffle不均匀。比如reducejoin换成mapjoin?
spark数据倾斜解决思路:代码 和 数据 两个方面
- 代码方面
spark产生数据倾斜,只要去找会产生shuffle的算子即可,比如 distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等 - 数据方面
有可能本身数据源里的数据就分布不均匀,那么可以通过采样的方式来看数据的分布情况
离线处理:无放回采样 流式处理:鱼塘采样
reduceByKey/aggregateByKey 替代 groupByKey => 前两者在shuffle之前有预聚合
mapPartitions 替代 map => 前者是针对rdd的每个分区来做操作,后者是针对每一个元素,前者更高效
foreachPartitions 替代 foreach
filter 之后进行 coalesce 操作 => rdd的数据总量 = 分区个数 * 分区的平均数据量 => 在 filter 执行后rdd分区的平均数据量变少了,通过 coalesce 操作,将多个分区合并成一个分区,经过过滤数据量变化越大越需要这个操作
repartitionAndSortwithinParititon 替代 repartition 与 sort 类操作 => 原有分区不符合需求,先 repartition 然后每个分区 sort 排序;通过 repartitionAndSortwithinParititon 可以在分区的同时进行排序
MapReduce 的 shuffle:
- 如果有 reduce 阶段就需要进行 shuffle
- 如果有 shuffle,就一定会按照 key 排序
原因 - 如果一个 reduceTask 执行计算的输入数据是无序的,则每个 reduceTask 进行分组聚合的时候,需要多次扫描输入文件
- reduceTask 期望输入数据是有序的。按照顺序扫描文件一次,就可以完成分组聚合。
事实上业务并不一定需要排序,所以spark做了优化,有排序的shuffle也有不用排序的shuffle
SparkContext 初始化
重点关注对象
- TaskScheduler 实现类:TaskSchedulerImpl
- DAGScheduler
- SchedulerBackEnd
两个成员变量- 负责和 worker 通信:DriverEndpoint
- 负责和 master 通信:StandaloneAppClient => clientEndpoint