Feature: Audit trail
The same domain event written to two Kafka topics — one log-compacted, one retention-based — and the side-by-side endpoint that shows the difference. Slice 13 in the insurance-app repo.
This is the chapter that finally makes the difference between compaction and retention obvious. The same claim state transition is written to two Kafka topics: one configured for log compaction, the other for time-based retention. A contrast endpoint queries both for the same claim id and shows the two shapes side by side. Students see in one screenshot that compaction = “current state per key” and retention = “every event for N days.”
Companion commit: ec2d9d7 in insurance-app.
The two topics
| Topic | cleanup.policy | What you get |
|---|---|---|
audit-events | compact | One record per claim:<id> key — the latest action wins after log compaction. |
claim-events | retention.ms=2592000000 (30 d) | Every state transition kept for 30 days, regardless of key. |
Both are created by the kafka-init one-shot in the compose stack.
The double-publish
Every ClaimService.publishToAuditAndClaimEvents(claim, action)
writes both:
private void publishToAuditAndClaimEvents(Claim c, String action) {
String state = String.format(
"{\"id\":%d,\"policyNumber\":\"%s\",\"status\":\"%s\"}",
c.getId(), c.getPolicyNumber(), c.getStatus());
audit.publish(new AuditEvent("claim", String.valueOf(c.getId()), action,
"system", state, OffsetDateTime.now().toString()));
claimEvents.publish(c.getId(), action, c);
}
AuditPublisher and ClaimEventPublisher are both raw
KafkaProducers keyed by the claim id. The key choice is what makes
compaction work — multiple writes with the same key let the log
cleaner collapse them.
Hooked into:
ClaimService.file(...)— publishesaction="FILED"ClaimService.approve(id)— publishesaction="APPROVED"
A new POST /api/claims/{id}/approve endpoint flips the status
and re-publishes. Two writes per claim → the contrast lands in the
right shape.
Audit emission across the other three services
Claims are the showcase for the dual-topic pattern, but they aren’t
the only state transitions worth auditing. QuoteService,
PolicyService, and PaymentService each emit a one-shot record
to audit-events (compacted) at the moment their core write
commits — calling the same AuditPublisher.publish(AuditEvent)
helper, keyed by the domain entity:
| Service | Hook | Audit type / key | Action |
|---|---|---|---|
QuoteService.createQuote() | after repo.save(q) and the calculated-event publish | "quote" / q.getId() | "CALCULATED" |
PolicyService.bind() | after repo.save(p) inside the redlock | "policy" / p.getPolicyNumber() | "BOUND" |
PaymentService.charge() | after the gateway charge succeeds | "payment" / paymentId | "CHARGED" |
ClaimService.file/approve() | as above | "claim" / c.getId() | "FILED" / "APPROVED" |
Two things to notice:
- The non-claim services emit only to
audit-events(the compacted topic). The dual-publish dance is specific to claims because the demo’s contrast endpoint shows both shapes side by side. Adopting the compacted topic on its own is a perfectly valid stopping point — it gives you the “current state per entity” projection that audit consumers care about most of the time. - Each emit is wrapped in a
try/catchso an audit-side failure never breaks the core write. The same four-defense pattern from chapter 17 (ClaimService.file→ MinIO upload → MI OCR → mTLS partner lookup → audit) applies here: enrichments must not be able to fail the transaction.
This wider coverage shipped as part of Phase 5 of the QA roadmap
(see chapter 31). The AuditCompletenessTest integration test in
src/test/java/com/example/insurance/audit/ enumerates which
services have to emit and fails CI if a new domain service is
added without an audit hook.
AuditConsumer — rebuilding state from a compacted topic
@ApplicationScoped
public class AuditConsumer {
@Inject AuditSnapshot snapshot;
@Resource ManagedExecutorService executor;
void start(@Observes @Initialized(ApplicationScoped.class) Object init) {
Properties props = …;
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"insurance-audit-" + UUID.randomUUID()); // ← fresh per process!
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("audit-events"));
executor.submit(this::pollLoop);
}
private void pollLoop() {
while (running.get()) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String,String> r : records) {
AuditEvent e = jsonb.fromJson(r.value(), AuditEvent.class);
snapshot.put(e);
}
}
}
}
The non-obvious bit: fresh-UUID group.id per process. The whole
point of audit-events is “rehydrate the projection on boot.” That
only works if the consumer always reads from the earliest offset,
and AUTO_OFFSET_RESET=earliest only kicks in when no committed
offset exists for the group. A persistent group would start from
“next after last commit” on a cold restart, and the snapshot would
be empty.
AuditSnapshot is just a ConcurrentMap<String, AuditEvent> —
key = claim:<id>, value = the latest record. No backing DB row.
The compacted topic IS the durable state.
The contrast endpoint
@GET
@Path("/contrast/{claimId}")
public Map<String, Object> contrast(@PathParam("claimId") String claimId) {
return Map.of(
"claimId", claimId,
"snapshot", snapshot.get("claim", claimId), // 1 record (compacted)
"events", readClaimEvents(claimId)); // every action (retention)
}
readClaimEvents scans the claim-events topic ad-hoc with a
one-shot consumer (5 s deadline, fresh UUID group). For a 30-day
retention topic, keeping a permanent projection in memory would be
wasteful; scanning by-VIN per request is fine for the demo.
Verify — the moment of teaching
source ~/insurance-app/.wso2is-creds
AT=$(…)
# 1. File a fresh claim (already adds action=FILED via the dashboard hook).
QID=$(curl -sS -X POST http://localhost:9080/api/quotes -H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
-d '{"vehicleVin":"AUD","driverAge":35,"coverageType":"BASIC"}' | jq -r .id)
POL=$(curl -sS -X POST http://localhost:9080/api/policies -H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
-d "{\"quoteId\":$QID}" | jq -r .policyNumber)
dd if=/dev/urandom of=/tmp/x.jpg bs=1024 count=4 status=none
CLAIM_ID=$(curl -sS -X POST http://localhost:9080/api/claims -H "Authorization: Bearer $AT" \
-F "policyNumber=$POL" -F "description=demo" -F "attachment=@/tmp/x.jpg" | jq -r .id)
sleep 2
# After FILED:
curl -sS "http://localhost:9080/api/audit/contrast/$CLAIM_ID" | jq '{snapshot: .snapshot.action, events: [.events[].action]}'
# → {"snapshot": "FILED", "events": ["FILED"]}
# 2. Approve it.
curl -sS -X POST -H "Authorization: Bearer $AT" "http://localhost:9080/api/claims/$CLAIM_ID/approve" > /dev/null
sleep 2
# After APPROVED:
curl -sS "http://localhost:9080/api/audit/contrast/$CLAIM_ID" | jq '{snapshot: .snapshot.action, events: [.events[].action]}'
# → {"snapshot": "APPROVED", "events": ["FILED", "APPROVED"]}
That second curl is the punchline. Compacted topic projection has one entry per claim (the latest action). Retention topic has every action for the claim. Same domain event, two different shapes; pick by use case.
When to pick which
Compaction = “current state” projection. Use when:
- Consumers ask “what’s the latest known status of X?”
- A new consumer that joins later should NOT replay every state transition since launch.
- Storage is bounded by number of entities, not time.
Retention = “event log.” Use when:
- Consumers do time-based analytics (“how many claims were FILED yesterday?”)
- Forensics / debug — you need the trail of every transition.
- Storage is bounded by time × volume.
Most real systems do both for important domain events. The slice 13 pattern — same event to two topics with different cleanup policies — is exactly how production deployments handle that.
What you have
- One write call → two Kafka topics with different cleanup semantics.
- An in-memory audit snapshot rebuilt from the compacted topic on every Liberty boot.
- A teaching endpoint that shows both shapes for the same claim id, side by side.
- The fresh-UUID-group.id pattern for any consumer whose job is “project the entire topic into local state.”
Next: 21 — Feature: Search →