~60 min read · updated 2026-05-18

Feature: Audit trail

The same domain event written to two Kafka topics — one log-compacted, one retention-based — and the side-by-side endpoint that shows the difference. Slice 13 in the insurance-app repo.

This is the chapter that finally makes the difference between compaction and retention obvious. The same claim state transition is written to two Kafka topics: one configured for log compaction, the other for time-based retention. A contrast endpoint queries both for the same claim id and shows the two shapes side by side. Students see in one screenshot that compaction = “current state per key” and retention = “every event for N days.”

Companion commit: ec2d9d7 in insurance-app.

The two topics

Topiccleanup.policyWhat you get
audit-eventscompactOne record per claim:<id> key — the latest action wins after log compaction.
claim-eventsretention.ms=2592000000 (30 d)Every state transition kept for 30 days, regardless of key.

Both are created by the kafka-init one-shot in the compose stack.

The double-publish

Every ClaimService.publishToAuditAndClaimEvents(claim, action) writes both:

private void publishToAuditAndClaimEvents(Claim c, String action) {
    String state = String.format(
        "{\"id\":%d,\"policyNumber\":\"%s\",\"status\":\"%s\"}",
        c.getId(), c.getPolicyNumber(), c.getStatus());

    audit.publish(new AuditEvent("claim", String.valueOf(c.getId()), action,
        "system", state, OffsetDateTime.now().toString()));
    claimEvents.publish(c.getId(), action, c);
}

AuditPublisher and ClaimEventPublisher are both raw KafkaProducers keyed by the claim id. The key choice is what makes compaction work — multiple writes with the same key let the log cleaner collapse them.

Hooked into:

  • ClaimService.file(...) — publishes action="FILED"
  • ClaimService.approve(id) — publishes action="APPROVED"

A new POST /api/claims/{id}/approve endpoint flips the status and re-publishes. Two writes per claim → the contrast lands in the right shape.

Audit emission across the other three services

Claims are the showcase for the dual-topic pattern, but they aren’t the only state transitions worth auditing. QuoteService, PolicyService, and PaymentService each emit a one-shot record to audit-events (compacted) at the moment their core write commits — calling the same AuditPublisher.publish(AuditEvent) helper, keyed by the domain entity:

ServiceHookAudit type / keyAction
QuoteService.createQuote()after repo.save(q) and the calculated-event publish"quote" / q.getId()"CALCULATED"
PolicyService.bind()after repo.save(p) inside the redlock"policy" / p.getPolicyNumber()"BOUND"
PaymentService.charge()after the gateway charge succeeds"payment" / paymentId"CHARGED"
ClaimService.file/approve()as above"claim" / c.getId()"FILED" / "APPROVED"

Two things to notice:

  • The non-claim services emit only to audit-events (the compacted topic). The dual-publish dance is specific to claims because the demo’s contrast endpoint shows both shapes side by side. Adopting the compacted topic on its own is a perfectly valid stopping point — it gives you the “current state per entity” projection that audit consumers care about most of the time.
  • Each emit is wrapped in a try/catch so an audit-side failure never breaks the core write. The same four-defense pattern from chapter 17 (ClaimService.file → MinIO upload → MI OCR → mTLS partner lookup → audit) applies here: enrichments must not be able to fail the transaction.

This wider coverage shipped as part of Phase 5 of the QA roadmap (see chapter 31). The AuditCompletenessTest integration test in src/test/java/com/example/insurance/audit/ enumerates which services have to emit and fails CI if a new domain service is added without an audit hook.

AuditConsumer — rebuilding state from a compacted topic

@ApplicationScoped
public class AuditConsumer {
    @Inject  AuditSnapshot snapshot;
    @Resource ManagedExecutorService executor;

    void start(@Observes @Initialized(ApplicationScoped.class) Object init) {
        Properties props = …;
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                  "insurance-audit-" + UUID.randomUUID());     // ← fresh per process!
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(List.of("audit-events"));
        executor.submit(this::pollLoop);
    }

    private void pollLoop() {
        while (running.get()) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord<String,String> r : records) {
                AuditEvent e = jsonb.fromJson(r.value(), AuditEvent.class);
                snapshot.put(e);
            }
        }
    }
}

