https://lifeofsems.com/posts/feed.xml

Introduction to Apache Kafka for Python Developers

2025-02-10

Introduction

In this blog post, I'll attempt to condense the knowledge I gained from reading the O'Reilly's book "Kafka: The Definitive Guide". We'll introduce Apache Kafka, explain why it’s useful, and present its core concepts. Afterwards, we will dive into a Python application that demonstrates some of these features. By the end of this post, you’ll have a solid high-level understanding of Kafka, learned to start a local Kafka cluster, publish and subscribe messages via the confluent-kafka-python library.

What is Kafka?

Apache Kafka is a distributed streaming platform designed to handle real-time data feeds with high throughput, scalability, and fault tolerance. At its core, Kafka is a publish-subscribe messaging system that allows applications to send (publish) and receive (subscribe to) messages in real time.

Kafka was originally developed at LinkedIn to solve the problem of processing massive amounts of real-time data. Their legacy systems relied on batch processing and polling mechanisms, which were slow, inefficient, and prone to missing updates. Kafka was designed to address these challenges by providing a real-time, pull-based messaging system that could scale horizontally and retain data for extended periods.

Today, Kafka is used by thousands of companies, including Netflix, Uber, and Airbnb, to power their real-time data pipelines and event-driven architectures.

Why use Kafka?

Kafka is widely used in modern applications for its scalability, fault tolerance, and real-time capabilities. As a Python developer, you can use Kafka to:

  • Build real-time data pipelines for analytics or machine learning.
  • Decouple microservices by enabling asynchronous communication.
  • Process high-volume data streams (e.g., logs, metrics, or events).

Python’s rich ecosystem and Kafka’s Python libraries make it easy to integrate Kafka into your applications.

Kafka Architecture Overview

Here’s a high-level overview of how Kafka works:

  • Producers publish messages to topics.
    • Optional message key determines the destination partition.
    • A hash function and modulo in combination with the key.
      • selected_partition = hash(key) % #partitions
    • If no key is specified, round-robin strategy is used to distributed load.
  • Topics are divided into partitions, which are distributed across brokers.
  • Consumers subscribe to topics and read messages from partitions.
    • Messages from a partition are read in the same order as published (first-in first-out).
    • Consumer can manually select partitions, which ensures that only specific messages are received.
  • Messages are retained for a configurable period, allowing consumers to replay data if needed.
Apache Kafka Architecture
Example
Simplified example of Apache Kafka architecture

Example: Football Matches

While reading this, keep an eye on the diagram above.

Assume a consumer service that processes all live football match data, like goals, fouls, free kicks and so on. Consumer wishes to propagate this information to the back-end services. Consumer crafts a message that could look like these:
{ "id": "bundesliga", "timestamp": 1741850999, "type": "goal", "team": "FC Bayern Munich" } { "id": "serie-a", "timestamp": 1741850991, "type": "foul", "team": "AC Milan" }

Messages are assigned and publish to partitions based on the key.
For example: hash("bundesliga") % 3 = partition 1 and hash(serie-a) % 3 = partition 3

Subscribers could be services responsible for serving web or mobile users, storing events into databases, computing statistics, sending notifications and so on. Since they subscribed to the topic football_events, a connection is made to each partition, from which they receive messages, A subscriber might only be interested in some specific league, so they could manually subscribe to those partitions. It's important that all events from a single match always go to the same partition, otherwise, the order of messages is not guaranteed. For example, one partition might have signaled that the match is over, while the other, because of congestion, could still send events about the match.

Partitions and replication

In the above example and diagram, we glossed over the parts that make Kafka scalable and durable. A common method to create a fault-tolent system is by replication. Kafka lets you specify the replication factor (configureable via replication.factor), which determines how many replicas of each partition should exist. One partition will always be the leader replica, while others, the follower replicas. All produce and consume requests go through the leader replica. The followers only replicate the data on the leader's partition, and in case that the leader replica crashes or goes offline, one of them will step in as the new leader.

Apache Kafka Topic Replication
Apache Kafka Topic Replication

Producers and Consumers

We said before that producers publish messages to topics, more specifically, on partitions. A topic consists of one or more partitions, where each partition is a kind of a serial queue. When a consumer subscribes to a topic, a connection is made to all brokers hosting that topic's partitions. This division enables the parallel consumption of multiple messages by consumers (from each partition). However, there is a potential pitfall if messages that are supposed to be consumed in a serial order, are consumed in parallel order, due to the partition assignment. Producers can specify the partition key, which determine the partition on which the message will be published on. The exact partition key depends on the use-case and must be chosen carefully. Producers can be configured for different behaviour with parameters.

