Beginning with Apache Kafka

What is Apache Kafka?

Apache Kafka is a distributed publish-subscribe based append-only messaging platform originally developed by LinkedIn and Later Donated to Apache Foundation.

( *The image is for beautification )

Use Cases

  • Messaging
  • Log aggregation
  • Stream processing
  • Commit Log
  • Event Sourcing etc

How Does It Work?

Kafka works in the pub-sub model. So there will be a consumer/consumer group and publisher.

The producer will publish data to a specific topic and consumer subscribed to the topic will consume data. Multiple producers could publish to the same topic but only one consumer is able to consume data from a topic at a time.

Now you may ask if only one consumer is allowed to consume from a topic at a time and if there are thousands of events coming how do we scale them ?

Okay. In that case, we can split the topic into partitions to achieve scalability.

When we do partition of the topic each message in the partition has an offset_id/message_id. In the picture of left we have made 3 partitions and each producer will send an event to a specific partition. And the consumer will read from a specific partition. Now if we have 300 event/sec being sent, it will be divided into 3, so each partition will have 100 messages/sec. In this way, we can achieve Load-balancing. When we do partition, Kafka only ensures partition level message order. To achieve order with partitioning we have to use key.

If we get events increasing we have to do more partitions and add more consumers to scale and achieve Load-Balancing.

Once we receive the event we have to commit log to Kafka, so Kafka will understand that the event has been processed and Kafka won’t send it again. If in the middle of processing our app crushes Kafka will send that event again to process.

Replication Factor :

Replication factor is how data in Kafka will be replicated with different Kafka brokers. Let’s say we have chosen replication factor N = 3, the data will be replicated to 2 more brokers. So Kafka will still work if N-1 broker goes down, cause there is at least one broker alive with the data to keep it working.

What is Apache Zookeeper ?

Apache Zookeeper is a centralized service for distributed systems to a hierarchical key-value store, which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems. Apache Kafka doesn’t work without Zookeeper. Kafka cluster management, if a broker comes up or goes down, if a new topic created….this type of data being synchronized with different Kafka broker through Zookeeper.

Spinup Kafka >

version: '3.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
    - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
    - ./storage/zoo1/data:/data
    - ./storage/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.2.1
    hostname: kafka1
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
    - ./storage/kafka1/data:/var/lib/kafka/data
    depends_on:
    - zoo1

docker-compose up

Consumer (Golang) >

package main

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "notification_service",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	c.SubscribeTopics([]string{"users.notification"}, nil)

	for {
		fmt.Println("Waiting for messages...")

		msg, err := c.ReadMessage(-1)
		if err == nil {
			fmt.Println("Topic : ", *msg.TopicPartition.Topic)
			fmt.Println("Partition : ", msg.TopicPartition.Partition)
			fmt.Println("Offset : ", msg.TopicPartition.Offset)
			fmt.Println("Value : ", string(msg.Value))
			c.CommitMessage(msg)
		} else {
			// The client will automatically try to recover from all errors.
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}

	c.Close()
}

Producer (Kotlin) >

import org.apache.kafka.clients.producer.*
import java.util.*
import org.apache.kafka.common.serialization.StringSerializer
import java.lang.Exception

fun main(args: Array<String>) {
    val props = Properties()
    props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
    props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
    props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name

    val producer = KafkaProducer<Int, String>(props)
    producer.send(ProducerRecord("users.notification",
            0, Calendar.getInstance().timeInMillis, null, "hello"), object : Callback {
        override fun onCompletion(metadata: RecordMetadata?, exception: Exception?) {
            println("Send Complete")
        }
    })

    Thread.sleep(120 * 1000)
}

Result >

Waiting for messages...Topic :  users.notification
Partition :  0
Offset :  1
Value :  hello

Resources :

1. github.com/confluentinc/confluent-kafka-go
2. compile group: 'io.opentracing.contrib', name: 'opentracing-kafka-client', version: '0.1.2'