Friday, July 29, 2016

Overview of Spark 2.0 Dataset / DataFrame API, Part 2


In Part 1 of this series, we examined type-safe operations with Datasets. In Part 2, we will cover untyped operations with DataFrames. Being untyped, DataFrames are well-suited for data exploration, ad-hoc queries, and data munging.


DataFrames are still available in Spark 2.0, and remain mostly unchanged. The biggest change is that they have been merged with the new Dataset API. The DataFrame class no longer exists on its own; instead, it is defined as a specific type of Dataset: type DataFrame = Dataset[Row]. However, all of the functionality from 1.6 is still there.


  • Example Data
  • DataFrames: Untyped Language-Integrated SQL Queries
  • DataFrames: SQL Queries in SQL
  • DataFrames: Adding Columns, Data Munging

Example Data

We will continue with the example data from Part 1. We have defined a "Student" class as:

case class Student(name: String, dept: String, age: Long)

The example data has been read into a Dataset[Student]:
ds: org.apache.spark.sql.Dataset[Student]
|age|dept| name|
+---+----+-----+ | 21|Math|Alice| | 23| CS| Bob| | 25|Math| Carl|

DataFrames: Untyped Language-Integrated SQL Queries

DataFrames supports language-integrated SQL queries, such as "select", "where", and "group by".

Convert to DataFrame

To convert a Dataset into a DataFrame:

val df = ds.toDF()
df: org.apache.spark.sql.DataFrame

Select, Where

To select columns and specify a "where" clause:

> val selected ="name", "age")
                             .where($"age" === 21)
selected: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
| name|age| +-----+---+ |Alice| 21|


To count the number of rows:

> df.count()
res1: Long = 3

GroupBy, Aggregate

To perform a "group by" and aggregate within each group, use "groupBy" and "agg". A number of aggregation functions, such as "avg", are available in the "functions" object. To group by a column and compute the average in each group:

import org.apache.spark.sql.functions._
> val avgAge2 = df.groupBy("dept")
|dept|avg(age)| +----+--------+ |Math| 23.0| | CS| 23.0|


You can join two DataFrames with "join". To create a second DataFrame with department info:

> case class Department(deptName: String, building: Int)
> val depts = Seq(Department("Math", 125), Department("CS", 110)).toDF()
|deptName|building| +--------+--------+ | Math| 125| | CS| 110|

Then, to join the students DataFrame with the new department DataFrame:

> val joined2 = df.join(depts, df("dept") === depts("deptName"))
|age|dept| name|deptName|building| +---+----+-----+--------+--------+ | 21|Math|Alice| Math| 125| | 23| CS| Bob| CS| 110| | 25|Math| Carl| Math| 125|


To examine the query plan used to compute a DataFrame:

