Introduction
Spark DataFrames were introduced in early 2015, in Spark 1.3. Since then, a lot of new functionality has been added in Spark 1.4, 1.5, and 1.6. More than a year later, Spark's DataFrame API provides a rich set of operations for data munging, SQL queries, and analytics. This post will give an overview of all the major features of Spark's DataFrame API, focusing on the Scala API in 1.6.1.Outline
- Classes and Objects
- Reading Data
- Traditional Dataframe Operations
- Lazy Eval and collect()
- SQL (Relational) Operations
- Data Munging
- Analytics
Classes and Objects
Let us start by reviewing the major classes and objects in the DataFrame API. The main ones are SQLContext, DataFrame, Column, and functions.
SQLContext, DataFrame, and Column Classes
- SQLContext is the main entry point for creating DataFrames.
- DataFrame is the main class representing the DataFrame data and operations.
- The Column class represents an individual column of a DataFrame.
Functions Object
The functions object contains functions for aggregation, math, and date/time and string manipulation that can be applied on DataFrame columns.
Reading Data into a DataFrame
JSON, Parquet, JDBC, Hive, CSV
DataFrames can read from a large number of source data formats, such as JSON, Parquet, JDBC, and Hive. See the DataFrameReader class for some of the natively supported formats and Spark Packages for packages available for other formats, such as CSV and many others.
Reading JSON Example
Here is an example of reading JSON data into a DataFrame. The input file must contain one JSON object on each line:
> val df = sqlContext.read.json("/home/data.json")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
In the above example, sqlContext is of type SQLContext, its read() method returns a DataFrameReader, and the reader's json() method reads the specified data file.
Traditional Dataframe Operations
Spark DataFrames support traditional dataframe operations that you might expect from working with Pandas or R dataframes. You can select columns and rows, create new columns, and apply functions on column values.
Selecting Columns
To select one or more columns:
> df.select("col1")
|col1|
+----+
| 1|
| 2|Selecting Rows
To select rows based on a boolean filter:
> df.filter(df("col1") > 1)
|col1|col2|
+----+----+
| 2| 6|
In the above example, df("col1") is of type Column, and ">" is a method defined in the Column class. Alternatively, a column can be specified with the $"col1" syntax.
See also in class DataFrame: sample
Creating New Columns
To create a new column derived from existing ones, use the withColumn() method:
> df.withColumn("col3", df("col1") + df("col2"))
|col1|col2|col3|
+----+----+----+
| 1| 5| 6|
| 2| 6| 8|
Math Functions on Columns
A number of math functions, defined in the functions object, such as sqrt(), can be applied to column values:
> import org.apache.spark.sql.functions._
> df.select(df("col1"), sqrt(df("col1")))
|col1| SQRT(col1)|
+----+------------------+
| 1| 1.0|
| 2|1.4142135623730951|
You can also define your only functions on columns, via UDFs. See the "UDF" section below. Here are some more predefined math functions (not comprehensive) in functions:
cos, sin, tan, exp, log, pow, cbrt, hypot, toDegrees, toRadians, ceil, floor, round, rint, pmod, shiftLeft, shiftRight
Displaying Data and Schema
To display a DataFrame:
> df.show()
|col1|col2|
+----+----+
| 1| 5|
| 2| 6|
To display a DataFrame's column names and types:
> df.printSchema()
root
|-- col1: integer (nullable = false)
|-- col2: integer (nullable = false)
See also: head, take, count
Lazy Eval and collect()
DataFrames are evaluated lazily, which means that no computation takes place until you perform an action. Any non-action method will thus return immediately, in most cases. An action is any method that produces output that is not a DataFrame, such as displaying data on the console, converting data into Scala Arrays, or saving data into a file or database.
To convert a DataFrame into an array, use the collect() method:
> df.collect()
res0: Array[org.apache.spark.sql.Row] = Array([1,5], [2,6])
To convert only the first n rows, use head or take.
SQL (Relational) Operations
DataFrames also support SQL (relational) operations, such as SELECT, WHERE, GROUP BY, Aggregate, and JOIN. You can also define UDFs (user-defined functions).
SELECT, WHERE
To do a SELECT with a WHERE clause:
> df.select("col1", "col2")
.where($"col1" === 1)
|col1|col2|
+----+----+
| 1| 5|
Note that "===" is a Column method that tests for equality. More methods in class Column:
>, <, !==, isNaN, isNull, isin, like, startsWith, endsWith
>, <, !==, isNaN, isNull, isin, like, startsWith, endsWith
GROUP BY, Aggregate
To do a GROUP BY and aggregation:
> df1.show()
|col1|col2|
+----+----+
| 1| 5|
| 2| 6|
| 2| 7|
> import org.apache.spark.sql.functions._
> df1.groupBy("col1")
.agg(sum("col2").as("sum_col2"))
|col1|sum_col2|
+----+--------+
| 1| 5|
| 2| 13|
Note that the groupBy() method returns a GroupedData object, on which we call the agg() method to perform one or more aggregations. The sum() function is one of the aggregation functions defined in the functions object.
More aggregation functions in functions: approxCountDistinct, avg, corr, count, countDistinct, first, last, max, mean, min, skewness, stddev, sumDistinct, variance (and more)
JOIN
To do a JOIN between two DataFrames:
> people.show()
| id| name|
+---+-----+
| 1|Alice|
| 2| Bob|
> people.join(df, people("id") === df("col1"))
| id| name|col1|col2|
+---+-----+----+----+
| 1|Alice| 1| 5|
| 2| Bob| 2| 6|
More SQL operations
See also in class DataFrame:alias, as, cube, distinct, drop, dropDuplicates, intersect, limit, na, orderBy, repartition, rollup, selectExpr, sort, unionAll, withColumnRenamed
UDFs
To define a UDF, use the udf function:
> import org.apache.spark.sql.functions.udf
val myUdf = udf {(n: Int) => (n * 2) + 1}
You can then apply the UDF on one or more Columns:
> df.select(df("col1"), myUdf(df("col1")))
|col1|UDF(col1)|
+----+---------+
| 1| 3|
| 2| 5|
Functions for Data Munging
There are a variety of functions to simplify data munging on date, timestamp, string, and nested data in DataFrames. These functions are defined in the functions object.Dates and Timestamps
Here is an example of working with dates and timestamps. The date_add() function adds or subtracts days from a given date. The unix_timestamp() function returns a Unix timestamp corresponding to a timestamp string in a specified format:
> import org.apache.spark.sql.functions._
> df6.withColumn("day_before", date_add(df6("date"), -1))
.withColumn("unix_time", unix_timestamp(df6("date"), "yyyy-MM-dd"))
| date|day_before| unix_time|
+----------+----------+----------+
|2016-01-01|2015-12-31|1451606400|
|2016-09-05|2016-09-04|1473033600|
See also (not comprehensive) in functions: current_date, current_timestamp, date_sub, datediff, dayofmonth, dayofyear, from_unixtime, hour, last_day, minute, month, next_day, quarter, second, trunc, weekofyear, year
Strings
There are also a number of functions for working with strings. Here is one example, with the substring() function, which returns a substring given an input string, position, and length:
> df6.withColumn("month_day", substring(df6("date"), 6, 5))
| date|month_day|
+----------+---------+
|2016-01-01| 01-01|
|2016-09-05| 09-05|
See also (not comprehensive) in functions: ascii, concat, decode, encode, format_number, format_string, length, levenshtein, lower, lpad, ltrim, regexp_extract, regexp_replace, repeat, rtrim, split, translate, trim, upper
Nested Data Structures
With certain data formats, such as JSON, it is common to have nested arrays and structs in the schema. The functions object includes functions for working with nested columns. For example, if a column is of type Array, such as "col2" below, you can use the explode() function to flatten the data inside that column:
> df8
|col1| col2|
+----+--------+
| 1|[1a, 1b]|
| 2| [2a]|
> df8.select(df8("col1"), explode(df8("col2")).as("col2_flat"))
|col1|col2_flat|
+----+---------+
| 1| 1a|
| 1| 1b|
| 2| 2a|
The new flattened column, "col2_flat", can now be manipulated as an ordinary top-level column. For more about nested array data, please see my post on the topic.
See also:
Analytics
The DataFrame API includes functionality for analytics, namely, summary statistics, window functions, and pivot tables.
Summary Statistics
The describe() method computes summary statistics for numerical columns and is meant for exploratory data analysis:
> df.describe()
|summary| col1| col2|
+-------+------------------+------------------+
| count| 2| 2|
| mean| 1.5| 5.5|
| stddev|0.7071067811865476|0.7071067811865476|
| min| 1| 5|
| max| 2| 6|
The stat() method returns a DataFrameStatFunctions object, which provides additional statistics functions such as:
corr, cov, crosstab, freqItems, sampleBy
Window Functions
Window functions allow you to perform calculations over a moving window of rows, and are the basis for calculating a moving average or cumulative sum. You can apply window functions on DataFrames by defining a WindowSpec:
> import org.apache.spark.sql.expressions.Window
> val wSpec2 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
The above window spec for a moving average consists of three components: (1) partition by, (2) order by, and (3) a frame. To use this WindowSpec in a DataFrame, you would apply a window function or aggregation function, such as avg() over this WindowSpec:
> customers.withColumn("movingAvg", avg(customers("amountSpent")).over(wSpec2))
| name| date|amountSpent|movingAvg|
+-----+----------+-----------+---------+
|Alice|2016-05-01| 50.0| 47.5|
|Alice|2016-05-03| 45.0| 50.0|
|Alice|2016-05-04| 55.0| 50.0|
| Bob|2016-05-01| 25.0| 27.0|
| Bob|2016-05-04| 29.0| 27.0|
| Bob|2016-05-06| 27.0| 28.0|
For a list of all the possible window functions and aggregation functions, please see the functions object. For more examples of using window functions, please see my blog post on the topic.
For more window functions, see in functions: cume_dist, lag, lead, ntile, percent_rank, rank, row_number (and more)
Pivot Tables
You can create pivot tables with the pivot() method:
> df7.groupBy("col1").pivot("col2").avg("col3")
|col1| A| B|
+----+----+----+
| 1|10.0|21.0|
| 2|12.0|20.0|
In the above example, the DataFrame is grouped by col1, pivoted along col2, which contains the values "A" and "B", and computes the average of col3 in each group. The pivot() method is defined in the GroupedData class. For more information about pivoting, please see this Databricks article: Reshaping Data with Pivot in Apache Spark.
Summary
By now, you should have a good feel for what is possible with the Spark DataFrame Scala API. From data munging, to SQL, to analytics, this API provides a broad range of functionality for working with big data. For more information about DataFrames, see the Spark programming guide.