Some notable producer parameters include:

  • retries: the number of times to try resending a message
  • batch.size: the size of the message
  • compression.type: compression algorithm (snappy, gzip, lz4)
  • acks: the number of acknowledgements from brokers required to consider the message successfully sent (by commiting the consumer index)

Consumers subscribe to topics and receive messages. Periodically they commit the current message offset (of some partition queue) to kafka on a special internal topic called __consumer_offsets, which gives them durability, such as in case of failure. Furthermore, consumers can be part of consumer groups. Consumer group subscribe to topics and assign partitions to consumer within the group in a 1:1 manner. This means that every message will be consumed only by a single consumer. This is how parallelization is done on the consumer side.

There are two notable parameters for consumer, namely: fetch.min.bytes and fetch.max.wait.ms that define the minimum data received and maximum waiting time before the messages can be consumed via the poll method. Usually, increasing these values will yield higher throughput at the cost of latency.

Lastly, the choice of the partition key particular important, as it defines how the data is distributed across the partitions and the order in which it is consumed. There are no sequential ordering guarantees across partitions. This means that special care must be taken while updating a shared state such as a database, to avoid losing changes or corrupting the state.

Apache Kafka Architecture Consumer Index
Apache Kafka Architecture Consumer Index

Synchronous example

This section will provide a minimal example of the library confluent-kafka-python usage. The Github page provides many examples for building sync or async consumers and producers, serialization with JSON or AVRO and other. I also put the examples below on my Github page here.

Setup services

We'll need the following services:

  1. Zookeeper
    • A fault-tolerant and highly available key-value store
    • Stores meta-data like broker and topic information and consumer offset indexes.
  2. Kafka Cluster
  3. Kafka UI

We'll use docker compose to quickly setup the needed services.

file: docker-compose.yml

version: "3"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.8.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"
    # to survive the container restart
    tmpfs: "/zktmp"

  kafka:
    image: confluentinc/cp-kafka:7.8.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true  # not recommended for production
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"
      - "29092:29092"
    volumes:
      - kafka_data:/data/kafka

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

volumes:
  kafka_data:
    driver: local

Start the Docker Compose setup:

docker-compose up -d

Open your browser and navigate to http://localhost:8081.

You’ll see the Kafka UI dashboard, where you can:

  • View all Kafka topics.
  • Inspect partitions and their offsets.
  • Monitor consumer groups.
  • Explore messages in real time.

Producer

We will use the confluent_kafka library. Let's start off with defining the external listener of the kafka broker. Then we define a callback function that trigger upon successful message delivery. Finally, we declare a forever loop, in which we send a message to the test_topic topic, setting a key and a value. Flush function will make sure all messages are sent and the callback is called.

import json

from confluent_kafka import Producer

conf = {'bootstrap.servers': 'localhost:29092,localhost:29093,localhost:29094'}
producer = Producer(conf)


def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')


def create_order(user_id: str, amount: int):
    return {
        "order_id": hash(user_id + str(amount)),
        "user_id": user_id,
        "amount": amount,
        "status": "created"
    }


for _ in range(10):
    order = create_order("alice", 15)
    producer.produce('orders', key=order['user_id'], value=json.dumps(order), callback=delivery_report)

producer.poll(0)
producer.flush()

Consumer

The consumer has slightly more configuration, in which group.id defines the consumer group ID and auto.offset.reset defines the starting point for message consumption. Afterward, it subscribes to the test_topic topic and enter a similar endless loop. During the loop, it will use the poll() function to check for any new messages and perform some checks.

import json

from confluent_kafka import Consumer

