~75 min read · updated 2026-05-18

Async messaging with Kafka

Kafka in KRaft mode (no ZooKeeper), a producer and consumer wired via MicroProfile Reactive Messaging, and the two-step pattern that turns synchronous endpoints into eventful ones.

A policy is created. Three things probably need to happen next: send a welcome email, write to a fraud-screening pipeline, update a downstream dashboard. Doing all three inside the HTTP request makes the endpoint slow, brittle, and tightly coupled to systems that may be down. Async messaging decouples them: the HTTP request emits one event, and three independent consumers pick it up at their own pace.

This module adds Kafka as the event bus.

Sync vs async: when to switch

Pick async when at least one of these is true:

  • The downstream work doesn’t need to complete before you respond to the caller (most notification / analytics work).
  • The downstream system has different uptime / latency than your service.
  • You want replay — the ability to re-process events into a new consumer later.
  • Multiple independent consumers care about the same event.

Otherwise, keep it sync. Async introduces eventual consistency and a whole new failure mode (lost / duplicated messages); pay that cost only when you need its benefits.

Kafka in KRaft mode

Kafka used to require ZooKeeper as a separate coordination service — a second container, a second thing to operate, a second failure mode. Since Kafka 3.3 the KRaft mode runs the controller inside the broker itself. One container.

podman run -d --replace --name kafka --network insurance-net \
  -p 9092:9092 \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=controller,broker \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
  docker.io/apache/kafka:4.0.2

Note the image is the official apache/kafka image, not the historically-popular bitnami/kafka:3.8 — Bitnami’s docker.io distribution disappeared in the 2025 Broadcom reshuffle. The env vars also lose the _CFG_ infix in the move from Bitnami’s wrapper to the upstream entrypoint.

Create a topic:

podman exec -it kafka \
  kafka-topics.sh --bootstrap-server kafka:9092 \
  --create --topic policy-events --partitions 3 --replication-factor 1

Three partitions is more than this app needs; it’s there so you can demonstrate consumer-group parallelism later. Replication factor 1 is dev-only — for production you’d want 3 across separate brokers.

Liberty side: MicroProfile Reactive Messaging

Direct Kafka client code is fine, but MicroProfile Reactive Messaging is the idiomatic Liberty path. Enable in server.xml:

<feature>mpReactiveMessaging-3.0</feature>

That feature already includes the liberty-kafka connector — no separate dependency. But the connector ships only the plumbing; you still need the Kafka client classes on the WAR’s classpath:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.9.0</version>
    <scope>compile</scope>
</dependency>

Without kafka-clients, Liberty fails app startup with CWMRX1006E: Classes from the kafka-clients JAR file could not be loaded. The scope can be runtime if you only use @Channel + Emitter, but gets bumped to compile the moment any code imports org.apache.kafka.clients.producer.ProducerRecord directly (which you’ll want for keyed producers against compacted topics — covered in the policy-bind feature chapter).

Configure connections in microprofile-config.properties (under src/main/resources/META-INF/):

mp.messaging.outgoing.policy-events.connector=liberty-kafka
mp.messaging.outgoing.policy-events.topic=policy-events
mp.messaging.outgoing.policy-events.bootstrap.servers=kafka:9092
mp.messaging.outgoing.policy-events.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.policy-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer

mp.messaging.incoming.policy-events-in.connector=liberty-kafka
mp.messaging.incoming.policy-events-in.topic=policy-events
mp.messaging.incoming.policy-events-in.bootstrap.servers=kafka:9092
mp.messaging.incoming.policy-events-in.group.id=insurance-app
mp.messaging.incoming.policy-events-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.policy-events-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.policy-events-in.auto.offset.reset=earliest
mp.messaging.incoming.policy-events-in.enable.auto.commit=false

Same topic, two channels — one for the producer side, one for the consumer side. The consumer side also needs the deserializer + offset-reset + commit-mode keys; without them, @Incoming deploys without error but silently fails to subscribe, no consumer group is ever registered, and no messages arrive.

Producer: emit an event on write

@ApplicationScoped
public class PolicyEventPublisher {
    @Inject @Channel("policy-events")
    Emitter<String> emitter;

    @Inject Jsonb jsonb;

    public void published(Policy p) {
        emitter.send(jsonb.toJson(p));
    }
}

Wire into the create path:

@POST @Consumes(MediaType.APPLICATION_JSON)
public Policy create(Policy p) {
    Policy saved = repo.create(p);
    redis.del("policy:" + saved.getPolicyNo());
    publisher.published(saved);    // emit event
    return saved;
}

Consumer: react to events

@ApplicationScoped
public class PolicyEventConsumer {
    @Inject Jsonb jsonb;
    private static final Logger log = Logger.getLogger(PolicyEventConsumer.class.getName());

    @Incoming("policy-events-in")
    public void handle(String json) {
        Policy p = jsonb.fromJson(json, Policy.class);
        log.info(() -> "Saw new policy: " + p.getPolicyNo());
        // dispatch downstream work...
    }
}

In a real app, the consumer would write to a notification table, call a downstream API, or fan out to another topic. The minimum viable pattern is “log it and move on” — that’s the producer/consumer round trip working.

Verifying

# Create a policy (triggers the producer)
curl -X POST http://localhost:9080/api/policies \
  -H "Content-Type: application/json" \
  -d '{"policyNo":"P-002","holderName":"Bob","premium":315.00}'

# Watch the consumer log
podman logs -f insurance-app | grep "Saw new policy"

# Inspect the topic directly
podman exec -it kafka \
  kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic policy-events --from-beginning

In SigNoz: the create request shows a producer span; a separate consumer trace shows the message handling. Trace context propagation does work across Kafka if you wire the OpenTelemetry Kafka instrumentation — out of scope for this module, but worth knowing the spans join up cleanly when you do.

The two-step pattern, named

What we just did has a name: the transactional outbox pattern, in its simplest form. Write to the database in a transaction, emit an event afterward. The mistake to avoid is the inverse — emitting before persisting, then the DB write fails and now you have a downstream consumer reacting to a non-event. Always persist first.

For stronger guarantees (no lost events even on crash between persist and emit), use a real outbox table that a separate worker drains. Out of scope here; the seed pattern is enough to grow into it.

What you have

  • Kafka running in KRaft mode on the shared network.
  • A topic, partitioned.
  • A producer and a consumer in the same Liberty service (in production they’d be different services).
  • The transactional pattern that keeps the database and the event bus in agreement.

Module 11 wraps the API in real authentication.

Next: 11 — Identity with WSO2 IS →