Spark SQL知识点整理

创建SparkSession类型对象

import org.apache.spark.sql.SparkSession
val spark = Spark.Session.builder().getOrCreate()

如果用户进入Spark-shell中,则自动创建spark对象(另一个是sc)

隐式转换包

import spark.implicits._

隐式转换包用于构建DataFrame(从RDD、Hive等来源构建)

数据读取

park.read.format('json').load("file:///xxxx/xxxx.json") //读取json
spark.read.format('parquet').load("file:///xxx/xxx.parquet") //读取parquet
spark.read.format('csv').load("file:///xxx.xxx.csv") //读取csv

数据写出

df.write.format('json').save("people.json") //写出json
df.write.format('parquet').save("people.parquet") //写出parquet
df.write.format("csv").save("people.csv") //写出csv

常用操作

df.printSchema()
df.select(df("name"), df("age") + 1).show()
df.filter(df("age") > 20).show()
df.groupBy("age").count().show()
df.sort(df("age").desc).show()
df.sort(df("age).desc, df("name").asc).show()
df.select(df("name").as("username"), df("age")).show()

从RDD到DataFrame

反射机制

case class Person(name:String, age:Int)
personRDD = spark.sparkContext.textFile("file:///usr/local/spark/..../people.txt").|
map(_.split(",")).map(attribute=>Person(attribute(0), attribute(1).trim.toInt)).toDF()
//注册临时表
personRDD.createOrReplaceTempView("person")
spark.sql("select * from person").show

编程方式

import org.apache.spark.sql.types
import org.apache.spark.sql.Row
//制作表头
val fields = Array(StructField("name", StringType, true), |
                   StructField("age", LongType,true))
val schema = StructType(fields)
//制作表
val peopleRDD = spark.sparkContext.
|textFile("file:///usrlocal/spark/.../people.txt")
val rowRDD = peopleRDD.map(_.split(",")).
|map(attributes=>Row(attributes(0), attributes(1).trim.toInt))
//合并
val peopleDF = spark.createDataFrame(rowRDD,schema)
peopleDF.createOrReplaceTempView("people")

连接外部数据库

记得先把jdbc mysql驱动拷贝到./spark/jars下,之后

spark-shell --jars /usr/local/.../connector-java.5.1.40-bin.jar \
--driver-class-path /usr/local/mysql-connector-java/mysql-connector-java-5.1.40-bin.jar

之后进入到交互式环境:

val jdbcDF = spark.read.format("jdbc").
    |option("url","jdbc:mysql://localhost:3306/spark").
    |option("driver","com.mysql.jdbc.Driver").
    |option("dbtable", "student").
    |option("user","root").
    |option("password":"hadoop").
    |load() //load 加载到内存
jdbcDF.show()

增加记录

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row //封装头对象

val studentRDD = spark.sparkContext.parallelize( \
    Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
//制作表头
val schema = StructType(List(StructField("id",IntegerType, true),| //这里使用Array也可以
                             StructField("name", StringType, true), |
                             StructField("gender", StringType, true), |
                             StructField("age", IntegerType, true)))
dataRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
val studentDF = spark.createDataFrame(rowRDD, schema)

//写入数据
val prop = new Properties()
prop.put("user", "root")
prop.put("password","hadoop")
prop.put("driver","com.mysql.jdbc.Driver")

studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark",\
                                    "spark.student", prop)

发表回复

您的电子邮箱地址不会被公开。