> joined2.explain()
== Physical Plan == *BroadcastHashJoin [dept#134], [deptName#384], Inner, BuildRight :- *Filter isnotnull(dept#134) : +- Scan ExistingRDD[age#133L,dept#134,name#135] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *Filter isnotnull(deptName#384) +- LocalTableScan [deptName#384, building#385]

DataFrames: SQL Queries in SQL

You can also query DataFrames with SQL. First create a temp view and then specify SQL queries against that view:

> df.createTempView("StudentTable")

> val sqlResults = spark.sql("SELECT name, dept FROM StudentTable"// "spark" is a SparkSession object
| name|dept| +-----+----+ |Alice|Math| | Bob| CS| | Carl|Math|

DataFrames: Adding Columns, Data Munging

DataFrames support creating new columns and data munging. To add a column, use "withColumn" to specify a new column name and an expression for column values. The Column class defines column operations, such as the minus operator shown below. The "functions" object also contains convenient functions for working with columns, such as math, string, and date / time functions.

In this example, the "lit" function, defined in "functions", returns a Column populated with a literal value. The Column class minus operator performs subtraction. The "$" method returns the Column associated with the given column name:

import org.apache.spark.sql.functions._
val withCol = df.withColumn("birthYear", lit(2016) - $"age")
|age|dept| name|birthYear| +---+----+-----+---------+ | 21|Math|Alice| 1995| | 23| CS| Bob| 1993| | 25|Math| Carl| 1991|

In the next example, the "round" function, defined in "functions", rounds values to the nearest tens digit:

val rounded = df.withColumn("roundedAge", round($"age", -1))
|age|dept| name|roundedAge| +---+----+-----+----------+ | 21|Math|Alice| 20| | 23| CS| Bob| 20| | 25|Math| Carl| 30|


In Spark 2.0, DataFrames have been merged into the DataSet API. DataFrame is a special type of Dataset that has untyped operations. DataFrames support convenient ways to query data, either through language-integrated queries or SQL. DataFrames are also useful in creating new columns and data munging.

Overview of Spark 2.0 Dataset / DataFrame API, Part 1


Spark 2.0 features a new Dataset API. Now that Datasets support a full range of operations, you can avoid working with low-level RDDs in most cases. In 2.0, DataFrames no longer exist as a separate class; instead, DataFrame is defined as a special case of Dataset. Here is some example code to get you started with Spark 2.0 Datasets / DataFrames. Part 1 focuses on type-safe operations with Datasets, which provide compile time type safety. Part 2 focuses on DataFrames, which have untyped operations.

Part 1: Datasets: Type-safe operations. (This blog post)
Part 2: DataFrame: Untyped operations. (Next blog post)

Dataset vs. DataFrame

A Dataset[T] is a parameterized type, where the type T is specified by the user and is associated with each element of the Dataset. A DataFrame, on the other hand, has no explicit type associated with it at compile time, from the user's point of view. Internally, a DataFrame is defined as a Dataset[Row], where Row is a generic row type defined by Spark SQL.


This blog post refers to the Scala API.


  • Reading Data In
  • Data Exploration
  • Statistics
  • Functional Transformations
  • Caching
  • Getting Data Out

Reading Data In

Spark supports a number of input formats, including Hive, JDBC, Parquet, CSV, and JSON. Below is an example of reading JSON data into a Dataset.

JSON example

Suppose you have this example JSON data, with one object per line:
{"name":"Alice", "dept":"Math", "age":21}
{"name":"Bob", "dept":"CS", "age":23}
{"name":"Carl", "dept":"Math", "age":25}

To read a JSON data file, first use the SparkSession object as an entry point, and access its DataFrameReader to read data into a DataFrame:

> val df ="/path/to/file.json") // "spark" is a SparkSession object
df1: org.apache.spark.sql.DataFrame

Then convert the DataFrame into Dataset[Student]:

case class Student(name: String, dept: String, age: Long)
val ds =[Student]
ds: org.apache.spark.sql.Dataset[Student]

Data Exploration

When you first look into a new data set, you can explore its contents by printing out the schema, counting the number of rows, and displaying some of those rows.

Print Schema

To explore what is in this Dataset, you can print out the schema:
> ds.printSchema()
root |-- age: long (nullable = true) |-- dept: string (nullable = true) |-- name: string (nullable = true)

Count Rows

To count the number of rows:
> ds.count()
res2: Long = 3

Display Rows

To display the first few rows in tabular format:
|age|dept| name| +---+----+-----+ | 21|Math|Alice| | 23| CS| Bob| | 25|Math| Carl|

Sample Rows

To get a sample of the data:

val sample = ds.sample(withReplacement=false, fraction=0.3)
sample: org.apache.spark.sql.Dataset[Student]
|age|dept|name| +---+----+----+ | 25|Math|Carl|


A number of statistics functions are available for Datasets.

Summary Statistics

To get summary statistics on numerical fields, call "describe":

val summary = ds.describe()
summary: org.apache.spark.sql.DataFrame
|summary| age|
+-------+----+ | count| 3| | mean|23.0| | stddev| 2.0| | min| 21| | max| 25|

Additional Statistical Functions, Approximate Frequent Items

The "stat" method returns a DataFrameStatFunctions object for statistical functions:

> ds.stat
res11: org.apache.spark.sql.DataFrameStatFunctions

For example, "stat.freqItems" returns approximate frequent items for the given columns:

val approxFreqItems = ds.stat.freqItems(Seq("dept"))
approxFreqItems: org.apache.spark.sql.DataFrame
+--------------+ | [CS, Math]|

Functional Transformations

The Dataset API supports functional transformations, such as "filter" and "map", much like the RDD API. These operators transform one Dataset[T] into another Dataset[U], where T and U are user-specified types. These operations have compile-time type safety, in the sense that each row is associated with a Scala object of a fixed type T (or U). This is in contrast to DataFrames, which are untyped. "Reduce" is an action that reduces the elements of a Dataset into a scalar value.


To filter for rows that satisfy a given predicate:

> val youngStudents = ds.filter($"age" < 22)
youngStudents: org.apache.spark.sql.Dataset[Student]
|age|dept| name|
+---+----+-----+ | 21|Math|Alice|


To map over rows with a given lambda function: 

val names ={}
names: org.apache.spark.sql.Dataset[String]
+-----+ |Alice| | Bob| | Carl|


To reduce the elements of a Dataset with a given reducer function:

val totalAge = + _)
totalAge: Long = 69


