阅读背景:

spark日志数据统计、TopN

来源:互联网 

spark日志数据统计、TopN PageView UserView TopN

PageView import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Pv_count { def main(args: Array[String]): Unit = { val sparkContext: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Pv_count")) val sc: RDD[String] = sparkContext.textFile("file:///C:\Users\Administrator\Documents\tt\hello\helloworld2.txt") //一共有多少条数据 val pv = sc.count() println(pv) sparkContext.stop() } } UserView import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Uv_count { def main(args: Array[String]): Unit = { val sparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Uv_count")) val sc: RDD[String] = sparkContext.textFile("file:///C:\Users\Administrator\Documents\tt\hello\helloworld2.txt") //获取下表为0的数据为IP val allIP: RDD[String] = sc.map(_.split(" ")(0)) //去重后分区改为1,减少资源 val disRDD: RDD[String] = allIP.distinct(1) val Uv: Long = disRDD.count() println(Uv) sparkContext.stop() } } TopN import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object top { def main(args: Array[String]): Unit = { val sparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("topN")) sparkContext.setLogLevel("WARN") val sc = sparkContext.textFile("file:///C:\Users\Administrator\Documents\tt\hello\helloworld2.txt") //分割成RDD数组 val map: RDD[Array[String]] = sc.map(_.split(" ")) //过滤长度不够的数据 val filter: RDD[Array[String]] = map.filter(_.length > 6) //取到想要的下标数据+1 val andOne = filter.map(s => (s(0) ,1)) //获取key的个数和排序 val key = andOne.reduceByKey(_+_) //根据个数的多少排序 val sorted: RDD[(String, Int)] = key.sortBy(_._2,false) //得到前两名 val result: Array[(String, Int)] = sorted.take(2) //打印出来,因为是集合,所以用toBuffer方法 println(result.toBuffer) //关闭sc sparkContext.stop() } } PageView



你的当前访问异常,请进行认证后继续阅读剩余内容。

分享到: