spark含有的数据分析算法(九Spark之图解aggregateByKey算子)

aggregateByKey(initValue)(seqQp, combineOp [,numberTask])

aggregateByKey(initValue)(seqQp, combineOp [,numberTask]): 是一个shuffle类的算子,与reduceByKey类似,只是它在每个分区seqOp后要应用一次初始值。

代码示例:

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object AggregateByKeyOp { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName(AggregateByKeyOp.getClass().getSimpleName) val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val rdd1: RDD[(String, Int)] = sc.parallelize(Array( ("刘备", 1), ("曹操", 2), ("刘备", 3), ("曹操", 4), ("刘备", 100), ("曹操", 200), ("刘备", 300), ("曹操", 400) ), 2) val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex((index, iter) => { iter.toList.map(x => "【分区号为:" index ", 值为:" x "】").iterator }) rdd2.foreach(println) // 【分区号为:0, 值为:(刘备,1)】 // 【分区号为:0, 值为:(曹操,2)】 // 【分区号为:0, 值为:(刘备,3)】 // 【分区号为:0, 值为:(曹操,4)】 // 【分区号为:1, 值为:(刘备,100)】 // 【分区号为:1, 值为:(曹操,200)】 // 【分区号为:1, 值为:(刘备,300)】 // 【分区号为:1, 值为:(曹操,400)】 val rdd3 = rdd1.aggregateByKey(1000)(_ _, _ _) rdd3.mapPartitionsWithIndex((index, iter) => { iter.toList.map(x => "【分区号为:" index ", 值为:" x "】").iterator }).foreach(println) // 【分区号为:0, 值为:(曹操,2606)】 // 【分区号为:1, 值为:(刘备,2404)】 rdd3.foreach(println) // (曹操,2606) // (刘备,2404) sc.stop() } }

图解aggregateByKey

spark含有的数据分析算法(九Spark之图解aggregateByKey算子)(1)

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页