Friday, July 29, 2016

Overview of Spark 2.0 Dataset / DataFrame API, Part 2

Introduction

In Part 1 of this series, we examined type-safe operations with Datasets. In Part 2, we will cover untyped operations with DataFrames. Being untyped, DataFrames are well-suited for data exploration, ad-hoc queries, and data munging.

DataFrame

DataFrames are still available in Spark 2.0, and remain mostly unchanged. The biggest change is that they have been merged with the new Dataset API. The DataFrame class no longer exists on its own; instead, it is defined as a specific type of Dataset: type DataFrame = Dataset[Row]. However, all of the functionality from 1.6 is still there.

Outline

  • Example Data
  • DataFrames: Untyped Language-Integrated SQL Queries
  • DataFrames: SQL Queries in SQL
  • DataFrames: Adding Columns, Data Munging

Example Data

We will continue with the example data from Part 1. We have defined a "Student" class as:

case class Student(name: String, dept: String, age: Long)

The example data has been read into a Dataset[Student]:
ds
ds: org.apache.spark.sql.Dataset[Student]
|age|dept| name|
+---+----+-----+ | 21|Math|Alice| | 23| CS| Bob| | 25|Math| Carl|

DataFrames: Untyped Language-Integrated SQL Queries

DataFrames supports language-integrated SQL queries, such as "select", "where", and "group by".

Convert to DataFrame

To convert a Dataset into a DataFrame:

val df = ds.toDF()
df: org.apache.spark.sql.DataFrame

Select, Where

To select columns and specify a "where" clause:

> val selected = df.select("name", "age")
                             .where($"age" === 21)
selected: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
| name|age| +-----+---+ |Alice| 21|

Count

To count the number of rows:

> df.count()
res1: Long = 3

GroupBy, Aggregate

To perform a "group by" and aggregate within each group, use "groupBy" and "agg". A number of aggregation functions, such as "avg", are available in the "functions" object. To group by a column and compute the average in each group:

import org.apache.spark.sql.functions._
> val avgAge2 = df.groupBy("dept")
                               .agg(avg($"age"))
|dept|avg(age)| +----+--------+ |Math| 23.0| | CS| 23.0|

Join

You can join two DataFrames with "join". To create a second DataFrame with department info:

> case class Department(deptName: String, building: Int)
> val depts = Seq(Department("Math", 125), Department("CS", 110)).toDF()
|deptName|building| +--------+--------+ | Math| 125| | CS| 110|

Then, to join the students DataFrame with the new department DataFrame:

> val joined2 = df.join(depts, df("dept") === depts("deptName"))
|age|dept| name|deptName|building| +---+----+-----+--------+--------+ | 21|Math|Alice| Math| 125| | 23| CS| Bob| CS| 110| | 25|Math| Carl| Math| 125|

Explain

To examine the query plan used to compute a DataFrame:

> joined2.explain()
== Physical Plan == *BroadcastHashJoin [dept#134], [deptName#384], Inner, BuildRight :- *Filter isnotnull(dept#134) : +- Scan ExistingRDD[age#133L,dept#134,name#135] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *Filter isnotnull(deptName#384) +- LocalTableScan [deptName#384, building#385]

DataFrames: SQL Queries in SQL

You can also query DataFrames with SQL. First create a temp view and then specify SQL queries against that view:

> df.createTempView("StudentTable")

> val sqlResults = spark.sql("SELECT name, dept FROM StudentTable"// "spark" is a SparkSession object
| name|dept| +-----+----+ |Alice|Math| | Bob| CS| | Carl|Math|

DataFrames: Adding Columns, Data Munging

DataFrames support creating new columns and data munging. To add a column, use "withColumn" to specify a new column name and an expression for column values. The Column class defines column operations, such as the minus operator shown below. The "functions" object also contains convenient functions for working with columns, such as math, string, and date / time functions.

In this example, the "lit" function, defined in "functions", returns a Column populated with a literal value. The Column class minus operator performs subtraction. The "$" method returns the Column associated with the given column name:

import org.apache.spark.sql.functions._
val withCol = df.withColumn("birthYear", lit(2016) - $"age")
|age|dept| name|birthYear| +---+----+-----+---------+ | 21|Math|Alice| 1995| | 23| CS| Bob| 1993| | 25|Math| Carl| 1991|

In the next example, the "round" function, defined in "functions", rounds values to the nearest tens digit:

val rounded = df.withColumn("roundedAge", round($"age", -1))
|age|dept| name|roundedAge| +---+----+-----+----------+ | 21|Math|Alice| 20| | 23| CS| Bob| 20| | 25|Math| Carl| 30|

Summary

In Spark 2.0, DataFrames have been merged into the DataSet API. DataFrame is a special type of Dataset that has untyped operations. DataFrames support convenient ways to query data, either through language-integrated queries or SQL. DataFrames are also useful in creating new columns and data munging.

1 comment: