Friday, July 29, 2016

Overview of Spark 2.0 Dataset / DataFrame API, Part 2


In Part 1 of this series, we examined type-safe operations with Datasets. In Part 2, we will cover untyped operations with DataFrames. Being untyped, DataFrames are well-suited for data exploration, ad-hoc queries, and data munging.


DataFrames are still available in Spark 2.0, and remain mostly unchanged. The biggest change is that they have been merged with the new Dataset API. The DataFrame class no longer exists on its own; instead, it is defined as a specific type of Dataset: type DataFrame = Dataset[Row]. However, all of the functionality from 1.6 is still there.


  • Example Data
  • DataFrames: Untyped Language-Integrated SQL Queries
  • DataFrames: SQL Queries in SQL
  • DataFrames: Adding Columns, Data Munging

Example Data

We will continue with the example data from Part 1. We have defined a "Student" class as:

case class Student(name: String, dept: String, age: Long)

The example data has been read into a Dataset[Student]:
ds: org.apache.spark.sql.Dataset[Student]
|age|dept| name|
+---+----+-----+ | 21|Math|Alice| | 23| CS| Bob| | 25|Math| Carl|

DataFrames: Untyped Language-Integrated SQL Queries

DataFrames supports language-integrated SQL queries, such as "select", "where", and "group by".

Convert to DataFrame

To convert a Dataset into a DataFrame:

val df = ds.toDF()
df: org.apache.spark.sql.DataFrame

Select, Where

To select columns and specify a "where" clause:

> val selected ="name", "age")
                             .where($"age" === 21)
selected: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
| name|age| +-----+---+ |Alice| 21|


To count the number of rows:

> df.count()
res1: Long = 3

GroupBy, Aggregate

To perform a "group by" and aggregate within each group, use "groupBy" and "agg". A number of aggregation functions, such as "avg", are available in the "functions" object. To group by a column and compute the average in each group:

import org.apache.spark.sql.functions._
> val avgAge2 = df.groupBy("dept")
|dept|avg(age)| +----+--------+ |Math| 23.0| | CS| 23.0|


You can join two DataFrames with "join". To create a second DataFrame with department info:

> case class Department(deptName: String, building: Int)
> val depts = Seq(Department("Math", 125), Department("CS", 110)).toDF()
|deptName|building| +--------+--------+ | Math| 125| | CS| 110|

Then, to join the students DataFrame with the new department DataFrame:

> val joined2 = df.join(depts, df("dept") === depts("deptName"))
|age|dept| name|deptName|building| +---+----+-----+--------+--------+ | 21|Math|Alice| Math| 125| | 23| CS| Bob| CS| 110| | 25|Math| Carl| Math| 125|


To examine the query plan used to compute a DataFrame:

> joined2.explain()
== Physical Plan == *BroadcastHashJoin [dept#134], [deptName#384], Inner, BuildRight :- *Filter isnotnull(dept#134) : +- Scan ExistingRDD[age#133L,dept#134,name#135] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *Filter isnotnull(deptName#384) +- LocalTableScan [deptName#384, building#385]

DataFrames: SQL Queries in SQL

You can also query DataFrames with SQL. First create a temp view and then specify SQL queries against that view:

> df.createTempView("StudentTable")

> val sqlResults = spark.sql("SELECT name, dept FROM StudentTable"// "spark" is a SparkSession object
| name|dept| +-----+----+ |Alice|Math| | Bob| CS| | Carl|Math|

DataFrames: Adding Columns, Data Munging

DataFrames support creating new columns and data munging. To add a column, use "withColumn" to specify a new column name and an expression for column values. The Column class defines column operations, such as the minus operator shown below. The "functions" object also contains convenient functions for working with columns, such as math, string, and date / time functions.

In this example, the "lit" function, defined in "functions", returns a Column populated with a literal value. The Column class minus operator performs subtraction. The "$" method returns the Column associated with the given column name:

import org.apache.spark.sql.functions._
val withCol = df.withColumn("birthYear", lit(2016) - $"age")
|age|dept| name|birthYear| +---+----+-----+---------+ | 21|Math|Alice| 1995| | 23| CS| Bob| 1993| | 25|Math| Carl| 1991|

In the next example, the "round" function, defined in "functions", rounds values to the nearest tens digit:

val rounded = df.withColumn("roundedAge", round($"age", -1))
|age|dept| name|roundedAge| +---+----+-----+----------+ | 21|Math|Alice| 20| | 23| CS| Bob| 20| | 25|Math| Carl| 30|


In Spark 2.0, DataFrames have been merged into the DataSet API. DataFrame is a special type of Dataset that has untyped operations. DataFrames support convenient ways to query data, either through language-integrated queries or SQL. DataFrames are also useful in creating new columns and data munging.

Overview of Spark 2.0 Dataset / DataFrame API, Part 1


Spark 2.0 features a new Dataset API. Now that Datasets support a full range of operations, you can avoid working with low-level RDDs in most cases. In 2.0, DataFrames no longer exist as a separate class; instead, DataFrame is defined as a special case of Dataset. Here is some example code to get you started with Spark 2.0 Datasets / DataFrames. Part 1 focuses on type-safe operations with Datasets, which provide compile time type safety. Part 2 focuses on DataFrames, which have untyped operations.

Part 1: Datasets: Type-safe operations. (This blog post)
Part 2: DataFrame: Untyped operations. (Next blog post)

Dataset vs. DataFrame

A Dataset[T] is a parameterized type, where the type T is specified by the user and is associated with each element of the Dataset. A DataFrame, on the other hand, has no explicit type associated with it at compile time, from the user's point of view. Internally, a DataFrame is defined as a Dataset[Row], where Row is a generic row type defined by Spark SQL.


This blog post refers to the Scala API.


  • Reading Data In
  • Data Exploration
  • Statistics
  • Functional Transformations
  • Caching
  • Getting Data Out

Reading Data In

Spark supports a number of input formats, including Hive, JDBC, Parquet, CSV, and JSON. Below is an example of reading JSON data into a Dataset.

JSON example

Suppose you have this example JSON data, with one object per line:
{"name":"Alice", "dept":"Math", "age":21}
{"name":"Bob", "dept":"CS", "age":23}
{"name":"Carl", "dept":"Math", "age":25}

To read a JSON data file, first use the SparkSession object as an entry point, and access its DataFrameReader to read data into a DataFrame:

> val df ="/path/to/file.json") // "spark" is a SparkSession object
df1: org.apache.spark.sql.DataFrame

Then convert the DataFrame into Dataset[Student]:

case class Student(name: String, dept: String, age: Long)
val ds =[Student]
ds: org.apache.spark.sql.Dataset[Student]

Data Exploration

When you first look into a new data set, you can explore its contents by printing out the schema, counting the number of rows, and displaying some of those rows.

Print Schema

To explore what is in this Dataset, you can print out the schema:
> ds.printSchema()
root |-- age: long (nullable = true) |-- dept: string (nullable = true) |-- name: string (nullable = true)

Count Rows

To count the number of rows:
> ds.count()
res2: Long = 3

Display Rows

To display the first few rows in tabular format:
|age|dept| name| +---+----+-----+ | 21|Math|Alice| | 23| CS| Bob| | 25|Math| Carl|

Sample Rows

To get a sample of the data:

val sample = ds.sample(withReplacement=false, fraction=0.3)
sample: org.apache.spark.sql.Dataset[Student]
|age|dept|name| +---+----+----+ | 25|Math|Carl|


A number of statistics functions are available for Datasets.

Summary Statistics

To get summary statistics on numerical fields, call "describe":

val summary = ds.describe()
summary: org.apache.spark.sql.DataFrame
|summary| age|
+-------+----+ | count| 3| | mean|23.0| | stddev| 2.0| | min| 21| | max| 25|

Additional Statistical Functions, Approximate Frequent Items

The "stat" method returns a DataFrameStatFunctions object for statistical functions:

> ds.stat
res11: org.apache.spark.sql.DataFrameStatFunctions

For example, "stat.freqItems" returns approximate frequent items for the given columns:

val approxFreqItems = ds.stat.freqItems(Seq("dept"))
approxFreqItems: org.apache.spark.sql.DataFrame
+--------------+ | [CS, Math]|

Functional Transformations

The Dataset API supports functional transformations, such as "filter" and "map", much like the RDD API. These operators transform one Dataset[T] into another Dataset[U], where T and U are user-specified types. These operations have compile-time type safety, in the sense that each row is associated with a Scala object of a fixed type T (or U). This is in contrast to DataFrames, which are untyped. "Reduce" is an action that reduces the elements of a Dataset into a scalar value.


To filter for rows that satisfy a given predicate:

> val youngStudents = ds.filter($"age" < 22)
youngStudents: org.apache.spark.sql.Dataset[Student]
|age|dept| name|
+---+----+-----+ | 21|Math|Alice|


To map over rows with a given lambda function: 

val names ={}
names: org.apache.spark.sql.Dataset[String]
+-----+ |Alice| | Bob| | Carl|


To reduce the elements of a Dataset with a given reducer function:

val totalAge = + _)
totalAge: Long = 69