conf = {
    'bootstrap.servers': 'localhost:29092,localhost:29093,localhost:29094',
    'group.id': 'payment-processor',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None: continue
        if msg.error():
            print(f'Error: {msg.error()}')
            continue

        order = json.loads(msg.value())
        print(f"Processing payment for order {order['order_id']}...")

        # Simulate payment processing...done.
        order['status'] = 'paid'
        print(f"Order {order['order_id']} paid successfully.")
except KeyboardInterrupt:
    print('Consumer interrupted')
finally:
    consumer.close()

Asynchronous example

If we are using asyncio, the synchronous poll and flush methods will block the main thread and make our program run less efficiently. Let's offload those functions to another thread and abstract the produce method to return immediately with a future.

Async Producer

I dropped a few comments in the code about the most important parts. Here is a short summary:

  • poll method of producer blocks, so it needs to run in a dedicated thread.
  • our produce method must return a future object, so it can be "awaited".
  • use delivery_report to assign a value to the future defined
  • use call_soon_threadsafe to schedule a task on main thread, since delivery_report runs in the same thread as poll.
import asyncio
import json
import threading
from random import randint
from typing import Callable

from confluent_kafka import Producer, KafkaException


class AIOProducer:
    def __init__(self, config: dict[str, str], loop=None):
        self._loop = loop or asyncio.get_event_loop()
        self._producer = Producer(config)
        self._cancelled = False
        # start the polling method in a separate thread
        self._poll_thread = threading.Thread(target=self._poll_loop)
        self._poll_thread.start()

    def _poll_loop(self):
        # this runs in a dedicated thread, so it won't block the main loop
        while not self._cancelled:
            self._producer.poll(0.1)

    def close(self):
        self._cancelled = True
        self._poll_thread.join()

    def produce(self, topic: str, key: str, value: str, on_delivery: Callable | None = None):
        # create a future that we can track the future result and return it
        future = self._loop.create_future()

        def delivery_report(err, msg):
            # gets triggered upon completion of produce method
            # set the result or exception on the future above
            # this runs in the same thread as _poll_loop and poll
            # call_soon_threadsafe is needed to safely schedule on main thread
            if err:
                self._loop.call_soon_threadsafe(
                    future.set_exception, KafkaException(err)
                )
            else:
                self._loop.call_soon_threadsafe(future.set_result, msg)

            if on_delivery:
                self._loop.call_soon_threadsafe(on_delivery, err, msg)

        self._producer.produce(
            topic,
            key=key,
            value=value,
            on_delivery=delivery_report,
        )
        return future


def create_order(user_id: str, amount: int):
    return {
        "order_id": hash(user_id + str(amount)),
        "user_id": user_id,
        "amount": amount,
        "status": "created",
    }


async def main():
    conf = {"bootstrap.servers": "localhost:29092,localhost:29093,localhost:29094"}
    producer = AIOProducer(conf)

    order = create_order("alice", 15)
    await producer.produce(
        "orders",
        key=order["user_id"],
        value=json.dumps(order),
    )

    orders = [create_order("bob", randint(1, 10)) for _ in range(10)]
    await asyncio.gather(
        *[
            producer.produce("orders", order["user_id"], json.dumps(order))
            for order in orders
        ]
    )
    producer.close()


if __name__ == "__main__":
    asyncio.run(main())

Async Consumer

The examples page does not show an example of a consumer, but we can apply a similar idea to the consumer. Let's summarize again the main ideas:

  • use a dedicated thread to poll the consumer and add items to the queue
  • provide a async and async iterator pattern to fetch messages from the queue
import asyncio
import json
import threading
from collections import deque

from confluent_kafka import Consumer


class AIOConsumer:
    def __init__(self, config: dict[str, str]):
        self._consumer = Consumer(config)
        self._cancelled = False
        # start the polling method in a separate thread
        self._poll_thread = threading.Thread(target=self._poll)
        self._poll_thread.start()
        # a queue that holds the incoming messages
        self._items = deque()

    def close(self):
        self._cancelled = True
        self._poll_thread.join()
        self._consumer.close()

    def _poll(self):
        # this runs in a dedicated thread
        while not self._cancelled:
            # poll for new messages
            msg = self._consumer.poll(0.1)
            if msg is None:
                continue
            if msg.error():
                print(f"Error: {msg.error()}")
                continue

            # append the message to queue
            # append is atomic
            self._items.append(msg)

    def subscribe(self, topics: list[str]):
        self._consumer.subscribe(topics)

    def __aiter__(self):
        # async iterator
        return self

    async def __anext__(self):
        # async iterator's next method returns items from the queue
        while True:
            if self._cancelled:
                raise StopAsyncIteration

            try:
                item = self._items.popleft()
                return item
            except IndexError:
                # queue is empty, try again later
                await asyncio.sleep(0.1)

    def get_next_item(self):
        # synchronous method
        try:
            return self._items.popleft()
        except IndexError:
            return None


async def main():
    conf = {
        "bootstrap.servers": "localhost:29092,localhost:29093,localhost:29094",
        "group.id": "payment-processor",
        "auto.offset.reset": "earliest",
    }
    consumer = AIOConsumer(conf)
    consumer.subscribe(["orders"])

    try:
        async for msg in consumer:
            order = json.loads(msg.value())
            print(f"Processing payment for order {order['order_id']}...")
            # Simulate payment processing
            order["status"] = "paid"
            print(f"Order {order['order_id']} paid successfully.")
    except KeyboardInterrupt:
        print("Consumer interrupted")
    finally:
        consumer.close()


if __name__ == "__main__":
    asyncio.run(main())

Conclusion

In this post, we explored the architecture behind Apache Kafka, explaining the most important concepts like brokers, partition, producers, consumers and so on. We applied the gained knowledge to write a simple producer-consumer Python application. One important missing concept is serialization. Kafka let's you define message schemas, which are used during serialization when publishing and deserialization when receiving.