Apache Kafka – Producers and Consumers

This post will provide a quick overview on how to write Kafka Producer and Kafka Consumer with a Kafka broker running locally.

First, let’s set-up the Kafka broker locally by downloading the TAR file and running the required scripts. Other option is to run Kafka broker locally using Docker image, however I’ll stick to the option of downloading the Kafka and setting it up manually.

Steps to set-up Kafka broker

  • Download the Apache Kafka 0.11 TAR archive from Apache
  • Un-tar the file using following command. I am using Git Bash on Windows which allows running the Unix commands

  tar -xzvf kafka_2.11-0.11.0.1.tgz

  •  Run the following command to start the Zookeeper

./zookeeper-server-start.bat ../../config/zookeeper.properties

  • Run the following command to start the Kafka Broker

./kafka-server-start.bat ../../config/server.properties

  • Run the following command to create Topic “raw_loan_data_ingest”

./kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 2 –topic raw_loan_data_ingest

After executing the above commands, we have a Kafka broker running on “localhost:9092” and Zookeeper running on  “localhost:2181”. We also have a Topic created on the Kafka broker with 2 partitions.

The code discussed in this post is available on Github. The financial data used in this application is provided by Lending Club. The data has records for all the loans issued and includes the loan amount, funding amount, term, interest rate etc. along with lot of other details pertaining to the loan issued to the customer.

Kafka Producer

LoanDataKafkaProducer publishes the loan data to the Topic “raw_loan_data_ingest”.

  • Create Kafka Producer by setting the following producer configuration properties. Kafka Broker Endpoint is passed as a command line argument.
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerEndpoint);
prop.put(ProducerConfig.CLIENT_ID_CONFIG, "LoanDataKafkaProducer");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  • Create a count down latch to make sure all the records are published asynchronously using the callback mechanism
final CountDownLatch countDownLatch = new CountDownLatch(1);
  • Read the input file as a stream of lines
Stream<String> loanDataFileStream = Files.lines(Paths.get(loanDataStatsInputFile));
  • Convert each line of record to a Kafka Producer Record
final ProducerRecord<String, String> loanRecord =
        new ProducerRecord<String, String>(loanDataIngestTopic, UUID.randomUUID().toString(), line);
  • Send the loan record to Kafka Broker in Async mode. Callback is called after the record receiving the acknowledgement from broker.
loanDataProducer.send(loanRecord, ((metadata, exception) -> {

    if (metadata != null) {
        System.out.println("Loan Data Event Sent --> " + loanRecord.key() + " | "
                + loanRecord.value() + " | " + metadata.partition());
    } else {

        System.out.println("Error Sending Loan Data Event --> " + loanRecord.value());
    }
}));
  • Run the LoanDataKafkaProducer class by passing following command line arguments

localhost:9092 raw_loan_data_ingest \bigdata\LoanStats_2017Q2.csv

Kafka Consumer

LoanDataKafkaConsumer consumes the loan data messages from the Topic “raw_loan_data_ingest”.

  • Create Kafka Consumer by setting the following consumer configuration properties. Kafka Broker Endpoint is passed as a command line argument.
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerEndpoint);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "LoanDataKafkaConsumer");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  • Kafka Consumer polls the Topic every 5 seconds to get the new messages
final ConsumerRecords<String, String> loanRecords = loanDataConsumer.poll(5000);
  • Loops through each consumer record and prints the key, value, offset and the Topic partition from which the message was consumed
loanRecords.forEach(loanRecord -> {
    System.out.println("Received Record --> " + loanRecord.key() + " | " + loanRecord.offset() + " | "
            + loanRecord.partition() + " | " + loanRecord.value());
});
  • Run the LoanDataKafkaConsumer class by passing the following arguments

localhost:9092 raw_loan_data_ingest

To summarize, we installed the Kafka broker locally and then ran the Kafka producer to read the loan data statistics in CSV format from a file and published the messages to a Kafka Topic. Finally we ran the Kafka consumer to read the messages from the Kafka Topic partitions and printed the messages to the console.

In the next post, we’ll see how Kafka can be integrated with Apache Spark Streaming to perform analysis on the Loan data provided by Lending club.

5 thoughts on “Apache Kafka – Producers and Consumers

  1. Hi,

    Thanks for hte example and explanation.
    I try to execute the same but the streaming won’t work with me.
    if you can help please

    it’s stopped at this level:

    09:02:33,057 INFO org.apache.kafka.clients.consumer.internals.Fetcher – [Consumer clientId=consumer-1, groupId=cdrsms-data-spark-streaming-ingest] Resetting offset for partition cdrsms-0 to offset 2064.

    How i can check please
    regards.

    Like

Leave a comment