Feature: Notification
Multi-topic fan-in via a single Kafka consumer, channel-routed mediation through MI to email/SMS/push, and Mailpit as the local SMTP sink. Slice 8 in the insurance-app repo.
Once you have multiple event-emitting features (Quote, Policy,
Payment), the natural next move is a single consumer that fans in
their events and dispatches notifications. This chapter covers two
patterns: multi-topic Kafka consumption in one process, and
channel-routed mediation in MI (one synapse <switch> that
dispatches the same payload to email, SMS, or push depending on a
field in the request).
Companion commit: 0607f12 in insurance-app.
The flow
quote-events ──┐
policy-events ──┼─► NotificationConsumer (one Kafka client)
payment-events ──┘ │
▼ POST /notification/dispatch {channel, recipient, subject, body}
insurance-mi (synapse)
│
├─ channel=email → POST mailpit:8025/api/v1/send
├─ channel=sms → POST wiremock:8080/sms-gateway/send
└─ channel=push → POST wiremock:8080/push-gateway/send
One consumer subscribed to three topics. One MI <switch> deciding
which backend to call. Liberty never knows about SMTP, just speaks
HTTP/JSON to MI.
The consumer — @Startup semantics with CDI
@ApplicationScoped
public class NotificationConsumer {
private static final List<String> TOPICS = List.of(
"quote-events", "policy-events", "payment-events");
@Resource ManagedExecutorService executor;
@Inject NotificationService service;
private KafkaConsumer<String, String> consumer;
void start(@Observes @Initialized(ApplicationScoped.class) Object init) {
Properties props = …;
consumer = new KafkaConsumer<>(props);
consumer.subscribe(TOPICS);
executor.submit(this::pollLoop);
}
private void pollLoop() { … /* poll → for-each handle → commitSync */ }
}
Two non-obvious bits:
@Observes @Initialized(ApplicationScoped.class) for eager init
CDI @ApplicationScoped beans are lazy — they aren’t
instantiated until something else injects them. A consumer that
nothing else injects sits dormant.
The fix is to make the bean an observer of the
@Initialized(ApplicationScoped.class) CDI event, which fires once
per app context, on deployment. The observer parameter forces
CDI to create the bean.
Same trick is reused in AgentDashboardSubscriber (slice 11) and
AuditConsumer (slice 13). Standard Liberty pattern.
ManagedExecutorService — not new Thread()
If you spawn a raw Thread for the poll loop, you’ll discover that
@Transactional repository calls fail with NPE inside the
UOWCoordinator. Liberty’s transaction system requires JEE
thread-context propagation; raw threads don’t carry it.
@Resource ManagedExecutorService executor; + executor.submit(...)
gives you a thread that has the JEE context (CDI request scope,
JTA, security, JNDI) that interceptors depend on. Same fix applies
to anything called from CompletableFuture.runAsync, JDK
schedulers, or Kafka client callbacks.
The MI synapse — channel routing
<api context="/notification" name="NotificationAPI">
<resource methods="POST" uri-template="/dispatch">
<inSequence>
<property name="ch" expression="json-eval($.channel)" scope="default"/>
<property name="recipient" expression="json-eval($.recipient)" scope="default"/>
<property name="subject" expression="json-eval($.subject)" scope="default"/>
<property name="msg" expression="json-eval($.body)" scope="default"/>
<switch source="$ctx:ch">
<case regex="email"> …payloadFactory + call to mailpit:8025/api/v1/send… </case>
<case regex="sms"> …payloadFactory + call to wiremock /sms-gateway/send… </case>
<case regex="push"> …payloadFactory + call to wiremock /push-gateway/send… </case>
<default> …400… </default>
</switch>
<respond/>
</inSequence>
</resource>
</api>
Note the property name msg rather than body. body is a
reserved name in synapse properties — using it makes payloadFactory
substitute the entire SOAP body into the field instead of the JSON
value you extracted. Discovered the hard way; captured as gotcha 17
in the repo memory.
Adding a new channel (Slack, Teams, postal mail) becomes one new
<case>. No Java rebuild.
Mailpit — the local SMTP sink
The compose stack runs axllent/mailpit exposing:
:1025— SMTP receiver:8025— Web UI + REST API (also servesPOST /api/v1/sendfor programmatic sending)
MI calls Mailpit’s REST send endpoint (not SMTP) because synapse’s
<call> transport is HTTP-native. The web UI at
https://mail.insurance-app.comptech-lab.com shows the inbox; every
notification the fan-in dispatches appears there in real time.
NotificationService — pending/sent/failed states
public void notify(String topic, String key, NotificationRequest req) {
Notification n = createPending(topic, key, req); // @Transactional
try (Response r = dispatcher.dispatch(req)) {
if (r.getStatus() / 100 == 2) finalizeSent(n.getId(), …);
else finalizeFailed(n.getId(), …);
} catch (Exception e) { finalizeFailed(…); }
}
@Transactional Notification createPending(…)
@Transactional(Transactional.TxType.REQUIRES_NEW) Notification finalizeSent(…)
@Transactional(Transactional.TxType.REQUIRES_NEW) Notification finalizeFailed(…)
Same transaction-split pattern as slice 7’s payment finalize. A slow or down MI doesn’t hold a Postgres tx open; the audit row records PENDING even if dispatch never returns.
Verify
Fire one of each:
source ~/insurance-app/.wso2is-creds
AT=$(…)
# Fresh quote (already triggers quote-events fan-in)
QID=$(curl -sS -X POST http://localhost:9080/api/quotes -H "Authorization: Bearer $AT" \
-H "Content-Type: application/json" \
-d '{"vehicleVin":"NTF","driverAge":35,"coverageType":"BASIC"}' | jq -r .id)
# Bind → policy-events
POL=$(curl -sS -X POST http://localhost:9080/api/policies -H "Authorization: Bearer $AT" \
-H "Content-Type: application/json" -d "{\"quoteId\":$QID}" | jq -r .policyNumber)
# Pay → payment-events
curl -sS -X POST http://localhost:9080/api/payments -H "Authorization: Bearer $AT" \
-H "Content-Type: application/json" -H "Idempotency-Key: nt-$RANDOM" \
-d "{\"policyNumber\":\"$POL\",\"amount\":100,\"currency\":\"USD\"}" > /dev/null
# Wait a few seconds, then check Mailpit
sleep 4
curl -sS http://localhost:8025/api/v1/messages | jq '.total'
# Or open https://mail.insurance-app.comptech-lab.com in a browser.
Three emails should appear, one per event.
What you have
- A consumer that subscribes to many topics with one Kafka client.
- An MI synapse API that routes by channel — the same pattern any multi-channel notification service uses in production.
- A notification audit log in Postgres that records every dispatch attempt regardless of downstream status.
- The
ManagedExecutorService+@Observes @Initializedpattern for any future eager-init background work in Liberty.