WWW.lllT.neT

一:SparkRDD持久化的有点

Spark最重要的一个功能,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(action)变得更加迅速(通常快10倍)。缓存是用Spark构建迭代算法的关键。RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。

二:如何持久化

  Spark通过persist()或cache()方法可以标记一个要被持久化的RDD,一旦首次被触发,该RDD将会被保留在计算节点的内存中并重用。实际上cache()是使用persist()的快捷方法

首先,在action中计算得到rdd;然后,将其保存在每个节点的内存中。Spark的缓存是一个容错的技术,如果RDD的任何一个分区丢失,它可以通过原有的转换(transformations)操作自动的重复计算并且创建出这个分区。

此外,我们可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许我们持久化集合到磁盘上、将集合作为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到Tachyon中。我们可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用了默认的存储级别—StorageLevel.MEMORY_ONLY。完整的存储级别介绍如下图

StorageLevel 源码

valNONE=newStorageLevel(false,false,false,false) valDISK_ONLY=newStorageLevel(true,false,false,false) valDISK_ONLY_2=newStorageLevel(true,false,false,false,2) valMEMORY_ONLY=newStorageLevel(false,true,false,true) valMEMORY_ONLY_2=newStorageLevel(false,true,false,true,2) valMEMORY_ONLY_SER=newStorageLevel(false,true,false,false) valMEMORY_ONLY_SER_2=newStorageLevel(false,true,false,false,2) valMEMORY_AND_DISK=newStorageLevel(true,true,false,true) valMEMORY_AND_DISK_2=newStorageLevel(true,true,false,true,2) valMEMORY_AND_DISK_SER=newStorageLevel(true,true,false,false) valMEMORY_AND_DISK_SER_2=newStorageLevel(true,true,false,false,2) valOFF_HEAP=newStorageLevel(true,true,true,false,1)

说明:上面"_2"代表的是份数,就是把持久化的数据存为2份

StorageLevel有五个属性分别是

privatevar_useDisk:Boolean,//useDisk_是否使用磁盘 privatevar_useMemory:Boolean,//useMemory_是否使用内存 privatevar_useOffHeap:Boolean,//useOffHeap_是否使用堆外内存如:Tachyon, privatevar_deserialized:Boolean,//deserialized_是否进行反序列化 privatevar_replication:Int=1)//replication_备份数目。

三:存储级别的选择

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

1:如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。

2:如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。

3:除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。

4:如果你希望更快的错误恢复,可以利用重复存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。

看下图

注意只能设置一种:不然会抛异常: Cannot change storage level of an RDD after it was already assigned a level

异常源码如下

privatedefpersist(newLevel:StorageLevel,allowOverride:Boolean):this.type={ //TODO:HandlechangesofStorageLevel if(storageLevel!=StorageLevel.NONE&&newLevel!=storageLevel&&!allowOverride){ thrownewUnsupportedOperationException( "CannotchangestoragelevelofanRDDafteritwasalreadyassignedalevel") } //IfthisisthefirsttimethisRDDismarkedforpersisting,registerit //withtheSparkContextforcleanupsandaccounting.Dothisonlyonce. if(storageLevel==StorageLevel.NONE){ sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel=newLevel this }

四:如何使用缓存

1:调用rdd.persist();变量可以这样设置 如:rdd.persist(StorageLevel.MEMORY_ONLY); 这里使用了MEMORY_ONLY级别存储。当然也可以选择其他的如: rdd.persist(StorageLevel.DISK_ONLY());

2:调用rdd.cache()方法,cache()是rdd.persist(StorageLevel.MEMORY_ONLY)的简写,效果和他一模一样的。

3: 调用rdd.unpersist()清除缓存

我通过一个demo看下

publicclassSparkCacheDemo{ privatestaticJavaSparkContextsc; publicstaticvoidmain(String[]args){ Listlist=Arrays.asList(5,4,3,2,1,6,9); SparkConfconf=newSparkConf().setMaster("local[2]").setAppName("SparkCacheDemo"); sc=newJavaSparkContext(conf); JavaRDDrdd=sc.parallelize(list); //rdd.persist(StorageLevel.DISK_ONLY());//磁盘存储 rdd.persist(StorageLevel.MEMORY_ONLY());//内存 //rdd.persist(StorageLevel.MEMORY_ONLY_2());//内存存储两份 rdd.collect(); rdd.collect();//这里可以设置debug断点便于查看 rdd.unpersist();//清楚缓存 rdd.collect();//这里也可以设置debug断点便于查看 } }

启动后设置上面连个debug点 然后查看页面 http://127.0.0.1:4040/storage/ 可以看到相关信息 如下图

磁盘

内存

五:缓存性能测试

我们知道StorageLevel.MEMORY_ONLY级别和不用缓存的级别相差10倍,我们一起来验证下,看代码

packagecom.demo.spark.cache; importorg.apache.spark.SparkConf; importorg.apache.spark.api.java.JavaRDD; importorg.apache.spark.api.java.JavaSparkContext; importorg.apache.spark.storage.StorageLevel; publicclassSparkCacheTest{ privatestaticJavaSparkContextsc; publicstaticvoidmain(String[]args){ SparkConfconf=newSparkConf().setMaster("local[2]").setAppName("SparkCacheTest"); sc=newJavaSparkContext(conf); sc.setLogLevel("error"); noCache(); cache(); System.out.println(""); } /** *不用缓存 * *@Title:noCache *@authorzhuhuipei *@date2017年6月2日下午4:22:01 */ publicstaticvoidnoCache(){ JavaRDDrdd=sc.textFile("./test.txt"); rdd.count(); Longt1=System.currentTimeMillis(); System.out.println("noCache()=rdd.count()="+rdd.count()); Longt2=System.currentTimeMillis(); Longt2_t1=t2-t1; System.out.println("nocache()="+t2_t1); } /** *用缓存 *@Title:cache *@authorzhuhuipei *@date2017年6月2日下午5:03:51 */ publicstaticvoidcache(){ JavaRDDrdd=sc.textFile("./test.txt").persist(StorageLevel.MEMORY_ONLY()); rdd.count(); Longt1=System.currentTimeMillis(); System.out.println("cache()=rdd.count()="+rdd.count()); Longt2=System.currentTimeMillis(); Longt2_t1=t2-t1; System.out.println("cache()="+t2_t1); } }

本人本地直接运行两者相差不大差不多在(3~5倍左右),并没有达到10倍性能,可能和数据量和运行环境有关系。

源码地址:

https://github.com/zhp8341/sparkdemo/blob/master/src/main/java/com/demo/spark/cache/SparkCacheTest.java

https://github.com/zhp8341/sparkdemo/blob/master/src/main/java/com/demo/spark/cache/SparkCacheDemo.java

测试文件:https://raw.githubusercontent.com/zhp8341/sparkdemo/master/test.txt 文件有点大16M

WWW.lllT.neT

声明:有的资源来自网络转载,版权归原作者所有,如有侵犯到您的权益请联系邮箱:our333@126.com我们将配合处理!

原文地址:Spark学习笔记之RDD持久化(四)发布于2021-11-26 09:15:01