kafka-tutorial

Step 3: Load

With your detections in place, it’s time to notify another application by producing a Kafka message.

TODOs

1. Create a Producer

To produce messages, you’ll need to create a KafkaProducer at the top of your code. Reuse the same authentication details from the consumer.

Required Parameters for the Producer:


2. Create a Message

Construct a Python dictionary with the following structure:

Key Value
customer The email of the customer
type The type from your team’s row (see Step 2 table)
reason The reason from your team’s row (see Step 2 table)
team Your team name

Once the dictionary is ready, convert it to JSON using json.dumps.


3. Push a Message

Use the Kafka topic actions to push your data.

  1. Use the send method from your producer to send the message.
  2. Before sending, convert the JSON string to bytes using:
    bytes(your_json, "utf-8")
    
  3. The send method will return a Future object.

Understanding Asynchronous Sending

The producer.send() method is non-blocking - it returns immediately without waiting for the message to be delivered:

future = producer.send('actions', value=bytes(json_message, 'utf-8'))

The message is queued in an internal buffer and sent asynchronously in the background. This is efficient but means you don’t know immediately if the send succeeded.

Confirming Delivery

To ensure your message was delivered, you can:

  1. Block and wait for confirmation:
    future = producer.send('actions', value=bytes(json_message, 'utf-8'))
    result = future.get(timeout=10)  # Blocks until sent or timeout
    print(f"Message sent to partition {result.partition} at offset {result.offset}")
    
  2. Flush before exiting to ensure all buffered messages are sent:
    producer.flush()  # Blocks until all messages are delivered
    

For this tutorial, calling .get() on each message helps you see immediate feedback. In production systems, you’d typically use callbacks or batch flushing for better performance.

For more details, refer to the KafkaProducer guide.

Why Async Matters

Synchronous (blocking): If we waited for Kafka’s acknowledgment on every message, network latency would limit us to ~100 messages/second:

Send message → Wait 10ms → Send next message → Wait 10ms → ...

Asynchronous (non-blocking): By queuing messages and sending them in batches, we can achieve 100,000+ messages/second:

Queue msg1, msg2, msg3... → Send batch → Queue more while waiting → ...

The Future Object

A Future represents a value that will be available later. It’s a common pattern in async programming:

# .send() returns immediately with a Future (the message isn't sent yet)
future = producer.send('topic', value=message)

# Do other work here while the message sends in the background...

# When you need the result, call .get() - this blocks until delivery
try:
    result = future.get(timeout=10)
    print(f"Delivered to {result.topic}:{result.partition}:{result.offset}")
except Exception as e:
    print(f"Delivery failed: {e}")

Choosing Your Strategy

Method Behavior Use When
.get() on each send Blocks per message Debugging, low volume
.flush() at end Blocks once, sends all End of batch processing
Callbacks Never blocks High-throughput production

Check Your Work

Leaderboard Shows Meaning
📤 in Progress Success! Your filter + producer are working

Ask the instructor to confirm they can see your message!

Next step

Congratulations, you learned how to produce a message in Kafka 🎉

You can now head to step 4!