【Spark 2.0系列】:Spark Session API和Dataset API

Dataset:Spark新的抽象层

Spark最原始的抽象基础是RDD(分布式弹性数据集),但是从Spark 2.0 开始,Dataset将成为Spark新的抽象层。所有的Spark开发者将使用Dataset API和Dataframe(Dataset子集)API编写代码,同时RDD API也还是可以用的,不过已降为low-level的API。

Dataframe API 在Spark 1.3时被引入,Dataset是Dataframe的超集。Dataset API和Dataframe API的使用带给Spark更好的性能和灵活性。Spark Streaming也将使用Dataset代替RDD。

Spark Session:Spark 2.0入口

在Spark早期版本,spark context是Spark的入口,RDD API通过context API创建。相应地,streamingStreamingContext创建;SQLsqlContext创建;hiveHiveContext创建。而到了Spark 2.0,DataSet和Dataframe API由Spark Session创建。

SparkSession包括SQLContext,HiveContext和StreamingContext的功能。Spark session实际起计算的还是spark context。

下面直接看代码吧。

创建SparkSession

使用工厂模式创建SparkSession。下面是创建SparkSession的代码:

1
2
3
4
val sparkSession = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()

上面的代码类似于创建SparkContextSQLContext。如果你需要创建hive context,你可以使用下面的代码创建SparkSession,并支持Hive。

1
2
3
4
5
val sparkSession = SparkSession.builder.
master("local")
.appName("spark session example")
.enableHiveSupport()
.getOrCreate()

enableHiveSupport开启Hive支持后就可以像HiveContext一样使用。

创建Spark Session后,可以来读取数据了。

使用Spark Session读取数据

使用Spark Session读取CSV数据:

1
2
val df = sparkSession.read.option("header","true").
csv("src/main/resources/sales.csv")

上面的代码与SQLContext类似,你可以复用原有SQLContext的代码。

WordCount

下面来个完整的WordCount例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#create SparkSession
val sparkSession = SparkSession.builder.
master("local")
.appName("example")
.getOrCreate()
#read data and convert to Dataset
import sparkSession.implicits._
val data = sparkSession.read.text("src/main/resources/data.txt").as[String]
#split and group by word
val words = data.flatMap(value => value.split("\\s+"))
val groupedWords = words.groupByKey(_.toLowerCase)
#count
val counts = groupedWords.count()
#print results
counts.show()

代码较简单,就不做解释了。

话外音

SQLContext和HiveContext在Spark 2.0中会继续使用,因为Spark是向后兼容的。但很明显,Spark官方文档建议以后使用SparkSession作为入口。


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image