史上最简单spark(老码农心得Spark2.0)
本文编辑整理自【微学堂】第二十六期活动实录。
嘉宾介绍:孟硕
擅长技术:基于Hadoop,spark分布式平台的数据分析,业务开发与模型搭建。参与公司电商行业、交通运输行业数据变现项目。现任职于云和恩墨教育产品部,负责大数据产品研发和交付。加入云和恩墨之前,曾在Oracle研究与开发(北京)中心供职5年,在Oracle解决方案中心担任 Oracle大数据一体机产品 Big Data Appliance(BDA)咨询顾问,期间接触大量一线大数据应用案例,同时取得国际商业大数据领导公司 Cloudera管理员、Spark开发认证,并经过选拔成为Cloudera ACTP认证讲师资质。在此之前曾任职于中科红旗(RedFlag-Linux)操作系统服务器组,负责操作系统安全、虚拟化产品自动化测试套件开发。
直播实录Spark 2.0 版本的技术路线分享:
大家好,今天我们讲的主要的主题就是这个spark,刚才发了一张图片,那一会儿呢我还会转发一次,你们大家这个太热情了刷屏已经把那张图片给淹没。希望大家尽量在讲的过程中有问题可以随时提问,为了大家能够更清楚地看到图片,就不要再刷屏了,谢谢大家的配合。
我发的第二张图片就是把那句话重点给你讲出来了,就是说我们把这个弹性分布式数据集,这种基于内存的分布式的架构实现出来一个产品,那么我们把这个产品的名字就叫spark。这就是我们这个论文第一段的重点。那spark和RDD的关系,就是一个是她背后的架构,那么实现出来一个产品就是spark,那这就是spark和RDD的之间的关系。
每一项新的技术产生都有它的一个背后的推动力。就像大数据基础设施Hadoop,它包含这么几个组件HDFS、MapReduce。它产生的背后的动力就是因为,第一家遇到大量数据存储和处理的公司就是google,那么他面临大量数据处理首先就是要存储它,那么HDFS,接下来就是要处理这么大量的数据,他发明了一个分布式计算框架,叫MapReduce。这是被业务推动而产生的两项技术。
那么spark它是基于什么样的业务需求,这么迫切才产生了这样一个比较神奇的组件。如果你了解大数据,但是不知道spark这个组件的话,那就感觉好像没有深入了解过大数据一样。那么接下来这张图就是讲一下整个spark这个组件被发明或者是被设计出来的一个原动力。
很抱歉开篇就抛出来大段大段的英文。这个主要就是为了让大家了解原汁原味的,各种技术的起源,而不是去读一些被人翻译过来的,不能完整表达出原文意思的那些文章。那我也推荐大家去读一些英文的文档因为大数据毕竟是一个更新迭代速度特别快的一个领域。那如果想掌握最新最一手的技术,还是绕不开要读一些这个英文的文档。这段英文来自于spark的这个最基础的一篇论文。大家可以看一看他没有代码,只是详细描述了他的产生和他的设计。最后还有一些评测,大家确实是值得一读。
我们可以看出整个spark的产生是直击之前的传统的基于分布式的计算框架MapReduce的一些缺点而设计的。那我们看看究竟是哪里的缺点,红色框出来的部分大家可以稍微的读一读。第一个就是数据重用在许多迭代的机器学习计算里边或者是图像计算里面,这里边他举出来了两个机器学习经典的例子一个是PageRank,还有K-means还有逻辑回归这些算法,那么另外一个呢就是要交互式的数据挖掘。那这些都是要求你的计算框架反应速度特别快,这恰恰就是MapReduce的弱项。
所以就有了刚才我发的第一张图,那现在我们说spark会取代Hadoop。其实本质的意义上来说spark和Hadoop并不是一个层级上的概念。我们知道Hadoop,如果狭义的理解,他是一个软件,这个软件里面包含三个组件。一个是解决大数据分布式存储的HDFS,另外一个就是分布式计算的MapReduce,还有一个资源调度器,是在Hadoop版本二里边从MapReduce分离出来的,专门用作资源管理的这么一个组件。那spark他取代的部分只是这个分布式计算,负责计算的这个组件就是MapReduce,而不是其他的HDFS或者是YARN。
这张图是经常拿来做对比的。也就是我们spark和Hadoop里边的MapReduce,这里边他写的还是很严谨。spark对比的不是整个Hadoop,而是Hadoop里边的计算框架MapReduce,他们俩执行的时间、执行效率这么一个对比。相同的任务spark要比MapReduce要快这么多。那接下来我们就可以来说一说为什么而会比MapReduce快这么多。
了解过MapReduce的同学可能知道MapReduce他也是来自于google的那篇论文MapReduce,一个基于分布式架构的计算控件。那他的名字的由来就是来自于函数式编程的两个函数,一个是Map,一个是Reduce。那他之所以执行这么慢就是因为他这种简单,虽然强大但是他的逻辑流程比较简单,它的过程只分一个Mapper和一个Reducer。那么程序员只能够定制化他的Mapper函数或者是Reducer函数。那他主要是慢在它的Mapper产生的中间结果要传到Reducer之前是要落在本地磁盘上的。
这张图就描述了MapReduce也就是我们之前传统的分布式计算框架,他的执行的一个流程。那我们看看他要处理一个文件也就是input file,他会根据文件格式的不同,比如说我们要处理一个文本文件,那他就是TEXT Format,也就是文本文件的格式。如果你这个文件格式在HDFS也就是分布式存储上分为三个块,那么他就会分别在三台不同的机器,前提是你这个三个块是分布在三台机上。那我们新型分布式计算架构是要让计算去寻找数据,所以他就会在这三台机上分别启动MapReduce的Mapper程序。那我们看看接下来他这几个流程。
首先是Record Reader,她会把读取你blog里面的内容之后交给Mapper函数,Mapper函数是我们用户自定义函数,你可以对数据做一个任何的处理,最后把数据交给Partitioner把数据分类。那么相同的数据交给Reduce去处理,这就是最后Reduce根据指令做一个汇总或者聚合的一些操作,最后把计算结果存储HDFS上或者是你指定的路径下。那整个她的这个过程只能跑一次,也就是一次Mapper,然后中间Shuffle and Sort,最后是Reduce,不能进行迭代,如果要进行迭代,必须依赖第三方的组件去操作。
所以总结起来说MapReduce中间结果罗盘,最后还要落盘。如果你要反复的迭代依赖于第三方的调度,那反反复复迭代次数越多他落盘的次数越多,那这样的话就造成它的速度严重的缓慢。我们说计算机如果按照资源来分,他有这么几个资源,一个是存储资源就是磁盘,计算资源就是CPU还有内存。这三块,磁盘、内存和CPU,说起来他们都是有存储的。那我们CPU里有cache,memory就是一个速度比较快的存储,那磁盘更不用说他就是永久保存数据的,如果把数量级提升到秒,那我们的CPU开始存储一个数据,如果用了一秒钟的话,那我们内存是一个什么样的数量级呢?
内存的数量级,存储数据的数量级,同样的内容如果CPU cache用了一秒钟,内存就会用十几秒的时间,那我们磁盘可能就需要一个月这样子长的一个数量级。所以说磁盘IO,在大数据或者是任何一个我们不管是传统的关系数据库的应用,都是一个尽量避免的操作,就是要避免磁盘IO。
所以说我们回过头再看spark产生的这个原动力这张图,我们就能更清晰地了解到,为什么这个spark会取代MapReduce。他确实指向的就是MapReduce一个痛点,那第一个就是数据重用。因为你需要把一部分数据集反复地对他进行计算,如果要MapReduce有什么他不会把它存在内存里而是磁盘上,反复的读取大量的磁盘IO就会造成他读写的缓慢。那这正是一些机器学习的算法经常用的一些迭代,让他迭代次数越多他可能得出来的值就越准。另外一个交互式呢。MapReduce整个的作业是一个面向批处理,而不是交互式。
所以这个spark她就是一个基于内存的弹性分布式数据集。而且它的计算的逻辑不是分为一个Mapper阶段一Reducer阶段,是能够根据你的数据经过不同的操作步骤,这样比MapReduce更灵活。
所以总结起来说,spark因为是基于内存的分布式计算架构,他要比MapReduce基于磁盘的分布式计算的架构要快很多。那我们刚才两个计算框架做对比的那张图,其实不是特别严谨。为什么呢,大家可能也想到了,因为第一次读取数据就是你在启动MapReduce或者spark程序的时候。在第一次去执行计算都是从HDFS或者是你的磁盘上去读取数据。那处理一次他的这个执行的速度可能还没有快那么多。那么spark他的一个强项,就是在迭代。因为他把数据放在内存里面,所以迭代次数越多,他就越快,因为每一轮他要比磁盘快很多
大家可以看这张图,这张图来自于我刚才提到的那篇论文。这个就做的比较严谨,它是利用了两个算法,一个是Logistic Regression还有一个是K-mean,就是逻辑回归和这个分类的算法。咱们先说横轴表示三个产品一个是Hadoop里边的MapReduce中间那个咱们就不要管了,已经被淘汰了你看那么慢,那肯定要被淘汰。最后一个就是我们的spark。那竖轴就是代表的执行时间。一个逻辑回归的任务,当然是相同的,灰色柱状图表示第一次迭代,那么黑色柱状图表示后续的迭代。
我们可以看看第一次迭代,相同的任务Hadoop用了80,spark用了46,并没有传说中的快一百倍。那相反后续迭代。他的spark优势才逐渐显现,那我们Hadoop是用了76。这个可以理解因为他每次都落盘,那他的这个速度的提升不会那么明显。之所以比他慢了一些可能是因为做了HDFS的cache。那我们看spark他的这个速度,他所用的时间只用了三秒。那这就是后续迭代比它快的地方。所以我们今后再说,如果要是spark比Hadoop的哪儿快一百倍,这样的说法是不严谨的。那最严谨的说法是首次迭代还是后续迭。
再有什么问题也可以随时提出来,然后我们可以一起讨论。
说完spark和传统的分布式计算框架MapReduce对比之后。那我们就来深入的理解一下spark。因为他的后台,就是产品背后的设计架构是基于RDD的,所以说我们就来先来了解一下RDD这个弹性分布式数据集。那我们首先说弹性,弹性他给的解释就是如果数据在内存中丢失,那么他能够被重建。这个是基于spark的一个原理,那他本质上还是从HDFS上去读取数据。那如果把数据缓存到内存中,这部分数据如果因为单点故障或者是机器的一些故障原因丢失了会怎么办?
他的做法和其他的基于内存的分布式计算框架。这里我要解释一点,就是基于内存的分布式计算这种产品也不是只有死spark一家,那他跟其他的分布式内存计算框架相比,这个spark他是一个粗粒度的,就是说他不会把你的数据缓存到内存里做一个多份拷贝保证它的安全性。他完全不在乎你这个数据丢失或者gvm kill掉怎么办,那为什么说到gvm,一会我们再讨论这个。他不管你这个数据,突然丢掉了怎么办,因为他会把他的计算的过程给保留下来。
什么意思呢?就是说他还是依靠,因为你的数据是从底层HDFS东西过来或者是其他分布式文件系统。那既然你原始的数据还在那么我只要记录出来我的整个计算的过程,或者计算的步骤,那我就能够了。大不了就把数据从磁盘上在录出来或者从库里再把它拽出来,重算一遍就好了。所以说这个RDD,他虽然是一个基于内存的一小块儿存储数据的一个数据结构,它本质上还是要从磁盘上去读取文件,通过一系列的转换过程,然后才到你丢失的那个点,他是通过这种方式去保障他的数据。
所以说我们把文件从磁盘放到内存里,这部分内存的这个结构,我们就叫他RDD。有同学可能会问说既然你在HDFS上,也就是说分布式文件系统上的这个文件都是在不同机器上存的,那你这些内存他彼此看不见,他都叫RDD吗?那我这可以详细的说一说,每一台机器上那小块内存,比如说咱们打个比方,你有一个文件是西游记,那前三十章假如说前三分之一的内容存在A节点。中间三十章存在B节点,后三分之一的部分,假如说是取到真经是放在C节点。
接下来你写了一个spark程序要处理这个西游记,你要统计一下这个吴承恩他最喜欢用哪些单词,那这种情况就需要把ABC三台节点的西游记上中下部分都给提出来。那每一个机器上分布的一块儿内容录到内存里边都叫RDD一个分区。ABC三个节点所有的分区加起来。就叫做一个RDD,那这就是一整个RDD。
张图所描述的是创建RDD的三种方式。第一个就是从文件或者是从一堆文件,第二个是从内存,第三个是从其他RDD,那从文件我们刚才已经举了一个例子了。那从内存呢,比如说你的数据已经被缓存到内存里,或者是从这个消息中间件,当然也是内存的数据传过来的,那或者是你做了一个转换,比如说你把西游记上中下录到内存里之后,你把它标点符号全都去掉了,那这就是一个转换。也就是说你转换过来的那个RDD就是从其他最原始的那个RDD创建过来的.
这就是我们在是启动spark里面创建一个RDD的过程。那我这里边呢,也正好有一台虚拟机,我可以给大家看一看启动spark他究竟是一个什么样子。然后呢我们先来对spark的版本做一个介绍。spark从最初的版本到现在最新是2.1的版本,那之前最常用的版本是2.6。那spark他其实从1点几之后到2点几这个版本他是有一个自己的发展的里程碑的。的每一步发展,也是有前期规划,比如说他从1.3版本之后就陆续的加入鲜新的功能来保障spark在整个分布式计算的领域一统江湖的地位。
这个就是我在虚拟机上刚刚敲了一个pyspark,也就是用python去运行spark的一个命令。我们当前刚装了一个最新版本的2.1.0。再解释一下pythonspark这个概念,这个到终端的启动命令是pyspark,他如果从代码上来说,按代码行数来说,他七十五行数的开发语言是Scala语言,那百分之十五是java语言,剩下百分之十是R语言和python语言。那就是说她支持你用四种开发语言,而且它还支持你用交互式的这种方式去开发程序。就跟Python是一样。那我现在进入的就是用python语言去交互式的了解spark。
举个例子,就是分别用python和Scala这两种终端,交互式的去进入跟spark这个组件去编写一些程序。那这个大家如果有兴趣的可以去找个spark然后只要解压出来就可以运行。前提是你安装了java的GDK,那之前我们在了解spark运行的时候说跟建伟GVM有关系的,因为这个Scala的开发语言的跟java很像。整个spark程序也是运行在java虚拟机之上的。那所以说我们这个SPARK和JAVA语言还是有渊源的,也是Scala语言和java也是有一定渊源。这就涉及到整个这个Scala语言的一些特性我们可以稍微再说一说。
接下来说函数式编程语言在spark的应用。那我们知道函数式编程语言其实发明了很早,早在上个世纪四几年五几年的时候就有了各种各样的函数式编程语言。那为什么在大数据时代才又从重新回到了咱们这个整个技术圈儿里最火热的领域里边,那就是因为函数式编程语言里面有很多比较好的特性,特别适用在分布式程序开发里面,比如说我们的函数式编程语言里边是没有变量的。那他怎么表示a=1,我要把a加上1变成a等于二呢,那就是你再新定义一个变量让a2等于a1,a加上一。
那我们程序需要记录的就是你的a2是通过a这个变量加上一得到。那我们这个a2和a他俩就有一定的血缘关系。那所谓的这种血缘关系就是继承关系。这在我们大数据里面或者是在数据这个领域里边就叫数据沿袭,这个概念如果你要是阅读大数据的一些文章也是经常能够看到。
python和Scala还有java在版本8之后都是支持这种函数式编程的写法以及这个所谓的匿名函数这种写法。因为在函数式编程语言里面函数是一等公民,大家可以回去了解一下这个函数式编程语言,匿名函数这些概念。这些概念都是对你在分布式计算框架上写程序都是很有帮助的。那就拿java来讲,我们传统的那些编程的方式我们管它叫指令式编程。那现在的这种函数式编程和指令式编程有什么差别呢?我们可以看下一张图。
这张图虽然讲的是这个spark和 MapReduce程序之间他们俩的简练程度,spark程序其实是一个用python的函数式编程的写法去写一个所谓词频统计的那个程序,最后把它存到我output这个文件夹里面。下边整段代码实现的是同样一个功能。那上面的完全采用是函数式编程的写法,大家看每一个函数后面括号里面第一个是lambda,是匿名函数的这么一个代名词。那这样写的其实还是给面子的,我们看每一个点儿,下边这些行有缩进然后还每个点儿,其实他是可以被写在一行。那下面用java传统的语言是这么多行。
今天因为时间关系,咱就先不继续深入的去讨论这些,大家可以先了解一下这些概念深入去学习。那也可以来恩墨学院和我们进行交流。而spark里面涉及的一些技术,大家可以回去再看一看,我先把这些名词给大家倒腾一遍。那后来有spark的RDD,是SPARK后台的一个数据结构,那有RDD之后他会做用一系列函数对这个RDD做一个转换。
后边有streaming,就是Dstream,就是他的一个在流逝计算里边他的一个分布式的内存的一个架构。那我接下来主要讲的是在1.6版本以后新加入的一个功能。那这个功能就是spark SQL。那么先来看一看这个spark SQL,他也一样是有论文去支撑的,这个是spark 最重要的一个转变。
Spark SQL和Spark GraphX还有SparkStreaming还有Spark MLLIB被他们并称为spark 的四个组件,spark 的四大金刚。那我要说的就是这个是spark SQL。他的出现的意义为什么这么重大呢?那我们可以先来了解一下我们比较熟的一个概念就是这个SQL。我们哪像还是我们那个思路,每一项新的技术推出来他都是有背后的源动力的。那我们看看这个spark SQL,它的推出的原动力是什么?
我还是要贴出来一段英文,这个就是spark SQL出现的产生的背景的这么一段话,也来自最上面我贴出来的那篇论文。那么来看一看这段话,就是说我们在传统的应用里边SQL的朋友圈儿是很大的。那基本上我们传统的用就是DD加上SQL,那中间再加一个中间件所谓的三层架构。那既然我们之前SQL他代表的是对结构化数据的一个处理,那传统关系型数据库是处理这种数据的最主要的一个应用。那他就在大数据时代就有两个缺点或者是两个不足的地方。
不足的地方在我们这里边分别写了,那么第一是用户想要执行一些ETL,他会从不同的数据源去来搜集一些半结构化或者非结构化说白了就是大量的日志。那需要我们用户自定义编码,说白了就是你要写程序。那第二用户想要去执行一些高级的分析,比如说机器学习或者图像处理。那这些都是传统结构化语言或者SQL或者是这个关系型数据库他的能量所不及的地方。就是传统的这个处理结果话数据,那它这也有短板,比如说处理这些日志他就显得力不从心,因为一些机器学习算法他也打不到。
那你有的这些不就是恰恰spark这个组件他所擅长的吗,因为我既有这个机器学习的库又能够让用户提供编码。那既然你spark有这么强大那你直接用spark不就好了吗?但是呢我们别忘了这个传统的关系型数据库对结构化处理数据处理的能力依然是让人很羡慕。那如果spark要是把这个能力吸收进来,那就是会一统数据的江湖。那我们整个数据从技术语言来说就是分为结构化数据和非结构化或半结构化数据。那从业务语言来说,结构化数据对应的就是交易数据,真金白银,这个非结构化数据对应的就是我们的这个行为数据,量大。
所以spark SQL这个组件就出现了。我们可以看看第一句话并不是很难的英文。就是bridges the gap 就是把这个两个模型的空缺,也是他们之间的有一道鸿沟给填不了,那是什么样的鸿沟呢?而在总结一下就是传统关系型数据库也就是SQL对节化数据的处理能力非常强大。但是呢他用不了机学习呀!或者是处理非结构化和半结构化数据这个功能。而且spark这个组件。他拥有强大的机器学习算法以及对非结构化或半结构化数据的处理功能。但是对结构化数据又无能为力。那spark SQL就是想要在spark 组件添加处理结构化数据的能力。
那这样spark 组件就如虎添翼。结构化数据能处理非结构化数据半结构化数据都能处理。这个我们整个大数据时代,数据分为结构化和非结构化也就是行为数据和交易手机我都能处理了。那这个是非常明显的就是要一统数据的江湖。
那我们来看看spark SQL里边,我们刚才讲spark 他本身背后的数据结构是叫RDD。那spark SQL他背后的这个数据结构就叫DataFrame。那我们知道SQL在关系型数据库里面都有一个SQL的这个语法的解析器也就SQL的执行引擎或者叫优化器。在优化器里面如果大家有这个关系型数据库DDA可能了解到有基于规则的RBO。那我们这个spark 里边他的这个SQL也是有一个这个优化计算引擎,那我们管它叫Catalyst。
那张图就是在spark 里边创建一个基于RDD的一个DataFrame的数据结构,其实他本质上还是继续RDD。只不过RDD就向一个空旷的屋子,你要找东西要把这个屋子翻遍才能找到。那我们的这个DataFrame相当于在你的屋子里面打上了货架。那你只要告诉他你是在第几个货架的第几个位置,那不就是二维表吗。那就是我们DataFrame就是在RDD基础上加入了列。实际上我们处理数据就像处理二维表一样。
这是我们整个spark处理的这么一个机制,我们首先看最底下。依然是spark以及他后台的数据结构。这个Resilient Distributed Datasets弹性分布式数据集。那我们用户可以通过这个编程语言的接口来直接处理,大家看看最右侧这个向下的箭头,可以去处理这个非结构化和半结构化数据。刚才讲一间屋子数据都堆到里边,你可以通过程序处理,那如果数据要是规则的,他就可以通过这个中间这个层spark SQL背后的数据结构DataFrame API。DataFrame API是接SQL的。那所以说她下面有Catalyst Optimizer,SQL优化器,上边JDBC、Console接这个SQL的一些语句的。
所以说我们这个整个spark的一个发展趋势,其中有一条路就是要重视结构化数据的处理。那其实我们在大数据数据清洗过程中,他的一个思路也就是把非结构化数据给结构化。那这是我们spark的一个技术的发展路线,那关于很多内容,而其实都是可以值得聊的,但是今天因为时间的关系咱们就先说到这儿。然后呢,有机会咱们再对此spark进行一个深入的交流。
今天的技术交流就先到这里。谢谢大家。
,免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com