spark将数据写入text文件(Spark技巧之读取输入数据的文件名和目录)

在平时使用spark开发中有时我们会有这样一个需求,在Spark读取数据时我们还需要获取到数据的文件名和父目录,比如目录中含有某个关键字数据使用\t分割,否则使用逗号分割等,今天小编就来聊一聊关于spark将数据写入text文件?接下来我们就一起去研究一下吧!

spark将数据写入text文件(Spark技巧之读取输入数据的文件名和目录)

spark将数据写入text文件

在平时使用spark开发中有时我们会有这样一个需求,在Spark读取数据时我们还需要获取到数据的文件名和父目录,比如目录中含有某个关键字数据使用\t分割,否则使用逗号分割等。

下面就介绍一下实现方式:

object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass().getSimpleName().filter(!_.equals('$'))) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array( classOf[Array[String]], classOf[util.LinkedHashMap[String, Integer]], classOf[DoubleWritable], classOf[LongWritable], classOf[IntWritable], classOf[mutable.HashMap[Long, Set[(Int, Int, Long, String)]]] )) val sc = new SparkContext(conf) val input = "D:\\test\\dirs\\*.gz"//args(0) val fileRDD = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](input) val hadooprdd = fileRDD.asInstanceOf[NewHadoopRDD[LongWritable, Text]] val rdd = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, iter) =>{ val fileSplit = inputSplit.asInstanceOf[FileSplit] iter.map(x =>{ fileSplit.getPath.toString "|" fileSplit.getPath.getName "|" x._2.toString }) }) rdd.foreach(println) } }

我们使用SparkContext的newAPIHadoopFile方法,并制定key和value的类型,并将返回的rdd类型强制为NewHadoopRDD,接着调用mapPartitionsWithInputSplit方法,该方法有两个参数,第一个参数inputSplit就是分片信息,iter是迭代器,代表了数据。

输出:

file:/D:/test/dirs/1.txt.gz|1.txt.gz|1 file:/D:/test/dirs/1.txt.gz|1.txt.gz|11 file:/D:/test/dirs/1.txt.gz|1.txt.gz|111 file:/D:/test/dirs/2.txt.gz|2.txt.gz|2 file:/D:/test/dirs/2.txt.gz|2.txt.gz|22

感兴趣的朋友可以点个赞,加个关注,共同学习进步!

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页