Introduced in Spark 1.4, Spark window functions improved the
expressiveness of Spark DataFrames and Spark SQL. With window functions, you
can easily calculate a moving average or cumulative sum, or reference a value
in a previous row of a table. Window functions allow you to do many common
calculations with DataFrames, without having to resort to RDD manipulation.
Aggregates, UDFs vs. Window functions
Window functions are complementary to existing DataFrame
operations: aggregates, such as sum
and avg, and UDFs. To review,
aggregates calculate one result, a sum or average, for each group of rows,
whereas UDFs calculate one result for each row based on only data in that row.
In contrast, window functions calculate one result for each row based on a
window of rows. For example, in a moving average, you calculate for each row the
average of the rows surrounding the current row; this can be done with window
functions.
Moving Average Example
Let us dive right into the moving average example. In this
example dataset, there are two customers who have spent different amounts of
money each day.
// Building
the customer DataFrame. All examples are written in Scala with Spark 1.6.1, but
the same can be done in Python or SQL.
val
customers = sc.parallelize(List(("Alice", "2016-05-01",
50.00),
("Alice", "2016-05-03", 45.00),
("Alice", "2016-05-04", 55.00),
("Bob", "2016-05-01", 25.00),
("Bob", "2016-05-04", 29.00),
("Bob", "2016-05-06", 27.00))).
toDF("name",
"date", "amountSpent")
// Import
the window functions.
import
org.apache.spark.sql.expressions.Window
import
org.apache.spark.sql.functions._
// Create a
window spec.
val wSpec1 =
Window.partitionBy("name").orderBy("date").rowsBetween(-1,
1)
In this
window spec, the data is partitioned by customer. Each customer’s data is
ordered by date. And, the window frame is defined as starting from -1 (one row
before the current row) and ending at 1 (one row after the current row), for a
total of 3 rows in the sliding window.
// Calculate
the moving average
customers.withColumn(
"movingAvg",
avg(customers("amountSpent")).over(wSpec1) ).show()
This code
adds a new column, “movingAvg”, by applying the avg function on the sliding window defined in the window spec:
name
|
date
|
amountSpent
|
movingAvg
|
Alice
|
5/1/2016
|
50
|
47.5
|
Alice
|
5/3/2016
|
45
|
50
|
Alice
|
5/4/2016
|
55
|
50
|
Bob
|
5/1/2016
|
25
|
27
|
Bob
|
5/4/2016
|
29
|
27
|
Bob
|
5/6/2016
|
27
|
28
|
Window function and Window Spec definition
As shown in the above example, there are two parts to
applying a window function: (1) specifying the window function, such as avg in the example, and (2) specifying
the window spec, or wSpec1 in the
example. For (1), you can find a full list of the window functions here:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
You can use functions listed under “Aggregate Functions” and “Window Functions”.
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
You can use functions listed under “Aggregate Functions” and “Window Functions”.
For (2) specifying a window spec, there are three
components: partition by, order by, and frame.
- “Partition by” defines how the data is grouped; in the above example, it was by customer. You have to specify a reasonable grouping because all data within a group will be collected to the same machine. Ideally, the DataFrame has already been partitioned by the desired grouping.
- “Order by” defines how rows are ordered within a group; in the above example, it was by date.
- “Frame” defines the boundaries of the window with respect to the current row; in the above example, the window ranged between the previous row and the next row.
Cumulative Sum
Next, let us calculate the cumulative sum of the amount
spent per customer.
// Window
spec: the frame ranges from the beginning (Long.MinValue) to the current row
(0).
val wSpec2 =
Window.partitionBy("name").orderBy("date").rowsBetween(Long.MinValue,
0)
// Create a
new column which calculates the sum over the defined window frame.
customers.withColumn(
"cumSum",
sum(customers("amountSpent")).over(wSpec2) ).show()
name
|
date
|
amountSpent
|
cumSum
|
Alice
|
5/1/2016
|
50
|
50
|
Alice
|
5/3/2016
|
45
|
95
|
Alice
|
5/4/2016
|
55
|
150
|
Bob
|
5/1/2016
|
25
|
25
|
Bob
|
5/4/2016
|
29
|
54
|
Bob
|
5/6/2016
|
27
|
81
|
Data from previous row
In the next example, we want to see the amount spent by the
customer in their previous visit.
// Window
spec. No need to specify a frame in this case.
val wSpec3 =
Window.partitionBy("name").orderBy("date")
// Use the lag function to look backwards by one
row.
customers.withColumn("prevAmountSpent",
lag(customers("amountSpent"),
1).over(wSpec3) ).show()
name
|
date
|
amountSpent
|
prevAmountSpent
|
Alice
|
5/1/2016
|
50
|
null
|
Alice
|
5/3/2016
|
45
|
50
|
Alice
|
5/4/2016
|
55
|
45
|
Bob
|
5/1/2016
|
25
|
null
|
Bob
|
5/4/2016
|
29
|
25
|
Bob
|
5/6/2016
|
27
|
29
|
Rank
In this example, we want to know the order of a customer’s
visit (whether this is their first, second, or third visit).
// The rank function returns what we want.
customers.withColumn(
"rank", rank().over(wSpec3) ).show()
name
|
date
|
amountSpent
|
rank
|
Alice
|
5/1/2016
|
50
|
1
|
Alice
|
5/3/2016
|
45
|
2
|
Alice
|
5/4/2016
|
55
|
3
|
Bob
|
5/1/2016
|
25
|
1
|
Bob
|
5/4/2016
|
29
|
2
|
Bob
|
5/6/2016
|
27
|
3
|
Conclusion
I hope these examples have helped you understand Spark’s
window functions. There is more functionality that was not covered here. To
learn more, please see the Databricks article on this topic: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
Glad to hear that it's helpful for them.
ReplyDeleteGreat explanations! Thanks.
ReplyDeleteIs there a way to apply UDFs to those windows/frame_of_rows instead of just using aggregates?
This comment has been removed by the author.
ReplyDeleteI would like like to ask a query:
ReplyDeleteI am using window feature of SPARK. Is there a way to get the YEAR, MONTH and DAY out of it?
Many Thanks in advance.
Carlo
That should be doable. If you look at org.apache.spark.sql.functions, it has functions for extracting the year, month, day from a date column. See: http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.functions$
DeleteYou would have to import those functions:
> import org.apache.spark.sql.functions
For the example in the article:
> customers.select(year($"date"), month($"date"), day($"date")).show()
Correcting a typo:
Delete> import org.apache.spark.sql.functions._
wanted to thank you for clear directions on how to use the windowing functions. nice!
ReplyDeleteWell articulated !!
ReplyDeleteCan you please share how to specify the sort order for the window function.
Eg: row_number() over (partition by id order by time desc)
Thanks..
Is that how to specify the sort order? Thanks.
DeleteThis comment has been removed by the author.
ReplyDeleteExcellent article , well explained !!!
ReplyDeleteBut for me the data is getting printed in the reverse order ,It is not getting sorted by name correctly ?
Below is the output printed ..
+-----+----------+-----------+---------+
| name| date|amountSpent|movingAvg|
+-----+----------+-----------+---------+
| Bob|2016-05-01| 25.0| 27.0|
| Bob|2016-05-04| 29.0| 27.0|
| Bob|2016-05-06| 27.0| 28.0|
|Alice|2016-05-01| 50.0| 47.5|
|Alice|2016-05-03| 45.0| 50.0|
|Alice|2016-05-04| 55.0| 50.0|
+-----+----------+-----------+---------+
Can explain why it is printing like this ?
great. great. bookmarking!!!
ReplyDeleteIs there a way to get the average for the past 2 days only? (perhaps by limiting the starting row to the first time a 2 day old transaction is seen)
ReplyDeleteThis comment has been removed by a blog administrator.
ReplyDeleteExcellent Article, very clear explanation.. thanks so much.
ReplyDeleteHello, can we implement tumbling window concept on static data using window functions.
ReplyDeleteExample: To view data as blocks of date intervals for each category.