阅读背景:

flink的reduce和reduceGroup性能测试_功夫老五的博客_reducegroup

来源:互联网 

reduceGroup 的性能官方说要高于reduce,下面是测试代码

package com.taobao.test

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

import scala.reflect.internal.util.Collections
object reduceTask {
  def main(args: Array[String]): Unit = {
    val env =ExecutionEnvironment.getExecutionEnvironment
    val elements: DataSet[List[(String, Int)]] =env.fromElements(List(("scala",2),("java",3),("java",1)))

    val tup_map: DataSet[(String, Int)] =elements.flatMap(x=>x)//拆开里面的list,编程tuple
    val group_map: GroupedDataSet[(String, Int)] = tup_map.groupBy(x=>x._1)
    val startTime = System.currentTimeMillis()
    val reduce= group_map.reduce((x,y)=>(x._1,x._2+y._2))
    reduce.print()
    val endTime =System.currentTimeMillis()
    println("reduce 共花费::"+(endTime-startTime))
//下面这个效率跟高
    val startTime2 = System.currentTimeMillis()
    val a= group_map.reduceGroup{
      (in:Iterator[(String,Int)],out:Collector[(String,Int)])=>
        val result = in.reduce((x,y)=>(x._1,x._2+y._2))
       out.collect(result)
    }
    a.print()
    val endTime2 =System.currentTimeMillis()
    println("reduceGroup 共花费::"+(endTime2-startTime2))
  }


}

pa



你的当前访问异常,请进行认证后继续阅读剩余内容。

分享到: