阅读背景:

Spark系列——从零学习SparkSQL编程(下)

来源:互联网 

5. 导入Java依附要应用SparkSQL的API,首先要导入Scala,Spark,SparkSQL的依附:<properties><scala.version>2.11.8</scala.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.0.2</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.32</version></dependency><!-- spark sql 依附--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.0.2</version></dependency></dependencies>6. Java代码操作DataFrame1.DataFrame作为SparkSQL的核心API,它是通过SparkContext来获得,代码以下:  //1.创立spark session,并指定appName,须要将义务提交到哪里。val spark = newSparkSession.Builder().appName("CaseClassSchema").master("local[2]").getOrCreate()//2.获得SparkContext,后面所有的SparkSQL操作都须要该高低文。val sc: SparkContext = spark.sparkContext上文中,master指定的是sparkSQL的履行环境,可以是集群也能够是本地,这里的local[2]指定的是本地单机运行模式,应用2条线程来履行义务,注意这里local必需是小写的。SparkSession是SparkContext的升级版,他支撑HiveContext和SparkContext。2.通过SparkContext后我们可以获得到数据对应的DataFrame,代码以下://3.获得每行内容的RDD,并通过schema将RDD转化成DFval lineRdd: RDD[Array[String]] =sc.textFile("hdfs://node01:8020/spark_res/people.txt").map(_.split(", "))val peopleRdd: RDD[People] = lineRdd.map(x => People(x(0), x(1).toInt))import spark.implicits._val peopleDF: DataFrame = peopleRdd.toDF//4.对DF进行操作peopleDF.printSchema()peopleDF.show()println(peopleDF.head())println(peopleDF.count())peopleDF.columns.foreach(println)应用DataFrame之前,必需导包,否则没法应用  toDF 办法。3.DataFrame操作SQL有两种方法,DSL和SQL,代码以下://DSLpeopleDF.select("name","age").show()peopleDF.filter($"age">20).groupBy("name").count().show//SQLpeopleDF.createOrReplaceTempView("t_people")spark.sql("select * from t_people order by age desc").show4.SQL操作终了后必需关闭sparkContext和SparkSession,代码分离是 sc.stop() 和  spark.stop()除读取普通文件,还可以读取mysql orcale数据,代码以下:val properties = new Properties()properties.setProperty("user","root")properties.setProperty("password","123456")//重点代码,衔接JDBCval ipLocationDF: DataFrame =spark.read.jdbc("jdbc:mysql://localhost:3306/iplocation","iplocation",properties)ipLocationDF.printSchemaipLocationDF.show7. 保留DataFrame的成果除读取数据,DataFrame还供给了一整套保留处置数据成果的机制,代码以下:object SparkSqlToMysql {def main(args: Array[String]): Unit = {val sc: SparkContext = ...//1.通过spark session读取json数据,并返回DataFrameval peopleDF: DataFrame = spark.read.json(args(0))//2.将DataFrame注册为t_people表,并对该表进行SQL语句操作peopleDF.createOrReplaceTempView("t_people")val resultDF = spark.sql("select * from t_people")//3.对上个SQL语句的操作成果进行保留val properties = new Properties()properties.setProperty("user","root")properties.setProperty("password","123456")resultDF.write.jdbc("jdbc:mysql://192.168.52.105:3306/iplocation","spark_save_result",properties)//close sparkcontext sparksession..}}须要注意的是resultDF.write,其返回DataFrameWriter。1. 该类可以保留任何SQL的成果,并且由于API的方便性,可以保留成多种格局,如 text ,json ,orc,csv,jdbc等。2. 对保留的数据,体系供给了几种保留模式,可以通过mode(String)来指定:overwrite : 重写文件内部数据append : 将新增内容添加到文件末尾ignore : 如果文件已存在 则疏忽操作error : default option, 如果文件存在,则抛出异常8. 总结1. 在SparkSQL系列中,我们首先介绍了SparkSQL的核心API DataFrame,DataFrame内部份为RDD基本散布式数据集和Schema元信息。DataFrame的SQL代码在履行之前会经过Catalyst优化,变成高效的处置代码。接着我们介绍了通过 spark-shell 和 java api 两种客户端窗口操作DataFrame。2. 创立DataFrame有两种方法:1. 通过 rdd.toDF 直接将rdd转换成DataFrame。2. 通过  spark.read 直接读取各种格局的数据。3. 查看DataFrame的内容有两种:1. 通过  df.printSchema 查看数据构造。2. 通过  df.show 查看数据内容。4. df供给了DSL和SQL两种作风的来操作数据。对DSL作风,常见的办法有  select() filter() 等。5. 本篇文章在后半部份重要介绍SparkSQL如何与mysql进行交互,除此以外,还支撑Parquet,ORC,JSON,Hive,JDBC , avro协定文件等交互。5. 导入Java依附要应用SparkSQL的API,首先要导入Scala,Spark,Sp




你的当前访问异常,请进行认证后继续阅读剩余内容。

分享到: