reduceGroup 的性能官方说要高于reduce,下面是测试代码
package com.taobao.test
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import scala.reflect.internal.util.Collections
object reduceTask {
def main(args: Array[String]): Unit = {
val env =ExecutionEnvironment.getExecutionEnvironment
val elements: DataSet[List[(String, Int)]] =env.fromElements(List(("scala",2),("java",3),("java",1)))
val tup_map: DataSet[(String, Int)] =elements.flatMap(x=>x)//拆开里面的list,编程tuple
val group_map: GroupedDataSet[(String, Int)] = tup_map.groupBy(x=>x._1)
val startTime = System.currentTimeMillis()
val reduce= group_map.reduce((x,y)=>(x._1,x._2+y._2))
reduce.print()
val endTime =System.currentTimeMillis()
println("reduce 共花费::"+(endTime-startTime))
//下面这个效率跟高
val startTime2 = System.currentTimeMillis()
val a= group_map.reduceGroup{
(in:Iterator[(String,Int)],out:Collector[(String,Int)])=>
val result = in.reduce((x,y)=>(x._1,x._2+y._2))
out.collect(result)
}
a.print()
val endTime2 =System.currentTimeMillis()
println("reduceGroup 共花费::"+(endTime2-startTime2))
}
}
pa