马表的英文译语怎么说-石莲花
2023年4月7日发(作者:学生账号登录平台登录入口)
ApacheSparkRDD论⽂(中⽂翻译)
为了能解决程序员能在⼤规模的集群中以⼀种容错的⽅式进⾏计算这个问题,我们提出了RDDs的概念。当前的很多框架对迭代式算法场
景与交互性数据挖掘场景的处理性能⾮常差,这个是RDDs的提出的动机。如果能将数据保存在内存中,将会使的上⾯两种场景的性能提
⾼⼀个数量级。为了能达到⾼效的容错,RDDs提供了⼀种受限制的共享内存的⽅式,这种⽅式是基于粗粒度的转换共享状态⽽⾮细粒度的
更新共享状态。然⽽,我们分析表明RDDs可以表达出很多种类的计算,包括⽬前专门从事迭代任务的编程计算模型,⽐如Pregel,当然
也可以表达出⽬前模型表达不出的计算。我们通过Spark系统来实现了RDDs,并且通过各种各样的⽤户应⽤和测试来评估了这个系统。
像MapReduce和Dryad等分布式计算框架已经⼴泛应⽤于⼤数据集的分析。这些系统可以让⽤户不⽤担⼼分布式⼯作以及容错,⽽是使
⽤⼀系列的⾼层次的操作api来达到并⾏计算的⽬的。
虽然当前的框架提供了⼤量的对访问利⽤计算资源的抽象,但是它们缺少了对利⽤分布式内存的抽象。样使的它们在处理需要在多个计算之
间复⽤中间结果的应⽤的时候会⾮常的不⾼效。数据的复⽤在迭代机器学习和图计算领域(⽐如PageRank,K-means以及线性回归等算
法)是很常见的。在交互式中,⼀个⽤户夏日绝句的意思译文 会经常对⼀个相同的数据⼦集进⾏多次不同的特定查询,所以数据复⽤在交互式数据挖掘也是很常
见的。然⽽,⽬前的⼤部分的框架对计算之间的数据复⽤的处理⽅式就是将中间数据写到⼀个靠稳定的系统中(⽐如分布式⽂件系统),这
样会由于数据的复制备份,磁盘的I/O以及数据的序列化⽽致应⽤任务执⾏很费时间。
认识到这个问题后,研究者们已经为⼀些需要中间数据复⽤的应⽤开发出了⼀些特殊的框架。⽐如Pregel在做迭代式图计算的时候会将中
间结果放在内存中。HaLoop也提供了迭代式MapReduce接⼝。然⽽,这些框架仅仅⽀持⼀些特殊的计算模式(⽐如循环⼀系列的
MapReduce步骤),并且它们是隐式的为些计算模式提供数据共享。它们没有提供更加普遍数据复⽤的抽象,⽐如可以让⽤户加载⼏个数
据集到存中然后对这些内存中的数据集进⾏专门的查询。
在这篇论⽂中,我们提出了⼀个全新的抽象,叫做RDDs,它可以⾼效的处理⼴泛的应⽤中涉及到的数据⽤的场景。RDDs是⼀个可以容错
且并⾏的数据结构,它可以让⽤户显式的将中间结果数据集保存在内中、控制数据集的分区来达到数据存放处理最优以及可以使⽤丰富的操
作api来操作数据集。
在设计RDDs的时候,最⼤的挑战是定义⼀个可以⾼效容错的编程接⼝。已经存在的分布式内存抽象系统⽐如distributedshared
memory、key-valuestores、databases以及Poccolo,都是提供了基于粒度的更新可变状态(⽐如table中的cells)的接⼝,基于这
种接⼝下,保证容错的⽅式⽆⾮是将数据复备份到多台机器或者在多台机器上记录更新的⽇志,这两种⽅式在数据密集性的⼯作任务中都是
⾮常的耗时的,因为需要通过⽹络传输在机器节点间复制⼤量的数据,宽带传输数据的速度远远⽐RAM内存慢,⽽这两种⽅式会占⽤⼤量
的存储空间。
与这些系统相反,RDDs提供了基于粗粒度转换(⽐如map,filter以及join)的接⼝,这些接⼝可以对多的数据条⽬应⽤相同的操作。这
样就可以通过记录来⽣成某个数据集的⼀系列转换(就是这个数据集lineage)⽽不是记录真实的数据来达到提供⾼效的容错机制。这个
RDD就有⾜够的信息知道它是从哪RDDs转换计算来的,如果⼀个RDD的分区数据丢失掉了,那么重新计算这个RDD所依赖的那个
RDD对应的区就⾏了。因此可以很快且不⽤通过复制备份⽅式来恢复丢失的数据。
虽然基于粗粒度的转换⼀开始看起来受限制,但是RDDs⾮常适合很多并⾏计算的应⽤,因为这些应⽤基都是在⼤量的数据元素上应⽤相
同的操作⽅法。事实上,我们分析表明RDDs不仅可以⾼效的表达出⽬前括MapReduce,DryadLINQ,SQL,Pregel以及HaLoop等
系统提出的分布式编程模型,⽽且还能表达它们表达不了的新的应⽤的计算模型,⽐如交互型数据挖掘。我们相信,RDDs解决那些新的框
架提出来计算需求的能⼒将会成为是RDD抽象强⼤的最有⼒证据。
我们在系统中实现了RDDs,这个系统已经在UCBerkeley以及好些个公司中应⽤于研究和⽣产应中。Spark和DryadLINQ类似使⽤
scala语⾔提供了很⽅便语⾔集成编程接⼝。另外,Spark可以利⽤scala的解释器来对⼤数据集进⾏交互式的查询。我们相信spark是⾸
个允许使⽤多种编程语⾔来进⾏分布式内存中交互式数据挖掘的系统。
我们通过为基准测试以及⽤户应⽤的测试两个⽅⾯来评估了RDDs和spark。我们分析显⽰,Spark在迭代应⽤中可以⽐hadoop快上
20倍以上、使的现实中的数据分析报表的速度提升了40倍以及使的交互式的扫1TB数据集的延迟在5-7秒。更重要的是,为了彰显
RDDs的普遍性,我们基于spark⽤相对较⼩的程序(每个包只有200⾏代码)实现了Pregel和HaLoop的编程模型,包括它们使⽤的
数据分布优化。本篇论⽂以RDDs(第⼆节)和Spark(第三节)的概述开始。然后在第四节中讨论了RDDs内部的表达、在第节中讨论
了我们的实现以及在第六节中讨论了实验结果。最后,我们讨论了RDDs是怎么样来表达现在已存在的⼏个系统的编程模型(第七节)、
调查相关⼯作(第⼋节)以及总结。
这节主要讲述RDDs的概要,⾸先定义RDDs(2.1)以及介绍RDDs在spark中的编程接⼝(2.2),然后对RDDs和细粒度共享内存
抽象进⾏的对⽐(2.3)。最后我们讨论了RDD模型的限制性。
⼀个RDD是⼀个只读,被分区的数据集。我们可以通过两种对稳定的存储系统和其他的RDDs进⾏操作⽽创建⼀个新的RDDs。为了区
别开RDDs的其他操作,我们称这些操作为transformations,⽐如map,filter以及join等都是transformations操作。
RDDs并不要始终被具体化,⼀个RDD有⾜够的信息知道⾃⼰是从哪个数据集计算⽽来的(就是所谓的依赖⾎统),这是⼀个⾮常强⼤的
属性:其实,⼀个程序你能引⽤⼀个不能从失败中重新构建的RDD。
最后,⽤户可以控制RDDs的两个⽅⾯:数据存储和分区。对于需要复⽤的RDD,⽤户可以明确的选择⼀个数据存储策略(⽐如内存缓
存)。他们也可以基于⼀个元素的key来为RDD所有的元素在机器节点间进⾏数据分区,这样⾮常利于数据分布优化,⽐如给两个数据集
进⾏相同的hash分区,然后进⾏join,可以提⾼join的性能。
Spark和DryadLINQ和FlumeJava⼀样通过集成编程语⾔api来暴露RDDs,这样的话,每⼀个数据集就代表⼀个对象,我们可以调⽤
这个对象中的⽅法来操作这个对象。
编程者可以通过对稳定存储的数据进⾏转换操作(即transformations,⽐如map和filter等)来得到⼀个或者多个RDDs。然后可以对
这些RDDs进⾏actions操作,这些操作可以是得到应⽤的结果值,也可以是将结果数据写⼊到存储系统中,actions包括:count(表
⽰返回这个数据集的元素的个数)、collect(表⽰返回数据集的所有元素)以及save(表⽰将输出结果写⼊到存储系统中)。和
DryadLINQ⼀样,spark在定义RDDs的时候并不会真正的计算,⽽是要等到对这个RDDs触发了actions操作才会真正的触发计算,
这个称之为RDDs的lazy特性,所以我们可以先对transformations进⾏组装⼀系列的pipelines,然后再计算。
另外,编程者可以通过调⽤RDDs的persist⽅法来缓存后续需要复⽤的RDDs。Spark默认是将缓存数据放在内存中,但是如果内存不
⾜的话则会写⼊到磁盘中。⽤户可以通过persist的参数来调整缓存策略,⽐如只将数据存储在磁盘中或者复制备份数据到多台机器。最
后,⽤户可以为每⼀个RDDs的缓存设置优先级,以达到哪个在内存中的RDDs应该⾸先写道磁盘中
假设⼀个web服务正发⽣了⼤量的错误,然后运维⼈员想从存储在hdfs中的⼏TB的⽇志中找出错误的原因。运维⼈员可以通过spark
将⽇志中的错误信息加载到分布式的内存中,然后对这些内存中的数据进⾏查询。她⾸先需要写下⾯的scala代码:
第⼀⾏表⽰从⼀个HDFS⽂件(许多⾏的⽂件数据集)上定义了⼀个RDD,第⼆⾏表⽰基于前⾯定义的RDD进⾏过滤数据。第三⾏将过
滤后的RDD结果存储在内存中,以达到多个对这个共享RDD的查询。需要注意的事,filter的参数是scala语法中的闭包。
到⽬前为⽌,集群上还没有真正的触发计算。然⽽,⽤户可以对RDD进⾏action操作,⽐如对错误信息的计数:
⽤户也可以继续对RDD进⾏transformations操作,然后计算其结果,⽐如:
在对errors第⼀次做action操作的后,spark会将errors的所有分区的数据存储在内存中,这样后⾯对errors的计算速度会有很⼤的
提升。需要注意的是,像lines这种基础数据的RDD是不会存储在内存中的。因为包含错误信息的数据可能只是整个⽇志数据的⼀⼩部
分,所以将包含错误数据的⽇志放在内存中是⽐较合理的。
最后,为了说明我们的模型是如何达到容错的,我们在图⼀种展⽰了第三个查询的⾎缘关系图(lineagegraph)。在这个查询种,我们以
对lines进⾏过滤后的errors开始,然后在对errors进⾏了filter和map操作,最后做了action操作即collect描写冬天的古诗小学 。Spark会最后两个
transformations组成⼀个pipeline,然后将这个pipeline分解成⼀系列的task,最后将这些task调度到含有errors缓存数据的机器
上进⾏执⾏。此外,如果errors的⼀个分区的数据丢失了,spark会对lines的相对应的分区应⽤filter函数来重新创建errors这个分区
的数据
图⼀:我们例⼦中第三个查询的⾎缘关系图,其中⽅框表⽰RDDs,箭头表⽰转换
1line=le(\"hdfs://..\")
2errors=(_.startsWith(\"ERROR\"))
t()
()
1//对错误中含有”MySQL”单词的数据进⾏计数
s(_.contains(\"MySQL\")).count()
3
4//返回错误信息中含有\"HDFS\"字样的信息中的时间字段的值(假设每⾏数据的字段是以tab来切分的,时间字段是第3个字段)
(_.contains(\"HDFS\"))
(_.split(\"t\")(3))
t()
为了理解作为分布式内存抽象的RDDs的好处,我们在表⼀种⽤RDDs和分布式共享内存系统(Distributedsharedmemory即
DSM)进⾏了对⽐。在所有的DSM系统中,应⽤从⼀个全局的地址空间中的任意位置中读写数据。需要注意的是,依据这个定义,我们
所说的DSM系统不仅包含了传统的共享内存系统,还包含了对共享状态的细粒度写操作的其他系统(⽐如Piccolo),以及分布式数据
库。DSM是⼀个很普遍的抽象,但是这个普遍性使得它在商⽤集群中实现⾼效且容错的系统⽐较困难。
Aspect(概念)RDDsDistributesharedmemory(分布式共享内存)
Reads粗粒度或者细粒度细粒度
Writes粗粒度细粒度
数据⼀致性不重要的(因为RDD是不可变的)取决于app或者runtime
容错利⽤lineage达到细粒度且低延迟的容错需要应⽤checkpoints(就是需要写磁盘)
并且需要程序回滚
计算慢的任务可以利⽤备份的任务来解决很难做到
计算数据的位置⾃动的机遇数据本地性取决于app(runtime是以透明为⽬标的)
内存不⾜时的⾏为和已经存在的数据流处理系统⼀样,写磁盘⾮常糟糕的性能(需要内存的交换?)
表⼀:RDDs和Distributedsharedmemory对⽐
RDDs只能通过粗粒度的转换被创建(或者被写),然⽽DSM允许对每⼀个内存位置进⾏读写,这个是RDDs和DSM最主要的区别。
这样使都RDDs在应⽤中⼤量写数据受到了限制,但是可以使的容错变的更加⾼效。特别是,RDDs不需要发⽣⾮常耗时的checkpoint
操作,因为它可以根据lineage进⾏恢复数据。⽽且,只有丢掉了数据的分区才会需要重新计算,并不需要回滚整个程序,并且这些重新计
算的任务是在多台机器上并⾏运算的。
RDDs的第⼆个好处是:它不变的特性使的它可以和MapReduce⼀样来运⾏执⾏很慢任务的备份任务来达到缓解计算很慢的节点的问
题。在DSM中,备份任务是很难实现的,因为原始任务和备份任务或同时更新访问同⼀个内存地址和接⼝。
最后,RDDs⽐DSM多提供了两个好处。第⼀,在对RDDs进⾏⼤量写操作的过程中,我们可以根据数据的本地性来调度task以提⾼性
能。第⼆,如果在scan-base的操作中,且这个时候内存不⾜以存储这个RDDs,那么RDDs可以慢慢的从内存中清理掉。在内存中存储
不下的分区数据会被写到磁盘中,且提供了和现有并⾏数据处理系统相同的性能保证。
经过上⾯的讨论介绍,我们知道RDDs⾮常适合将相同操作应⽤在整个数据集的所有的元素上的批处理应⽤。在这些场景下,RDDs可以
利⽤⾎缘关系图来⾼效的记住每⼀个transformations的步骤,并且不需要记录⼤量的数据就可以恢复丢失的分区数据。RDDs不太适合
⽤于需要异步且细粒度的更新共享状态的应⽤,⽐如⼀个web应⽤或者数据递增的web爬⾍应⽤的存储系统。对于这些应⽤,使⽤传统的
纪录更新⽇志以及对数据进⾏checkpoint会更加⾼效。⽐如使⽤数据库、RAMCloud、Percolator以及Piccolo。我们的⽬标是给批量
分析提供⼀个⾼效的编程模型,对于这些异步的应⽤需要其他的特殊系统来实现。
Spark使⽤scala语⾔实现了抽象的RDD,scala是建⽴在javaVM上的静态类型函数式编程语⾔。我们选择scala是因为它结合了简
洁(很⽅便进⾏交互式使⽤)与⾼效(由于它的静态类型)。然⽽,并不是说RDD的抽象需要函数式语⾔来实现。
开发员需要写连接集群中的workers的driver程序来使⽤spark,就⽐如图2展⽰的。Driver端程序定义了⼀系列的RDDs并且调⽤了
RDD的action操作。Driver的程序同时也会跟踪RDDs之间的的⾎缘关系。workers是可以将RDD分区数据存储在内存中的长期存活
的进程。
图⼆:这个是Spark运⾏时的图,⽤户写的driver端程序启动多个workers,这些workers可以从分布书的存储系统中读取数据块并且
可以将计算出来的RDD分区数据存放在内存中。
在2.2.1⼩节中的⽇志挖掘例⼦中,我们提到,⽤户提供给RDD操作⽐如map以参数作为这个操作的闭包(说⽩了就是函数)。Scala
将这些函数看作⼀个java对象,这些对象是可以序列化的,并且可以通过⽹络传输传输到其他的机器节点上的。Scala将函数中的变量看
作⼀个对象中的变量。⽐如,我们可以写⼀段这样的代码:varx=5;(_+5)来达到给这个RDD每⼀个元素加上5的⽬的。
RDDs是被⼀元素类型参数化的静态类型对象,⽐如,RDD[Int]表⽰⼀个类型为整数的RDD。然⽽,我们很多例⼦中的RDD都会省去这
个类型,这个是因为scala⽀持类型推断。
虽然我们⽤scala实现RDD的⽅法很简单,但是我们需要处理⽤反射实现的闭包对象相关的⼯作,我们还需要做很多的⼯作使的spark
可以⽤scala的解释器,这个我们在5.2⼩节中会讨论到。尽管如此,我们是不需要修改scala的编译器的。
表2中列举了Spark中RDD常⽤的transformations和actions操作,且描述了每⼀个⽅法的签名以及类型。我们需要记住
transformations是⽤来定义⼀个新的RDD的lazy操作,⽽actions是真正触发⼀个能返回结果或者将结果写到⽂件系统中的计算。
表⼆:Spark中RDD常⽤的transformations和actions操作。Seq[T]表⽰元素类型为T的⼀个列表。
需要注意的是,⼀些操作⽐如join只适合⽤于key-value类型的RDDs。我们取的函数的名称和scala或者其他函数式编程语⾔的函数名
是⼀致的。⽐如,map是⼀个one-to-one的映射操作,⽽flatMap的每⼀个输⼊值会对应⼀个或者更多的输出值(有点像MapReduce
中的map)
除了这些操作,⽤户可以通过persist操作来请求缓存RDD。另外,⽤户可以拿到被Partitioner分区后的分区数以及根据Partitioner对
另⼀个dataset进⾏分区。像groupByKey、reduceByKey以及sort等操作都是经过了hash或者rang分区后的RDD。
我们⽤两个迭代式的应⽤:线性回归和PageRank来补充2.2.1提到的数据挖掘的例⼦。稍后也会展⽰下如何控制RDD的分区以达到提
升性能的⽬的。
很多的机器学习算法⼀般都是迭代式的计算,因为它们需要跑迭代的优化程序(⽐如梯度下降)来达到最⼤化功能。他们将数据存放在内存
中以达到很快的速度。
作为⼀个例⼦,下⾯的程序实现了线性回归,⼀个能找到最佳区分两种点集(垃圾邮件以及⾮垃圾邮件)的超平⾯w的常⽤的分类算法。
这个算法⽤了梯度下降的⽅法:⼀个随机的值作为w的初始值,每次迭代都会将含有w的⽅法应⽤到每⼀个数据点然后累加得到梯度值,
然后将w往改善结果的⽅向移动。
⼀开始我们定义⼀个叫points的RDD,这个RDD从⼀个⽂本⽂件中经过map将每⼀⾏转换为Point对象得到。然后我们重复对
points进⾏map和reduce操作计算出每⼀步的梯度值。在迭代之间我们将points存放在内存中可以使的性能提⾼20倍,我们将会在
6.1节中讨论。
在PageRank中数据共享更加复杂。如果⼀个⽂档引⽤另⼀个⽂档,那被引⽤的⽂档的排名值(rank)需要加上引⽤的⽂档发送过来的贡
献值,当然这个过程是个迭代的过程。在每⼀次迭代中,每⼀个⽂档都会发送r/n的贡献值给它的邻居,其中r表⽰这个⽂档的排名值,n
表⽰这个⽂档的邻居数量。然后更新⽂档的排名值为,这个表达式值表⽰这个⽂档收到的贡献值,N表⽰所有的⽂档的数量,我们可以⽤如
下的spark代码来表达PageRank:
其中links表⽰(URL,outlinks)键值对。这个程序的RDD的⾎缘关系图如图三。在每⼀次迭代中我们都是根据上⼀次迭代的contribs
和ranks以及原始不变的links数据集来创建⼀个新的ranks数据集。随着迭代次数的变多这张图会变的越长,这个是这个图⽐较有意思
的特点。如果这个job的迭代次数很多的话,那么备份⼀些版本的ranks来达到减少从错误中恢复出来的时间是很有必要的,⽤户可以调
⽤标记为RELIABLE的persist函数来达到这个⽬的。需要注意的是,links是不需要备份的,因为它的分区数据可以快速的从重新计算输
⼊⽂件中对应的数据块⽽得到,这个中国古代十大名曲 数据集⼀般会⽐ranks数据集⼤上很多倍,因为每⼀个⽂档会有很多的连接但只会有⼀个排名值,所
以利⽤RDD的⾎缘关系来恢复数据肯定⽐checkpoint内存中的数据快很多(因为数据量太⼤)。
最后,我们可以控制RDDs的分区⽅式来优化PageRank中的节点通讯。如果我们事先为links指定⼀个分区⽅式(⽐如,根据link的
url来hash分区,就是将相同的url发送到同⼀个节点中),然后我们对ranks进⾏相同的分区⽅式,这样就可以保证links和ranks之
间的join不需要机器节点之间的通讯(因为相同的url都在同⼀个机器节点了,那么相对应的rank和link肯定也是在同⼀个机器节点
了)。我们也可以⾃定义分区器来实现将⼀组页⾯url放到⼀起(⽐如按照url的domain进⾏分区)。以上两种优化⽅式都可以通过在定
义links的时候调⽤partitionBy来实现:
在调⽤了partitionBy后,links和ranks之间的join操作会⾃动的在link所在的机器进⾏每⼀个URL的贡献值的聚合计算,然后在相
同的机器计算新的排名值,然后计算出来的新的ranks在相同的机器和links进⾏join。这种在迭代之间进⾏数据⼀致分区是像Pregel这
种框架中的主要的优化计算⽅式。RDDs使的⽤户可以直接⾃⼰来实现这种优化机制。
在抽象RDDs的过程中,怎么表达出RDDs能跟踪很多的transformations操作之间⾎缘关系是⼀个⽐较⼤的挑战。理想的情况下,⼀
个实现RDDs系统应该是尽可能多的提供transformations操作(⽐如表⼆中的操作),并且可以让⽤户以任意的⽅式来组合这些
transformations操作。我们提出了基于图的RDDs展现⽅式来达到以上的⽬的。我们在spark中利⽤这种展现⽅式达到了在不需要给调
度系统为每⼀个transformation操作增加任何的特殊逻辑就可以⽀持⼤量的transformations操作,这样极⼤的简化了我们的系统设
计。
概括的说,以下五个信息可以表达RDDs:⼀个分区列表,每⼀个分区就是数据集的原⼦块。⼀个⽗亲RDDs的依赖列表。⼀个计算⽗亲
的数据集的函数。分区模式的元数据信息以及数据存储信息。⽐如,基于⼀个HDFS⽂件创建出来的的RDD中⽂件的每⼀个数据块就是
⼀个分区,并且这个RDD知道每⼀个数据块存储在哪些机器上,同时,在这个RDD上进⾏map操作后的结果有相同的分区数,当计算
元素的时候,将map函数应⽤到⽗亲RDD数据中的。我们在表三总结了这些接⼝:
操作接⼝含义
partitions()返回⼀个分区对象的列表
preferredLocations(p)分区p数据存储在哪些机器节点中
dependencies()返回⼀个依赖列表
iterator(p,parentIters)根据⽗亲分区的数据输⼊计算分区p的所有数据
partitioner()返回这个RDD是hash还是range分区的元数据信息
表三:Spark中表达RDDs的接⼝
在设计如何表达RDDs之间依赖的接⼝是⼀个⾮常有意思的问题。我们发现将依赖定义成两种类型就⾜够了:窄依赖,表⽰⽗亲RDDs的
⼀个分区最多被⼦RDDs⼀个分区所依赖。宽依赖,表⽰⽗亲RDDs的⼀个分区可以被⼦RDDs的多个⼦分区所依赖。⽐如,map操作
是⼀个窄依赖,join操作是⼀个宽依赖操作(除⾮⽗亲RDDs已经被hash分区过),图四显⽰了其他的例⼦:
图四:窄依赖和宽依赖的例⼦。每⼀个⽅框表⽰⼀个RDD,带有颜⾊的矩形表⽰分区
以下两个原因使的这种区别很有⽤,第⼀,窄依赖可以使得在集群中⼀个机器节点的执⾏流计算所有⽗亲的分区数据,⽐如,我们可以将每
⼀个元素应⽤了map操作后紧接着应⽤filter操作,与此相反,宽依赖需要⽗亲RDDs的所有分区数据准备好并且利⽤类似于
MapReduce的操作将数据在不同的节点之间进⾏重新洗牌和⽹络传输。第⼆借月思乡的诗句20首 ,窄依赖从⼀个失败节点中恢复是⾮常⾼效的,因为只需要重
新计算相对应的⽗亲的分区数据就可以,⽽且这个重新计算是在不同的节点进⾏并⾏重计算的,与此相反,在⼀个含有宽依赖的⾎缘关系
RDDs图中,⼀个节点的失败可能导致⼀些分区数据的丢失,但是我们需要重新计算⽗RDD的所有分区的数据。
Spark中的这些RDDs的通⽤接⼝使的实现很多transformations操作的时候只花了少于20⾏的代码。实际上,新的spark⽤户可以
在不了解调度系统的细节之上来实现新的transformations操作(⽐如,采样和各种join操作)。下⾯腻组词 简要的概括了⼀些RDD的实现:
HDFSfiles:抽样的输⼊RDDs是HDFS中的⽂件。对于这些RDDs,partitions返回⽂件中每⼀个数据块对应的⼀个分区信息
(数据块的位置信息存储在Partition对象中),preferredLocations返回每⼀个数据块所在的机器节点信息,最后iterator负责数
据块的读取操作。
map:对任意的RDDs调⽤map操作将会返回⼀个MappedRDD对象。这个对象含有和其⽗亲RDDs相同的分区信息和数据存储
节点信息,但是在iterator中对⽗亲的所有输出数据记录应⽤传给map的函数。
union:对两个RDDs调⽤union操作将会返回⼀个新的RDD,这个RDD的分区数是他所有⽗亲RDDs的所有分区数的总数。每
⼀个⼦分区通过相对应的窄依赖的⽗亲分区计算得到。
sample:sampling和mapping类似,除了sampleRDD中为每⼀个分区存储了⼀个随机数,作为从⽗亲分区数据中抽样的种⼦。
join:对两个RDDs进⾏join操作,可能导致两个窄依赖(如果两个RDDs都是事先经过相同的hash/range分区器进⾏分区),
或者导致两个宽依赖,或者⼀个窄依赖⼀个宽依赖(⼀个⽗亲RDD经过分区⽽另⼀个没有分区)。在上⾯所有的恶场景中,join之后
的输出RDD会有⼀个partitioner(从⽗亲RDD中继承过来的或者是⼀个默认的hashpartitioner)。
我们⽤了14000⾏scala代码实现了spark。Spark系统跑在集群管理者mesos上,这样可以使的它和其他的应⽤⽐如hadoop、
MPI等共享资源,每⼀个spark程序都是由它的driver和workers组成,这些driver和workers都是以⼀个mesos应⽤运⾏在
mesos上的,mesos可以管理这些应⽤之间的资源共享问题。
Spark可以利⽤已经存在的hadoop的api组件读取任何的hadoop的输⼊数据源(⽐如:HDFS和Hbase等),这个程序api是运⾏
在没有更改的scala版本上。
我们会简要的概括下⼏个⽐较有意思的技术点:我们的job调度器(5.1节),可以⽤于交互的spark解释器(5.2节),内存管理(5.3
节)以及对checkpointing的⽀持(5.4节)。
spark的调度器依赖我们在第4章中讨论的RDDs的表达。
从总体上看,我们的调度系统有点和Dryad相似,但是它还考虑了被存储的RDDs的哪些分区还在内存中。当⼀个⽤户对某个RDD调⽤
了action操作(⽐如count或者save)的时候调度器会检查这个RDD的⾎缘关系图,然后根据这个⾎缘关系图构建⼀个含有stages
的有向⽆环图(DAG),最后按照步骤执⾏这个DAG中的stages,如图5的说明。每⼀个stage包含了尽可能多的带有窄依赖的
transformations操作。这个stage的划分是根据需要shuffle操作的宽依赖或者任何可以切断对⽗亲RDD计算的某个操作(因为这些
⽗亲RDD的分区已经计算过了)。然后调度器可以调度启动tasks来执⾏没有⽗亲stage的stage(或者⽗亲stage已经计算好了的
stage),⼀直到计算完我们的最后的⽬标RDD。
图五:怎么计算sparkjobstage的例⼦。实现的⽅框表⽰RDDs,带有颜⾊的⽅形表⽰分区,⿊⾊的是表⽰这个分区的数据存储在内存
中,对RDDG调⽤action操作,我们根据宽依赖⽣成很多stages,且将窄依赖的transformations操作放在stage中。在这个场景
中,stage1的输出结果已经在内存中,所以我们开始运⾏stage2,然后是stage3。
我们调度器在分配tasks的时候是采⽤延迟调度来达到数据本地性的⽬的(说⽩了,就是数据在哪⾥,计算就在哪⾥)。如果某个分区的数
据在某个节点上的内存中,那么将这个分区的计算发送到这个机器节点中。如果某个RDD为它的某个分区提供了这个数据存储的位置节
点,则将这个分区的计算发送到这个节点上。
对于宽依赖(⽐如shuffle依赖),我们将中间数据写⼊到节点的磁盘中以利于从错误中恢复,这个和MapReduce将map后的结果写
⼊到磁盘中是很相似的。
只要⼀个任务所在的stage的⽗亲stage还是有效的话,那么当这个task失败的时候,我们就可以在其他的机器节点中重新跑这个任
务。如果⼀些stages变的⽆效的话(⽐如因为⼀个shuffle过程中map端的⼀个输出结果丢失了),我们需要重新并⾏提交没有⽗亲
stage的stage(或者⽗亲stage已经计算好了的stage)的计算任务。虽然备份RDD的⾎缘关系图⽰⽐较容易的,但是我们还不能容忍
调度器调度失败的场景。
虽然⽬前spark中所有的计算都是响应driver程序中调⽤的action操作,但是我们也是需要尝试在集群中调⽤lookup操作,这种操作
是根据key来随机访问已经hash分区过的RDD所有元素以获取相应的value。在这种场景中,如果⼀个分区没有计算的话,那么task
需要将这个信息告诉调度器。
scala和Ruby以及Python⼀样包含了⼀个交互型的shell脚本⼯具。考虑到利⽤内存数据可以获得低延迟的特性,我们想让⽤户通过解
释器来交互性的运⾏spark,从⽽达到查询⼤数据集的⽬的。
Scala解释器通常是将⽤户输⼊的每⼀⾏代码编译成⼀个类,然后将这个类加载到JVM中,然后调⽤这个类的⽅法。这个类中包含了⼀个
单例对象,这个单例对象包含了⽤户输⼊⼀⾏代码中的变量或者函数,还包含了⼀个运⾏⽤户输⼊那⾏代码的初始化⽅法。⽐如,⽤户输⼊
varx=5,然后再输⼊println(x),scala解释器定义个包含了x的叫做Line1的类,然后将第⼆⾏代码编译成println(Line
tance().x)。
我们对spark中的解释器做了如下两个改变:
hipping:为了让worker节点能拿到⽤户输⼊的每⼀⾏代码编译成的class的⼆进制代码,我们使的解释器为这些classes
的⼆进制代码提供HTTP服务。
2.修改了代码⽣成:正常情况下,我们通过访问对应的类的静态⽅法来达到访问将⽤户输⼊每⼀⾏代码编译成的单例对象。这个以为
着,当我们将⼀个含有在前⾯⾏中定义的变量(⽐如上⾯例⼦中的Line1.x)的闭包序列化发送到worker节点的时候,java是不会
通过对象图来跟踪含有x的实⼒Line1的,这样的话worker节点将收不到变量x。我们修改了代码⽣成逻辑来达到能直接引⽤每⼀
⾏代码⽣成的实例。
图六显⽰了经过我们的改变后,解释器是如何将⽤户输⼊的⼀系列的代码转换成java对象。
图六:显⽰spark解释器是如何将⽤户输⼊的代码转换成java对象的例⼦
我们发现spark解释器在处理我们研究中的⼤量已经获取到的痕迹数据以及探索存储在hdfs中的数据集时是⾮常有⽤的。我们正在打算⽤
这个来实现更⾼层⾯的交互查询语⾔,⽐如SQL。
Spark在持久化RD相思情诗大全 Ds的时候提供了3种存储选:存在内存中的⾮序列化的java对象、存在内存中的序列化的数据以及存储在磁盘中。
第⼀种选择的性能是最好的,因为javaVM可以很快的访问RDD的每⼀个元素。第⼆种选择是在内存有限的情况下,使的⽤户可以以很
低的性能代价⽽选择的⽐java对象图更加⾼效的内存存储的⽅式。如果内存完全不够存储的下很⼤的RDDs,⽽且计算这个RDD⼜很费
时的,那么选择第三种⽅式。
为了管理有限的内存资源,我们在RDDs的层⾯上采⽤LRU(最近最少使⽤)回收策略。当⼀个新的RDD分区被计算但是没有⾜够的内
存空间来存储这个分区的数据的时候,我们回收掉最近很少使⽤的RDD的分区数据的占⽤内存,如果这个RDD和这个新的计算分区的
RDD时同⼀个RDD的时候,我们则不对这个分区数据占⽤的内存做回收。在这种情况下,我们将相同的RDD的⽼分区的数据保存在内
存中是为了不让⽼是重新计算这些分区的数据,这点事⾮常重要的,因为很多操作都是对整个RDD的所有的tasks进⾏计算的,所以⾮常
有必要将后续要⽤到的数据保存在内存中。到⽬前为⽌,我们发现这种默认的机制在所有的应⽤中⼯作的很好,但是我们还是将持久每⼀个
RDD数据的策略的控制权交给⽤户。
最后,在⼀个集群中的每⼀个spark实例的内存空间都是分开的,我们以后打算通过统⼀内存管理达到在spark实例之间共享RDDs。
虽然我们总是可以使⽤RDDs的⾎缘关系来恢复失败的RDDs的计算,但是如果这个⾎缘关系链很长的话,则恢复是需要耗费不少时间
的。因此,将⼀些RDDs的数据持久化到稳定存储系统中是有必要的
⼀般来说,checkpointing对具有很长的⾎缘关系链且包含了宽依赖的RDDs是⾮常有⽤的,⽐如我们在3.2.2⼩节中提到的PageRank
的例⼦。在这些场景下,集群中的某个节点的失败会导致每⼀个⽗亲RDD的金陵十二钗香烟薄荷 ⼀些数据的丢失,进⽽需要重新所有的计算。与此相反的,对
于存储在稳定存储系统中且是窄依赖的RDDs(⽐如3.2.1⼩节中线性回归例⼦中的points和PageRank中的link列表数
据),checkpointing可能⼀点⽤都没有。如果⼀个节点失败了,我们可以在其他的节点中并⾏的重新计算出丢失了数据的分区,这个成本
只是备份整个RDD的成本的⼀点点⽽已。
spark⽬前提供了⼀个checkpointing的api(persist中的标识为REPLICATE,还有checkpoint()),但是需要将哪些数据需要
checkpointing的决定权留给了⽤户。然⽽,我们也在调查怎么样⾃动的checkpoing,因为我们的调度系统知道数据集的⼤⼩以及第⼀次
计算这个数据集花的时间,所以有必要选择⼀些最佳的RDDs来进⾏checkpointing,来达到最⼩化恢复时间
最后,需要知道的事RDDs天⽣的只读的特性使的他们⽐⼀般的共享内存系统做checkpointing更简单了。因为不⽤考虑数据的⼀致性,
我们可以不终⽌程序或者take快照,然后在后台将RDDs的数据写⼊到存储系统中。
我们通过在亚马逊EC2伤进⾏⼀系列的实验以及⽤⽤户的应⽤做基准测试来评估spark,总的来说,下⾯是我们的结论:
在迭代式机器学习和图计算中,spark以20倍的速度超过了hadoop。提速的点主要是在避免了I/O操作以及将数据以java对象
的形式存在内存中从⽽降低了反序列化的成本。
⽤户写的应⽤程序运⾏平稳以及很好扩展。特别的,我们利⽤spark为⼀个分析报表提速了40倍,相对于hadoop来说。
当节点失败的时候,spark可以通过重新计算失去的rdd分区数据达到快速的恢复。
spark在查询1TB的数据的时候的延迟可以控制在5到7秒。
我们通过和hadoop对⽐,展⽰迭代式机器学习(6.1节)和PageRank(6.2节)的基准测试。然后我们评估了spark的错误恢复机制
(6.3节)以及当内存不⾜以存储⼀个数据集的⾏为(6.4节),最后我们讨论了⽤户应⽤(6.5节)和交互式数据挖掘(6.6节)的结果
除⾮另外声明,我们都是⽤类型为的EC2节点,4核以及15GB内存。我们是有数据块⼤⼩为256M的HDFS存储系
统。在每⼀次测试之前,我们都会清理OS的缓存,以达到准确的测量IO成本的⽬的
我们实现了两种迭代式机器学习应⽤,线性回归核K-means,来和下⾯的系统进⾏性能的对⽐:
Hadoop:版本号为0.20.0的稳定版。
HadoopBinMem:这个系统在迭代的⼀开始会将输⼊数据转换成底开销的⼆进制形式,这样可以为接下来的迭代消除解析⽂本数据的
开销,并且将数据存储在hdfs实例的内存中。
Spark:我们的RDDs的实现。
我们在25-100台机器上存储100G数据,两种算法都是对这100G数据跑10次迭代。两个应⽤之间的关键不同点是他们对相同数据
的计算量不⼀样。K-means的迭代时间都是花在计算上,然⽽线性回归是⼀个计算量不⼤,时间都是花在反序列化和I/O上。由于典型的
机器学习算法都是需要上千次的迭代来达到收敛,所以我们将第⼀次迭代花的时间和接下来的迭代时间分开显⽰。我们发现通过RDDs的
共享数据极⼤的提⾼了后续的迭代速度
图七:在100台机器的集群上分别⽤hadoop、hadoopBinMem以及spark对100GB的数据进⾏,线性回归和k-means的⾸次
迭代和随后迭代花的时间
⾸次迭代:三个系统在⾸次迭代中都是读取HDFS中的数据,从图七的条形图中我们可以看出,在实验中,spark稳定的⽐hadoop要
快。这个是由于hadoop主从节点之间的⼼跳信息的信号开销导致的。HadoopBinMen是最慢的,这个是因为它启动了⼀个额外的
MapReduce任务来将数据转换为⼆进制,它还需要通过⽹络传输数据以达到备份内存中的数据的⽬的。随后的迭代:图七也显⽰了随后迭
代花的平均时间。图⼋则是显⽰了集群⼤⼩不断扩展时候的花的时间。对于线性回归,在100台机器上,spark分别⽐hadoop和
hadoopBinMem快上25.3倍和20.7倍。对于计算型的k-means应⽤,spark仍然分别提⾼了1.9倍和3.2倍。
图⼋:hadoop、hadoopBinMem以及spark在随后的迭代花的时间,都是处理100G的数据
理解为什么提速了:我们惊奇的发现spark甚⾄⽐基于内存存储⼆进制数据的hadoopBinMem还要快20倍。在hadoopBinMem中,
我们使⽤的是hadoop标准的⼆进制⽂件格式(sequenceFile)和256m这么⼤的数据块⼤⼩,以及我们强制将hadoop的数据⽬录放
在⼀个内存的⽂件系统中。然⽽,Hadoop仍然因为下⾯⼏点⽽⽐spark慢:
软件栈的最低开销。
提供数据服务的开销。
3.将⼆进制数据转换成有效的内存中的java对象的反序列化的成本开销。
我们依次来调查上⾯的每⼀个因素。为了测量第⼀个因素,我们跑了⼀些空的hadoop任务,我们发现单单完成job的设置、任务的启动
以及任务的清理等⼯作就花掉了⾄少25秒钟。对于第⼆个元素,我们发现HDFS需要执⾏多份内存数据的拷贝以及为每⼀个数据块做
checksum计算。
最后,为了测试第3个因素,我们在单机上做了⼀个微型的基准测试,就是针对不同⽂件类型的256M数据来跑线性回归计算。我们特别
的对⽐了分别从HDFS⽂件(HDFS技术栈的耗时将会很明显)和本地内存⽂件(内核可以很⾼效的将数据传输给应⽤程序)中处理⽂本
和⼆进制类型数据所话的时间、
图九中是我们我们测试结果的展⽰。从In-memoryHDFS(数据是在本地机器中的内存中)中读数据⽐从本地内存⽂件中读数据要多花
费2秒中。解析⽂本⽂件要⽐解析⼆进制⽂件多花费7秒钟。最后,即使从本地内存⽂件中读数据,但是将预先解析了⼆进制数据转换成
java对象也需要3秒钟,这个对于线性回归来说也是⼀个⾮常耗时的操作。Spark将RDDs所有元素以java对象的形式存储在内存中,
进⽽避免了上述说的所有的耗时
我们分别⽤spark和hadoop对54GB的维基百科的转储数据进⾏了PageRank机器学习,并⽐对了它们的性能。我们⽤PageRank
的算法处理了⼤约4百万相互连接的⽂章,并进⾏了10次迭代。图⼗展⽰了在30个节点上,只⽤内存存储使的spark拥有了⽐
hadoop2.4倍的性能提升。另外,就和3.2.2⼩节讨论的,如果控制RDD的分区使的迭代之间数据的平衡更可以使的性能速度提升到
7.2倍。将节点数量扩展到60个,spark的性能速度提升也是上⾯的结果
图⼗:分别基于Hadoop和spark的PageRank的性能对⽐
我们也评估了在第7.1节中提到的⽤我们基于spark⽽实现的Pregel重写的PageRank。迭代次数和图⼗是⼀样的,但是慢了4秒钟,
这个是因为每⼀次迭代Pregel都要跑额外的操作来让顶点进⾏投票决定是否需要结束任务。
我们评估了当在k-means应⽤中⼀个节点失败了⽽利⽤RDD的⾎缘关系链来重建RDD的分区需要的成本。图⼗⼀对⽐的是在75个节
点中运⾏10次迭代的k-means正常情况和在第6次迭代⼀个节点失败的情况。如果没有任何失败的话,每⼀次迭代都是在100GB的
数据上跑400个tasks。
图⼗⼀:出现了失败的k-means每次迭代时间。在第6次迭代中⼀台机器被杀掉了,导致需要利⽤⾎缘关系链重建RDD的部分分区
第五次迭代的时间是58秒。在第6次迭代,⼀台机器被杀死了,导致丢失了运⾏在这台机器上的tasks以及存储在这台机器上的RDD
分区数据。Spark在其他机器节点上重新读取相应的输⼊数据以及通过⾎缘关系来重建RDD,然后并⾏的重跑丢失的tasks,使的这次迭
代的时间增加到80s。⼀旦丢失的RDD分区数据重建好了,迭代的时间⼜回到了58s。
需要注意的是,如果是基于checkpoint的容错机制的话,那么需要通过重跑好⼏个迭代才能恢复,需要重跑⼏个取决于checkpoints的
频率。此外,系统需要通过⽹络传输来备份应⽤需要的100GB数据(被转化为⼆进制的⽂本数据),以及为了达到在内存中备份数据⽽消
耗掉2倍的内存,或者等待将100GB数据写⼊到磁盘中。与此相反的是,在我们的例⼦中每⼀个RDDs的⾎缘关系图的⼤⼩都是⼩于
10KB的。
在⽬前为⽌,我们都是假设集群中的每⼀台机器都是有⾜够的内存来存储迭代之间的RDDs的数据的。当没有⾜够的内存来存储任务的数
据的时候spark是怎么运⾏的呢?在这个实验中,我们给spark每⼀个节点配置很少的内存,这些内存不⾜以存储的下RDDs。我们在图
⼗⼆中,我们展⽰了不同存储空间下的运⾏线性回归应⽤需要的时间。可以看出,随着空间的减少,性能速度慢慢的下降:
图⼗⼆:每次都是使⽤不同的内存,然后在25台机器中对100GB的数据运⾏线性回归的性能对⽐图
内存中分析:ConvivaInc是⼀个视频提供商,他们⽤spark来加速之前在hadoop上运⾏的⼏个数据报表分析。⽐如,其中⼀个报表是
运⾏⼀系列的Hive查询来计算⼀个⽤户的各种统计信息。这些查询都是基于相同的数据⼦集(基于⾃定义的过滤器过滤出来的数据)但是
需要很多MapReduce任务来为分组字段进⾏聚合运算(平均值、百分位数值以及countdistinct)。将这些数据⼦集创建成⼀个可以共
享的spark中的RDD来实现这些查询使的这个报表的速度提升了40倍。对200GB已经压缩的数据在hadoop集群上跑这个报表花了
20个⼩时,但是利⽤2台机器的spark只⽤了30分钟⽽已。此外,spark程序只花了96G的内存,因为只需要将报表关⼼的列数据
存储在内存中进⾏共享就⾏,⽽不是所有的解压后的数据。
交通模型:伯克利分校的MobileMillennium项⽬组的研究员在收集到的零星的汽车的GPS信息上并⾏运⾏⼀个机器学习算法试图推断出
道路交通是否拥挤。在都市区域道路⽹络中的10000条道路和600000个装有GPS设备的汽车点对点的旅⾏时间(每⼀条路线的旅⾏
时间可能包含了多条道路)样本是数据源。利⽤交通模型可以估算出通过每⼀条独⽴的道路需要多长时间。研究⼈员利⽤EM算法来训练模
型,这个算法在迭代的过程中重复执⾏map和reduceByKey步骤。这个应⽤近似线性的将机器规模从20台扩展到80台,每台机器4
个cores,如图13(a)所⽰。
图⼗三:两个⽤spark实现的⽤户应⽤的每次迭代的时间,误差线表⽰标准误差
推特垃圾邮件分类:伯克利分校的Monarch项⽬利⽤spark来标记推特消息中的垃圾链接。它们实现的线性回归分类器和第6.1节中很
相似,但是他们利⽤了分布式的reduceByKey来并⾏的累加梯度向量值。图13(b)中显⽰了对50GB的数据⼦集进⾏分类训练需要的时
间(随着机器扩展),这个数据⼦集中包含了25000URLs以及每⼀个url对应的页⾯的⽹络和内容属性相关的10000000个特征/纬
度。图13(b)中的时间不是线性的下降是因为每⼀次迭代花费了很⾼的且固定的通讯成本。
为了演⽰spark在交互查询⼤数据集的能⼒,我们来分析1TB的维基页⾯访问⽇志数据(2年的数据)。在这个实验中,我们使⽤100
个m2.4xlargeEC2实例,每⼀个实例8个cores以及68G内存。我们查询出(1)所有页⾯的浏览量,(2)页⾯标题等于某个单
词的页⾯的浏览量以及(3)页⾯标题部分的等于某个单词的页⾯的浏览量。每⼀个查询都是扫描整个输⼊数据集。
图⼗四展⽰的分别是查询整个数据集、⼀半数据集⼀集⼗分之⼀的数据集的响应时间。即使是1TB的数据,⽤spark来查询仅仅花了5-
7秒⽽已。这个⽐查询磁盘数据的速度快了⼀个数量级,⽐如,查询磁盘⽂件中的1TB数据需要170秒。这个可以说明RDDs使的
spark是⼀个⾮常强⼤的交互型数据挖掘的⼯具。
虽然由于RDDs的天然不可变性以及粗粒度的转换导致它们似乎提供了有限制的编程接⼝,但是我们发现它们适合很多类型的应⽤。特别
的,RDDs可以表达出现在各种各样的框架提出的编程模型,⽽且还可以将这些模型组合在同⼀个程序中(⽐如跑⼀个MapReduce任务
来创建⼀个图,然后基于这个图来运⾏Pregel)以及可以在这些模型中共享数据。在这⼀章中,我们在第7.1节中讨论RDDs可以表达哪
些模型以及为什么适合表达这些编程模型。另外,我们在第7.2节中讨论我们推崇的RDD的⾎缘信息的好处,利⽤这些信息可以帮助我们
debug模型。
对于到⽬前为⽌很多独⽴提出的编程模型,RDDs都可以⾼效的表达出来。这⾥所说的“⾼效”,不仅仅是指使⽤RDDs的输出结果和独
⽴提出的编程模型狂简的输出结果是⼀致的,⽽且RDDs在优化性能⽅⾯⽐这些框架还要强⼤,⽐如将特定的数据保存在内存中、对数据
分区以减少⽹络传输以及⾼效的从错误中恢复。可以⽤RDDs表达的模型如下:
MapReduce:可以利⽤spark中的flatMap和groupByKey操作来表达这个模型,或者如果需要聚合的话可以使⽤
reduceByKey。
DryadLINQ:DryadLINQ系统⽐MapReduce更多的操作,但是这些操作都是直接和RDD的转换操作(map,groupByKey,join
等)对应的批量操作。
SQL:和DryadLINQ⼀样,SQL查询都是对⼀个数据集进⾏并⾏的操作计算。
Pregel:Google的Pregel是⼀个专门解决迭代图计算应⽤的模型,它⼀开始看起来和⾯向数据集的编程模型的其他系统完全不同。
在Pregel中,⼀个程序运⾏⼀些列的相互协调的“supersteps”。在每⼀个superstep上,对图上的每⼀个顶点运⾏⽤户⾃定义
的函数来更新这个顶点的相关的状态、改变图的拓扑结构以及向其他顶点发送下⼀个superstep需要的消息。这种模型可以表达⾮常
多的图计算算法,包括最短路径、⼆部图匹配以及PageRank。
Pregel在每⼀次迭代中都是对所有顶点应⽤相同的⽤户定义的函数,这个是使的我们⽤RDDs来实现这个模型的关键点。因此,每次迭代
后,我们都可以将顶点的状态保存在RDD中,然后执⾏⼀个批量转换操作(apply)来应⽤这个函数以及⽣成⼀个消息的RDD。然后我们
可以⽤这个RDD通顶点的状态进⾏join来完成消息的交换。和Pregel⼀样,RDDs允许将点的状态保存在内存中、控制它们的分区以减
少⽹络通讯以及指出从失败中恢复。我们在spark上⽤了200⾏代码的包实现了Pregel,读者可以参考第33个⽂献来了解更多的细节
迭代MapReduce胶柱鼓瑟 :最近提出的⼏个系统,包括HaLoop和Twister,它们提供了可以让⽤户循环跑⼀系列的MapReduce任务的迭
代式MapReduce模型。这些系统在迭代之间保持数据分区⼀致,Twister也可以将数据保存在内存中。RDDs可以很简单的表达以
上两个优化,⽽且我们基于spark花了200⾏代码实现了HaLoop。
批量流处理:研究⼈员最近提出了⼀些增量处理系统,这些系统是为定期接受新数据然后根据数据更新结果的应⽤服务的。⽐如,⼀
个应⽤需要实时接收新数据,然后每15分钟就将接收到的数据和前⾯15分钟的时间窗⼝的数据进⾏join聚合,将聚合的结果更新
到统计数据中。这些系统执⾏和Dryad类似的批处理,但是它们将应⽤的状态数据存储在分布式系统中。将中间结果放在RDDs中可
以提⾼处理速度。
阐释RDDs的表达⼒为什么这么丰富:为什么RDDs可以表达多种多样编程模型?原因就是RDDs的限制性对很多并⾏计算的应⽤
的影响是很⼩的。特别指出的是,虽然RDDs只能通过批量转换⽽得到,但是很多的并⾏计算的程序都是将相同的操作应⽤到⼤量的
数据条⽬中,这样使的RDDs的表达⼒变的丰富。类似的,RDDs的不变性并不是障碍,因为我们可以创建多个RDDs来表达不同版
本的相同数据集。事实上,现在很多的MapReduce的应⽤都是运⾏在不能对⽂件修改数据的⽂件系统中,⽐如HDFS。
最后⼀个问题是为什么之前的框架没有提供这中通⽤型的表达能⼒呢?我们相信这个是因为这些框架解决的是MapReduce和Dryad不
能解决的特殊性的问题,⽐如迭代,它们没有洞察到这些问题的共同原因是因为缺少了数据共享的抽象。
当我们⼀开始设计RDDs通过重新计算来达到容错的时候,这种特性同时也促使了debugging的产⽣。特别的,在⼀个任务中通过记录
RDDs的创建的⾎缘,我们可以:
1.后⾯可以重新构建这些RDDs以及可以让⽤户交互性的查询它们。
2.通过重新计算其依赖的RDD分区来达到在⼀个进程debugger中重跑任何的任务。
和传统的通⽤分布式系统的重跑debugger不⼀样,传统的需要捕获和引⽤多个节点之间的事件发⽣的顺序,RDDs这种debugger⽅式
不需要依赖任何的数据,⽽只是需要记录RDD的⾎缘关系图。我们⽬前正在基于这些想法来开发⼀个sparkdebbger。
集群编程模型:集群编程模型的相关⼯作分为以下⼏类:
第⼀,像MapReduce,Dryad以及Ciel⼀样⽀持⼀系列处理数据的操作,并且需要通过稳定的存储系统来共享数据,RDDs表达了
⼀种⽐稳定存储系统更⾼效的数据共享抽象,因为它避免了数据备份、I/O以及序列化的成本。
第⼆,⼏个数据流系统的⾼层⾯上的编程接⼝,包括DryadLINQ和FlumeJava,它们提供了语⾔集成api,使的⽤户可以通过像
map和join等操作来操作并⾏的集合。然⽽,在这些系统中,并⾏的集合是指在磁盘上的⽂件或者⼀个查询计划表达的临时数据集。
虽然这些系统在相同的查询中的操作之间组装数据的pipeline(⽐如,⼀个map操作后跟另外⼀个map),但是它们不能在查询之间
进⾏⾼效的数据共享。我们在并⾏集合模式上建⽴sparkapi,是由于它的便利性以及在集成语⾔接⼝上不要求新颖性,但是我们基于
在这些接⼝背后以RDDs作为存储抽象,就可以使的spark⽀持⼤量类型的应⽤了。
第三种系统为许多专门的需要数据共享的应⽤提供了⾼层的接⼝。⽐如,pregel⽀持迭代式的图计算应⽤、Twister和HaLoop⽀持
迭代式的MapReduce。然⽽,这些框架只是为他们⽀持的计算模型隐式的共享数据,并没有提供可以让⽤户根据⾃⼰的需求显式的共
享数据的通⽤抽象。⽐如,⼀个⽤户不能⽤Pregel或者Twister将数据加载到内存中然后决定在数据集上⾯跑什么样的查询。RDDs
提供了⼀个显式的分布式存储抽象,因此可以⽀持那些特殊系统不能⽀持的应⽤,⽐如交互式数据挖掘。
最后,⼀些系统暴露共享可变状态以使的⽤户可以执⾏内存计算。⽐如,Piccolo使的⽤户通过分布式的函数来读取和更新分布式hash表
中的单元格数据。DSM和像RAMCloud这样的key-value存储系统提供了类似的模型。RDDs和这些系统有两个⽅⾯的不同,第
⼀,RDDs基于像map,sot以及join等操作提供了⾼层的编程接⼝,然⽽,在Piccolo和DSM中的接⼝只是读取和更新表的单元格数
据。第⼆,Piccolo和DSM通过checkpoint和回滚机制实现容错,在许多应⽤中这种机制⽐机遇⾎缘机制的RDDs的容错的成本更
⼤。最后,如2.3⼩节讨论的,相对于DSM,RDDs还提供了其他的优势功能,⽐如执⾏慢的task的处理机制
缓存系统:Nectar可以通过标识通⽤的程序分析的⼦表达式来达到在DryadLINQ任务之间对中间数据结果的复⽤。这种能⼒肯定会加⼊
到基于RDD的系统中。然⽽,Nectar即没有提供基于内存的缓存(他是将数据放到分布式⽂件系统中)也不能让⽤户可以显式的对数据
集进⾏缓存控制和分区控制。Ciel和F做头 lumeJava同样可以缓存任务结果,但是也不提供基于内存的缓存或者显式的控制哪些数据可以缓存
Ananthanarayananetal已经提出在分布式⽂件系统上加⼀层基于内存的缓存来利⽤数据访问的暂时性和本地性。这种解决⽅案却是加快
了已经存在于⽂件系统中的数据访问速度,但是它在共享同⼀个应⽤中的中间结果⽅⾯并没有RDD⾼效,因为它在每⼀个stage之间仍然
需要将数据写⼊到⽂件系统中
⾎缘:在科学计算以及数据库中,捕获数据的⾎缘或者来源信息是⼀个很长时间被研究的话题了,对于像解释结果的应⽤,需要让他们可以
从其他的应⽤重新产⽣,且当在⼯作流中存在⼀个bug或者数据丢失的时候可以重新对数据进⾏计算。对于这边⾯的容错的⼯作,我们推
荐读者看第[5]和[9]的资料。RDDs提供了⼀种并⾏编程模型,记录跟踪细粒度的⾎缘的成本很低,我们可以根据⾎缘来达到容错的⽬
的。
我们基于⾎缘的容错机制和MapReduce以及Dryad⼀个任务中的容错机制是类似的,在DAG中跟踪任务的依赖。然⽽,在这些系统
中,⼀个任务结束后⾎缘信息也就丢失了,⽤户需要依靠数据备份式的存储系统来共享任务之间的数据。与此相反,RDDs可以在任务之间
通过将数据存储在内存中达到⾼效的数据共享,并不需要数据的备份和磁盘的I/O关系型数据库:RDDs在概念上和数据库中的视图⽐较
类似,存储的RDDs则像具体的视图。然⽽,像DSM系统,数据库⼀般允许细粒度的对所有的数据进⾏读写访问,这种⽅式需要对操作
和数据进⾏记录⽇志,⽤于容错,以及需要额外的开销来保持数据的⼀致性,对于粗粒度转换模型的RDDs来说,这些额外的开销式不需
要的。
我们已经展⽰了在集群应⽤中⼀个⾼效的,通⽤的以及容错的对共享数据的抽象的RDDs。RDDs可以表达⼤量的并⾏应⽤,包括特殊的编
程模型提出的迭代式计算以及这些模型表达不了的新的应⽤。和已经存在的对集群存储抽象不同的是,RDDs提供了基于粗粒度转换的
api,可以使的⽤户通过⾎缘达到⾼效的容错。我们在spark系统中实现了RDDs,在迭代式的应⽤中,性能是hadoop的20倍,并且可
以⽤于交互式的查询数百GB的数据。
我们已经在中开源了Spark,作为⼀个稳定的数据分析和系统研究的⼿段。
WethankthefirstSparkusers,includingTimHunter,LesterMackey,DilipJoseph,andJibinZhan,fortryingoutoursystem
intheirrealapplications,providingmanygoodsuggestions,
alsothankourshepherd,EdNightingale,searchwassupportedinpartby
BerkeleyAMPLabsponsorsGoogle,SAP,AmazonWebServices,Cloudera,Huawei,IBM,Intel,Microsoft,NEC,NetAppand
VMWare,byDARPA(contract#FA8650-11-C-7136),byaGooglePhDFellowship,andbytheNaturalSciencesand
EngineeringResearchCouncilofCanada.
(⽼汤)
(那伊抹微笑)
更多推荐
piccolo是什么意思colo在线翻译读音例句
发布评论