Spark Core基础知识
1、RDD
Resilient Distributed Dataset (RDD),弹性分布式数据集
弹性是指什么?
1、内存的弹性:内存与磁盘的自动切换
2、容错的弹性:数据丢失可以自动恢复
3、计算的弹性:计算出错重试机制
4、分片的弹性:根据需要重新分片
分布式
就是RDD中的计算逻辑根据分区划分Task发送Executor(不同节点)执行
数据集
RDD中是划分了分区了,有数据的引用,不是数据的存储
主要特性
- A list of partitions 分区
- A function for computing each split 每个切片有一个计算
- A list of dependencies on other RDDs RDD之间是相互依赖的
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) key/value形式的有分区器
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 有优选的位置去计算分片
2、RDD依赖
血缘
package com.journey.core.wc;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
// 如果是集群上运行,直接就是hdfs路径了
JavaRDD<String> lineRDD = sc.textFile("datas/wc", 4);
System.out.println(lineRDD.toDebugString());
System.out.println("*************************************");
JavaRDD<String> wordsRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
System.out.println(wordsRDD.toDebugString());
System.out.println("*************************************");
JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return Tuple2.apply(word, 1);
}
});
System.out.println(wordToPairRDD.toDebugString());
System.out.println("*************************************");
JavaPairRDD<String, Integer> word2CountRDD = wordToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(word2CountRDD.toDebugString());
System.out.println("*************************************");
List<Tuple2<String, Integer>> result = word2CountRDD.collect();
System.out.println(result);
sc.stop();
}
}
输出结果 :
(4) datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
| datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
| MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
| datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) ShuffledRDD[4] at reduceByKey at WordCount.java:47 []
+-(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
| MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
| datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
[(Spark,2), (Hello,4), (World,1), (Mayun,1)]
注意 : 前面4是指分区,有缩进是说明划分了Stage了,不在同一个Stage,其实也就是有了shuffle的操作了
RDD的依赖关系
窄依赖
表示上游的Partition最多被下游RDD的一个Partition使用class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId) }
宽依赖
表示上游的Partition可以多个下游RDD的Partition依赖,会产生shuffleclass ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false, val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) extends Dependency[Product2[K, V]] with Logging {
- RDD阶段划分
RDD会组装成一个DAG,DAG是根据宽依赖进行的阶段的划分,从最后一个RDD开始进行向前找,找到有宽依赖的切分Stage,产生一个新的阶段。这个主要是由DAGScheduler来完成 RDD任务划分
RDD任务切分中的术语 : Application、Job、Stage和Task- Application : 初始化一个SparkContext即生成一个Application
- Job : 一个Action算子就会生成一个Job
- Stage : 遇到宽依赖就会产生一个新的Stage,但是默认是有最后一个Stage的,也就是ResultStage,这样Stage的个数就是 宽依赖个数 + 1,主要是DAGScheduler来完成
- Task : 一个Stage阶段中,最后一个RDD的分区个数就是Task的个数,任务的调度主要是TaskScheduler来完成
3、RDD持久化
RDD Cache缓存
先看一个示例 :
package com.journey.core.rdd.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.List;
public class CacheRDD {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setAppName("CacheRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);
// 每个元素 * 10
JavaRDD<Integer> numsRDDTransform = numsRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer v1) throws Exception {
System.out.println("****************************");
return v1 * 10;
}
});
numsRDDTransform.collect().forEach(System.out::println);
numsRDDTransform.collect().forEach(System.out::println);
sc.stop();
}
}
打印输出 :
****************************
****************************
****************************
10
20
30
****************************
****************************
****************************
10
20
30
案例想说明什么呢?想法很简单,就是想重复利用一下numsRDDTransform这个算子,然后下面分两个Job来搞事情,但是惊奇的发现,咦?好像就是简单的变量复用了而已,发现计算还是从头开始计算,这里面可以看出两个点:1、RDD默认是不缓存数据的,只是数据搬运工(通过计算逻辑) 2、同时验证RDD血缘依赖,都是从头开始进行执行的
那怎么办呢?我不想让它重复的计算,比如说某些情况下,对重要的前面的逻辑计算,说白了,就是计算出结果很难,前面的结果对后面很重要,而且可能对下面要多出Job使用,或者怕之后计算有什么问题,可以从上面缓存中拿出结果容错执行,所以,来吧,缓存一下吧