Introduction to Stream Processing using Apache Spark

In my previous post, we looked at how Apache Spark can be used to ingest and aggregate the data using Spark SQL in a batch mode. There are different ways to create the Dataset from the raw data depending upon whether the schema of the ingested data is already well-known in advance (RDD of Java Beans) or requires a more flexible solution of defining and configuring the schema using Struct Type (RDD of  Rows).

In this post, we’ll look at how Spark Streaming can be used perform computations and aggregations an a stream of data. For the sake of simplicity, we’ll use the Lending Club data provided in the CSV file format and convert it into a stream of data that would then be aggregated using Spark SQL. It’s worth mentioning that Spark Core, Spark SQL and Spark Streaming all work in tandem to create a streaming data pipeline. As we saw in the previous post, Spark SQL (read Dataset and Dataframe) doesn’t replace the need for RDDs, instead it complements it by providing a high level abstraction where necessary. Likewise, Spark Streaming also provides additional capability on top of existing Spark infrastructure to allow for real-time streaming when compared to the batch mode processing provided by Spark Core and Spark SQL.

Let’s now look at a sample Java application demonstrating how Spark Streaming can be used to process high velocity data using distributed processing.

The financial data used in this application is provided by Lending Club. The data has records for all the loans issued and includes the loan amount, funding amount, term, interest rate etc. along with lot of other details pertaining to the loan issued to the customer. In order to demonstrate the aggregation on the streaming data and compare the results with the batch processing discussed in my previous post, the Loan Data has been split into 2 files. These files are ingested using Spark Streaming and the data is aggregated on the file system in Parquet format. Calculations are performed on both the individual file records as well as the aggregated records in the Parquet file to calculate the total loan funding amount for a given State.

The code discussed in this post is available on Github.

I’ll encourage you to first go though my previous post on Spark to make a better sense of what we are going to discuss in the rest of the post below.

First of all, we need to set the Spark Configuration similar to what we have seen earlier for Spark batch processing.

// Set Spark Configuration
SparkConf sparkConf = new SparkConf().setAppName("spark-financial-analysis").setMaster("local[*]");
sparkConf.set("spark.sql.parquet.compression.codec", "snappy");

Create the Spark Streaming Context with a batch duration of 10 seconds. Spark streaming would read the data in time interval of 10 seconds (micro batches) and create an RDD for all the records read in a micro-batch. In this example, we are looking for files on a specific incoming directory with Spark monitoring this directory for any new files and converting it into an RDD. JavaDStreams (Discrete Stream) is an abstraction for the Streaming data with a notion of continuously flowing RDDs in the stream representing the data in a micro-batch.

// Creates Java Streaming Context with batch size of 10 seconds
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10));

JavaDStream<String> loanStatsStream = streamingContext.textFileStream("/bigdata/streaming");

After we get the JavaDStream, we can convert the streams of JavaRDD of Strings to JavaRDD of Java Beans. RDD of JavaBeans can then be converted to a Dataset by adding a schema over the raw data.

// Create Spark Session
SparkSession session = SparkSession.builder().config(conf).getOrCreate();

// Create a Dataset of Rows (Dataframe) from the RDD of LoanDataRecord
Dataset<Row> loanStatFullDataset = session.createDataFrame(loanDataRecordRDD, LoanDataRecord.class);

We can now query the data by registering as a temporary table as demonstrated in my previous post. In this case, we are converting the Dataset to RDD and then filtering the RDD for a specific state and calculating the total funded amount for IL state.

// Calculate the total funding amount for a given State in current file of streaming records
List<String> fundedAmountsForState = loanStatFullDataset.javaRDD().filter(row -> "IL".equalsIgnoreCase(row.getString(0))).map(row -> row.getString(2)).collect();

String totalFundedAmountForState = fundedAmountsForState.stream().reduce((x,y) -> Double.toString(Double.parseDouble(x) + Double.parseDouble(y))).get();

Following images show the number of records processed and the funding amount in the split file #1

SparkStreaming2

SparkStreaming3

Following images show the number of records processed and the funding amount in the split file #2

SparkStreaming4

SparkStreaming5

The incoming raw files processed in the Streaming mode gets appended to the parquet file format which can then be used to calculate the aggregated funded amount for the State. This gives us the benefit to query the processed data stored in the parquet file or move it to some other storage system for future batch processing.

// Read the aggregated data from the parquet File for batch records received so far
    Dataset<Row> aggregatedLoanDataParquet = session.read().parquet("/bigdata/loanStatFullDataset");

    if (!aggregatedLoanDataParquet.rdd().isEmpty()) {

        System.out.println("Streaming Financial Statistics aggregated Record count " + aggregatedLoanDataParquet.count());

        // Calculate the total funding amount for a given State in current file of streaming records
        List<String> fundedAmountsForStateAgg = aggregatedLoanDataParquet.javaRDD().filter(row -> "IL".equalsIgnoreCase(row.getString(0))).map(row -> row.getString(2)).collect();

        String totalFundedAmountForStateAgg = fundedAmountsForStateAgg.stream().reduce((x, y) -> Double.toString(Double.parseDouble(x) + Double.parseDouble(y))).get();

        System.out.println("Aggregated Total Amount funded by Lending Club in IL State : $" + new BigDecimal(totalFundedAmountForStateAgg).toPlainString());
    }
}

Following images show the aggregated amount for both the files read in the streaming context. As you can see, the amount calculated in the streaming data is same as the amount calculated in the batch mode discussed in the previous post.

SparkStreaming6

In my next post, we’ll look at how we can leverage Apache Kafka as an Enterprise Event Bus with distributed message capabilities to provide an even sourcing solution that can scale to millions of events per second. We’ll also look at how Kafka can be integrated with Apache Spark Streaming to ingest data in real-time instead of flat files.

5 thoughts on “Introduction to Stream Processing using Apache Spark

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s