Stream Processing using Storm and Kafka

In my earlier post, we looked at how Kafka can be integrated with Spark Streaming for processing the loan data. In the Spark streaming process, we are cleansing the data to remove invalid records before we aggregate the data. We could potentially cleanse the data in the pipeline prior to streaming the loan records in the Spark streaming application. We could subscribe the loan records, scrub them and put them back on another Topic in the Kafka broker for Spark streaming to pick the cleansed records and continue with further processing. Apache Storm provides a task parallel computation when compared to Apache Spark which is based on data parallel computation. We could potentially use Storm to perform multiple tasks on the loan data messages in parallel and put the messages back to the Kafka on multiple topics based on some logic specific to the use case. For instance, we could split the stream by grouping the states by region and have multiple Spark Streaming process run computations in data parallel mode (data partitions) with region specific logic.

In this post, we’ll look at the Storm code that subscribes to the Kafka Topic to read the raw loan records and then publish the cleansed records back to Kafka. It is assumed that Apache Kafka is already running locally as described in Apache Kafka post.

Storm Topology consists of Spouts and Bolts that describes how the data is ingested from source, processed and finally sent to a sink (in this case Kafka). Let’s look at the Spout required to read the raw data from Kafka, bolt to cleanse the loan records and finally publish the cleansed records using a Kafka Bolt. The code discussed in the post is available in GitHub.

Loan Data Kafka Spout

LoanDataKafkaSpout creates a Spout that reads the raw loan data records from the Kafka broker. It converts  Kafka ConsumerRecord to a Tuple (Offset, Loan Record Key, Loan Record Value) which is then sent to Storm bolts for subsequent processing and published the scrubbed records back to Kafka.

KafkaSpoutConfig loanDataSpoutConfig = KafkaSpoutConfig.builder(kafkaBrokerEndpoint, rawLoanDataInputTopic)
        .setGroupId("loan_data_storm_spout")
        .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
        .setRecordTranslator(consumerRecord -> {
            return Arrays.asList(consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
        }, new Fields(LoanDataCleansingContants.LOAN_DATA_TUPLE_OFFSET, LoanDataCleansingContants.LOAN_DATA_TUPLE_KEY, LoanDataCleansingContants.LOAN_DATA_TUPLE_LOANRECORD))
        .build();

Loan Data Cleansing Bolt

LoanDataCleansingBolt gets the raw loan data record Tuple from the Spout and discards the invalid records (header, empty and trailer records). It also reforms the valid loan record by matching and replacing few characters using the regex pattern. Finally the bolt emits the Tuple (Loan Record Key, Loan Record Value) containing the scrubbed loan record for further processing.

// Drop the header record that defines the attribute names
if (!(loanRecord.isEmpty() || loanRecord.contains("member_id") || loanRecord.contains("Total amount funded in policy code"))) {

    // Few records have emp_title with comma separated values resulting in records getting rejected.
    String scrubbedLoanRecord = loanRecord.replace(", ", "|").replaceAll("[a-z],", "");
    collector.emit(loanRecordTuple, new Values(loanRecordTuple.getString(1), scrubbedLoanRecord));
    collector.ack(loanRecordTuple);

} else {

    System.out.println("Invalid Loan Record dropped at offset --> " + loanRecordTuple.getLong(0) + " | " + loanRecordTuple.getString(2));
    collector.ack(loanRecordTuple);
}

Loan Data Kafka Writer Bolt

LoanDataKafkaWriterBolt creates the Bolt for writing the cleansed loan data records back to the Kafka broker. It gets the cleansed loan data record tuple from the LoanDataCleansingBolt and maps the fields to the key-value pair in a Kafka ProducerRecord. It finally published the scrubbed loan record to Kafka broker.

KafkaBolt<String, String> cleansedLoanDataWriterBolt = new KafkaBolt<string, string="">().withProducerProperties(getKafkaProducerProperties(kafkaBrokerEndpoint))</string,>
        .withTopicSelector(new DefaultTopicSelector(cleansedLoanDataOutputTopic))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>(LoanDataCleansingContants.LOAN_DATA_TUPLE_KEY, LoanDataCleansingContants.LOAN_DATA_SCRUBBED_TUPLE_LOANRECORD));

Loan Data Cleansing Topology

LoanDataCleansingTopology creates and run the storm topology that contains the spout and bolts discussed above.

TopologyBuilder loanDataTopologyBuilder = new TopologyBuilder();

// Spout to read the raw loan data from Kafka broker
loanDataTopologyBuilder.setSpout(LoanDataCleansingContants.LOAN_DATA_CLEANSING_SPOUT, LoanDataKafkaSpout.createLoanDataKafkaSpout(kafkaBroker, rawLoanDataInputTopic), 1);
// Bolt to cleanse the loan data record by dropping the invalid records and reformatting the records based on RegEx pattern
loanDataTopologyBuilder.setBolt(LoanDataCleansingContants.LOAN_DATA_CLEANSING_BOLT, new LoanDataCleansingBolt(), 1).shuffleGrouping(LoanDataCleansingContants.LOAN_DATA_CLEANSING_SPOUT);
// Bolt to publish the cleansed loan data record back to Kafka
loanDataTopologyBuilder.setBolt(LoanDataCleansingContants.LOAN_DATA_CLEANSED_WRITER_BOLT, LoanDataKafkaWriterBolt.createLoanDataKafkaWriterBolt(kafkaBroker, cleansedLoanDataOutputTopic)).shuffleGrouping(LoanDataCleansingContants.LOAN_DATA_CLEANSING_BOLT);

LocalCluster localCluster = new LocalCluster();
Config conf = new Config();
conf.setDebug(false);
// submit the topology to local cluster
localCluster.submitTopology("LoanDataCleansingTopology", conf, loanDataTopologyBuilder.createTopology());

Finally run the application by passing the following 3 command line arguments – Kafka Broker, Topic for reading the raw loan records, Topic for publishing the cleansed loan records. It is assumed that Kafka broker is already running locally with the Topics created.

localhost:9092 raw_data_storm_in cleansed_data_storm_out

After starting the Storm application polling on the Topic “raw_data_storm_in”, run the Kafka Producer that publishes the messages on the Topic “raw_data_storm_in” by reading the loan records csv file provided by Lending Club.

After all the loan data records are published by the Kafka producer, we can see that there are few records (headers, trailers, empty spaces) dropped by Storm topology based on the bolt logic.

storm1

2 thoughts on “Stream Processing using Storm and Kafka

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s