Overview
Spark 1.6 introduced a new Datasets API. It is an extension of Dataframes that supports functional processing on a collection of objects. Let's take a look at some examples of how to use them. First we'll read a JSON file and a text file into Datasets. We will apply functional transformations to parse the data. Then we will run relational queries against a Dataset.
Creating a Dataset from a JSON file
Suppose you have JSON formatted data which you would like to read into a Dataset. Here is an example JSON file:
Contents of "students.json" --
{"name":"Alice", "dept":"Math"}
{"name":"Bob", "dept":"CS"}
{"name":"Carl", "dept":"Math"}
To create a Dataset from this JSON file:
// Define the Student row type.> case class Student(name: String, dept: String)
// Read JSON objects into a Dataset[Student].
> val studentsFromJSON = sqlContext.read.json("students.json").as[Student]
Creating a Dataset from a Text file
Suppose instead you have data in a text file, in tab-separated (.tsv) format:
Alice<tab>Math<tab>18
Bob<tab>CS<tab>19
Carl<tab>Math<tab>21
To create a Dataset from this text file:
// Read the lines of the file into a Dataset[String].
> val studentsFromText = sqlContext.read.text("students.tsv").as[String]
> val studentsFromText = sqlContext.read.text("students.tsv").as[String]
(result) studentsFromText: org.apache.spark.sql.Dataset[String] = [value: string]
// We want a Dataset of type "Student".
case class Student(name: String, dept: String, age:Int)
case class Student(name: String, dept: String, age:Int)
// Functional programming to parse the lines into a Dataset[Student].
val students = studentsFromText.
map(line => {
map(line => {
val cols = line.split("\t") // parse each line
Student(cols(0), cols(1), cols(2).toInt)
})
(result) students: org.apache.spark.sql.Dataset[Student] = [name: string, dept: string, age: int]
// Show the contents of the Dataset.
> students.show()
| name|dept|age|
+-----+----+---+
|Alice|Math| 18|
| Bob| CS| 19|
| Carl|Math| 21|
Relational queries
Datasets support relational queries, with operations such as: select, filter, group by, count, avg, join.
SELECT, FILTER
Get the names of students in the Math department.
// Select two columns and filter on one column.
// Each argument of "select" must be a "TypedColumn".
// Each argument of "select" must be a "TypedColumn".
> students.select($"name".as[String], $"dept".as[String]).
filter(_._2 == "Math"). // Filter on _2, the second selected column
collect()
(result) Array((Alice,Math), (Carl,Math))
GROUP BY, COUNT
Count the number of students in each department.
// Group by department and count each group.
> students.groupBy(_.dept).count().collect()
(result) Array((CS,1), (Math,2))
GROUP BY, AVG
Average age in each department.
// Import the "avg" function.
> import org.apache.spark.sql.functions._
// Group and aggregate in each group.
> students.groupBy(_.dept).
agg(avg($"age").as[Double]).
collect()
agg(avg($"age").as[Double]).
collect()
(result) Array((CS,19.0), (Math,19.5))
JOIN
Suppose we have a separate table with deparment information. We would like to join the department information into our student table.
First, create the department Dataset.
> case class Department(abbrevName: String, fullName: String)
// Initialize a Seq and convert to a Dataset.
> val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()
> depts.show()
|abbrevName| fullName|
+----------+----------------+
| CS|Computer Science|
| Math| Mathematics|
Join the students Dataset with the departments Dataset.
// Join two datasets with "joinWith".
> val joined = students.joinWith(depts, $"dept" === $"abbrevName")
// Note that the original objects are nested into tuples under the _1 and _2 columns.
> joined.show()
| _1| _2|
+---------------+--------------------+
|[Alice,Math,18]| [Math,Mathematics]|
| [Bob,CS,19]|[CS,Computer Scie...|
| [Carl,Math,21]| [Math,Mathematics]|
Select two columns from the joined Dataset.
// Use "map" to select from the joined Dataset. // Notice that the original Dataset types are preserved.
> joined.map(s => (s._1.name, s._2.fullName)).show()
| _1| _2|
+-----+----------------+
|Alice| Mathematics|
| Bob|Computer Science|
| Carl| Mathematics|
EXPLAIN
"Explain" prints the query's physical plan for debugging.
// Explain how the join is computed.
// Note that a BroadcastJoin is planned.
> joined.explain()
== Physical Plan ==
Project [struct(name#168163,dept#168164,age#168165) AS _1#168203,struct(abbrevName#168200,fullName#168201) AS _2#168204]
+- BroadcastHashJoin [dept#168164], [abbrevName#168200], BuildRight
:- ConvertToUnsafe
: +- !MapPartitions <function1>, class[value[0]: string], class[name[0]: string, dept[0]: string, age[0]: int], [name#168163,dept#168164,age#168165]
: +- ConvertToSafe
: +- Scan TextRelation[value#168157] InputPaths: /students.tsv
+- ConvertToUnsafe
+- LocalTableScan [abbrevName#168200,fullName#168201], [[0,1800000002,2000000010,5343,72657475706d6f43,65636e6569635320],[0,1800000004,200000000b,6874614d,74616d656874614d,736369]]
References
Spark API docs: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
Spark Programming Guide: http://spark.apache.org/docs/latest/sql-programming-guide.html
Introducing Spark Datasets: https://databricks.com/blog/2016/01/04/introducing-spark-datasets.html
I am well *underwhelmed* by the datasets api. A lot of manual casting. I prefer to work with sqlcontext.sql("throw any sql you want in here")
ReplyDeleteHa ha, that casting *is* pretty cumbersome. It seems the functional transformations, like map(), are very convenient (no casting necessary), while you almost want to switch back to DataFrame for the SQL methods, such as select().
DeleteHow do you cast a dataset for the purposes of renaming its columns; could you provide an example?
ReplyDeleteI guess you could do a "map()" on the DS.
Deletecase class T1(colA:Int, colB:Int)
val ds : Dataset[T1] = ...
Now, you want to rename its columns to "C" and "D":
case class T2(colC:Int, colD:Int)
val ds2 : Dataset[T2] = ds.map(e => T2(e.colA, e.colB))
Ok, so how to do a simple cross join??
ReplyDelete