You can join two Datasets. Suppose you want to join the "Students" Dataset with a new "Department" Dataset:

case class Department(name: String, building: Int)
val depts = Seq(Department("Math", 125), Department("CS", 110)).toDS()
+----+--------+ |Math| 125| | CS| 110|

To join the Students" Dataset with the new "Department" Dataset:

val joined = ds.joinWith(depts, ds("dept") === depts("name"))
joined: org.apache.spark.sql.Dataset[(Student, Department)]
| _1| _2|
+---------------+----------+ |[21,Math,Alice]|[Math,125]| | [23,CS,Bob]| [CS,110]| | [25,Math,Carl]|[Math,125]|

GroupByKey, Aggregation

To group elements of a Dataset and aggregate within each group:

val deptSizes = ds.groupByKey(_.dept).count()
deptSizes: org.apache.spark.sql.Dataset[(String, Long)]
+-----+--------+ | Math| 2| | CS| 1|

Additional aggregation functions are available in the "functions" object. The "avg" function calculates an average for each group:

import org.apache.spark.sql.functions._
val avgAge = ds.groupByKey(_.dept)
avgAge: org.apache.spark.sql.Dataset[(String, Double)]
+-----+--------+ | Math| 23.0| | CS| 23.0|


To order by a given set of fields:

val ordered = ds.orderBy("dept", "name")
ordered: org.apache.spark.sql.Dataset[Student]
|age|dept| name|
+---+----+-----+ | 23| CS| Bob| | 21|Math|Alice| | 25|Math| Carl|


To persist a Dataset at the default storage level (memory and disk):
> ds.cache()

Getting Data Out

Into an Array

To collect data into a Scala Array, use "collect". Note that this will collect all rows into the Driver node, and thus could potentially be a memory- and IO- intensive operation.

> val studentArr = ds.collect()
studentArr: Array[Student] = Array(Student(Alice,Math,21), Student(Bob,CS,23), Student(Carl,Math,25))

To collect only the first few rows into a Scala Array:
val firstTwo = ds.head(2)
firstTwo: Array[Student] = Array(Student(Alice,Math,21), Student(Bob,CS,23))

Into an RDD

To convert into an RDD:

val studentRdd = ds.rdd
studentRdd: org.apache.spark.rdd.RDD[Student]

Into a File

To write a Dataset into a file, use "write". A number of output formats are supported. Here is an example of writing in JSON format:

> ds.write.json("/path/to/file.json")


To read about untyped operations with DataFrames, continue onto part 2.

Friday, May 20, 2016

Overview of Spark DataFrame API


Spark DataFrames were introduced in early 2015, in Spark 1.3. Since then, a lot of new functionality has been added in Spark 1.4, 1.5, and 1.6. More than a year later, Spark's DataFrame API provides a rich set of operations for data munging, SQL queries, and analytics. This post will give an overview of all the major features of Spark's DataFrame API, focusing on the Scala API in 1.6.1.


  • Classes and Objects
  • Reading Data
  • Traditional Dataframe Operations
  • Lazy Eval and collect()
  • SQL (Relational) Operations
  • Data Munging
  • Analytics

