Showing posts with label data science. Show all posts
Showing posts with label data science. Show all posts

Friday, May 20, 2016

Overview of Spark DataFrame API

Introduction

Spark DataFrames were introduced in early 2015, in Spark 1.3. Since then, a lot of new functionality has been added in Spark 1.4, 1.5, and 1.6. More than a year later, Spark's DataFrame API provides a rich set of operations for data munging, SQL queries, and analytics. This post will give an overview of all the major features of Spark's DataFrame API, focusing on the Scala API in 1.6.1.

Outline


  • Classes and Objects
  • Reading Data
  • Traditional Dataframe Operations
  • Lazy Eval and collect()
  • SQL (Relational) Operations
  • Data Munging
  • Analytics

Classes and Objects

Let us start by reviewing the major classes and objects in the DataFrame API. The main ones are SQLContext, DataFrame, Column, and functions.

SQLContext, DataFrame, and Column Classes

  • SQLContext is the main entry point for creating DataFrames.
  • DataFrame is the main class representing the DataFrame data and operations.
  • The Column class represents an individual column of a DataFrame.

Functions Object

The functions object contains functions for aggregation, math, and date/time and string manipulation that can be applied on DataFrame columns.

Reading Data into a DataFrame

JSON, Parquet, JDBC, Hive, CSV

DataFrames can read from a large number of source data formats, such as JSON, Parquet, JDBC, and Hive. See the DataFrameReader class for some of the natively supported formats and Spark Packages for packages available for other formats, such as CSV and many others.

Reading JSON Example 

Here is an example of reading JSON data into a DataFrame. The input file must contain one JSON object on each line:

val df = sqlContext.read.json("/home/data.json")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

In the above example, sqlContext is of type SQLContext, its read() method returns a DataFrameReader, and the reader's json() method reads the specified data file.

Traditional Dataframe Operations

Spark DataFrames support traditional dataframe operations that you might expect from working with Pandas or R dataframes. You can select columns and rows, create new columns, and apply functions on column values.

Selecting Columns

To select one or more columns:
> df.select("col1")
|col1| +----+ | 1| | 2|


Selecting Rows

To select rows based on a boolean filter:
> df.filter(df("col1") > 1)  
|col1|col2| +----+----+ | 2| 6|
In the above example, df("col1") is of type Column, and ">" is a method defined in the Column class. Alternatively, a column can be specified with the $"col1" syntax.

More methods in class Column: ===, !==, isNaN, isNull, isin, like, startsWith, endsWith
See also in class DataFrame: sample

Creating New Columns

To create a new column derived from existing ones, use the withColumn() method:
> df.withColumn("col3", df("col1") + df("col2")) 
|col1|col2|col3| +----+----+----+ | 1| 5| 6| | 2| 6| 8|

More methods in class Column%, *, -, /, bitwiseAND, bitwiseOR, cast, &&, ||

Math Functions on Columns

A number of math functions, defined in the functions object, such as sqrt(), can be applied to column values:
import org.apache.spark.sql.functions._
> df.select(df("col1"), sqrt(df("col1")))
|col1| SQRT(col1)| +----+------------------+ | 1| 1.0| | 2|1.4142135623730951|

You can also define your only functions on columns, via UDFs. See the "UDF" section below. Here are some more predefined math functions (not comprehensive) in functions
cos, sin, tan, exp, log, pow, cbrt, hypot, toDegrees, toRadians, ceil, floor, round, rint, pmod, shiftLeft, shiftRight

Displaying Data and Schema

To display a DataFrame:
> df.show()
|col1|col2| +----+----+ | 1| 5| | 2| 6|

To display a DataFrame's column names and types:
> df.printSchema()
root |-- col1: integer (nullable = false) |-- col2: integer (nullable = false)

See also: headtake, count

Lazy Eval and collect()

DataFrames are evaluated lazily, which means that no computation takes place until you perform an action. Any non-action method will thus return immediately, in most cases. An action is any method that produces output that is not a DataFrame, such as displaying data on the console, converting data into Scala Arrays, or saving data into a file or database. 

To convert a DataFrame into an array, use the collect() method:
> df.collect()
res0: Array[org.apache.spark.sql.Row] = Array([1,5], [2,6])

To convert only the first n rows, use head or take.

SQL (Relational) Operations

DataFrames also support SQL (relational) operations, such as SELECT, WHERE, GROUP BY, Aggregate, and JOIN. You can also define UDFs (user-defined functions).

SELECT, WHERE

To do a SELECT with a WHERE clause:
> df.select("col1", "col2")
    .where($"col1" === 1)
|col1|col2| +----+----+ | 1| 5|

Note that "===" is a Column method that tests for equality. More methods in class Column
>, <!==isNaNisNullisinlikestartsWithendsWith

GROUP BY, Aggregate

To do a GROUP BY and aggregation:
> df1.show()
|col1|col2| +----+----+ | 1| 5| | 2| 6| | 2| 7|
import org.apache.spark.sql.functions._
> df1.groupBy("col1")
     .agg(sum("col2").as("sum_col2"))
|col1|sum_col2| +----+--------+ | 1| 5| | 2| 13|

Note that the groupBy() method returns a GroupedData object, on which we call the agg() method to perform one or more aggregations. The sum() function is one of the aggregation functions defined in the functions object.

More aggregation functions in functions: approxCountDistinct, avg, corr, count, countDistinct, first, last, max, mean, min, skewness, stddev, sumDistinct, variance (and more)

JOIN

To do a JOIN between two DataFrames:
> people.show()
| id| name| +---+-----+ | 1|Alice| | 2| Bob|
> people.join(df, people("id") === df("col1"))
| id| name|col1|col2| +---+-----+----+----+ | 1|Alice| 1| 5| | 2| Bob| 2| 6|

The above example shows an inner join; other join types, such as outer join, are also supported.

More SQL operations

See also in class DataFrame:
alias, as, cube, distinct, drop, dropDuplicates, intersect, limit, na, orderBy, repartition, rollup, selectExpr, sort, unionAll, withColumnRenamed

UDFs

To define a UDF, use the udf function:
import org.apache.spark.sql.functions.udf
val myUdf = udf {(n: Int) => (n * 2) + 1}

You can then apply the UDF on one or more Columns:
> df.select(df("col1"), myUdf(df("col1")))
|col1|UDF(col1)| +----+---------+ | 1| 3| | 2| 5|

Functions for Data Munging

There are a variety of functions to simplify data munging on date, timestamp, string, and nested data in DataFrames. These functions are defined in the functions object.

Dates and Timestamps

Here is an example of working with dates and timestamps. The date_add() function adds or subtracts days from a given date. The unix_timestamp() function returns a Unix timestamp corresponding to a timestamp string in a specified format:

import org.apache.spark.sql.functions._
> df6.withColumn("day_before", date_add(df6("date"), -1))
     .withColumn("unix_time", unix_timestamp(df6("date"), "yyyy-MM-dd"))

| date|day_before| unix_time| +----------+----------+----------+ |2016-01-01|2015-12-31|1451606400| |2016-09-05|2016-09-04|1473033600|

See also (not comprehensive) in functions: current_date, current_timestamp, date_sub, datediff, dayofmonth, dayofyear, from_unixtime, hour, last_day, minute, month, next_day, quarter, second, trunc, weekofyear, year

Strings

There are also a number of functions for working with strings. Here is one example, with the substring() function, which returns a substring given an input string, position, and length:

> df6.withColumn("month_day", substring(df6("date"), 6, 5))
| date|month_day|
+----------+---------+ |2016-01-01| 01-01| |2016-09-05| 09-05|

See also (not comprehensive) in functions: ascii, concat, decode, encode, format_number, format_string, length, levenshtein, lower, lpad, ltrim, regexp_extract, regexp_replace, repeat, rtrim, split, translate, trim, upper

Nested Data Structures

With certain data formats, such as JSON, it is common to have nested arrays and structs in the schema. The functions object includes functions for working with nested columns. For example, if a column is of type Array, such as "col2" below, you can use the explode() function to flatten the data inside that column:

> df8
|col1| col2| +----+--------+ | 1|[1a, 1b]| | 2| [2a]|

> df8.select(df8("col1"), explode(df8("col2")).as("col2_flat"))
|col1|col2_flat| +----+---------+ | 1| 1a| | 1| 1b| | 2| 2a|

The new flattened column, "col2_flat", can now be manipulated as an ordinary top-level column. For more about nested array data, please see my post on the topic.

See also: 
array_contains, size, sort_array, struct, array in functions.

Analytics

The DataFrame API includes functionality for analytics, namely, summary statistics, window functions, and pivot tables.

Summary Statistics

The describe() method computes summary statistics for numerical columns and is meant for exploratory data analysis:

> df.describe()
|summary| col1| col2| +-------+------------------+------------------+ | count| 2| 2| | mean| 1.5| 5.5| | stddev|0.7071067811865476|0.7071067811865476| | min| 1| 5| | max| 2| 6|

The stat() method returns a DataFrameStatFunctions object, which provides additional statistics functions such as:
corr, cov, crosstab, freqItems, sampleBy

Window Functions

Window functions allow you to perform calculations over a moving window of rows, and are the basis for calculating a moving average or cumulative sum. You can apply window functions on DataFrames by defining a WindowSpec:

import org.apache.spark.sql.expressions.Window
val wSpec2 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

The above window spec for a moving average consists of three components: (1) partition by, (2) order by, and (3) a frame. To use this WindowSpec in a DataFrame, you would apply a window function or aggregation function, such as avg() over this WindowSpec:

> customers.withColumn("movingAvg", avg(customers("amountSpent")).over(wSpec2))
| name| date|amountSpent|movingAvg| +-----+----------+-----------+---------+ |Alice|2016-05-01| 50.0| 47.5| |Alice|2016-05-03| 45.0| 50.0| |Alice|2016-05-04| 55.0| 50.0| | Bob|2016-05-01| 25.0| 27.0| | Bob|2016-05-04| 29.0| 27.0| | Bob|2016-05-06| 27.0| 28.0|

For a list of all the possible window functions and aggregation functions, please see the functions object. For more examples of using window functions, please see my blog post on the topic.

For more window functions, see in functions: cume_dist, lag, lead, ntile, percent_rank, rank, row_number (and more)

Pivot Tables

You can create pivot tables with the pivot() method:
> df7.groupBy("col1").pivot("col2").avg("col3")
|col1| A| B| +----+----+----+ | 1|10.0|21.0| | 2|12.0|20.0|

In the above example, the DataFrame is grouped by col1, pivoted along col2, which contains the values "A" and "B", and computes the average of col3 in each group. The pivot() method is defined in the GroupedData class. For more information about pivoting, please see this Databricks article: Reshaping Data with Pivot in Apache Spark.

Summary

By now, you should have a good feel for what is possible with the Spark DataFrame Scala API. From data munging, to SQL, to analytics, this API provides a broad range of functionality for working with big data. For more information about DataFrames, see the Spark programming guide.

Thursday, April 28, 2016

Introduction to Spark for Developers and Data Scientists

What is Spark?

Spark is “a fast and general engine for large-scale data processing”. – http://spark.apache.org/
Spark is also one of the most popular open source frameworks for big data, based on number of contributors. Let us find out why this is the case.

When do you use Spark?

Suppose you would like to analyze a data set: perform ETL or data munging, then run SQL queries such as grouping and aggregations against the data, and maybe apply a machine learning algorithm. When the data size is small, everything will run quickly on a single machine, and you can use analysis tools like Pandas (Python), R, or Excel, or write your own scripts. But, for larger data sets, data processing will be too slow on a single machine, and then you will want to move to a cluster of machines. This is when you would use Spark.

You could probably benefit from Spark if:
  •       Your data is currently stored in Hadoop / HDFS.
  •       Your data set contains more than 100 million rows.
  •       Ad-hoc queries take longer than 5 minutes to complete.

What Spark is Not, typical architecture

Spark can be a central component to a big data system, but it is not the only component. It is not a distributed file system: you would typically store your data on HDFS or S3. Nor is Spark a NoSQL database: Cassandra or HBase would be a better place for horizontally scalable table storage. And, it is not a message queue: you would use Kafka or Flume to collect streaming event data. Spark is, however, a compute engine which can take input or send output to all of these other systems.

