Thursday, March 3, 2016

Spark 1.6 Datasets API: Example Usage


Spark 1.6 introduced a new Datasets API. It is an extension of Dataframes that supports functional processing on a collection of objects. Let's take a look at some examples of how to use them. First we'll read a JSON file and a text file into Datasets. We will apply functional transformations to parse the data. Then we will run relational queries against a Dataset.

Creating a Dataset from a JSON file

Suppose you have JSON formatted data which you would like to read into a Dataset. Here is an example JSON file:
Contents of "students.json" --
{"name":"Alice", "dept":"Math"}
{"name":"Bob", "dept":"CS"}
{"name":"Carl", "dept":"Math"}
To create a Dataset from this JSON file:
// Define the Student row type.
> case class Student(name: String, dept: String)
// Read JSON objects into a Dataset[Student].
> val studentsFromJSON ="students.json").as[Student]

Creating a Dataset from a Text file

Suppose instead you have data in a text file, in tab-separated (.tsv) format:

To create a Dataset from this text file:
// Read the lines of the file into a Dataset[String].
> val studentsFromText ="students.tsv").as[String]
(result) studentsFromText: org.apache.spark.sql.Dataset[String] = [value: string]

// We want a Dataset of type "Student".
case class Student(name: String, dept: String, age:Int)

// Functional programming to parse the lines into a Dataset[Student].

val students = studentsFromText.
  map(line => {
    val cols = line.split("\t") // parse each line
    Student(cols(0), cols(1), cols(2).toInt)
(result) students: org.apache.spark.sql.Dataset[Student] = [name: string, dept: string, age: int]

// Show the contents of the Dataset.
| name|dept|age|
|Alice|Math| 18|
|  Bob|  CS| 19|
| Carl|Math| 21|

Relational queries

Datasets support relational queries, with operations such as: select, filter, group by, count, avg, join.


Get the names of students in the Math department.
// Select two columns and filter on one column.
// Each argument of "select" must be a "TypedColumn".
>$"name".as[String], $"dept".as[String]).
    filter(_._2 == "Math").  // Filter on _2, the second selected column
(result) Array((Alice,Math), (Carl,Math))


Count the number of students in each department.
// Group by department and count each group.
> students.groupBy(_.dept).count().collect()
(result) Array((CS,1), (Math,2))


Average age in each department.
// Import the "avg" function.
> import org.apache.spark.sql.functions._

// Group and aggregate in each group.
> students.groupBy(_.dept).
(result) Array((CS,19.0), (Math,19.5))


Suppose we have a separate table with deparment information. We would like to join the department information into our student table.

First, create the department Dataset.
// The Department type.
> case class Department(abbrevName: String, fullName: String)

// Initialize a Seq and convert to a Dataset.
> val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()

// Show the contents of the Dataset.

|abbrevName|        fullName|
|        CS|Computer Science|
|      Math|     Mathematics|

Join the students Dataset with the departments Dataset.
// Join two datasets with "joinWith".
> val joined = students.joinWith(depts, $"dept" === $"abbrevName")

// Show the contents of the joined Dataset.
// Note that the original objects are nested into tuples under the _1 and _2 columns.

|             _1|                  _2|

|[Alice,Math,18]|  [Math,Mathematics]|
|    [Bob,CS,19]|[CS,Computer Scie...|
| [Carl,Math,21]|  [Math,Mathematics]|

Select two columns from the joined Dataset.
// Use "map" to select from the joined Dataset. 
// Notice that the original Dataset types are preserved.
> => (, s._2.fullName)).show()

|   _1|              _2|

|Alice|     Mathematics|
|  Bob|Computer Science|
| Carl|     Mathematics|


"Explain" prints the query's physical plan for debugging.

// Explain how the join is computed.
// Note that a BroadcastJoin is planned.
> joined.explain()

== Physical Plan ==
Project [struct(name#168163,dept#168164,age#168165) AS _1#168203,struct(abbrevName#168200,fullName#168201) AS _2#168204]
+- BroadcastHashJoin [dept#168164], [abbrevName#168200], BuildRight
   :- ConvertToUnsafe
   :  +- !MapPartitions <function1>, class[value[0]: string], class[name[0]: string, dept[0]: string, age[0]: int], [name#168163,dept#168164,age#168165]
   :     +- ConvertToSafe
   :        +- Scan TextRelation[value#168157] InputPaths: /students.tsv
   +- ConvertToUnsafe
      +- LocalTableScan [abbrevName#168200,fullName#168201], [[0,1800000002,2000000010,5343,72657475706d6f43,65636e6569635320],[0,1800000004,200000000b,6874614d,74616d656874614d,736369]]


  1. I am well *underwhelmed* by the datasets api. A lot of manual casting. I prefer to work with sqlcontext.sql("throw any sql you want in here")

    1. Ha ha, that casting *is* pretty cumbersome. It seems the functional transformations, like map(), are very convenient (no casting necessary), while you almost want to switch back to DataFrame for the SQL methods, such as select().

  2. How do you cast a dataset for the purposes of renaming its columns; could you provide an example?

    1. I guess you could do a "map()" on the DS.
      case class T1(colA:Int, colB:Int)
      val ds : Dataset[T1] = ...

      Now, you want to rename its columns to "C" and "D":
      case class T2(colC:Int, colD:Int)
      val ds2 : Dataset[T2] = => T2(e.colA, e.colB))

  3. Ok, so how to do a simple cross join??