~60 min read · updated 2026-05-16

Feature: Notification

Multi-topic fan-in via a single Kafka consumer, channel-routed mediation through MI to email/SMS/push, and Mailpit as the local SMTP sink. Slice 8 in the insurance-app repo.

Once you have multiple event-emitting features (Quote, Policy, Payment), the natural next move is a single consumer that fans in their events and dispatches notifications. This chapter covers two patterns: multi-topic Kafka consumption in one process, and channel-routed mediation in MI (one synapse <switch> that dispatches the same payload to email, SMS, or push depending on a field in the request).

Companion commit: 0607f12 in insurance-app.

The flow

quote-events    ──┐
policy-events   ──┼─► NotificationConsumer (one Kafka client)
payment-events  ──┘     │
                        ▼ POST /notification/dispatch  {channel, recipient, subject, body}
                  insurance-mi (synapse)

                        ├─ channel=email → POST mailpit:8025/api/v1/send
                        ├─ channel=sms   → POST wiremock:8080/sms-gateway/send
                        └─ channel=push  → POST wiremock:8080/push-gateway/send

One consumer subscribed to three topics. One MI <switch> deciding which backend to call. Liberty never knows about SMTP, just speaks HTTP/JSON to MI.

The consumer — @Startup semantics with CDI

@ApplicationScoped
public class NotificationConsumer {
    private static final List<String> TOPICS = List.of(
        "quote-events", "policy-events", "payment-events");

    @Resource ManagedExecutorService executor;
    @Inject NotificationService service;

    private KafkaConsumer<String, String> consumer;

    void start(@Observes @Initialized(ApplicationScoped.class) Object init) {
        Properties props = …;
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(TOPICS);
        executor.submit(this::pollLoop);
    }

    private void pollLoop() { …  /* poll → for-each handle → commitSync */ }
}

Two non-obvious bits:

@Observes @Initialized(ApplicationScoped.class) for eager init

CDI @ApplicationScoped beans are lazy — they aren’t instantiated until something else injects them. A consumer that nothing else injects sits dormant.

The fix is to make the bean an observer of the @Initialized(ApplicationScoped.class) CDI event, which fires once per app context, on deployment. The observer parameter forces CDI to create the bean.

Same trick is reused in AgentDashboardSubscriber (slice 11) and AuditConsumer (slice 13). Standard Liberty pattern.

ManagedExecutorService — not new Thread()

If you spawn a raw Thread for the poll loop, you’ll discover that @Transactional repository calls fail with NPE inside the UOWCoordinator. Liberty’s transaction system requires JEE thread-context propagation; raw threads don’t carry it.

@Resource ManagedExecutorService executor; + executor.submit(...) gives you a thread that has the JEE context (CDI request scope, JTA, security, JNDI) that interceptors depend on. Same fix applies to anything called from CompletableFuture.runAsync, JDK schedulers, or Kafka client callbacks.

The MI synapse — channel routing

<api context="/notification" name="NotificationAPI">
    <resource methods="POST" uri-template="/dispatch">
        <inSequence>
            <property name="ch"        expression="json-eval($.channel)"   scope="default"/>
            <property name="recipient" expression="json-eval($.recipient)" scope="default"/>
            <property name="subject"   expression="json-eval($.subject)"   scope="default"/>
            <property name="msg"       expression="json-eval($.body)"      scope="default"/>

            <switch source="$ctx:ch">
              <case regex="email">  …payloadFactory + call to mailpit:8025/api/v1/send… </case>
              <case regex="sms">    …payloadFactory + call to wiremock /sms-gateway/send… </case>
              <case regex="push">   …payloadFactory + call to wiremock /push-gateway/send… </case>
              <default> …400… </default>
            </switch>
            <respond/>
        </inSequence>
    </resource>
</api>

Note the property name msg rather than body. body is a reserved name in synapse properties — using it makes payloadFactory substitute the entire SOAP body into the field instead of the JSON value you extracted. Discovered the hard way; captured as gotcha 17 in the repo memory.

Adding a new channel (Slack, Teams, postal mail) becomes one new <case>. No Java rebuild.

Mailpit — the local SMTP sink

The compose stack runs axllent/mailpit exposing:

  • :1025 — SMTP receiver
  • :8025 — Web UI + REST API (also serves POST /api/v1/send for programmatic sending)

MI calls Mailpit’s REST send endpoint (not SMTP) because synapse’s <call> transport is HTTP-native. The web UI at https://mail.insurance-app.comptech-lab.com shows the inbox; every notification the fan-in dispatches appears there in real time.

NotificationService — pending/sent/failed states

public void notify(String topic, String key, NotificationRequest req) {
    Notification n = createPending(topic, key, req);   // @Transactional
    try (Response r = dispatcher.dispatch(req)) {
        if (r.getStatus() / 100 == 2) finalizeSent(n.getId(), …);
        else                          finalizeFailed(n.getId(), …);
    } catch (Exception e) { finalizeFailed(…); }
}

@Transactional                                Notification createPending(…)
@Transactional(Transactional.TxType.REQUIRES_NEW) Notification finalizeSent(…)
@Transactional(Transactional.TxType.REQUIRES_NEW) Notification finalizeFailed(…)

Same transaction-split pattern as slice 7’s payment finalize. A slow or down MI doesn’t hold a Postgres tx open; the audit row records PENDING even if dispatch never returns.

Verify

Fire one of each:

source ~/insurance-app/.wso2is-creds
AT=$()

# Fresh quote (already triggers quote-events fan-in)
QID=$(curl -sS -X POST http://localhost:9080/api/quotes -H "Authorization: Bearer $AT" \
        -H "Content-Type: application/json" \
        -d '{"vehicleVin":"NTF","driverAge":35,"coverageType":"BASIC"}' | jq -r .id)

# Bind → policy-events
POL=$(curl -sS -X POST http://localhost:9080/api/policies -H "Authorization: Bearer $AT" \
        -H "Content-Type: application/json" -d "{\"quoteId\":$QID}" | jq -r .policyNumber)

# Pay → payment-events
curl -sS -X POST http://localhost:9080/api/payments -H "Authorization: Bearer $AT" \
  -H "Content-Type: application/json" -H "Idempotency-Key: nt-$RANDOM" \
  -d "{\"policyNumber\":\"$POL\",\"amount\":100,\"currency\":\"USD\"}" > /dev/null

# Wait a few seconds, then check Mailpit
sleep 4
curl -sS http://localhost:8025/api/v1/messages | jq '.total'

# Or open https://mail.insurance-app.comptech-lab.com in a browser.

Three emails should appear, one per event.

What you have

  • A consumer that subscribes to many topics with one Kafka client.
  • An MI synapse API that routes by channel — the same pattern any multi-channel notification service uses in production.
  • A notification audit log in Postgres that records every dispatch attempt regardless of downstream status.
  • The ManagedExecutorService + @Observes @Initialized pattern for any future eager-init background work in Liberty.

Next: 17 — Feature: Claim filing →