Introduction
In Part 1 of this series, we examined type-safe operations with Datasets. In Part 2, we will cover untyped operations with DataFrames. Being untyped, DataFrames are well-suited for data exploration, ad-hoc queries, and data munging.
DataFrame
DataFrames are still available in Spark 2.0, and remain mostly unchanged. The biggest change is that they have been merged with the new Dataset API. The DataFrame class no longer exists on its own; instead, it is defined as a specific type of Dataset: type DataFrame = Dataset[Row]. However, all of the functionality from 1.6 is still there.
Outline
- Example Data
- DataFrames: Untyped Language-Integrated SQL Queries
- DataFrames: SQL Queries in SQL
- DataFrames: Adding Columns, Data Munging
Example Data
We will continue with the example data from Part 1. We have defined a "Student" class as:
> case class Student(name: String, dept: String, age: Long)
The example data has been read into a Dataset[Student]:
> ds
ds: org.apache.spark.sql.Dataset[Student]
|age|dept| name|
|age|dept| name|
+---+----+-----+
| 21|Math|Alice|
| 23| CS| Bob|
| 25|Math| Carl|
DataFrames: Untyped Language-Integrated SQL Queries
DataFrames supports language-integrated SQL queries, such as "select", "where", and "group by".Convert to DataFrame
To convert a Dataset into a DataFrame:> val df = ds.toDF()
df: org.apache.spark.sql.DataFrame
Select, Where
To select columns and specify a "where" clause:
.where($"age" === 21)
selected: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
| name|age| +-----+---+ |Alice| 21|
Count
To count the number of rows:
res1: Long = 3
GroupBy, Aggregate
To perform a "group by" and aggregate within each group, use "groupBy" and "agg". A number of aggregation functions, such as "avg", are available in the "functions" object. To group by a column and compute the average in each group:
> val avgAge2 = df.groupBy("dept")
.agg(avg($"age"))
|dept|avg(age)| +----+--------+ |Math| 23.0| | CS| 23.0|
Join
You can join two DataFrames with "join". To create a second DataFrame with department info:
> val depts = Seq(Department("Math", 125), Department("CS", 110)).toDF()
|deptName|building| +--------+--------+ | Math| 125| | CS| 110|
Then, to join the students DataFrame with the new department DataFrame:
> val joined2 = df.join(depts, df("dept") === depts("deptName"))
|age|dept| name|deptName|building| +---+----+-----+--------+--------+ | 21|Math|Alice| Math| 125| | 23| CS| Bob| CS| 110| | 25|Math| Carl| Math| 125|
Explain
To examine the query plan used to compute a DataFrame:
> joined2.explain()
== Physical Plan ==
*BroadcastHashJoin [dept#134], [deptName#384], Inner, BuildRight
:- *Filter isnotnull(dept#134)
: +- Scan ExistingRDD[age#133L,dept#134,name#135]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *Filter isnotnull(deptName#384)
+- LocalTableScan [deptName#384, building#385]DataFrames: SQL Queries in SQL
You can also query DataFrames with SQL. First create a temp view and then specify SQL queries against that view:
> val sqlResults = spark.sql("SELECT name, dept FROM StudentTable") // "spark" is a SparkSession object
| name|dept| +-----+----+ |Alice|Math| | Bob| CS| | Carl|Math|
DataFrames: Adding Columns, Data Munging
DataFrames support creating new columns and data munging. To add a column, use "withColumn" to specify a new column name and an expression for column values. The Column class defines column operations, such as the minus operator shown below. The "functions" object also contains convenient functions for working with columns, such as math, string, and date / time functions.
In this example, the "lit" function, defined in "functions", returns a Column populated with a literal value. The Column class minus operator performs subtraction. The "$" method returns the Column associated with the given column name:
> import org.apache.spark.sql.functions._
> val withCol = df.withColumn("birthYear", lit(2016) - $"age")|age|dept| name|birthYear| +---+----+-----+---------+ | 21|Math|Alice| 1995| | 23| CS| Bob| 1993| | 25|Math| Carl| 1991|
In the next example, the "round" function, defined in "functions", rounds values to the nearest tens digit:
> val rounded = df.withColumn("roundedAge", round($"age", -1))
|age|dept| name|roundedAge| +---+----+-----+----------+ | 21|Math|Alice| 20| | 23| CS| Bob| 20| | 25|Math| Carl| 30|