Overview
Spark 1.6 introduced a new Datasets API. It is an extension of Dataframes that supports functional processing on a collection of objects. Let's take a look at some examples of how to use them. First we'll read a JSON file and a text file into Datasets. We will apply functional transformations to parse the data. Then we will run relational queries against a Dataset.
Creating a Dataset from a JSON file
Suppose you have JSON formatted data which you would like to read into a Dataset. Here is an example JSON file:
Contents of "students.json" --
{"name":"Alice", "dept":"Math"}
{"name":"Bob", "dept":"CS"}
{"name":"Carl", "dept":"Math"}
To create a Dataset from this JSON file:
// Define the Student row type.> case class Student(name: String, dept: String)
// Read JSON objects into a Dataset[Student].
> val studentsFromJSON = sqlContext.read.json("students.json").as[Student]
Creating a Dataset from a Text file
Suppose instead you have data in a text file, in tab-separated (.tsv) format:
Alice<tab>Math<tab>18
Bob<tab>CS<tab>19
Carl<tab>Math<tab>21
To create a Dataset from this text file:
// Read the lines of the file into a Dataset[String].
> val studentsFromText = sqlContext.read.text("students.tsv").as[String]
> val studentsFromText = sqlContext.read.text("students.tsv").as[String]
(result) studentsFromText: org.apache.spark.sql.Dataset[String] = [value: string]
// We want a Dataset of type "Student".
case class Student(name: String, dept: String, age:Int)
case class Student(name: String, dept: String, age:Int)
// Functional programming to parse the lines into a Dataset[Student].
val students = studentsFromText.
map(line => {
map(line => {
val cols = line.split("\t") // parse each line
Student(cols(0), cols(1), cols(2).toInt)
})
(result) students: org.apache.spark.sql.Dataset[Student] = [name: string, dept: string, age: int]
// Show the contents of the Dataset.
> students.show()
| name|dept|age|
+-----+----+---+
|Alice|Math| 18|
| Bob| CS| 19|
| Carl|Math| 21|
Relational queries
Datasets support relational queries, with operations such as: select, filter, group by, count, avg, join.
SELECT, FILTER
Get the names of students in the Math department.
// Select two columns and filter on one column.
// Each argument of "select" must be a "TypedColumn".
// Each argument of "select" must be a "TypedColumn".
> students.select($"name".as[String], $"dept".as[String]).
filter(_._2 == "Math"). // Filter on _2, the second selected column
collect()
(result) Array((Alice,Math), (Carl,Math))
GROUP BY, COUNT
Count the number of students in each department.
// Group by department and count each group.
> students.groupBy(_.dept).count().collect()
(result) Array((CS,1), (Math,2))
GROUP BY, AVG
Average age in each department.
// Import the "avg" function.
> import org.apache.spark.sql.functions._
// Group and aggregate in each group.
> students.groupBy(_.dept).
agg(avg($"age").as[Double]).
collect()
agg(avg($"age").as[Double]).
collect()
(result) Array((CS,19.0), (Math,19.5))
JOIN
Suppose we have a separate table with deparment information. We would like to join the department information into our student table.
First, create the department Dataset.
> case class Department(abbrevName: String, fullName: String)
// Initialize a Seq and convert to a Dataset.
> val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()
> depts.show()
|abbrevName| fullName|
+----------+----------------+
| CS|Computer Science|
| Math| Mathematics|
Join the students Dataset with the departments Dataset.
// Join two datasets with "joinWith".
> val joined = students.joinWith(depts, $"dept" === $"abbrevName")
// Note that the original objects are nested into tuples under the _1 and _2 columns.
> joined.show()
| _1| _2|
+---------------+--------------------+
|[Alice,Math,18]| [Math,Mathematics]|
| [Bob,CS,19]|[CS,Computer Scie...|
| [Carl,Math,21]| [Math,Mathematics]|
Select two columns from the joined Dataset.
// Use "map" to select from the joined Dataset. // Notice that the original Dataset types are preserved.
> joined.map(s => (s._1.name, s._2.fullName)).show()
| _1| _2|
+-----+----------------+
|Alice| Mathematics|
| Bob|Computer Science|
| Carl| Mathematics|
EXPLAIN
"Explain" prints the query's physical plan for debugging.
// Explain how the join is computed.
// Note that a BroadcastJoin is planned.
> joined.explain()
== Physical Plan ==
Project [struct(name#168163,dept#168164,age#168165) AS _1#168203,struct(abbrevName#168200,fullName#168201) AS _2#168204]
+- BroadcastHashJoin [dept#168164], [abbrevName#168200], BuildRight
:- ConvertToUnsafe
: +- !MapPartitions <function1>, class[value[0]: string], class[name[0]: string, dept[0]: string, age[0]: int], [name#168163,dept#168164,age#168165]
: +- ConvertToSafe
: +- Scan TextRelation[value#168157] InputPaths: /students.tsv
+- ConvertToUnsafe
+- LocalTableScan [abbrevName#168200,fullName#168201], [[0,1800000002,2000000010,5343,72657475706d6f43,65636e6569635320],[0,1800000004,200000000b,6874614d,74616d656874614d,736369]]
References
Spark API docs: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
Spark Programming Guide: http://spark.apache.org/docs/latest/sql-programming-guide.html
Introducing Spark Datasets: https://databricks.com/blog/2016/01/04/introducing-spark-datasets.html