1、持久化(缓存)持久化操作包含:1存储策略,2内存LRU回收算法,3chickPoint,4Tachyon存储策略(存储级别)使用pe rsist(存储级别)对rdd进行存储设置。存储级别的设置,主要是为了解决统一rdd数据被多次访问,而多次重新计算所带来的问题。 为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出R DD的节点会分别保存它们所求出的分区数据(保留中间的计算结果,以后不需要重新计算该过程)。如果一个有持久化数据的节点发生故障, Spark会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上 (就是在存储级别后添加_2表示备份两份,如:MEMORY_ONLY_2)。Persist(MEMORY_ONLY)=cache( )存储级别级别使用的空间CPU时间备注MEMORY_ONLY高低MEMORY_ONLY_SER低高MEMORY_AND_DI SK高中等如果数据在内存中放不下,则溢写到磁盘上MEMORY_AND_DISK_SER低高如果数据在内存中放不下,则溢写到磁盘上。 在内存中存放序列化后的数据DISK_ONLY低高注:存储到磁盘中会消耗IO资源。序列化操作会再用CPU资源LRU回收算法(缓存淘汰 算法)LRU即页面置换算法原理LRU(Least?recently?used,最近最少使用)算法根据数据的历史访问记录来进行淘汰数 据(淘汰访问机率低的数据),其核心思想是“如果数据最近被访问过,那么将来被访问的几率也更高”。?实现最常见的实现是使用一个链表保存 缓存数据,详细算法实现如下:1.?新数据插入到链表头部;2.?每当缓存命中(即缓存数据被访问),则将数据移到链表头部;3.?当链表 满的时候,将链表尾部的数据丢弃。当存在热点数据时,LRU的效率很好,但偶发性的、周期性的批量操作会导致LRU命中率急剧下降,缓存污 染情况比较严重。命中时需要遍历链表,找到命中的数据块索引,然后需要将数据移到头部。LRU算法就是我们在jvm垃圾回收中说的大GC与 小GC的回收原则(解释了保留什么样的数据,移除什么样的数据)参考:http://flychao88.iteye.com/blog/ 1977653CheckPoint1checkpoint检查点的目的有两个:(1)数据的共享(多次调用)(2)rdd的数据恢复 对于数据共享,我们用存储策略,常见的是cache()、persist(),事实上他们也能够起到数据恢复的作用。但我们一般 用它们来做数据共享,而不是数据恢复。这是因为cache是存储在内存中,而内存是会进行自动清理的,当数据真的发生丢失的时候,是达不到 数据恢复效果的。而persist()虽然可以设置成存储导disk即磁盘中,但是若是磁盘坏了,数据也是无法恢复的(本质上存储级别都是 将数据存贮到本地上,即集群worker上,而没有借助外部的存储系统,如hdfs、tachoy)。此时我们利用checkpoint 进行检查点设置,将checkpoint存储位置设置为hdfs。由于hdfs是副本存放的,因此可以保证数据安全。当然checkpoi t的代价是很高的,在面对大量数据的时候,需要进行大量复制数据,消耗资源。因此我们不用chickpoint来进行数据共享,之前说过 RDD4十分宝贵,需要将其进行持久化,首先考虑的是persist然后考虑的才是chickpoint,同时又说宁愿重新计算出RDD4 也不使用chickpoint,因为chickpoint太耗时。checkpoint的工作原理可以对任意rdd进行checkpoin t,此时就会将RDD的这个状态标记为checkpointing,也就是将rdd中的数据存入到了hdfs上,当数据丢失时系统会自动根 据checkpoint的标记进行数据恢复。在进行checkpoint时最好先进行cache操作,这是因为checkpoint的操作 时需要一个单独的job来完成的,并且对于一个spark程序来说,只有遇见action算子才会进行真正的操作。而每次操作都会执行sp ark的程序入口。如果你不用chache将数据先缓存到内存中,那么当程序执行到action算子的时候,此时整个程序会从hdfs获取 一次数据,然后,还会在新产生一个job,去执行checkpoint,这个时候还会再一次从hdfs上读取数据;(这相当于重复计算了两 次RDD数据)若你chache了那么,checkpoint就从内存中读取数据,而不用重复计算。(为什么从内存读取数据就不用重新计算 了呢?我们之前一直认为将数据存储到内存中,的目的是为了下次计算快,其实这完全是错误的。我们将数据之久化到内存中,要知道这里的数据指 的是什么样的数据,这里的数据是一个结果数据,就是rdd在该阶段已经计算出的结果数据,自然我们从内存中调用这样的结果数据,就不用去计 算了,已经计算好了。这才是持久化的目的)4程序5结果:注意:checkpoint一般用于shuffle阶段的RDD的数据存储的恢复 。非shuffle阶段的rdd在数据恢复的时候,可以根据父RDD相应分区进行恢复,无需设置检查点。TachyonTachyon是s park生态系统中的分布式内存文件系统,Tachyon从spark单独分离出来,使得Spark更加专注于数据计算。分布式内存计算所 面临的问题以及问题的解决:1、多线程数据共享:问题:当两个Spark作业需要共享数据时,必须通过写磁盘操作。比如:作业1要先把生成 的数据写入HDFS,然后作业2再从HDFS把数据读出来。在此,磁盘的读写可能造成性能瓶颈。分析:首先spark的内存指的就是exe cutor,而且executor是jvm中的一个线程(对内存)。虽然同一个executor线程内部的所有task是可以进行数据共享 的,这个是通过chache来实现的。但是线程之间是不能进行数据共享的。要想进行数据共享,就需要通过借助一个外部存储系统来统一存储共 享数据,正如问题所描述的那样,但是hdfs是基于磁盘的,因此在读写数据的时候,会参数大量的IO消耗。造成数据处理困难。解决:通过额 外安装tachyon内存分布式系统,如下图所示。当两个Spark作业需要共享数据时,无需再通过写磁盘,而是借助Tachyon进行内 存读写,从而提高计算效率。(从内存读)垃圾回收数据丢失:问题:由于Spark会利用自身的JVM对数据进行缓存(executor) ,当Spark程序崩溃时,JVM进程退出,所缓存数据也随之丢失(exeutor数据丢失),因此在工作重启时又需要从HDFS把数据再 次读出。分析:由于executor就是所说的JvM中的堆内存,也就是说当spark关闭后,内存中的数据就会消失。解决:在使用Tac hyon对数据进行缓存后,即便在Spark程序崩溃JVM进程退出后,所缓存数据也不会丢失。这样,Spark工作重启时可以直接从Ta chyon内存读取数据了。(数据不丢失)内存不足导致大GC使得线程停止:问题:当两个Spark作业需操作相同的数据时,每个作业的J VM都需要缓存一份数据,不但造成资源浪费,也极易引发频繁的垃圾收集(存不下了),造成性能的降低。分析:现在我们把数据存到JVM里面 。我们要想这么样的一个事,如果我们的数据不断的存,不断的存,迟早就存不下。存不下来以后,会进行垃圾回收(GC)。GC有分为fu llGC(大回收),minorGC(小回收)。FullGC的时候,会让工作线程停止。这个时候只运行垃圾回收的线程!!!这 样从某种角度讲,会让我们的代码运行速度变慢。因此如果需要对Spark集群进行调优,那么可以考虑使用这个Tachyon。解决:当两个 Spark作业需要操作相同的数据时,它们可以直接从Tachyon获取,并不需要各自缓存一份数据,从而降低JVM内存压力,减少垃圾收 集发生的频率。Tachyon的架构包括:Master:Worker:Client:在使用Tachyon的时候,我们将Spar kWork节点放到TachyonWorker节点上,SparkWork通过TachyonClient层到TachyonWorke r中提取数据。RDD的常用算子性能distinct()操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确 保每个元素都只有一份。union(other):处理来自多个数据源的日志文件,不对数据进行去重。若想去重则可以在执行filter()intersection(other):它需要通过网络混洗数据来发现共有的元素(取交集)subtract(other):它需要通过网络混洗数据来发现共有的元素(取差集)cartesian(other)笛卡儿积在我们希望考虑所有可能的组合的相似度时比较有用比如计算各用户对各种产品的预期兴趣程度,求大规模RDD的笛卡儿积开销巨大 |
|