0%

Spark 架构进阶

Spark 架构进阶

一些重要的源码类
DAGScheduler.scala
EventLoop.scala

spark DAG引擎

spark 优势

  1. 减少磁盘IO
  2. 增加并行度
    mr:任务是进程级别
    spark:线程级别
  3. 避免重复计算
    中间结果进行持久化到内存或磁盘,mr 只能去写到磁盘
  4. 可选的shuffle和排序
    mr如果有reduce阶段,一定会按照key排序,但是业务其实不一定需要key有序
    spark提供多种可选的shuffle方案
  5. 灵活的内存管理策略
    堆内内存onheap 堆外内存offheap
    存储内存storagememory 执行内存executionmemory

架构:主从架构 master、worker

两种基本使用方式:
spark-shell 交互式编程
spark-submit 提交任务

spark application
driver 类似mrappmaster 驱动程序,管理application中task运行
executor 类似mapTask reduceTask 真正运行task
都是线程级别

spark-submit
–supervise # driver 有可能会宕机,通过这个参数可以自动恢复driver
注意:

  1. 不能使用 kill -9 方式来杀掉driver,因为杀了之后会自动再重启
  2. 需要通过 yarn -kill jobId 的方式来杀掉进程

spark 程序套路

  1. 获取编程入口
    环境对象,链接对象
  2. 通过编程入口获取数据抽象
    source对象
  3. 针对数据抽象对象进行各种计算
    action transformation
  4. 提交任务运行
    submit
  5. 输出结果
    sink
  6. 回收资源
    stop close

使用scala编写的时候:链式调用
sparkContext.textFile(path).flatmap(func)…
代码规范:可读性、可维护性、可扩展性
rdd1 = sparkContext.textFile(path)
rdd2 = rdd1.flatmap()

使用java编写的时候
sparkContext.textFile(path).flatmap(new FlatMapFunction(){

})

核心功能

  1. sparkContext
    编程入口
    DAG引擎
  • DAGScheduler 帮助我们把一个spark app构建成一个DAG有向无环图,同时也切分stage
  • TaskScheduler 接受一个stage转变成一个TaskSet去提交执行

通信组件 Backend

  • SchedulerBackend => driver
  • ExecutorBackend => executor

    app分driver和executor,就是两个backend通信

  1. 存储体系
    BlockManager
    spark app 在 hdfs 读取、写入数据,都是通过 BlockManager 来实现

  2. 计算引擎
    DAG

  3. 部署模式
    local
    standalone
    使用spark自带的资源管理系统:spark集群 master worker
    yarn
    driver+executor 不需要启动spark集群,只要有spark-submit客户端就行

应用模块

  1. sparkcore
  2. sparksql
  3. sparkstreaming
  4. spark graphx
  5. spark mllib
  6. sparkR
  7. pyspark
  8. structure streaming

基本架构

driver 和 executor 是进程,executor 会启动一个线程池,每个线程就是一个 Task
client 提交任务,master 根据 deploy-mode 是 client\cluster 来决定 driver 在哪里(前者在 client 端,后者会找一个结点去创建 driver);之后 master 向 yarn 的 resourcemanager 申请资源,executor 中的 task 会在 container 中执行,并保持和 driver 的通信
同一个 executor 里面的 task 是相同的;不同的 executor 里面的 task 可能不相同 => 有一部分 executor 执行 stage1, 有的执行 stage2 等等,执行同一个 stage 的 executor 工作方式是一样的,只是输入的数据不一样,所以里面的 task 是一样的,就是负责的功能是一样的

rdd
分布式集合(由分散在多个不同worker节点上的多个partition组成)

一个stage中有多少个task?由当前stage最后一个rdd的分区个数来决定

数据倾斜最根本的原因:shuffle不均匀。比如reducejoin换成mapjoin?

spark数据倾斜解决思路:代码 和 数据 两个方面

  1. 代码方面
    spark产生数据倾斜,只要去找会产生shuffle的算子即可,比如 distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等
  2. 数据方面
    有可能本身数据源里的数据就分布不均匀,那么可以通过采样的方式来看数据的分布情况
    离线处理:无放回采样 流式处理:鱼塘采样

reduceByKey/aggregateByKey 替代 groupByKey => 前两者在shuffle之前有预聚合
mapPartitions 替代 map => 前者是针对rdd的每个分区来做操作,后者是针对每一个元素,前者更高效
foreachPartitions 替代 foreach
filter 之后进行 coalesce 操作 => rdd的数据总量 = 分区个数 * 分区的平均数据量 => 在 filter 执行后rdd分区的平均数据量变少了,通过 coalesce 操作,将多个分区合并成一个分区,经过过滤数据量变化越大越需要这个操作
repartitionAndSortwithinParititon 替代 repartition 与 sort 类操作 => 原有分区不符合需求,先 repartition 然后每个分区 sort 排序;通过 repartitionAndSortwithinParititon 可以在分区的同时进行排序

MapReduce 的 shuffle:

  1. 如果有 reduce 阶段就需要进行 shuffle
  2. 如果有 shuffle,就一定会按照 key 排序
    原因
  3. 如果一个 reduceTask 执行计算的输入数据是无序的,则每个 reduceTask 进行分组聚合的时候,需要多次扫描输入文件
  4. reduceTask 期望输入数据是有序的。按照顺序扫描文件一次,就可以完成分组聚合。

    事实上业务并不一定需要排序,所以spark做了优化,有排序的shuffle也有不用排序的shuffle

SparkContext 初始化
重点关注对象

  1. TaskScheduler 实现类:TaskSchedulerImpl
  2. DAGScheduler
  3. SchedulerBackEnd
    两个成员变量
    • 负责和 worker 通信:DriverEndpoint
    • 负责和 master 通信:StandaloneAppClient => clientEndpoint