How do you use Spark?

Implemented in Scala, Spark can be programmed in Scala, Java, Python, SQL, and R. However, not all of the latest functionality is immediately available in all languages.

What kind of operations does Spark support?

Spark SQL. Spark supports batch operations involved in ETL and data munging, via the DataFrame API. It supports parsing different input formats, such as JSON or Parquet. Once the raw data is loaded, you can easily compute new columns from existing columns. You can slice and dice the data by filtering, grouping, aggregating, and joining with other tables. Spark supports relational queries, which you can express in SQL or through the DataFrame API.

Spark Streaming. Spark also provides scalable stream processing. Given an input data stream, for example, coming from Kafka, Spark allows you to perform operations on the streaming data, such as map, reduce, join, and window.

MLlib. Spark includes a machine learning library, with scalable algorithms for classification, regression, collaborative filtering, clustering, and more. If training your dataset on a single machine takes too long, you might consider cluster computing with Spark.

GraphX. Finally, GraphX is a component in Spark for scalable batch processing on graphs.

As you may have noticed by now, Spark processing is batch oriented. It works best when you want to perform the same operation on all of your data, or a large subset of your data. Even with Spark Streaming, you operate on small batches of the data stream, rather than one event at a time.

Spark vs. Hadoop MapReduce, Hive, Impala

How does Spark compare with other big data compute engines? Unlike Hadoop MapReduce, Spark caches data in memory for huge performance gains when you have ad-hoc queries or iterative workloads, which are common in machine learning algorithms. Hive and Impala both run SQL queries at scale; the advantage of Spark over these systems is (1) the convenience of writing both queries and UDFs in the same language, such as Scala, and (2) support for machine learning algorithms, streaming data, and graph processing within the same system.

Conclusion