Classes and Objects

Let us start by reviewing the major classes and objects in the DataFrame API. The main ones are SQLContext, DataFrame, Column, and functions.

SQLContext, DataFrame, and Column Classes

  • SQLContext is the main entry point for creating DataFrames.
  • DataFrame is the main class representing the DataFrame data and operations.
  • The Column class represents an individual column of a DataFrame.

Functions Object

The functions object contains functions for aggregation, math, and date/time and string manipulation that can be applied on DataFrame columns.

Reading Data into a DataFrame

JSON, Parquet, JDBC, Hive, CSV

DataFrames can read from a large number of source data formats, such as JSON, Parquet, JDBC, and Hive. See the DataFrameReader class for some of the natively supported formats and Spark Packages for packages available for other formats, such as CSV and many others.

Reading JSON Example 

Here is an example of reading JSON data into a DataFrame. The input file must contain one JSON object on each line:

val df ="/home/data.json")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

In the above example, sqlContext is of type SQLContext, its read() method returns a DataFrameReader, and the reader's json() method reads the specified data file.

Traditional Dataframe Operations

Spark DataFrames support traditional dataframe operations that you might expect from working with Pandas or R dataframes. You can select columns and rows, create new columns, and apply functions on column values.

Selecting Columns

To select one or more columns:
|col1| +----+ | 1| | 2|

Selecting Rows

To select rows based on a boolean filter:
> df.filter(df("col1") > 1)  
|col1|col2| +----+----+ | 2| 6|
In the above example, df("col1") is of type Column, and ">" is a method defined in the Column class. Alternatively, a column can be specified with the $"col1" syntax.

More methods in class Column: ===, !==, isNaN, isNull, isin, like, startsWith, endsWith
See also in class DataFrame: sample

Creating New Columns

To create a new column derived from existing ones, use the withColumn() method:
> df.withColumn("col3", df("col1") + df("col2")) 
|col1|col2|col3| +----+----+----+ | 1| 5| 6| | 2| 6| 8|

More methods in class Column%, *, -, /, bitwiseAND, bitwiseOR, cast, &&, ||

Math Functions on Columns

A number of math functions, defined in the functions object, such as sqrt(), can be applied to column values:
import org.apache.spark.sql.functions._
>"col1"), sqrt(df("col1")))
|col1| SQRT(col1)| +----+------------------+ | 1| 1.0| | 2|1.4142135623730951|

You can also define your only functions on columns, via UDFs. See the "UDF" section below. Here are some more predefined math functions (not comprehensive) in functions
cos, sin, tan, exp, log, pow, cbrt, hypot, toDegrees, toRadians, ceil, floor, round, rint, pmod, shiftLeft, shiftRight

Displaying Data and Schema

To display a DataFrame:
|col1|col2| +----+----+ | 1| 5| | 2| 6|

To display a DataFrame's column names and types:
> df.printSchema()
root |-- col1: integer (nullable = false) |-- col2: integer (nullable = false)

See also: headtake, count

Lazy Eval and collect()

DataFrames are evaluated lazily, which means that no computation takes place until you perform an action. Any non-action method will thus return immediately, in most cases. An action is any method that produces output that is not a DataFrame, such as displaying data on the console, converting data into Scala Arrays, or saving data into a file or database. 

To convert a DataFrame into an array, use the collect() method:
> df.collect()
res0: Array[org.apache.spark.sql.Row] = Array([1,5], [2,6])

To convert only the first n rows, use head or take.

SQL (Relational) Operations

DataFrames also support SQL (relational) operations, such as SELECT, WHERE, GROUP BY, Aggregate, and JOIN. You can also define UDFs (user-defined functions).


To do a SELECT with a WHERE clause:
>"col1", "col2")
    .where($"col1" === 1)
|col1|col2| +----+----+ | 1| 5|

Note that "===" is a Column method that tests for equality. More methods in class Column
>, <!==isNaNisNullisinlikestartsWithendsWith

GROUP BY, Aggregate

