Thursday, March 3, 2016

Spark 1.6 Datasets API: Example Usage


Spark 1.6 introduced a new Datasets API. It is an extension of Dataframes that supports functional processing on a collection of objects. Let's take a look at some examples of how to use them. First we'll read a JSON file and a text file into Datasets. We will apply functional transformations to parse the data. Then we will run relational queries against a Dataset.

Creating a Dataset from a JSON file

Suppose you have JSON formatted data which you would like to read into a Dataset. Here is an example JSON file:
Contents of "students.json" --
{"name":"Alice", "dept":"Math"}
{"name":"Bob", "dept":"CS"}
{"name":"Carl", "dept":"Math"}
To create a Dataset from this JSON file:
// Define the Student row type.
> case class Student(name: String, dept: String)
// Read JSON objects into a Dataset[Student].
> val studentsFromJSON ="students.json").as[Student]

Creating a Dataset from a Text file

Suppose instead you have data in a text file, in tab-separated (.tsv) format:

To create a Dataset from this text file:
// Read the lines of the file into a Dataset[String].
> val studentsFromText ="students.tsv").as[String]
(result) studentsFromText: org.apache.spark.sql.Dataset[String] = [value: string]

// We want a Dataset of type "Student".
case class Student(name: String, dept: String, age:Int)

// Functional programming to parse the lines into a Dataset[Student].

val students = studentsFromText.
  map(line => {
    val cols = line.split("\t") // parse each line
    Student(cols(0), cols(1), cols(2).toInt)
(result) students: org.apache.spark.sql.Dataset[Student] = [name: string, dept: string, age: int]

// Show the contents of the Dataset.
| name|dept|age|
|Alice|Math| 18|
|  Bob|  CS| 19|
| Carl|Math| 21|

Relational queries

Datasets support relational queries, with operations such as: select, filter, group by, count, avg, join.


Get the names of students in the Math department.
// Select two columns and filter on one column.
// Each argument of "select" must be a "TypedColumn".
>$"name".as[String], $"dept".as[String]).
    filter(_._2 == "Math").  // Filter on _2, the second selected column
(result) Array((Alice,Math), (Carl,Math))


Count the number of students in each department.
// Group by department and count each group.
> students.groupBy(_.dept).count().collect()
(result) Array((CS,1), (Math,2))


Average age in each department.
// Import the "avg" function.
> import org.apache.spark.sql.functions._

// Group and aggregate in each group.
> students.groupBy(_.dept).
(result) Array((CS,19.0), (Math,19.5))


Suppose we have a separate table with deparment information. We would like to join the department information into our student table.

First, create the department Dataset.
// The Department type.
> case class Department(abbrevName: String, fullName: String)

// Initialize a Seq and convert to a Dataset.
> val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()

// Show the contents of the Dataset.

|abbrevName|        fullName|
|        CS|Computer Science|
|      Math|     Mathematics|

Join the students Dataset with the departments Dataset.
// Join two datasets with "joinWith".
> val joined = students.joinWith(depts, $"dept" === $"abbrevName")

// Show the contents of the joined Dataset.
// Note that the original objects are nested into tuples under the _1 and _2 columns.

|             _1|                  _2|

|[Alice,Math,18]|  [Math,Mathematics]|
|    [Bob,CS,19]|[CS,Computer Scie...|
| [Carl,Math,21]|  [Math,Mathematics]|

Select two columns from the joined Dataset.
// Use "map" to select from the joined Dataset. 
// Notice that the original Dataset types are preserved.
> => (, s._2.fullName)).show()

|   _1|              _2|

|Alice|     Mathematics|
|  Bob|Computer Science|
| Carl|     Mathematics|


"Explain" prints the query's physical plan for debugging.

// Explain how the join is computed.
// Note that a BroadcastJoin is planned.
> joined.explain()

== Physical Plan ==
Project [struct(name#168163,dept#168164,age#168165) AS _1#168203,struct(abbrevName#168200,fullName#168201) AS _2#168204]
+- BroadcastHashJoin [dept#168164], [abbrevName#168200], BuildRight
   :- ConvertToUnsafe
   :  +- !MapPartitions <function1>, class[value[0]: string], class[name[0]: string, dept[0]: string, age[0]: int], [name#168163,dept#168164,age#168165]
   :     +- ConvertToSafe
   :        +- Scan TextRelation[value#168157] InputPaths: /students.tsv
   +- ConvertToUnsafe
      +- LocalTableScan [abbrevName#168200,fullName#168201], [[0,1800000002,2000000010,5343,72657475706d6f43,65636e6569635320],[0,1800000004,200000000b,6874614d,74616d656874614d,736369]]