Thursday, March 3, 2016

Spark 1.6 Datasets API: Example Usage

Overview

Spark 1.6 introduced a new Datasets API. It is an extension of Dataframes that supports functional processing on a collection of objects. Let's take a look at some examples of how to use them. First we'll read a JSON file and a text file into Datasets. We will apply functional transformations to parse the data. Then we will run relational queries against a Dataset.

Creating a Dataset from a JSON file

Suppose you have JSON formatted data which you would like to read into a Dataset. Here is an example JSON file:
Contents of "students.json" --
{"name":"Alice", "dept":"Math"}
{"name":"Bob", "dept":"CS"}
{"name":"Carl", "dept":"Math"}
To create a Dataset from this JSON file:
// Define the Student row type.
> case class Student(name: String, dept: String)
// Read JSON objects into a Dataset[Student].
> val studentsFromJSON = sqlContext.read.json("students.json").as[Student]

Creating a Dataset from a Text file

Suppose instead you have data in a text file, in tab-separated (.tsv) format:
Alice<tab>Math<tab>18
Bob<tab>CS<tab>19
Carl<tab>Math<tab>21

To create a Dataset from this text file:
// Read the lines of the file into a Dataset[String].
> val studentsFromText = sqlContext.read.text("students.tsv").as[String]
(result) studentsFromText: org.apache.spark.sql.Dataset[String] = [value: string]


// We want a Dataset of type "Student".
case class Student(name: String, dept: String, age:Int)

// Functional programming to parse the lines into a Dataset[Student].

val students = studentsFromText.
  map(line => {
    val cols = line.split("\t") // parse each line
    Student(cols(0), cols(1), cols(2).toInt)
  })
(result) students: org.apache.spark.sql.Dataset[Student] = [name: string, dept: string, age: int]

// Show the contents of the Dataset.
> students.show()
| name|dept|age|
+-----+----+---+
|Alice|Math| 18|
|  Bob|  CS| 19|
| Carl|Math| 21|

Relational queries

Datasets support relational queries, with operations such as: select, filter, group by, count, avg, join.

SELECT, FILTER

Get the names of students in the Math department.
// Select two columns and filter on one column.
// Each argument of "select" must be a "TypedColumn".
> students.select($"name".as[String], $"dept".as[String]).
    filter(_._2 == "Math").  // Filter on _2, the second selected column
    collect()
(result) Array((Alice,Math), (Carl,Math))

GROUP BY, COUNT

Count the number of students in each department.
// Group by department and count each group.
> students.groupBy(_.dept).count().collect()
(result) Array((CS,1), (Math,2))

GROUP BY, AVG

Average age in each department.
// Import the "avg" function.
> import org.apache.spark.sql.functions._

// Group and aggregate in each group.
> students.groupBy(_.dept).
    agg(avg($"age").as[Double]).
    collect()
(result) Array((CS,19.0), (Math,19.5))

JOIN

Suppose we have a separate table with deparment information. We would like to join the department information into our student table.

First, create the department Dataset.
// The Department type.
> case class Department(abbrevName: String, fullName: String)

// Initialize a Seq and convert to a Dataset.
> val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()

// Show the contents of the Dataset.
> depts.show()

|abbrevName|        fullName|
+----------+----------------+
|        CS|Computer Science|
|      Math|     Mathematics|

Join the students Dataset with the departments Dataset.
// Join two datasets with "joinWith".
> val joined = students.joinWith(depts, $"dept" === $"abbrevName")

// Show the contents of the joined Dataset.
// Note that the original objects are nested into tuples under the _1 and _2 columns.
> joined.show()

|             _1|                  _2|