To do a GROUP BY and aggregation:
|col1|col2| +----+----+ | 1| 5| | 2| 6| | 2| 7|
import org.apache.spark.sql.functions._
> df1.groupBy("col1")
|col1|sum_col2| +----+--------+ | 1| 5| | 2| 13|

Note that the groupBy() method returns a GroupedData object, on which we call the agg() method to perform one or more aggregations. The sum() function is one of the aggregation functions defined in the functions object.

More aggregation functions in functions: approxCountDistinct, avg, corr, count, countDistinct, first, last, max, mean, min, skewness, stddev, sumDistinct, variance (and more)


To do a JOIN between two DataFrames:
| id| name| +---+-----+ | 1|Alice| | 2| Bob|
> people.join(df, people("id") === df("col1"))
| id| name|col1|col2| +---+-----+----+----+ | 1|Alice| 1| 5| | 2| Bob| 2| 6|

The above example shows an inner join; other join types, such as outer join, are also supported.

More SQL operations

See also in class DataFrame:
alias, as, cube, distinct, drop, dropDuplicates, intersect, limit, na, orderBy, repartition, rollup, selectExpr, sort, unionAll, withColumnRenamed


To define a UDF, use the udf function:
import org.apache.spark.sql.functions.udf
val myUdf = udf {(n: Int) => (n * 2) + 1}

You can then apply the UDF on one or more Columns:
>"col1"), myUdf(df("col1")))
|col1|UDF(col1)| +----+---------+ | 1| 3| | 2| 5|

Functions for Data Munging

There are a variety of functions to simplify data munging on date, timestamp, string, and nested data in DataFrames. These functions are defined in the functions object.

Dates and Timestamps

Here is an example of working with dates and timestamps. The date_add() function adds or subtracts days from a given date. The unix_timestamp() function returns a Unix timestamp corresponding to a timestamp string in a specified format:

import org.apache.spark.sql.functions._
> df6.withColumn("day_before", date_add(df6("date"), -1))
     .withColumn("unix_time", unix_timestamp(df6("date"), "yyyy-MM-dd"))

| date|day_before| unix_time| +----------+----------+----------+ |2016-01-01|2015-12-31|1451606400| |2016-09-05|2016-09-04|1473033600|

See also (not comprehensive) in functions: current_date, current_timestamp, date_sub, datediff, dayofmonth, dayofyear, from_unixtime, hour, last_day, minute, month, next_day, quarter, second, trunc, weekofyear, year


There are also a number of functions for working with strings. Here is one example, with the substring() function, which returns a substring given an input string, position, and length:

> df6.withColumn("month_day", substring(df6("date"), 6, 5))
| date|month_day|
+----------+---------+ |2016-01-01| 01-01| |2016-09-05| 09-05|

See also (not comprehensive) in functions: ascii, concat, decode, encode, format_number, format_string, length, levenshtein, lower, lpad, ltrim, regexp_extract, regexp_replace, repeat, rtrim, split, translate, trim, upper

Nested Data Structures

With certain data formats, such as JSON, it is common to have nested arrays and structs in the schema. The functions object includes functions for working with nested columns. For example, if a column is of type Array, such as "col2" below, you can use the explode() function to flatten the data inside that column:

> df8
|col1| col2| +----+--------+ | 1|[1a, 1b]| | 2| [2a]|

>"col1"), explode(df8("col2")).as("col2_flat"))
|col1|col2_flat| +----+---------+ | 1| 1a| | 1| 1b| | 2| 2a|

The new flattened column, "col2_flat", can now be manipulated as an ordinary top-level column. For more about nested array data, please see my post on the topic.

See also: 
array_contains, size, sort_array, struct, array in functions.


The DataFrame API includes functionality for analytics, namely, summary statistics, window functions, and pivot tables.

Summary Statistics

The describe() method computes summary statistics for numerical columns and is meant for exploratory data analysis:

> df.describe()
|summary| col1| col2| +-------+------------------+------------------+ | count| 2| 2| | mean| 1.5| 5.5| | stddev|0.7071067811865476|0.7071067811865476| | min| 1| 5| | max| 2| 6|

