创建SparkSession类型对象
import org.apache.spark.sql.SparkSession
val spark = Spark.Session.builder().getOrCreate()
如果用户进入Spark-shell中,则自动创建spark对象(另一个是sc)
隐式转换包
import spark.implicits._
隐式转换包用于构建DataFrame(从RDD、Hive等来源构建)
数据读取
park.read.format('json').load("file:///xxxx/xxxx.json") //读取json
spark.read.format('parquet').load("file:///xxx/xxx.parquet") //读取parquet
spark.read.format('csv').load("file:///xxx.xxx.csv") //读取csv
数据写出
df.write.format('json').save("people.json") //写出json
df.write.format('parquet').save("people.parquet") //写出parquet
df.write.format("csv").save("people.csv") //写出csv
常用操作
df.printSchema()
df.select(df("name"), df("age") + 1).show()
df.filter(df("age") > 20).show()
df.groupBy("age").count().show()
df.sort(df("age").desc).show()
df.sort(df("age).desc, df("name").asc).show()
df.select(df("name").as("username"), df("age")).show()
从RDD到DataFrame
反射机制
case class Person(name:String, age:Int)
personRDD = spark.sparkContext.textFile("file:///usr/local/spark/..../people.txt").|
map(_.split(",")).map(attribute=>Person(attribute(0), attribute(1).trim.toInt)).toDF()
//注册临时表
personRDD.createOrReplaceTempView("person")
spark.sql("select * from person").show
编程方式
import org.apache.spark.sql.types
import org.apache.spark.sql.Row
//制作表头
val fields = Array(StructField("name", StringType, true), |
StructField("age", LongType,true))
val schema = StructType(fields)
//制作表
val peopleRDD = spark.sparkContext.
|textFile("file:///usrlocal/spark/.../people.txt")
val rowRDD = peopleRDD.map(_.split(",")).
|map(attributes=>Row(attributes(0), attributes(1).trim.toInt))
//合并
val peopleDF = spark.createDataFrame(rowRDD,schema)
peopleDF.createOrReplaceTempView("people")
连接外部数据库
记得先把jdbc mysql驱动拷贝到./spark/jars下,之后
spark-shell --jars /usr/local/.../connector-java.5.1.40-bin.jar \
--driver-class-path /usr/local/mysql-connector-java/mysql-connector-java-5.1.40-bin.jar
之后进入到交互式环境:
val jdbcDF = spark.read.format("jdbc").
|option("url","jdbc:mysql://localhost:3306/spark").
|option("driver","com.mysql.jdbc.Driver").
|option("dbtable", "student").
|option("user","root").
|option("password":"hadoop").
|load() //load 加载到内存
jdbcDF.show()
增加记录
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row //封装头对象
val studentRDD = spark.sparkContext.parallelize( \
Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
//制作表头
val schema = StructType(List(StructField("id",IntegerType, true),| //这里使用Array也可以
StructField("name", StringType, true), |
StructField("gender", StringType, true), |
StructField("age", IntegerType, true)))
dataRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
val studentDF = spark.createDataFrame(rowRDD, schema)
//写入数据
val prop = new Properties()
prop.put("user", "root")
prop.put("password","hadoop")
prop.put("driver","com.mysql.jdbc.Driver")
studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark",\
"spark.student", prop)