用Scala编写:
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object TransformationOperator {
/**
* map算子
* 遍历rdd中的每一个元素,可以对元素进行操作
*/
def Map: Unit ={
val conf = new SparkConf().setAppName("map").setMaster("local")
val sc = new SparkContext(conf)
val list = List("张无忌","赵敏","周芷若")
val RDD: RDD[String] = sc.parallelize(list)
val unit = RDD.map(name => "hello" + name)
unit.foreach(nn => println(nn))
}
/**
* 结果:
* hello张无忌
* hello赵敏
* hello周芷若
*/
/**
* flatMap算子
* 将数组扁平化,即降维处理
*/
def FlatMap: Unit ={
val conf = new SparkConf().setAppName("flatMap").setMaster("local")
val sc = new SparkContext(conf)
val list: List[String] = List("张无忌 赵敏","宋青书 周芷若")
val rdd: RDD[String] = sc.parallelize(list)
val line: RDD[String] = rdd.flatMap(line => line.split(" ").map(name =>"hello" + name))
// val name: RDD[String] = rdd.map("hello" + line)
line.foreach(nn => println(nn))
}
/**
* 结果:
* hello张无忌
* hello赵敏
* hello宋青书
* hello周芷若
*/
/**
* filter算子
* 过滤函数,将rdd中满足一定条件的值选择出来
*/
def Filter: Unit ={
val conf = new SparkConf().setAppName("filter").setMaster("local")
val sc = new SparkContext(conf)
val list: List[Int] = List(1,2,3,4,5,6,7,8)
val rdd: RDD[Int] = sc.parallelize(list)
rdd.filter(num => num % 2 == 0).foreach(nn => println(nn))
}
/**
* 结果:
* 2
* 4
* 6
* 8
*/
/**
* groupBykey算子
* 按照key进行分组,可以指定numPartitions来指定task数目
*/
def GroupBykey: Unit ={
val conf = new SparkConf().setAppName("groupBykey").setMaster("local")
val sc = new SparkContext(conf)
val list: List[(String, String)] = List(
new Tuple2("峨眉", "周芷若"),
new Tuple2("武当", "宋青书"),
new Tuple2("峨眉", "灭绝师太"),
new Tuple2("武当", "张三丰")
)
val rdd: RDD[(String, String)] = sc.parallelize(list)
rdd.groupByKey().foreach(menpai => {print("menpai" + menpai._1 + " ");menpai._2.foreach(nn => print(nn));println()} )
}
/**
* 结果:
* menpai峨眉 周芷若灭绝师太
* menpai武当 宋青书张三丰
*/
/**
* reduceBykey算子
* 对分组后的数据进行处理,求和,求最值等
* 可以指定分区数目
*/
def ReduceBykey: Unit ={
val conf = new SparkConf().setAppName("reduceBykey").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val list: List[(String, Int)] = List(
new Tuple2("峨眉", 40),
new Tuple2("武当", 30),
new Tuple2("峨眉", 60),
new Tuple2("武当", 99)
)
val rdd: RDD[(String, Int)] = sc.parallelize(list)
rdd.reduceByKey((v1,v2) => v1 + v2).foreach(tuple => println(tuple._1 + tuple._2))
}
/**
* 结果:
* 峨眉100
* 武当129
*/
/**
* sortBykey算子
* 按照key进行排序
* 参数1 true为升序 false为降序
* 参数2 指定分区数目 可选
*/
def SortBykey: Unit ={
val conf = new SparkConf().setAppName("sortBykey").setMaster("local")
val sc = new SparkContext(conf)
val list: List[(Int, String)] = List((98,"东方不败"),(80,"岳不群"),(85,"令狐冲"),(83,"任我行"))
val rdd: RDD[(Int, String)] = sc.parallelize(list)
rdd.sortByKey(false).foreach(tuple => println(tuple._1 + "->" + tuple._2))
}
/**
* 结果:
* 98->东方不败
* 85->令狐冲
* 83->任我行
* 80->岳不群
*/
/**
* join算子
* 按照key值连接两个rdd,可以指定分区(可选)
* 结果(d,(4,d4)) (b,(2,b2)) ------- (a,(1,a1)) (c,(3,c3))
*/
def Join: Unit ={
val conf = new SparkConf().setAppName("join").setMaster("local")
val sc = new SparkContext(conf)
val list1: List[(Int, String)] = List((1, "东方不败"),(2, "令狐冲"),(3, "林平之"))
val list2: List[(Int, Int)] = List((1, 99),(2, 98),(3, 97))
val rdd1: RDD[(Int, String)] = sc.parallelize(list1)
val rdd2: RDD[(Int, Int)] = sc.parallelize(list2)
rdd1.join(rdd2)
.foreach(tuple => println("学号:" + tuple._1 + " 名字:" + tuple._2._1 + " 分数:" + tuple._2._2))
}
/**
* 结果:
* 学号:1 名字:东方不败 分数:99
* 学号:3 名字:林平之 分数:97
* 学号:2 名字:令狐冲 分数:98
*/
/**
* union算子
* 连接两个类型相同的rdd,即拼接
*/
def Union: Unit ={
val conf = new SparkConf().setAppName("union").setMaster("local")
val sc = new SparkContext(conf)
val list1: List[Int] = List(1, 2, 3, 4)
val list2: List[Int] = List(3, 4, 5, 6)
val rdd1: RDD[Int] = sc.parallelize(list1)
val rdd2: RDD[Int] = sc.parallelize(list2)
rdd1.union(rdd2).foreach(nn => println(nn))
}
/**
* 结果:
* 1
* 2
* 3
* 4
* 18/04/23 20:54:32 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver
* 18/04/23 20:54:32 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7905 bytes)
* 18/04/23 20:54:32 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
* 3
* 4
* 5
* 6
*
*/
/**
* intersection算子
* A intersection B 求AB的交集
*/
def Intersection: Unit ={
val conf: SparkConf = new SparkConf().setAppName("intersection").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val list1: List[Int] = List(1, 2, 3, 4)
val list2: List[Int] = List(3, 4, 5, 6)
val rdd1: RDD[Int] = sc.parallelize(list1)
val rdd2: RDD[Int] = sc.parallelize(list2)
rdd1.intersection(rdd2).foreach(nn => println(nn))
}
/**
* 结果:
* 4
* 3
*/
/**
* distinct算子
* 返回包含源数据集的不同元素的新数据集。
* 去重,需要整体相同才进行去重
*/
def Distinct: Unit ={
val conf = new SparkConf().setAppName("distinct").setMaster("local")
val sc = new SparkContext(conf)
val list = List(1, 2, 3,3,4,4)
val rdd = sc.parallelize(list)
rdd.distinct().foreach(nn => println(nn))
}
/**
* 结果:
* 4
* 1
* 3
* 2
*/
/**
* cartesian算子
* 笛卡尔积
*
*/
def Cartesian: Unit ={
val conf = new SparkConf().setAppName("cartesian").setMaster("local")
val sc = new SparkContext(conf)
val list1 = List("a", "b")
val list2 = List(0, 1, 2)
val rdd1 = sc.parallelize(list1)
val rdd2 = sc.parallelize(list2)
rdd1.cartesian(rdd2).foreach(tuple => println(tuple._1 + "->" + tuple._2))
}
/**
* 结果:
* a->0
* a->1
* a->2
* b->0
* b->1
* b->2
*/
/**
* mapPartitions算子
* 一次获取的是一个分区的数据(hdfs)
* 类似于map,map是一次读取RDD中的一个值,mapPartitions是读取一个分区文件
*
*/
def MapPartitions: Unit ={
val conf = new SparkConf().setAppName("mapPartitions").setMaster("local")
val sc = new SparkContext(conf)
val list = List(1, 2, 3, 4, 5, 6)
val rdd = sc.parallelize(list,2)
rdd.mapPartitions(nn => nn,false).foreach(nb => println(nb))
}
/**
* 结果:
* 1
* 2
* 3
* 18/04/23 21:17:49 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7792 bytes)
* 18/04/23 21:17:49 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
* 18/04/23 21:17:49 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 622 bytes result sent to driver
* 4
* 5
* 6
*/
/**
* repartition算子
* 重新分区
* 即coalesce在输入分区数目P > 分片数目S
* 当shuffle为true,进行shuffle阶段,分区数目为P
*/
def Repartition: Unit ={
val conf = new SparkConf().setAppName("repartition").setMaster("local")
val sc = new SparkContext(conf)
val list = List(1, 2, 3, 4, 5, 6)
val rdd = sc.makeRDD(list,2)
rdd.repartition(3).foreach(nn => println(nn))
}
/**
* 结果:
* 3
* 5
* 18/04/24 18:43:16 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1052 bytes result sent to driver
* 18/04/24 18:43:16 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 7925 bytes)
* 18/04/24 18:43:16 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 431 ms on localhost (executor driver) (1/3)
* 18/04/24 18:43:16 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
* 18/04/24 18:43:16 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
* 18/04/24 18:43:16 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 9 ms
* 18/04/24 18:43:16 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 966 bytes result sent to driver
* 1
* 6
* 18/04/24 18:43:16 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, localhost, executor driver, partition 2, ANY, 7925 bytes)
* 18/04/24 18:43:16 INFO Executor: Running task 2.0 in stage 1.0 (TID 4)
* 18/04/24 18:43:16 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
* 18/04/24 18:43:16 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
* 18/04/24 18:43:16 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 332 ms on localhost (executor driver) (2/3)
* 2
* 4
*/
/**
* aggregateByKey算子
* 实现单词计数
* 实现优化合并
* (zeroValue:U)合并初始值
* (seqOp:(U,V)=> U,分区合并
* combOp:(U,U) => U)不同分区合并
*/
def AggregateByKey: Unit ={
val conf = new SparkConf().setAppName("aggregateByKey").setMaster("local")
val sc = new SparkContext(conf)
val list = List("you,jump", "i,jump")
val rdd = sc.parallelize(list)
rdd.flatMap(_.split(","))
.map((_,1))
.aggregateByKey(0)(_ + _ , _ + _)
.foreach(tuple => println(tuple._1 + " -> " + tuple._2))
}
/**
* 结果:
* you -> 1
* jump -> 2
* i -> 1
*/
/**
* coalesce算子
* 分区数由多变少
* 合并分区
* 当输入task数目过多时,用来减少输入的分区数目,用来进行优化
* 1、当输入分区数目 P > 分片数目S
* 当shuffle 为false,coalesce无效,分区数目为S
* 当shuffle 为true,进行shuffle阶段,分区数目为P,此时即repartition
* 2、当输入分区数目P < 分片数目S
* 当P,S数目相差不大时,shuffle设置为false,直接进行按照数目比例进行合并
* 当P,S数目相差较大时,为了保证并行度,将shuffle设置为true
*/
def Coalesce: Unit ={
val conf = new SparkConf().setAppName("coalesce").setMaster("local")
val sc = new SparkContext(conf)
val list = List(1, 2, 3, 4, 5, 6)
sc.makeRDD(list)
.coalesce(1)
.foreach(println(_))
}
/** 结果:
* 1
* 2
* 3
* 4
* 5
* 6
*/
/**
* mapPartitionsWithIndex算子
* 功能与mapPartitions类似,多了参数index,分区编号
* 功能实现,将元组按照分区读取,并添加分区号
*/
def MapPartitionsWithIndex: Unit ={
val conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local")
val sc = new SparkContext(conf)
val list = List(1, 2, 3, 4, 5, 6, 7, 8)
val rdd = sc.makeRDD(list,2)
rdd.mapPartitionsWithIndex((x,y) => y.map(tuple => (x,tuple) ) )
.foreach(println(_))
}
/**
* 结果:
* (0,1)
* (0,2)
* (0,3)
* (0,4)
* 18/04/23 22:41:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver
* (1,5)
* (1,6)
* (1,7)
* (1,8)
*/
/**
* cogroup算子
* 协同组
* 分组函数,将不同rdd按照key进行拼接分组,rdd可以为2个或者3个
* 可以指定分区数目,可选项
*/
def Cogroup: Unit ={
val conf = new SparkConf().setAppName("cogroup").setMaster("local")
val sc = new SparkContext(conf)
val list1 = List((1, "东方不败"),(2, "林平之"),(3, "岳不群"),(1, "东方不败"),(2, "林平之"),(3, "岳不群"))
val list2 = List((1, 90),(2, 91),(3, 89),(1, 98),(2, 78),(3, 67))
val rdd1 = sc.makeRDD(list1)
val rdd2 = sc.makeRDD(list2)
val rdd3 = rdd1.cogroup(rdd2)
rdd3.foreach(tuple =>println("ID:" + tuple._1 + " Name: "+ tuple._2._1 + " Scores: "+ tuple._2._2))
}
/**
* 结果:
* ID:1 Name: CompactBuffer(东方不败, 东方不败) Scores: CompactBuffer(90, 98)
* ID:3 Name: CompactBuffer(岳不群, 岳不群) Scores: CompactBuffer(89, 67)
* ID:2 Name: CompactBuffer(林平之, 林平之) Scores: CompactBuffer(91, 78)
*/
/**
* repartitionAndSortWithinPartitions 调优
* 在分区内进行排序,
* 使用说明:需要指定Partitoner,实现2个方法,自定义分区
* 也可以使用new HashPartitioner(2)
* 或者new RangePartitioner(a,b) a是分区数目,b是范围分布的数组对应的rdd
*/
def RepartitionAndSortWithinPartitions: Unit ={
val conf = new SparkConf().setAppName("repartitionAndSortWithinPartitions").setMaster("local")
val sc = new SparkContext(conf)
val list = List(1, 2, 3, 4, 5, 6, 7, 8)
val rdd = sc.parallelize(list)
.map(a=>(a,a)).repartitionAndSortWithinPartitions(new Partitioner() {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int ={
val number = Integer.valueOf(key.toString)
if (number % 2 == 0) return 0
else return 1
}
}).mapPartitionsWithIndex((x,y) => y.map(tuple => (x,tuple._1)))
.foreach(println(_))
}
/**
* 结果:
* (0,2)
* (0,4)
* (0,6)
* (0,8)
* 18/04/24 08:33:00 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1138 bytes result sent to driver
* 18/04/24 08:33:00 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, localhost, executor driver, partition 1, ANY, 7649 bytes)
* 18/04/24 08:33:00 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 107 ms on localhost (executor driver) (1/2)
* 18/04/24 08:33:00 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
* 18/04/24 08:33:00 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
* 18/04/24 08:33:00 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
* (1,1)
* (1,3)
* (1,5)
* (1,7)
*/
/**
* sample算子
* 抽样函数,有三个参数
* withReplacement: Boolean,:是否放回抽样
* fraction: Double,:从数据中抽出样本占整体比值
* seed: Long,:随机种子因子,可以不填
*/
def Sample: Unit ={
val conf = new SparkConf().setAppName("sample").setMaster("local")
val sc = new SparkContext(conf)
val list = List(1, 2, 3, 4, 5, 6, 7,9,10)
sc.makeRDD(list).sample(false,0.5)
.foreach(println(_))
}
/**
* 结果(随机):
* 1
* 2
* 3
* 4
* 6
* 7
*/
def main(args: Array[String]): Unit = {
// Map
// FlatMap
// Filter
// GroupBykey
// ReduceBykey
// SortBykey
// Join
// Union
// Intersection
// Distinct
// Cartesian
// MapPartitions
Repartition
// AggregateByKey
// Coalesce
// MapPartitionsWithIndex
// Cogroup
// RepartitionAndSortWithinPartitions
// Sample
}
}import org.apache.spark.rdd.RDD
imp