Spark Core基础知识

1、RDD

Resilient Distributed Dataset (RDD),弹性分布式数据集

弹性是指什么?

1、内存的弹性:内存与磁盘的自动切换
2、容错的弹性:数据丢失可以自动恢复
3、计算的弹性:计算出错重试机制
4、分片的弹性:根据需要重新分片

分布式

就是RDD中的计算逻辑根据分区划分Task发送Executor(不同节点)执行

数据集

RDD中是划分了分区了,有数据的引用,不是数据的存储

主要特性

  • A list of partitions 分区
  • A function for computing each split 每个切片有一个计算
  • A list of dependencies on other RDDs RDD之间是相互依赖的
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) key/value形式的有分区器
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 有优选的位置去计算分片

2、RDD依赖

血缘

package com.journey.core.wc;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class WordCount {
 public static void main(String[] args) {
 SparkConf conf = new SparkConf()
 .setAppName("WordCount")
 .setMaster("local[*]");
 JavaSparkContext sc = new JavaSparkContext(conf);
 // 如果是集群上运行,直接就是hdfs路径了
 JavaRDD<String> lineRDD = sc.textFile("datas/wc", 4);
 System.out.println(lineRDD.toDebugString());
 System.out.println("*************************************");
 JavaRDD<String> wordsRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
 @Override
 public Iterator<String> call(String line) throws Exception {
 return Arrays.asList(line.split(" ")).iterator();
 }
 });
 System.out.println(wordsRDD.toDebugString());
 System.out.println("*************************************");
 JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
 @Override
 public Tuple2<String, Integer> call(String word) throws Exception {
 return Tuple2.apply(word, 1);
 }
 });
 System.out.println(wordToPairRDD.toDebugString());
 System.out.println("*************************************");
 JavaPairRDD<String, Integer> word2CountRDD = wordToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
 @Override
 public Integer call(Integer v1, Integer v2) throws Exception {
 return v1 + v2;
 }
 });
 System.out.println(word2CountRDD.toDebugString());
 System.out.println("*************************************");
 List<Tuple2<String, Integer>> result = word2CountRDD.collect();
 System.out.println(result);
 sc.stop();
 }
}

输出结果 :

(4) datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
 | datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
 | datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
 | datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
 | MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
 | datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
 | datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) ShuffledRDD[4] at reduceByKey at WordCount.java:47 []
 +-(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
 | MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
 | datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
 | datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
[(Spark,2), (Hello,4), (World,1), (Mayun,1)]

注意 : 前面4是指分区,有缩进是说明划分了Stage了,不在同一个Stage,其实也就是有了shuffle的操作了

RDD的依赖关系

  • 窄依赖
    表示上游的Partition最多被下游RDD的一个Partition使用

    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
     override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
  • 宽依赖
    表示上游的Partition可以多个下游RDD的Partition依赖,会产生shuffle

    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     @transient private val _rdd: RDD[_ <: Product2[K, V]],
     val partitioner: Partitioner,
     val serializer: Serializer = SparkEnv.get.serializer,
     val keyOrdering: Option[Ordering[K]] = None,
     val aggregator: Option[Aggregator[K, V, C]] = None,
     val mapSideCombine: Boolean = false,
     val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
     extends Dependency[Product2[K, V]] with Logging {
  • RDD阶段划分
    RDD会组装成一个DAG,DAG是根据宽依赖进行的阶段的划分,从最后一个RDD开始进行向前找,找到有宽依赖的切分Stage,产生一个新的阶段。这个主要是由DAGScheduler来完成
  • RDD任务划分
    RDD任务切分中的术语 : Application、Job、Stage和Task

    • Application : 初始化一个SparkContext即生成一个Application
    • Job : 一个Action算子就会生成一个Job
    • Stage : 遇到宽依赖就会产生一个新的Stage,但是默认是有最后一个Stage的,也就是ResultStage,这样Stage的个数就是 宽依赖个数 + 1主要是DAGScheduler来完成
    • Task : 一个Stage阶段中,最后一个RDD的分区个数就是Task的个数,任务的调度主要是TaskScheduler来完成

3、RDD持久化

RDD Cache缓存

先看一个示例 :

package com.journey.core.rdd.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.List;
public class CacheRDD {
 public static void main(String[] args) {
 SparkConf sparkConf = new SparkConf()
 .setAppName("CacheRDD")
 .setMaster("local[*]");
 JavaSparkContext sc = new JavaSparkContext(sparkConf);
 List<Integer> nums = new ArrayList<>();
 nums.add(1);
 nums.add(2);
 nums.add(3);
 JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);
 // 每个元素 * 10
 JavaRDD<Integer> numsRDDTransform = numsRDD.map(new Function<Integer, Integer>() {
 @Override
 public Integer call(Integer v1) throws Exception {
 System.out.println("****************************");
 return v1 * 10;
 }
 });
 numsRDDTransform.collect().forEach(System.out::println);
 numsRDDTransform.collect().forEach(System.out::println);
 sc.stop();
 }
}

打印输出 :

****************************
****************************
****************************
10
20
30
****************************
****************************
****************************
10
20
30

案例想说明什么呢?想法很简单,就是想重复利用一下numsRDDTransform这个算子,然后下面分两个Job来搞事情,但是惊奇的发现,咦?好像就是简单的变量复用了而已,发现计算还是从头开始计算,这里面可以看出两个点:1、RDD默认是不缓存数据的,只是数据搬运工(通过计算逻辑) 2、同时验证RDD血缘依赖,都是从头开始进行执行的

那怎么办呢?我不想让它重复的计算,比如说某些情况下,对重要的前面的逻辑计算,说白了,就是计算出结果很难,前面的结果对后面很重要,而且可能对下面要多出Job使用,或者怕之后计算有什么问题,可以从上面缓存中拿出结果容错执行,所以,来吧,缓存一下吧

作者:journey原文地址:https://segmentfault.com/a/1190000043770463

%s 个评论

要回复文章请先登录注册