The stat() method returns a DataFrameStatFunctions object, which provides additional statistics functions such as:
corr, cov, crosstab, freqItems, sampleBy

Window Functions

Window functions allow you to perform calculations over a moving window of rows, and are the basis for calculating a moving average or cumulative sum. You can apply window functions on DataFrames by defining a WindowSpec:

import org.apache.spark.sql.expressions.Window
val wSpec2 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

The above window spec for a moving average consists of three components: (1) partition by, (2) order by, and (3) a frame. To use this WindowSpec in a DataFrame, you would apply a window function or aggregation function, such as avg() over this WindowSpec:

> customers.withColumn("movingAvg", avg(customers("amountSpent")).over(wSpec2))
| name| date|amountSpent|movingAvg| +-----+----------+-----------+---------+ |Alice|2016-05-01| 50.0| 47.5| |Alice|2016-05-03| 45.0| 50.0| |Alice|2016-05-04| 55.0| 50.0| | Bob|2016-05-01| 25.0| 27.0| | Bob|2016-05-04| 29.0| 27.0| | Bob|2016-05-06| 27.0| 28.0|

For a list of all the possible window functions and aggregation functions, please see the functions object. For more examples of using window functions, please see my blog post on the topic.

For more window functions, see in functions: cume_dist, lag, lead, ntile, percent_rank, rank, row_number (and more)

Pivot Tables

You can create pivot tables with the pivot() method:
> df7.groupBy("col1").pivot("col2").avg("col3")
|col1| A| B| +----+----+----+ | 1|10.0|21.0| | 2|12.0|20.0|

In the above example, the DataFrame is grouped by col1, pivoted along col2, which contains the values "A" and "B", and computes the average of col3 in each group. The pivot() method is defined in the GroupedData class. For more information about pivoting, please see this Databricks article: Reshaping Data with Pivot in Apache Spark.


By now, you should have a good feel for what is possible with the Spark DataFrame Scala API. From data munging, to SQL, to analytics, this API provides a broad range of functionality for working with big data. For more information about DataFrames, see the Spark programming guide.

Saturday, May 14, 2016

Reading JSON Nested Array in Spark DataFrames

In a previous post on JSON data, I showed how to read nested JSON arrays with Spark DataFrames. Now that I am more familiar with the API, I can describe an easier way to access such data, using the explode() function. All of the example code is in Scala, on Spark 1.6.

Loading JSON data

Suppose you have a file with JSON data, with one JSON object per line:

{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "schools":[{"sname":"ucsb", "year":2011}]}

You can read it into a DataFrame with the SqlContext read() method:

>> val people ="people.json")
people: org.apache.spark.sql.DataFrame

+-------+--------------------+ | name| schools| +-------+--------------------+ |Michael|[[stanford,2010],...| | Andy| [[ucsb,2011]]| +-------+--------------------+

Notice that the second column "schools", is an Array type, and each element of the array is a Struct:

>> people.printSchema()
root |-- name: string (nullable = true) |-- schools: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- sname: string (nullable = true) | | |-- year: long (nullable = true)

Nested Array of Struct

Flatten / Explode an Array

If your JSON object contains nested arrays of structs, how will you access the elements of an array? One way is by flattening it. For instance, in the example above, each JSON object contains a "schools" array. We can simply flatten "schools" with the explode() function.

>> import org.apache.spark.sql.functions._
val flattened =$"name", explode($"schools").as("schools_flat"))
flattened: org.apache.spark.sql.DataFrame

+-------+---------------+ | name| schools_flat| +-------+---------------+ |Michael|[stanford,2010]| |Michael|[berkeley,2012]| | Andy| [ucsb,2011]| +-------+---------------+

Now each school is on a separate row. The new column "schools_flat" is of type Struct.

Select into Struct

Now you can select, for instance, all the school names within each struct, by using the DataFrame select() method. The struct has two fields: "sname" and "year". We will select only the school name, "sname":

>> val schools ="name""schools_flat.sname")
schools: org.apache.spark.sql.DataFrame = [sname: string]

| name| sname| +-------+--------+ |Michael|stanford| |Michael|berkeley| | Andy| ucsb| +-------+--------+

There you have it! We have taken data that was nested as structs inside an array column and bubbled it up to a first-level column in a DataFrame. You can now manipulate that column with the standard DataFrame methods.


  1. The DataFrame API:
  2. The explode() function:$

Friday, April 29, 2016

Spark Window Functions for DataFrames and SQL

Introduced in Spark 1.4, Spark window functions improved the expressiveness of Spark DataFrames and Spark SQL. With window functions, you can easily calculate a moving average or cumulative sum, or reference a value in a previous row of a table. Window functions allow you to do many common calculations with DataFrames, without having to resort to RDD manipulation.

Aggregates, UDFs vs. Window functions

Window functions are complementary to existing DataFrame operations: aggregates, such as sum and avg, and UDFs. To review, aggregates calculate one result, a sum or average, for each group of rows, whereas UDFs calculate one result for each row based on only data in that row. In contrast, window functions calculate one result for each row based on a window of rows. For example, in a moving average, you calculate for each row the average of the rows surrounding the current row; this can be done with window functions.

Moving Average Example

Let us dive right into the moving average example. In this example dataset, there are two customers who have spent different amounts of money each day.

// Building the customer DataFrame. All examples are written in Scala with Spark 1.6.1, but the same can be done in Python or SQL.
val customers = sc.parallelize(List(("Alice", "2016-05-01", 50.00),
                                    ("Alice", "2016-05-03", 45.00),
                                    ("Alice", "2016-05-04", 55.00),
                                    ("Bob", "2016-05-01", 25.00),
                                    ("Bob", "2016-05-04", 29.00),
                                    ("Bob", "2016-05-06", 27.00))).
                               toDF("name", "date", "amountSpent")

// Import the window functions.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Create a window spec.
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

In this window spec, the data is partitioned by customer. Each customer’s data is ordered by date. And, the window frame is defined as starting from -1 (one row before the current row) and ending at 1 (one row after the current row), for a total of 3 rows in the sliding window.

// Calculate the moving average
customers.withColumn( "movingAvg",
                                             avg(customers("amountSpent")).over(wSpec1)  ).show()

This code adds a new column, “movingAvg”, by applying the avg function on the sliding window defined in the window spec:


Window function and Window Spec definition

As shown in the above example, there are two parts to applying a window function: (1) specifying the window function, such as avg in the example, and (2) specifying the window spec, or wSpec1 in the example. For (1), you can find a full list of the window functions here:$
 You can use functions listed under “Aggregate Functions” and “Window Functions”.

For (2) specifying a window spec, there are three components: partition by, order by, and frame.
  1.     “Partition by” defines how the data is grouped; in the above example, it was by customer. You have to specify a reasonable grouping because all data within a group will be collected to the same machine. Ideally, the DataFrame has already been partitioned by the desired grouping.
  2.       “Order by” defines how rows are ordered within a group; in the above example, it was by date.
  3.       “Frame” defines the boundaries of the window with respect to the current row; in the above example, the window ranged between the previous row and the next row.

Cumulative Sum

Next, let us calculate the cumulative sum of the amount spent per customer.

// Window spec: the frame ranges from the beginning (Long.MinValue) to the current row (0).
val wSpec2 = Window.partitionBy("name").orderBy("date").rowsBetween(Long.MinValue, 0)

// Create a new column which calculates the sum over the defined window frame.
customers.withColumn( "cumSum",
  sum(customers("amountSpent")).over(wSpec2)  ).show()


Data from previous row

In the next example, we want to see the amount spent by the customer in their previous visit.

// Window spec. No need to specify a frame in this case.
val wSpec3 = Window.partitionBy("name").orderBy("date")

// Use the lag function to look backwards by one row.
 lag(customers("amountSpent"), 1).over(wSpec3) ).show()



In this example, we want to know the order of a customer’s visit (whether this is their first, second, or third visit).

// The rank function returns what we want.
customers.withColumn( "rank", rank().over(wSpec3) ).show()



I hope these examples have helped you understand Spark’s window functions. There is more functionality that was not covered here. To learn more, please see the Databricks article on this topic: