Async messaging with Kafka
Kafka in KRaft mode (no ZooKeeper), a producer and consumer wired via MicroProfile Reactive Messaging, and the two-step pattern that turns synchronous endpoints into eventful ones.
A policy is created. Three things probably need to happen next: send a welcome email, write to a fraud-screening pipeline, update a downstream dashboard. Doing all three inside the HTTP request makes the endpoint slow, brittle, and tightly coupled to systems that may be down. Async messaging decouples them: the HTTP request emits one event, and three independent consumers pick it up at their own pace.
This module adds Kafka as the event bus.
Sync vs async: when to switch
Pick async when at least one of these is true:
- The downstream work doesn’t need to complete before you respond to the caller (most notification / analytics work).
- The downstream system has different uptime / latency than your service.
- You want replay — the ability to re-process events into a new consumer later.
- Multiple independent consumers care about the same event.
Otherwise, keep it sync. Async introduces eventual consistency and a whole new failure mode (lost / duplicated messages); pay that cost only when you need its benefits.
Kafka in KRaft mode
Kafka used to require ZooKeeper as a separate coordination service — a second container, a second thing to operate, a second failure mode. Since Kafka 3.3 the KRaft mode runs the controller inside the broker itself. One container.
podman run -d --replace --name kafka --network insurance-net \
-p 9092:9092 \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=controller,broker \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
docker.io/apache/kafka:4.0.2
Note the image is the official
apache/kafkaimage, not the historically-popularbitnami/kafka:3.8— Bitnami’s docker.io distribution disappeared in the 2025 Broadcom reshuffle. The env vars also lose the_CFG_infix in the move from Bitnami’s wrapper to the upstream entrypoint.
Create a topic:
podman exec -it kafka \
kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic policy-events --partitions 3 --replication-factor 1
Three partitions is more than this app needs; it’s there so you can demonstrate consumer-group parallelism later. Replication factor 1 is dev-only — for production you’d want 3 across separate brokers.
Liberty side: MicroProfile Reactive Messaging
Direct Kafka client code is fine, but MicroProfile Reactive Messaging is the idiomatic Liberty path. Enable in server.xml:
<feature>mpReactiveMessaging-3.0</feature>
That feature already includes the liberty-kafka connector — no
separate dependency. But the connector ships only the plumbing; you
still need the Kafka client classes on the WAR’s classpath:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
<scope>compile</scope>
</dependency>
Without kafka-clients, Liberty fails app startup with CWMRX1006E: Classes from the kafka-clients JAR file could not be loaded. The
scope can be runtime if you only use @Channel + Emitter, but
gets bumped to compile the moment any code imports
org.apache.kafka.clients.producer.ProducerRecord directly (which
you’ll want for keyed producers against compacted topics — covered
in the policy-bind feature chapter).
Configure connections in microprofile-config.properties (under src/main/resources/META-INF/):
mp.messaging.outgoing.policy-events.connector=liberty-kafka
mp.messaging.outgoing.policy-events.topic=policy-events
mp.messaging.outgoing.policy-events.bootstrap.servers=kafka:9092
mp.messaging.outgoing.policy-events.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.policy-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.policy-events-in.connector=liberty-kafka
mp.messaging.incoming.policy-events-in.topic=policy-events
mp.messaging.incoming.policy-events-in.bootstrap.servers=kafka:9092
mp.messaging.incoming.policy-events-in.group.id=insurance-app
mp.messaging.incoming.policy-events-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.policy-events-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.policy-events-in.auto.offset.reset=earliest
mp.messaging.incoming.policy-events-in.enable.auto.commit=false
Same topic, two channels — one for the producer side, one for the consumer side. The consumer side also needs the deserializer + offset-reset + commit-mode keys; without them, @Incoming deploys without error but silently fails to subscribe, no consumer group is ever registered, and no messages arrive.
Producer: emit an event on write
@ApplicationScoped
public class PolicyEventPublisher {
@Inject @Channel("policy-events")
Emitter<String> emitter;
@Inject Jsonb jsonb;
public void published(Policy p) {
emitter.send(jsonb.toJson(p));
}
}
Wire into the create path:
@POST @Consumes(MediaType.APPLICATION_JSON)
public Policy create(Policy p) {
Policy saved = repo.create(p);
redis.del("policy:" + saved.getPolicyNo());
publisher.published(saved); // emit event
return saved;
}
Consumer: react to events
@ApplicationScoped
public class PolicyEventConsumer {
@Inject Jsonb jsonb;
private static final Logger log = Logger.getLogger(PolicyEventConsumer.class.getName());
@Incoming("policy-events-in")
public void handle(String json) {
Policy p = jsonb.fromJson(json, Policy.class);
log.info(() -> "Saw new policy: " + p.getPolicyNo());
// dispatch downstream work...
}
}
In a real app, the consumer would write to a notification table, call a downstream API, or fan out to another topic. The minimum viable pattern is “log it and move on” — that’s the producer/consumer round trip working.
Verifying
# Create a policy (triggers the producer)
curl -X POST http://localhost:9080/api/policies \
-H "Content-Type: application/json" \
-d '{"policyNo":"P-002","holderName":"Bob","premium":315.00}'
# Watch the consumer log
podman logs -f insurance-app | grep "Saw new policy"
# Inspect the topic directly
podman exec -it kafka \
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic policy-events --from-beginning
In SigNoz: the create request shows a producer span; a separate consumer trace shows the message handling. Trace context propagation does work across Kafka if you wire the OpenTelemetry Kafka instrumentation — out of scope for this module, but worth knowing the spans join up cleanly when you do.
The two-step pattern, named
What we just did has a name: the transactional outbox pattern, in its simplest form. Write to the database in a transaction, emit an event afterward. The mistake to avoid is the inverse — emitting before persisting, then the DB write fails and now you have a downstream consumer reacting to a non-event. Always persist first.
For stronger guarantees (no lost events even on crash between persist and emit), use a real outbox table that a separate worker drains. Out of scope here; the seed pattern is enough to grow into it.
What you have
- Kafka running in KRaft mode on the shared network.
- A topic, partitioned.
- A producer and a consumer in the same Liberty service (in production they’d be different services).
- The transactional pattern that keeps the database and the event bus in agreement.
Module 11 wraps the API in real authentication.