定义
RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建,为此,RDD支持丰富的转换操作(如map, join, filter, groupBy等),通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。
基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区。
总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。
可以说Spark最初也就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD的关系类似于Hadoop-MapReduce关系。
RDD特点
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
分区
如下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了,如下图所示。
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。下图是RDD所支持的操作算子列表。
依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
通过RDDs之间的这种依赖关系,一个任务流可以描述为DAG(有向无环图),如下图所示,在实际执行过程中宽依赖对应于Shuffle(图中的reduceByKey和join),窄依赖中的所有转换操作可以通过类似于管道的方式一气呵成执行(图中map和union可以一起执行)。
缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。
checkpoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
小结
总结起来,给定一个RDD我们至少可以知道如下几点信息:1、分区数以及分区方式;2、由父RDDs衍生而来的相关依赖信息;3、计算每个分区的数据,计算步骤为:1)如果被缓存,则从缓存中取的分区的数据;2)如果被checkpoint,则从checkpoint处恢复数据;3)根据血缘关系计算分区的数据。
编程模型
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。
应用举例
下面介绍一个简单的spark应用程序实例WordCount,统计一个数据集中每个单词出现的次数,首先将从hdfs中加载数据得到原始RDD-0,其中每条记录为数据中的一行句子,经过一个flatMap操作,将一行句子切分为多个独立的词,得到RDD-1,再通过map操作将每个词映射为key-value形式,其中key为词本身,value为初始计数值1,得到RDD-2,将RDD-2中的所有记录归并,统计每个词的计数,得到RDD-3,最后将其保存到hdfs。
import org.apache.spark._import SparkContext._object WordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: WordCount"); System.exit(1); } val conf = new SparkConf().setAppName("WordCount") val sc = new SparkContext(conf) val result = sc.textFile(args(0)) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) result.saveAsTextFile(args(1)) }}
小结
基于RDD实现的Spark相比于传统的Hadoop MapReduce有什么优势呢?总结起来应该至少有三点:1)RDD提供了丰富的操作算子,不再是只有map和reduce两个操作了,对于描述应用程序来说更加方便;2)通过RDDs之间的转换构建DAG,中间结果不用落地;3)RDD支持缓存,可以在内存中快速完成计算。
RDD-杂
RDD的创建方式
- 集合转换;
- 从文件系统 (本地文件、HDFS.、HBase)输入;
- 从父RDD转换(为什么需要父RDD呢?容错, 下面会提及);
RDD的计算类型
- Transformation:延迟执行,一个RDD通过该操作产生新的RDD时不会立即执行,只有等到Action操作才会真正执行。
- Action:提交Spark作业,当Action时,Transformation 类型的操作才会真正执行计算操作,然后产生最终结果输出。
- Hadoop提供处理的数据接口有Map和Reduce,而Spark提供的不仅仅有Map和Reduce,还有更多对数据处理的接口。
容错Lineage
每个RDD都会记录自己所依赖的父RDD, 一旦出现某个RDD的某些Partition丢失,可以通过并行计算迅速恢复,这就是容错。
RDD的依赖又分为Narrow Dependent(窄依赖)和Wide Dependent (宽依赖)- 窄依赖:每个Partition最多只能给一个RDD使用,由于没有多重依赖,所以在一个节点上可以一次性将Partition处理完,且一旦数据发生丢失或者损坏,可以迅速从上一个RDD恢复
- 宽依赖:每个Partition可以给多个RDD使用,由于多重依赖,只有等到所有到达节点的数据处理完毕才能进行下一步处理,
一且发生数据丢失或者损坏则完蛋了,所以在此发生之前,必须将上一次所有节点的数据进行物化(存储到磁盘上)处理,这样达到恢复。
宽、 窄依赖缓存策略
Spark通过useDisk、 useMemory、deserialized、 replication4个参数组成11种缓存策略。
- useDisk:使用磁盘缓存(boolean) .
- useMemory:使用内存缓存(boolean).
- deserialized:反序列化(序列化是为了网络将对象进行传输,boolean: true 反序列化false序列化) .
- replication:副本数量(int).
通过StorageLevel类的构造传参的方式进行控制,结构如下:
class StorageLevel private (useDisk : Boolean ,useMemory : Boolean , deserialized : Boolean ,replication: Ini)RDD运行流程
RDD在Spark中运行大概分为以下三步:
- 创建RDD对象;
- DAGScheduler模块介入运算,计算RDD之间的依赖关系,RDD之间的依赖关系就形成了DAG;
- 每一个Job被分为多个Stage。划分Stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个Stage,避免多个Stage之间的消息传递开销;
整体流程如下图:
以下面一个按 A-Z首字母分类,查找相同首字母下不同姓名总个数的例子来看一下 RDD 是如何运行起来的:
- 创建 RDD:上面的例子除去最后一个 collect 是个Action,不会创建 RDD 之外,前面四个转换都会创建出新的 RDD 。因此第一步就是创建好所有 RDD( 内部的五项信息 )?
- 创建执行计划:Spark 会尽可能地管道化,并基于是否要重新组织数据来划分阶段 (stage) ,例如本例中的 groupBy() 转换就会将整个执行计划划分成两阶段执行。最终会产生一个 DAG(directed acyclic graph ,有向无环图 ) 作为逻辑执行计划(logical plan)。
- 调度任务:将各阶段划分成不同的任务 (task) ,每个任务都是数据和计算的合体。在进行下一阶段前,当前阶段的所有任务都要执行完成。因为下一阶段的第一个转换一定是重新组织数据的,所以必须等当前阶段所有结果数据都计算出来了才能继续。
参考: