rdf知识图谱大全(RDD弹性特性)

rdd作为弹性分布式数据集,弹性具体体现在,我来为大家科普一下关于rdf知识图谱大全?以下内容希望对你有帮助!

rdf知识图谱大全(RDD弹性特性)

rdf知识图谱大全

rdd作为弹性分布式数据集,弹性具体体现在

自动进行内存和磁盘数据存储的切换

spark会优先把数据放到内存中,如果内存放不下,会放到磁盘里面。当应用程序内存不足时,Spark应用程序将数据自动从内存存储切换到磁盘存储,以保证其高效运行。

基于Lineage(血统)的高效容错机制

Lineage是基于Spark RDD的依赖关系来完成的,每个操作只关联其父操作,各个分片的数据之间互不影响,出现错误只需要恢复单个Split的特定部分即可。

常规容错有两种方式:

1、数据检查点

2、记录数据的更新操作

Spark的RDD通过记录数据更新的方式进行容错,主要原因有:RDD是不可变的且Lazy;RDD的写操作是粗粒度的。但是RDD的读既可以是粗粒度的,也可以是细粒度的。

Task如果失败,会自动进行特定次数的重试

默认重试次数为4次。TaskSchedulerImpl是底层任务调度接口TaskScheduler的实现,这些Schedulers从每个Stage中的DAGSchedler中获取TaskSet,运行它们,尝试是否有故障。DAGschedler是高层调度,计算每个Job的Stage的DAG,然后提交Stage,用TaskSets的形式启动底层TaskScheduler调度在集群中运行。

Task默认重试次数,位于org.apache.spark.internal.config#MAX_TASK_FAILURES

private[spark] val MAX_TASK_FAILURES = ConfigBuilder("spark.task.maxFailures") .intConf .createWithDefault(4)

TaskSchedulerImpl源码,位于org.apache.spark.scheduler.TaskSchedulerImpl

private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker], isLocal: Boolean = false) extends TaskScheduler with Logging { ​ import TaskSchedulerImpl._ ​ def this(sc: SparkContext) = { this( sc, sc.conf.get(config.MAX_TASK_FAILURES), TaskSchedulerImpl.maybeCreateBlacklistTracker(sc)) }

Stage如果失败,会自动进行特定次数的重试

Stage对象可以跟踪多个StageInfo。默认重试次数为4次,且可以直接运行计算失败的阶段,只计算失败的数据分片。

Stage是Spark Job运行时具有相同逻辑功能和并行计算任务的一个基本单元。Stage中所有的任务都依赖同样的Shuffle,每个DAG任务通过DAGScheduler在Stage的边界处发生Shuffle形成Stage,然后DAGScheduler运行这些阶段的拓扑排序。

每个Stage都可能是ShuffleMapStage,如果是ShuffleMapStage,则跟踪每个输出节点上输出文件分区,任务结果会输入其他的Stage,或者输入一个ResultStage;如果是ResultStage,这个Stage的任务直接在这个RDD上运行计算这个Spark Action函数。

每个Stage会有firstJobId,度额定第一个提交Stage的Job,使用FIFO调度实,会使其前面的Job先计算或快速恢复。

ShuffleMapStage是DAG产生数据进行Shuffle的中间阶段,发生在每次Shuffle操作之前,可能包含多个Pipelined操作;ResultStage阶段捕获函数在RDD分区上运行Action算子计算结果。

Stage源码,位于org.apache.spark.scheduler.Stage

private[scheduler] abstract class Stage( val id: Int, val rdd: RDD[_], val numTasks: Int, val parents: List[Stage], val firstJobId: Int, val callSite: CallSite) extends Logging { ​ // partition的个数 val numPartitions = rdd.partitions.length ​ /** Set of jobs that this stage belongs to. */ /** 属于这个工作集的Stage */ val jobIds = new HashSet[Int] ​ /** The ID to use for the next new attempt for this stage. */ /** 用于此Stage的下一个新attempt的标识ID */ private var nextAttemptId: Int = 0 ​ val name: String = callSite.shortForm val details: String = callSite.longForm ​ /** * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized * here, before any attempts have actually been created, because the DAGScheduler uses this * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts * have been created). */ /** * 最新的[StageInfo] object指针,需要被初始化 * 任何attempts都是被创造出来的,因为DAGScheduler使用StageInfo * 告诉SparkListeners工作何时开始 */ private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId) ​ /** * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure. * We keep track of each attempt ID that has failed to avoid recording duplicate failures if * multiple tasks from the same stage attempt fail (SPARK-5945). */ /** * 设置stage attempy IDs 当失败是可以读取失败信息 * 跟踪这些失败,为了避免无休止地重复失败 * 跟踪每一次attempt,以便避免记录重复故障 * 如果从同一stage窗体间多任务失败 */ val fetchFailedAttemptIds = new HashSet[Int] ​ private[scheduler] def clearFailures() : Unit = { fetchFailedAttemptIds.clear() } ​ /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ /** 在stage中创建一个新的attempt */ def makeNewStageAttempt( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { val metrics = new TaskMetrics metrics.register(rdd.sparkContext) _latestInfo = StageInfo.fromStage( this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) nextAttemptId = 1 } ​ /** Returns the StageInfo for the most recent attempt for this stage. */ /** 放回当前stage中最新的StageInfo */ def latestInfo: StageInfo = _latestInfo ​ override final def hashCode(): Int = id ​ override final def equals(other: Any): Boolean = other match { case stage: Stage => stage != null && stage.id == id case _ => false } ​ /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */ /** 返回需要重新计算的分区标识的序列 */ def findMissingPartitions(): Seq[Int] }

在Stage终止前允许Stage连续尝试4次,位于org.apache.spark.scheduler.DAGScheduler#maxConsecutiveStageAttempts

/** 在终止之前允许的连续尝试次数 */ private[scheduler] val maxConsecutiveStageAttempts = sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) private[spark] object DAGScheduler { // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one // as more failure events come in /** * 在毫秒级别,等待读取失败事件后就停止;这是一个避免重新提交任务的简单方法,非读取任务的map中更多失败事件的到来 */ val RESUBMIT_TIMEOUT = 200 // Number of consecutive stage attempts allowed before a stage is aborted /** 终止之前允许连续尝试的次数 */ val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4 }

checkpoint和persist(检查点和持久化),可主动或被动触发

checkpoint是对RDD进行标记,会产生一些列的文件,且所有所有父依赖都会被删除,是整个依赖的重点。checkpoint是Lazy级别的。persist后RDD工作室每个工作节点都会把计算的分片结果保存在内存或者磁盘上,下一次对相同的RDD进行其他Action计算。就可以重用。

当RDD.iterator()被调用的时候,也就是计算该RDD中某个Partition的时候,会先去cacheManager获取一个blockId,然后去BlockManager里匹配Partition是否被checkpoint了。如果是,就不用计算该Partition,直接产品能够checkPoint中读取该Partition的所有records放入ArrayBuffer里面。如果没有被checkPoint过,将Partition计算出来,然后将其所有records放入到cache中。

总体来说,当RDD会被重复使用时,RDD需要cache。Spark自动监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据,如果想手动删除RDD,可以使用RDD.unpersist()方法。

RDD.iterator源码,位于org.apache.spark.rdd.RDD#iterator

final def iterator(split: Partition, context: TaskContext): Iterator[T] = { // 判断此RDD的持久户登记是为为NONE,不进行持久化 if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }

可以用不同的存储级别存储每一个被持久化的RDD。StorageLevel是控制存储RDD的标志,Spark的多个存储级别意味着在内存利用率和CPU利用率间的不同平衡。推荐通过下面的过程选择一个合适的存储级别

  • 如果RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是CPU利用率最高的选项,会使RDD上的操作尽可能地块。
  • 如果不适合用默认级别,就选择MEMOEY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,任然能够快速地访问。
  • 除非算子计算RDD话费较大或者需要过滤大量的数据,不要将RDD存储在磁盘上,否则重复计算一个分区,就会和从磁盘上读取数据一样慢。
  • 如果希望更快地恢复错误,可以利用replicated存储机制,所有的存储级别都可以通过replicated计算丢失的数据来支持完整的容错。另外,replicated的数据能在RDD上继续运行任务,而不需要重复计算丢失的数据。
  • 在拥有大量内存的环境中或者多应用程序的环境中,Off_Heap具有如下优势:Off_Heap运行多个执行者共享的Alluxio中的相同的内存池,限制减少GC,如果当个Executor崩溃,缓存的数据也不会丢失。

StorageLevel源码,位于org.apache.spark.storage.StorageLevel

val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

数据弹性调度,DAGScheduler、TaskScheduler和资源管理无关

Spark将执行模型丑行为通过的有向无环图(DAG),可以将多个Stage的任务串联或并行执行,从而不需要将Stage中间结果输出到HDFS上,当发生节点运行故障时,可有其他可用节点代替该故障节点运行。

数据分片的高度弹性。

Spark 进行数据分片时,默认将数据放在内存汇总,如果内存放不下,一部分会放在磁盘上保存。

RDD的coalesce算子源码,位于org.apache.spark.rdd.RDD#coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ /** 从随机分区开始,将元素均匀分布在输出分区上 */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed // 包括一个Shuffle步骤,使上游任务仍然是分布式的 new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { new CoalescedRDD(this, numPartitions, partitionCoalescer) } }

如果在计算过程中,产生很多的数据碎片,这是产生的Partition可能会非常小,如果一个Partition非常小,每次都会消耗一个线程取处理,这是可能降低它的处理效率。可以考虑把许多个小的Partition合并成一个较大的Partition处理,会提高效率。

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页