This overview has explained what Spark is, when to use it, what kinds of operations it supports, and how it compares with other big data systems. To learn more, please take a look at the Spark website (http://spark.apache.org/).

Sunday, September 28, 2014

Higgs Data Challenge: Particle Physics meets Machine Learning

The HiggsML Challenge on Kaggle

How do you frame a particle physics problem as a data challenge? Will it be accessible to Kagglers? Will data scientists be able to outperform physicists with machine learning? (as was the case in the NASA dark matter challenge.) The Kaggle Higgs competition recently came to a close with great results.

The HiggsML challenge: when High Energy Physics meets Machine Learning.

Competition summary.

A Particle Physics Data Challenge

Goal: Classify collision events

The goal of the competition was to classify 550,000 proton-proton collision events as either originating from a Higgs boson decay (a "signal" event) or not (a "background" event). This type of analysis can be used in experiments at the Large Hadron Collider to discover decay channels of the Higgs boson.

Data: Tracks of particles produced in each collision

Each proton-proton collision, or event, produces a number of outgoing particles, whose tracks are recorded by a particle detector. The training data consisted of 250,000 labelled events, obtained from simulations of the collisions and detector. For each event, 30 features based on particle track data, such as particle mass, speed, and direction, were provided. The actual experimental data is highly unbalanced (much more background than signal), but the training data provided was fairly balanced.

Particle tracks: a Higgs boson decays into two τ particles, each of which subsequently decays into either an electron (blue line) or a muon (red line). ATLAS Experiment © 2014 CERN.

Evaluation Metric: Significance of discovery

Instead of a standard performance metric, submissions were evaluated by Approximate Median Significance (AMS), a measure of how well a classifier is able to discover new phenomena.

When a classifier labels events (in experimental data, not simulations) as "signal", the p-value is the probability that those events were actually just background events. The corresponding "significance" (Z) is the normal quantile of that p-value, measured in units of sigma. We want a small p-value and large "significance", before claiming a discovery. Finally, AMS is an estimate of the "significance" for a given classifier. For more details, see the Kaggle evaluation page.

Making Particle Physics Accessible

The Organizers

The contest organizers did a fantastic job of designing and organizing this competition. A group of physicists and machine learning researchers, they took a difficult subject, particle physics, and spent 18 months creating a data challenge that ultimately attracted 1,785 teams (a Kaggle record).

Their success can be attributed to the following:

  • Designing a dataset that could be analyzed without any domain knowledge. 
  • Providing multiple software starter kits that made it easy to get an analysis up and running. 
  • Providing a 16-page document explaining the challenge.
  • Answering questions promptly on the Kaggle discussion forum.

Not needed to win the contest. "Higgs-gluon-fusion" by TimothyRias. Licensed under Creative Commons Attribution-Share Alike 3.0.

The Results

Did machine learning give data scientists an edge? How did the physicists fare?

Cross validation was key

It was not machine learning, but the statistical technique of cross validation, that was key to winning this competition. The AMS metric had a high variance, which made it difficult to know how well a model was performing. You could not rely on the score on the public leaderboard because it was calculated on only a small subset of the data. Therefore, a rigorous cross validation (CV) method was needed for model comparison and parameter tuning. The 1st-place team, Gabor Melis, did 2-fold stratified CV repeated 35 times on random shuffles of the training data. Both the 1st-place and 4th-place teams mentioned cross validation as key to a high score on the forum.

Insufficient CV could lead to overfitting and large shake-ups in the final ranking, as can be seen in a meta analysis of the competition scores.

Overfitting led to shake-ups in the final ranking. "Overfitting svg" by Gringer. Licensed under Creative Commons Attribution 3.0

Feature engineering

Feature engineering was less important in this contest, having a small impact on the score. The reason was the organizers had already designed good features, such as those used in the actual Higgs discovery, into the provided dataset. Participants did come up with good features, such as a feature named “Cake”, which discriminated between the signal and one major source of background events, the Z boson.

Software and Algorithms

The software libraries and algorithms discussed on the discussion forum were mostly standard ones: gradient tree boosting, bagging, Python scikit-learn, and R. The 1st-place entry used a bag of 70 dropout neural networks (overview, model and code). Early in the competition, Tianqi Chen shared a fast gradient boosting library called XGBoost, which was popular among participants.

The 1st-place entry used a bag of 70 dropout neural networks. "Colored neural network" by Glosser.ca. Licensed under Creative Commons Attribution-Share Alike 3.0

Particle Physics meets Machine Learning

The organizers' overall goal was to increase the cross fertilization of ideas between the particle physics and machine learning communities. In that respect, this challenge was a good first step.

Physicists who participated began to appreciate machine learning techniques. This quote from the forum summarizes that sentiment: "The process of attempting as a physicist to compete against ML experts has given us a new respect for a field that (through ignorance) none of us held in as high esteem as we do now."

On the machine learning side, one researcher, Lester Mackey, developed a method to optimize AMS directly. In scikit-learn, support for sample weights was added to the GradientBoostingClassifier, in part due to interest from this competition.

And, much more will be discussed at the upcoming NIPS workshop on particle physics and machine learning. There will also be a workshop on data challenges in machine learning.

There will be a workshop on particle physics and machine learning at the NIPS machine learning conference.

Personal Note

I studied physics at Caltech and Stanford / SLAC (Stanford Linear Accelerator Center), before switching to computer science. Therefore, I could not pass up the opportunity to participate in this challenge myself. This was my first Kaggle competition, in which I ranked in the top 10%.

Further Reading


  • The challenge website, with supplementary information not found on Kaggle: The HiggsML Challenge
  • Tim Salimans, placed 2nd, with a blend of boosted decision tree ensembles: details
  • Courtiol Pierre, placed 3rd, with an ensemble of neural networks: details
  • Lubos Motl, a theoretical physicist who ranked 8th, wrote several blog posts about the competition: initial post
  • "Log0" ranked 23rd by using scikit-learn AdaBoostClassifier and ExtraTreesClassifier: forum post
  • "phunter" ranked 26th with a single XGBoost model: What can a single model do?
  • Darin Baumgartel shared code based on scikit-learn GradientBoostingClassifier: post
  • Trevor Stephens, a data scientist, tried out hundreds of automatically generated features: Armchair Particle Physicist
  • Bios of participants who reached the top of the public leaderboard: Portraits
  • Balazs Kegl, one of the organizers, talked about the challenge in this video: CERN seminar