You can join two Datasets. Suppose you want to join the "Students" Dataset with a new "Department" Dataset:

case class Department(name: String, building: Int)
val depts = Seq(Department("Math", 125), Department("CS", 110)).toDS()
+----+--------+ |Math| 125| | CS| 110|

To join the Students" Dataset with the new "Department" Dataset:

val joined = ds.joinWith(depts, ds("dept") === depts("name"))
joined: org.apache.spark.sql.Dataset[(Student, Department)]
| _1| _2|
+---------------+----------+ |[21,Math,Alice]|[Math,125]| | [23,CS,Bob]| [CS,110]| | [25,Math,Carl]|[Math,125]|

GroupByKey, Aggregation

To group elements of a Dataset and aggregate within each group:

val deptSizes = ds.groupByKey(_.dept).count()
deptSizes: org.apache.spark.sql.Dataset[(String, Long)]
+-----+--------+ | Math| 2| | CS| 1|

Additional aggregation functions are available in the "functions" object. The "avg" function calculates an average for each group:

import org.apache.spark.sql.functions._
val avgAge = ds.groupByKey(_.dept)
avgAge: org.apache.spark.sql.Dataset[(String, Double)]
+-----+--------+ | Math| 23.0| | CS| 23.0|


To order by a given set of fields:

val ordered = ds.orderBy("dept", "name")
ordered: org.apache.spark.sql.Dataset[Student]
|age|dept| name|
+---+----+-----+ | 23| CS| Bob| | 21|Math|Alice| | 25|Math| Carl|


To persist a Dataset at the default storage level (memory and disk):
> ds.cache()

Getting Data Out

Into an Array

To collect data into a Scala Array, use "collect". Note that this will collect all rows into the Driver node, and thus could potentially be a memory- and IO- intensive operation.

> val studentArr = ds.collect()
studentArr: Array[Student] = Array(Student(Alice,Math,21), Student(Bob,CS,23), Student(Carl,Math,25))

To collect only the first few rows into a Scala Array:
val firstTwo = ds.head(2)
firstTwo: Array[Student] = Array(Student(Alice,Math,21), Student(Bob,CS,23))

Into an RDD

To convert into an RDD:

val studentRdd = ds.rdd
studentRdd: org.apache.spark.rdd.RDD[Student]

Into a File

To write a Dataset into a file, use "write". A number of output formats are supported. Here is an example of writing in JSON format:

> ds.write.json("/path/to/file.json")


To read about untyped operations with DataFrames, continue onto part 2.