第一种:
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import scala.reflect.io.Path
object joinTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("joinTest")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val lines: Dataset[String] = spark.createDataset(List("1,jige,china","2,AJ,Japan","3,杨玉环,Tang"))
//整理数据
val Person: Dataset[(Int, String, String)] = lines.map(line => {
val word: Array[String] = line.split(",")
val id: Int = word(0).toInt
val name: String = word(1).toString
val country: String = word(2).toString
(id, name, country)
})
val df1: DataFrame = Person.toDF("id","name","country")
println("----df1----")
df1.show()
val lines2: Dataset[String] = spark.createDataset(Array("china,中国","Japan,日本"))
val nations: Dataset[(String, String)] = lines2.map(x => {
val word: Array[String] = x.split(",")
val ename: String = word(0).toString
val cname: String = word(1).toString
(ename, cname)
})
val df2: DataFrame = nations.toDF("ename","cname")
println("----df2----")
df2.show()
/**
* 第一种:创建视图
*/
df1.createTempView("t1")
df2.createTempView("t2")
//join语句:查询字段 + 第一张表 关联(join)第二张表 + 条件(on)+ 两张表字段相同
val r: DataFrame = spark.sql("SELECT id,name,cname,ename FROM t1 JOIN t2 ON country = ename")
println("----关联结果----")
r.show()
/**
* 第二种,用dataframeAPI
*/
//join连接类型有很多:`inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`,
// * `right`, `right_outer`, `left_semi`, `left_anti`.
//如果最后不加join类型,默认是"inner"类型,等价于val rj: DataFrame = df1.join(df2,$"country" === $"ename")
val rj: DataFrame = df1.join(df2,$"country" === $"ename","inner")
println("----innerJoin----")
rj.show()
//左连接和左外连接结果相同`left`, `left_outer`
val left: DataFrame = df1.join(df2,$"country" === $"ename","left")
println("----leftJoin----")
left.show()
}
}
import org.apache.spark.s