Spark Streaming and Kafka Integration Example

Spark Streaming and Kafka Integration Example

Introduction

        My search online for a good tutorial on Big Data processing using Spark Streaming and Kafka ended up in a bunch of old posts which had very old versions of Spark and Kafka client libraries. They were of no use to me if I wanted to use the newer versions of the libraries. Hence I decided that I should share my experience with the newer versions of the libraries. This was the motivation for me to come up with this post. This post will give you a comprehensive overview of Spark Streaming and Kafka Integration. The use case that I am trying to address in this post is to process streaming data from a topic on Kafka using Spark Streaming. This is a very common use case that many organizations large and small are implementing these days.
        I will be presenting 2 examples in this post. One example will print the stream of data from a Kafka Topic as it receives it and the other example counts the occurrences of a word from a stream of data from a Kafka topic. I have used Scala for coding these 2 examples. So lets get started.

Spark Streaming

Spark Streaming is a streaming data processing framework which is primarily used for real-time data analytics. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Spark Streaming is widely being used today as Real-time data processing and analytics have become a critical component of the big data strategy for most organizations.

Some of the use cases and organizations where Spark Streaming is being used are:
  1. Fraud Detection in many different industries including banking and financial sectors, insurance, government agencies and law enforcement, and more
  2. Supply Chain Analytics
  3. Real-time telemetry analytics (Uber)
  4. Analytics in E-Commerce websites (Amazon, Flipkart)
  5. Real-time video analytics in many of the video streaming platforms (Netflix)
  6. Predictive Risk & Compliance in many financial organizations
  7. Live Flight Optimization in the airline industry (United Airlines)
  8. Continuous Transaction Optimization in banking industry (Morgan Stanley)
  9. IoT Parcel Tracking in postal and logistics organizations (Royal Mail, UK)
  10. Live Train Time Tables (Dutch Railways)
Here is a pictorial overview of Spark Streaming (Source: Apache Spark Website)


You can read more about Spark Streaming at the Apache Spark Website.

Kafka

        Kafka is a distributed streaming platform. It can be used for communication between applications or micro services. Kafka is ideally used in big data applications or in applications that consume or process huge number of messages.

Some use cases where Kafka can be used:
  1. Messaging
  2. Website activity tracking
  3. Metrics
  4. Log aggregation
  5. Stream processing
  6. Event sourcing
  7. Commit log
Few Advantages of Kafka:
  1. Horizontally scalable
  2. Fault-tolerant
  3. Very fast
  4. Tried, tested and used by many organizations in production.
A few very important concepts are to be known when using Kafka.
  • Kafka is run as a cluster on one or more servers.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.
More details about Kafka can be read from their website.
    Requirements to Run the Application:
    1. Intellij IDE with SBT and Scala Plugins
    2. Kafka
    Kafka should be setup and running in your machine. To setup, run and test if the Kafka setup is working fine, please refer to my post on: Kafka Setup.

    In this tutorial I will help you to build an application with Spark Streaming and Kafka Integration in a few simple steps.

    Step 1: Dependencies in build.sbt

    Since the code is in Scala, I will be using sbt for build related setup. sbt (Scala Build Tool, formerly Simple Build Tool) is an open source build tool for Scala and Java projects, similar to Java's Maven and Ant. I am using the following in this tutorial:
    • Scala Version = 2.11.8
    • Spark Version = 2.2.0
    • Kafka Client Version = 0.8.2.1
    These have to be set in the file: build.sbt
    Here is the full build.sbt:
    name := "SparkKafka"
    
    version := "1.0"
    
    organization := "com.aj"
    
    scalaVersion := "2.11.8"
    scalacOptions := Seq("-deprecation", "-unchecked", "-encoding", "utf8", "-Xlint")
    libraryDependencies ++= {
      val sparkVersion = "2.2.0"
      val kafkaClientVersion = "0.8.2.1"
      Seq(
        "org.apache.spark"  %% "spark-core"                      % sparkVersion,
        "org.apache.spark"  % "spark-streaming_2.11"                 % sparkVersion,
        "org.apache.spark"  % "spark-streaming-kafka-0-8_2.11" % sparkVersion,
        "org.apache.spark"  %% "spark-sql"                 % sparkVersion,
        "org.apache.spark"  %% "spark-hive"                % sparkVersion,
        "org.apache.spark"  %% "spark-repl"                % sparkVersion,
        "org.apache.kafka"  % "kafka-clients"              % kafkaClientVersion
      )
    }

    Step 2: Main Object

    Example One: Print stream of data from a Kafka Topic

    Main Object is the Main Class and entry point of the application. Here is the code for SparkKafkaStream.scala:
    package com.aj.sparkkafka
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object SparkKafkaStream {
      def main(args: Array[String]) {
        if (args.length != 2 ) {
          System.err.println("Please pass program arguments as: <brokers> <topics>")
          System.exit(1)
        }
    
        val Array(brokers, topics) = args
        val sparkConf = new SparkConf().setAppName("SparkKafkaStream").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(10))
    
        //Get the Kafka Topics from the program arguments and create a set of Kafka Topics
        val topicsSet = topics.split(",").toSet
        //Set the brokers in the Kafka Parameters
        val kafkaParameters = Map[String, String]("metadata.broker.list" -> brokers)
        //Create a direct Kafka Stream using the Kafka Parameters set and the Kafka Topics
        val messagesFromKafka = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParameters, topicsSet)
    
        //Get the lines
        val lines = messagesFromKafka.map(_._2)
        //Print
        lines.print()
        //Save to Text Files
        val outputLocation = "output/sparkkafkastream"
        lines.saveAsTextFiles(outputLocation)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    I have put the comments for most of the lines of code above to describe what the code does. Here is an overview of the things being done in the code above:
    1. The main class takes in 2 program arguments which are of type String and are: Kafka Brokers and Kafka Topics. Here is the code used for this:
    def main(args: Array[String]) {
        if (args.length != 2 ) {
          System.err.println("Please pass program arguments as: <brokers> <topics>")
          System.exit(1)
        }
    
    2. For this program to run as a stand alone application, a new Spark Config needs to be created. SparkConf API represents configuration for a Spark application. It is used to set various Spark parameters as key-value pairs. I am setting App Name and Master parameters to it. Then a new Spark Streaming Context needs to be created. StreamingContext API is the main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster and can be used to create RDDs, accumulators and broadcast variables on the spark cluster. If you observe the code below I am creating a streaming context with an interval of 10 seconds. Hence it will check for data from the Kafka topic every 10 seconds. Here is the code for this:
    val sparkConf = new SparkConf().setAppName("SparkKafkaStream").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    
    3. Then I create a direct Kafka Stream using the Kafka Parameters that I have set and the Kafka Topics passed as input parameters:
    //Get the Kafka Topics from the program arguments and create a set of Kafka Topics
    val topicsSet = topics.split(",").toSet
    //Set the brokers in the Kafka Parameters
    val kafkaParameters = Map[String, String]("metadata.broker.list" -> brokers)
    //Create a direct Kafka Stream using the Kafka Parameters set and the Kafka Topics
    val messagesFromKafka = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParameters, topicsSet)
    
    4. Print messages from the Kafka Topic on the console and the output directory location:
    //Get the lines
    val lines = messagesFromKafka.map(_._2)
    //Print
    lines.print()
    //Save to Text Files
    val outputLocation = "output/sparkkafkastream"
    lines.saveAsTextFiles(outputLocation)
    5. Start and await termination of the Streaming Context created.
    ssc.start()
    ssc.awaitTermination()
    

    Example Two: Count and print the occurrences of a word from the stream of data from a Kafka Topic

    Main Object is the Main Class and entry point of the application. Here is the code for SparkKafkaWordCount.scala:
    package com.aj.sparkkafka
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    
    object SparkKafkaWordCount {
      def main(args: Array[String]) {
        if (args.length != 2 ) {
          System.err.println("Please pass program arguments as: <brokers> <topics>")
          System.exit(1)
        }
    
        val Array(brokers, topics) = args
        val sparkConf = new SparkConf().setAppName("SparkKafkaWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(10))
    
        //Get the Kafka Topics from the program arguments and create a set of Kafka Topics
        val topicsSet = topics.split(",").toSet
        //Set the brokers in the Kafka Parameters
        val kafkaParameters = Map[String, String]("metadata.broker.list" -> brokers)
        //Create a direct Kafka Stream using the Kafka Parameters set and the Kafka Topics
        val messagesFromKafka = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParameters, topicsSet)
    
        //Get the lines
        val lines = messagesFromKafka.map(_._2)
        //Split the lines into words
        val words = lines.flatMap(_.split(" "))
        //Count the words
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
        //Print
        wordCounts.print()
        //Save to Text Files
        val outputLocation = "output/sparkkafkawordcount"
        wordCounts.saveAsTextFiles(outputLocation)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    I have put the comments for most of the lines of code above to describe what the code does. Here is an overview of the things being done in the code above:
    1. The main class takes in 2 program arguments which are of type String and are: Kafka Brokers and Kafka Topics. Here is the code used for this:
    def main(args: Array[String]) {
        if (args.length != 2 ) {
          System.err.println("Please pass program arguments as: <brokers> <topics>")
          System.exit(1)
        }
    
    2. For this program to run as a stand alone application, a new Spark Config needs to be created. SparkConf API represents configuration for a Spark application. It is used to set various Spark parameters as key-value pairs. I am setting App Name and Master parameters to it. Then a new Spark Streaming Context needs to be created. StreamingContext API is the main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster and can be used to create RDDs, accumulators and broadcast variables on the spark cluster. If you observe the code below I am creating a streaming context with an interval of 10 seconds. Hence it will check for data from the Kafka topic every 10 seconds. Here is the code for this:
    val sparkConf = new SparkConf().setAppName("SparkKafkaStream").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    
    3. Then I create a direct Kafka Stream using the Kafka Parameters that I have set and the Kafka Topics passed as input parameters:
    //Get the Kafka Topics from the program arguments and create a set of Kafka Topics
    val topicsSet = topics.split(",").toSet
    //Set the brokers in the Kafka Parameters
    val kafkaParameters = Map[String, String]("metadata.broker.list" -> brokers)
    //Create a direct Kafka Stream using the Kafka Parameters set and the Kafka Topics
    val messagesFromKafka = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParameters, topicsSet)
    
    4. Get the lines from the Kafka topic messages, split the lines into words, count the words and print the words and the corresponding count from the Kafka Topic on the console and the output directory location:
    //Get the lines
    val lines = messagesFromKafka.map(_._2)
    //Split the lines into words
    val words = lines.flatMap(_.split(" "))
    //Count the words
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    //Print
    wordCounts.print()
    //Save to Text Files
    val outputLocation = "output/sparkkafkawordcount"
    wordCounts.saveAsTextFiles(outputLocation)
    
    5. Start and await termination of the Streaming Context created.
    ssc.start()
    ssc.awaitTermination()

    Step 3: Configure Logging

    I am configuring the logging to print everything to the console. I am also adding a few settings to not log the third party logs that are too verbose. Here is the file log4j.properties:
    # Log everything to the console
    log4j.rootCategory=WARN, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
    # Settings to not log the third party logs that are too verbose
    log4j.logger.org.eclipse.jetty=WARN
    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
    log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR

    Run Application:

    1. To run the SparkKafkaStream application in your IDE use:
        Main class: com.aj.sparkkafka.SparkKafkaStream
        Set Kafka Host and Port and Kafka Topic in program arguments. I have used the following:
        Program Arguments: localhost:9092 softwaredevelopercentral
    2. To run the SparkKafkaWordCount application in your IDE use:
        Main class: com.aj.sparkkafka.SparkKafkaWordCount
        Set Kafka Host and Port and Kafka Topic in program arguments. I have used the following:
        Program Arguments: localhost:9092 softwaredevelopercentral

    Results:

    Zookeeper and Kafka should be running in your machine.

    I have created a Kafka Topic softwaredevelopercentral and I am using it to send messages to the application. Here are the commands used in Kafka.
    To create Kafka topic softwaredevelopercentral :
    D:\Programs\Apache\kafka_2.11-0.11.0.0>bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic softwaredevelopercentral
    
    To list the topics in Kafka
    D:\Programs\Apache\kafka_2.11-0.11.0.0>bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
    
    To send messages on the topic softwaredevelopercentral, start the producer 
    D:\Programs\Apache\kafka_2.11-0.11.0.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic softwaredevelopercentral

    1. SparkKafkaStream

    Run the program in IDE and send messages from the Kafka topic. The messages are displayed on the console and in the output directory location.
    For the application SparkKafkaStream, the output location will be output/sparkkafkastream-NNNNNNNNNNNNN. You should find a _SUCCESS and a part-00000 file. Following Hadoop conventions, the latter contains the actual data for the "partitions" (in this case just one, the 00000 partition). The _SUCCESS file is empty. It is written when output to the part-NNNNN files is completed, so that other applications watching the directory know the process is done and it's safe to read the data.
    Send messages on the Kafka topic softwaredevelopercentral
    Receive messages and print on console and output location

    2. SparkKafkaWordCount

    Run the program in IDE and send messages from the Kafka topic. The messages are displayed on the console and in the output directory location.
    For the application SparkKafkaWordCount, the output location will be output/sparkkafkawordcount-NNNNNNNNNNNNN. You should find a _SUCCESS and a part-00000 file. Following Hadoop conventions, the latter contains the actual data for the "partitions" (in this case two, the 00000 partition and the 00001 partition). The _SUCCESS file is empty. It is written when output to the part-NNNNN files is completed, so that other applications watching the directory know the process is done and it's safe to read the data.
    Send messages on the Kafka topic softwaredevelopercentral
    Receive messages and print on console and output location

    Conclusion and GitHub link:

        In this post I have shown you how you can perform big data processing by using Spark Streaming with Kafka integration. The code used in this post is available on GitHub.
        Learn the most popular and trending technologies like Machine Learning, Chatbots, Angular 5, Internet of Things (IoT), Akka HTTP, Play Framework, Dropwizard, Docker, Elastic Stack, Netflix Eureka, Netflix Zuul, Spring Cloud, Spring Boot, Flask and RESTful Web Service integration with MongoDB, Kafka, Redis, Aerospike, MySQL DB in simple steps by reading my most popular blog posts at Software Developer Central.
        If you like my post, please feel free to share it using the share button just below this paragraph or next to the heading of the post. You can also tweet with #SoftwareDeveloperCentral on Twitter. To get a notification on my latest posts or to keep the conversation going, you can follow me on Twitter or Instagram. Please leave a note below if you have any questions or comments.

    Comments

    Popular Posts

    REST API using Play Framework with Java

    Asynchronous Processing (@Async) in Spring Boot

    Elasticsearch, Logstash, Kibana Tutorial: Load MySQL Data into Elasticsearch