With your detections in place, it’s time to notify another application by producing a Kafka message.
To produce messages, you’ll need to create a KafkaProducer at the top of your code. Reuse the same authentication details from the consumer.
Required Parameters for the Producer:
security_protocol: SASL_PLAINTEXTsasl_mechanism: SCRAM-SHA-256sasl_plain_username: SASL_USERNAME (same as consumer)sasl_plain_password: SASL_PASSWORD (same as consumer)bootstrap_servers: BOOTSTRAP_SERVERS (same as consumer)client_id: Your team IDConstruct a Python dictionary with the following structure:
| Key | Value |
|---|---|
customer |
The email of the customer |
type |
The type from your team’s row (see Step 2 table) |
reason |
The reason from your team’s row (see Step 2 table) |
team |
Your team name |
Once the dictionary is ready, convert it to JSON using json.dumps.
Use the Kafka topic actions to push your data.
send method from your producer to send the message.bytes(your_json, "utf-8")
The producer.send() method is non-blocking - it returns immediately without waiting for the message to be delivered:
future = producer.send('actions', value=bytes(json_message, 'utf-8'))
The message is queued in an internal buffer and sent asynchronously in the background. This is efficient but means you don’t know immediately if the send succeeded.
To ensure your message was delivered, you can:
future = producer.send('actions', value=bytes(json_message, 'utf-8'))
result = future.get(timeout=10) # Blocks until sent or timeout
print(f"Message sent to partition {result.partition} at offset {result.offset}")
producer.flush() # Blocks until all messages are delivered
For this tutorial, calling .get() on each message helps you see immediate feedback. In production systems, you’d typically use callbacks or batch flushing for better performance.
For more details, refer to the KafkaProducer guide.
Synchronous (blocking): If we waited for Kafka’s acknowledgment on every message, network latency would limit us to ~100 messages/second:
Send message → Wait 10ms → Send next message → Wait 10ms → ...
Asynchronous (non-blocking): By queuing messages and sending them in batches, we can achieve 100,000+ messages/second:
Queue msg1, msg2, msg3... → Send batch → Queue more while waiting → ...
A Future represents a value that will be available later. It’s a common pattern in async programming:
# .send() returns immediately with a Future (the message isn't sent yet)
future = producer.send('topic', value=message)
# Do other work here while the message sends in the background...
# When you need the result, call .get() - this blocks until delivery
try:
result = future.get(timeout=10)
print(f"Delivered to {result.topic}:{result.partition}:{result.offset}")
except Exception as e:
print(f"Delivery failed: {e}")
| Method | Behavior | Use When |
|---|---|---|
.get() on each send |
Blocks per message | Debugging, low volume |
.flush() at end |
Blocks once, sends all | End of batch processing |
| Callbacks | Never blocks | High-throughput production |
| Leaderboard Shows | Meaning |
|---|---|
| 📤 in Progress | Success! Your filter + producer are working |
Ask the instructor to confirm they can see your message!
Congratulations, you learned how to produce a message in Kafka 🎉
You can now head to step 4!