学习笔记spark运行架构概述(疯狂Spark之Spark资源调度和任务调度)
资源调度源码分析
资源请求简单图
资源调度Master路径:
路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/Master.scala
提交应用程序,submit的路径:
路径:spark-1.6.0/core/src/main/scala/org.apache.spark/ deploy/SparkSubmit.scala
总结:
- Executor在集群中分散启动,有利于task计算的数据本地化。
- 默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
- 如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项。
- 默认情况下没有设置--total-executor-cores,一个Application会使用Spark集群中所有的cores。
结论演示
使用Spark-submit提交任务演示。也可以使用spark-shell
- 默认情况每个worker为当前的Application启动一个Executor,这个Executor使用集群中所有的cores和1G内存。
./spark-submit --master spark://node01:7077
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000
运行结果
2.在workr上启动多个Executor,设置--executor-cores参数指定每个executor使用的core数量。
./spark-submit --master spark://node01:7077 --executor-cores 1
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000
运行结果
3.内存不足的情况下启动core的情况。Spark启动是不仅看core配置参数,也要看配置的core的内存是否够用。
./spark-submit --master spark://node01:7077 --executor-cores 1 --executor-memory 3g
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000
4.--total-executor-cores集群中共使用多少cores
注意:一个进程不能让集群多个节点共同启动。
./spark-submit --master spark://node01:7077 --executor-cores 1 --executor-memory 2g
--total-executor-cores 3
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000
任务调度源码分析
Action算子开始分析
任务调度可以从一个Action类算子开始。因为Action类算子会触发一个job的执行。
划分stage,以taskSet形式提交任务
DAGScheduler 类中getMessingParentStages()方法是切割job划分stage。可以结合以下这张图来分析:
二次排序
在项目中添加一个SecondSort.txt文件
排序前文件中内容
编写代码
package com.gw.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
object SparkSecondSort {
def main(args: Array[String]): Unit = {
val sconf=new SparkConf().setAppName("SecondSort").setMaster("local")
val sc=new SparkContext(sconf)
val lines=sc.textFile("secondSort.txt")
val pairs= lines.map { x=>(new SecondSortKey(x.split(" ")(0).toInt,x.split(" ")(1).toInt),x) }
val sortedPairs= pairs.sortByKey(false)
// val sortedPairs = pairs.sortBy(_._1, false)
sortedPairs.map(_._2).foreach {println }
sc.stop()
}
}
class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable {
def compare(that: SecondSortKey): Int = {
if(this.first-that.first==0)
this.second- that.second
else
this.first-that.first
}
}
运行效果
topN和分组取topN
topN
需求:获取成绩单中,成绩排在前五的学生信息
在项目中添加一个top.txt文件,在文件中以K,V(K表示成绩,V表示姓名)对的形式添加数据,如下图:
编写代码
package com.gw.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object SparkTopN {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TopN").setMaster("local")
val sc = new SparkContext(conf)
val lines=sc.textFile("top.txt")
val lineList=lines.map(x=>(x.split(",")(0),x))
val sortRdd = lineList.sortByKey(false)
val resultRDD = sortRdd.map(x=>x._2)
for(a <-resultRDD.take(5)){
println(a)
}
sc.stop()
}
}
运行结果
分组取topN
需求:给每个班级的学生成绩排序
在项目中添加一个文件scores.txt,在文件中编写K,V(K表示班级,V表示成绩)格式的数据,如下图
编写代码
package com.gw.scala
import org.apache.spark.{SparkContext, SparkConf}
object SparkGroupTopN {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GroupTopN").setMaster("local")
val sc = new SparkContext(conf)
val lines=sc.textFile("scores.txt")
val lineList=lines.map(x=>(x.split("\t")(0),x.split("\t")(1))).groupByKey()
val topList=lineList.map(x=>{
var t = List[Int]()
for(a<-x._2){
t = t.::(a.toInt)
}
println(x._1)
t.sortBy { x => -x }.take(3)
})
topList.foreach { println }
}
}
运行结果
,
免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com