The non-obvious bit: fresh-UUID group.id per process. The whole point of audit-events is “rehydrate the projection on boot.” That only works if the consumer always reads from the earliest offset, and AUTO_OFFSET_RESET=earliest only kicks in when no committed offset exists for the group. A persistent group would start from “next after last commit” on a cold restart, and the snapshot would be empty.

AuditSnapshot is just a ConcurrentMap<String, AuditEvent> — key = claim:<id>, value = the latest record. No backing DB row. The compacted topic IS the durable state.

The contrast endpoint

@GET
@Path("/contrast/{claimId}")
public Map<String, Object> contrast(@PathParam("claimId") String claimId) {
    return Map.of(
        "claimId",  claimId,
        "snapshot", snapshot.get("claim", claimId),    // 1 record (compacted)
        "events",   readClaimEvents(claimId));          // every action (retention)
}

readClaimEvents scans the claim-events topic ad-hoc with a one-shot consumer (5 s deadline, fresh UUID group). For a 30-day retention topic, keeping a permanent projection in memory would be wasteful; scanning by-VIN per request is fine for the demo.

Verify — the moment of teaching

source ~/insurance-app/.wso2is-creds
AT=$()

# 1. File a fresh claim (already adds action=FILED via the dashboard hook).
QID=$(curl -sS -X POST http://localhost:9080/api/quotes -H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
        -d '{"vehicleVin":"AUD","driverAge":35,"coverageType":"BASIC"}' | jq -r .id)
POL=$(curl -sS -X POST http://localhost:9080/api/policies -H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
        -d "{\"quoteId\":$QID}" | jq -r .policyNumber)
dd if=/dev/urandom of=/tmp/x.jpg bs=1024 count=4 status=none
CLAIM_ID=$(curl -sS -X POST http://localhost:9080/api/claims -H "Authorization: Bearer $AT" \
            -F "policyNumber=$POL" -F "description=demo" -F "attachment=@/tmp/x.jpg" | jq -r .id)
sleep 2

# After FILED:
curl -sS "http://localhost:9080/api/audit/contrast/$CLAIM_ID" | jq '{snapshot: .snapshot.action, events: [.events[].action]}'
# → {"snapshot": "FILED", "events": ["FILED"]}

# 2. Approve it.
curl -sS -X POST -H "Authorization: Bearer $AT" "http://localhost:9080/api/claims/$CLAIM_ID/approve" > /dev/null
sleep 2

# After APPROVED:
curl -sS "http://localhost:9080/api/audit/contrast/$CLAIM_ID" | jq '{snapshot: .snapshot.action, events: [.events[].action]}'
# → {"snapshot": "APPROVED", "events": ["FILED", "APPROVED"]}

That second curl is the punchline. Compacted topic projection has one entry per claim (the latest action). Retention topic has every action for the claim. Same domain event, two different shapes; pick by use case.

When to pick which

Compaction = “current state” projection. Use when:

  • Consumers ask “what’s the latest known status of X?”
  • A new consumer that joins later should NOT replay every state transition since launch.
  • Storage is bounded by number of entities, not time.

Retention = “event log.” Use when:

  • Consumers do time-based analytics (“how many claims were FILED yesterday?”)
  • Forensics / debug — you need the trail of every transition.
  • Storage is bounded by time × volume.

Most real systems do both for important domain events. The slice 13 pattern — same event to two topics with different cleanup policies — is exactly how production deployments handle that.

What you have

  • One write call → two Kafka topics with different cleanup semantics.
  • An in-memory audit snapshot rebuilt from the compacted topic on every Liberty boot.
  • A teaching endpoint that shows both shapes for the same claim id, side by side.
  • The fresh-UUID-group.id pattern for any consumer whose job is “project the entire topic into local state.”

Next: 21 — Feature: Search →