+---------------+--------------------+
|[Alice,Math,18]|  [Math,Mathematics]|
|    [Bob,CS,19]|[CS,Computer Scie...|
| [Carl,Math,21]|  [Math,Mathematics]|

Select two columns from the joined Dataset.
// Use "map" to select from the joined Dataset. 
// Notice that the original Dataset types are preserved.
> joined.map(s => (s._1.name, s._2.fullName)).show()

|   _1|              _2|

+-----+----------------+
|Alice|     Mathematics|
|  Bob|Computer Science|
| Carl|     Mathematics|

EXPLAIN

"Explain" prints the query's physical plan for debugging.

// Explain how the join is computed.
// Note that a BroadcastJoin is planned.
> joined.explain()

== Physical Plan ==
Project [struct(name#168163,dept#168164,age#168165) AS _1#168203,struct(abbrevName#168200,fullName#168201) AS _2#168204]
+- BroadcastHashJoin [dept#168164], [abbrevName#168200], BuildRight
   :- ConvertToUnsafe
   :  +- !MapPartitions <function1>, class[value[0]: string], class[name[0]: string, dept[0]: string, age[0]: int], [name#168163,dept#168164,age#168165]
   :     +- ConvertToSafe
   :        +- Scan TextRelation[value#168157] InputPaths: /students.tsv
   +- ConvertToUnsafe
      +- LocalTableScan [abbrevName#168200,fullName#168201], [[0,1800000002,2000000010,5343,72657475706d6f43,65636e6569635320],[0,1800000004,200000000b,6874614d,74616d656874614d,736369]]



Saturday, October 24, 2015

Grace Hopper Conference 2015 Wrap up

Very briefly, highlights from GHC 2015 ...

Susan Wojcicki's (CEO of Youtube) keynote speech. She talked about two reasons why there are so few women in tech: (1) not enough women in the pipeline, starts in elementary / middle school, and (2) retention of women in tech - a workplace culture that causes women to leave at twice the rate of men. Not enough support for maternity leave and working moms is a problem. Her career advice: "Keep asking to be invited." "Power and influence is passed down from those who have it."

Clara Shih, CEO of Hearsay Social offers career advice: (1) embrace failure, (2) nurture relationships.

Manuela Veloso's (CMU) keynote on symbiotic autonomy for autonomous robots. Robots can navigate around buildings with the help of humans.

Dinner with Duy Loan Le, former TI fellow, and Vietnamese Women in Computing. Follow your own path; don't compare yourself to others. Fail fast. It was nice meeting other Vietnamese women in computing.

My birds-of-a-feather discussion on maternity leave. There were lots of good conversations about fitting maternity leave into a woman's career. Key take aways: (1) ask for what you need, (2) start building your network now.

Lunch time table topic: Asian American identity in computing. Intersectionality.

All the conversations I had with women from other companies and students. Catching up with former co-workers.

Anita Borg Institute: you belong (in this field).

Hour of Code: teaching computer science in public schools is the best way to increase diversity in CS.

Jo Miller career workshop: for leaders, "delivers results" is a must-have.

What worked: meet your friend's friends: I met so many women through my former co-worker. Lunch time table topics lead to interesting conversations. Speaking at the conference is a great way to be involved. I had good conversations with people sitting next to me at talks.

For next time (if there is a next time): talk with more students. Stay for the full conference: more time to network. Most technical talks are beginner level, so go to talks to learn something new, and not on topics you already know. After meeting someone, always get their name and / or business card. Stay in lodging as close to the conference as possible. Don't do work if possible.

Saturday, September 12, 2015

Attending Grace Hopper Conference


I will be attending the Grace Hopper Celebration of Women in Computing this October, in Houston, TX. I am looking forward to connecting with other women in data science and big data. I would love to compare notes on Spark, Scala, Databricks, and machine learning. I would also like to meet others doing agile and scrum, or leading data science projects. There will be talks on professional development, and I will be leading a discussion on the impact of maternity leave on a woman's career. Last, but not least, I will be happy to discuss jobs at Samsung SDSA with job seekers.

Thursday, June 18, 2015

Reading JSON data in Spark DataFrames

Overview

Spark DataFrames makes it easy to read from a variety of data formats, including JSON. In fact, it even automatically infers the JSON schema for you. Once the data is loaded, however, figuring out how to access individual fields is not so straightforward. This post will walk through reading top-level fields as well as JSON arrays and nested objects. The code provided is for Spark 1.4. Update: please see my updated post on an easier way to work with nested array of struct JSON data.

Load the JSON File

Let's begin by loading a JSON file, where each line is a JSON object:

{"name":"Michael", "cities":["palo alto", "menlo park"], "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "cities":["santa cruz"], "schools":[{"sname":"ucsb", "year":2011}]}
{"name":"Justin", "cities":["portland"], "schools":[{"sname":"berkeley", "year":2014}]}

The Scala code to read a JSON file:

>> val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame

Read a Top-Level Field

With the above command, all of the data is read into a DataFrame. In the following examples, I will show how to extract individual fields into arrays of primitive types. Let's start with the top-level "name" field:

>> val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])

>> names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)

Use the select() method to specify the top-level field, collect() to collect it into an Array[Row], and the getString() method to access a column inside each Row.

Flatten and Read a JSON Array

Update: please see my updated post on an easier way to work with nested array of struct JSON data.

Next, notice that each Person has an array of "cities". Let's flatten these arrays and read out all their elements.

>> val flattened = people.explode("cities", "city"){c: List[String] => c}
flattened: org.apache.spark.sql.DataFrame

>> val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]

>> allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)

The explode() method explodes, or flattens, the cities array into a new column named "city". We then use select() to select the new column, collect() to collect it into an Array[Row], and getString() to access the data inside each Row.

Read an Array of Nested JSON Objects, Unflattened

Finally, let's read out the "schools" data, which is an array of nested JSON objects. Each element of the array holds the school name and year:

>> val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]

>> val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]

>> schoolsArr.foreach(schools => {
>>    schools.map(row => print(row.getString(0), row.getLong(1)))
>>    print("\n")
>> })
(stanford,2010)(berkeley,2012) 
(ucsb,2011) 
(berkeley,2014)

Use select() and collect() to select the "schools" array and collect it into an Array[Row]. Now, each "schools" array is of type List[Row], so we read it out with the getSeq[Row]() method. Finally, we can read the information for each individual school, by calling getString() for the school name and getLong() for the school year. Phew!

Summary

In this blog post, we have walked through accessing top-level fields, arrays, and nested JSON objects from JSON data. The key classes involved were DataFrame, Array, Row, and List. We used the select(), collect(), and explode() DataFrame methods, and the getString(), getLong(), and get Seq[T]() Row methods to read data out into arrays of primitive types.

Saturday, May 23, 2015

GraphX: Graph Computing for Spark

Overview

I've been reading about GraphX, Spark's graph processing library. GraphX provides distributed, in-memory graph computing. The key thing that differentiates it from other large-scale graph processing sytems, like Giraph and GraphLab, is that it is tightly integrated within the Spark ecosystem. This allows efficient data pipelines that combine ETL (SQL), machine learning, and graph analysis within one framework (Spark), without the overhead of running multiple systems and copying data between them.

The Spark stack.

Graph Library for the Spark Framework

Graphs in GraphX are directed multigraph property graphs, which means that each vertex and each edge can have properties (attributes) associated with it. GraphX graphs are distributed and immutable. You create a graph in GraphX by providing an RDD of vertices and an RDD of edges. You can then perform OLAP operations on a graph through the API. A pregel API supports vertex-centric, bulk-synchronous parallel, iterative algorithms.

In-memory indexes speed up graph operations. Edge partitioning (which means vertices can be split across partitions) and vertex data replication speed up edge traversal, which usually involves communication across machines. A 2014 research paper shows performance comparable to other graph systems, Giraph and GraphLab.

GraphX
GraphX is built on RDDs.

Applications

A couple of recent MLLib algorithms are implemented on GraphX: LDA topic modeling and Power Iteration Clustering. Alibaba Taobao uses GraphX for data mining in ecommerce, modeling user-item-merchant interactions as a graph. Netflix uses GraphX for movie recommendation, with graph diffusion and LDA clustering algorithms.


Friday, April 24, 2015

Spark for Exploratory Data Analysis?

Python and R have been known for their data analysis packages and environments. But, now that Spark supports DataFrames, will it be possible to do exploratory data analysis with Spark? Assuming the production system is implemented in Spark for scalability, it would be nice to do the initial data exploration within the same framework.

At first glance, all the major components are available. With Spark SQL, you can load a variety of different data formats, such as JSON, Hive, Parquet, and JDBC, and manipulate the data with SQL. Since the data is stored in RDDs (with schema), you can also process it with the original RDD APIs, as well as algorithms and utilities in MLLib.

Of course, the details matter, so without having done a real world project in this framework, I have to wonder: what is missing? Is there a critical data frame function in Pandas or R, that is not yet supported in Spark? Are there other missing pieces that are critical to real world data analysis? How difficult is it to patch up those missing pieces by linking in external libraries?

Thursday, December 4, 2014

Lightbeam: Shines a Light on Creepy Ads

Have you ever had a creepy ad experience? Have you gone online to search for lodging in Lake Tahoe, and then two minutes later received an email from Groupon for a "Tahoe weekend getaway"? Sure, many companies advertise online, but Groupon ads seem extremely targeted. So, I was curious: "how much is Groupon tracking me?"

Lightbeam Shows Third-Party Tracking

One way that websites target ads is by tracking a user's online behavior. When a user visits a website, the web page may execute tracking scripts and store cookies, which allow third-party ad networks to track that user. These ad networks can then collect information whenever the user visits another website that partners with them. (More info here.)

To investigate the extent of Groupon's tracking, I installed the Lightbeam add-on to my Firefox browser. This add-on, released by Mozilla about a year ago, shows all the third-party cookies and connections made by websites, in an interactive visualization. After browsing to a few websites (Yahoo, Google, Groupon), I could see that Groupon contacted by far the largest number of third-party domains for ad tracking, 17 in all, 8 of which also created a cookie.

Groupon (the green "G") made the most third-party connections, as shown in Lightbeam.


Third-party Domains

What are these third-party domains that Groupon connected with?

  • Google Tag Manager: "Tags are tiny bits of website code that let you measure traffic and visitor behavior."
  • Criteo: "Personalized retargeting company."
  • Facebook
  • Advertising.com: "The most mature ad optimization and bid management system in the business."
  • Rubicon Project: "Leading the automation of advertising."
  • Doubleclick: Internet ad serving, subsidiary of Google.
  • PubMatic: "Programmatic advertising."
  • Yahoo
  • Channel Intelligence (Google): "Optimize your data and sell more products."
  • Nanigans: "The industry-leading advertising automation platform."
  • Atwola: "Atwola is a cookie that is used to track user browsing history." 

Curious about ad tracking in websites you visit? Check out the Lightbeam Firefox add-on