Feature: Policy bind
Binding a Quote into a Policy: Redlock distributed locking for idempotency, and a log-compacted Kafka topic for the resulting policy-state projection. Slice 6 in the insurance-app repo.
A Quote sitting in Postgres isn’t insurance — it’s a quotation. A Policy is what the customer ends up with after accepting it. This chapter walks through binding one to the other, introducing two enterprise patterns that show up everywhere once you start looking: distributed locking (Redlock) and log-compacted topics.
Companion commit: 9617ccc in insurance-app.
The two patterns
| Pattern | Why for this feature |
|---|---|
| Redlock | Two concurrent POSTs with the same quoteId must NOT both succeed; the lock serializes them. |
| Log-compacted Kafka topic | The policy-events topic keeps only the latest record per policyNumber, so consumers replaying it see a current-state projection rather than every state transition. |
API shape
POST /api/policies {quoteId: 123}
→ 201 Created {policyNumber: "POL-A1B2C3D4", quoteId, status: "BOUND"}
→ 200 OK on idempotent re-bind (same quoteId → same policyNumber)
→ 409 Conflict if a concurrent bind holds the Redlock
Three response codes, one POST. The 201/200 distinction is the client-visible contract for idempotency; the 409 is the rare-but- real “another request is in flight, retry in a moment” case.
Redlock — the SET-NX + Lua-CAD pattern
public class Redlock {
private static final String RELEASE_LUA =
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ " return redis.call('del', KEYS[1]) "
+ "else "
+ " return 0 "
+ "end";
public String tryAcquire(String key, Duration ttl) {
String token = UUID.randomUUID().toString();
String r = redis.set(key, token, SetArgs.Builder.nx().px(ttl.toMillis()));
return "OK".equals(r) ? token : null;
}
public boolean release(String key, String token) {
return redis.eval(RELEASE_LUA, ScriptOutputType.INTEGER,
new String[]{key}, token) instanceof Long n && n == 1;
}
}
Two non-obvious bits:
SET key value NX PX ttl— atomic check-and-set with expiry.NX= only-if-not-exists;PX= TTL in ms. One round trip; no race window between the GET and the SET.- Lua-scripted release — the script compares the stored value
to the token before deleting. Without this, a slow holder whose
lock has already expired could
DELa key now held by a new acquirer. The script keeps that DEL atomic with the comparison.
Single-node Redis. The classic “Redlock” recipe is N independent Redis nodes with a quorum; for our demo’s single Redis, that collapses to this one-node form. Read Martin Kleppmann + antirez’s exchange on Redlock if you ever need to deploy this for real money.
Service flow
@Transactional
public BindResult bind(Long quoteId) {
// 1. Idempotency hot path — DB hit avoids the lock entirely.
Policy existing = repo.findByQuoteId(quoteId);
if (existing != null) return new BindResult(existing, false);
Quote quote = quoteRepo.findById(quoteId);
if (quote == null) throw new NotFoundException("quote " + quoteId);
// 2. Acquire the lock. Failure = another bind in flight.
String lockKey = "lock:policy:quote:" + quoteId;
String token = redlock.tryAcquire(lockKey, Duration.ofSeconds(10));
if (token == null) throw new WebApplicationException(409);
try {
// 3. Re-check inside the lock — the holder may have just committed.
existing = repo.findByQuoteId(quoteId);
if (existing != null) return new BindResult(existing, false);
Policy p = new Policy();
p.setPolicyNumber("POL-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase());
p.setQuoteId(quoteId);
p.setStatus("BOUND");
Policy saved = repo.save(p);
publisher.publishStateChange(saved);
return new BindResult(saved, true);
} finally {
redlock.release(lockKey, token);
}
}
Three checks against the same race, in decreasing optimism:
- DB-level check before the lock (covers the obvious “already bound” case — most calls).
- Redlock (covers the “two concurrent first-time binds” case).
- DB-level check inside the lock (covers the “we lost the race for the lock, the winner committed, we get the lock next”).
Plus a DB UNIQUE(quote_id) constraint as a fourth-line backstop
in the schema. Defense in depth — if any one layer fails,
the others still keep state consistent.
The compacted topic side
<!-- compose/infra/kafka — kafka-init step -->
--topic policy-events --partitions 3 --replication-factor 1
--config cleanup.policy=compact
The key on each record is the policyNumber. Subsequent updates to the same policy (e.g. status transitions) write the same key, and Kafka’s log cleaner eventually drops the older records — leaving one entry per policy that represents its current state.
Why this matters: a new consumer that joins next year and reads
from the beginning of policy-events sees one record per existing
policy (current state), not every state transition since launch.
That’s the compacted-topic-as-key-value-store pattern.
PolicyPublisher uses a raw KafkaProducer rather than
MicroProfile Reactive Messaging’s Emitter.send(payload) — Emitter
is round-robin keyless, and a compacted topic needs the key to
deduplicate. Bumped kafka-clients to <scope>compile</scope> in
pom.xml so ProducerRecord is visible at compile time.
Verify
# Set up: get a token, fire a quote, capture its id.
source ~/insurance-app/.wso2is-creds
AT=$(curl -k -sS -X POST -u "$WSO2IS_CLIENT_ID:$WSO2IS_CLIENT_SECRET" \
"$WSO2IS_TOKEN_URL" -d "grant_type=client_credentials" \
| jq -r .access_token)
QID=$(curl -sS -X POST http://localhost:9080/api/quotes \
-H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
-d '{"vehicleVin":"PB","driverAge":35,"coverageType":"BASIC"}' \
| jq -r .id)
# First bind → 201
curl -i -X POST http://localhost:9080/api/policies \
-H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
-d "{\"quoteId\":$QID}"
# Idempotent re-bind → 200, same policyNumber
curl -i -X POST http://localhost:9080/api/policies \
-H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
-d "{\"quoteId\":$QID}"
# Concurrent burst → 5 parallel POSTs, exactly 1 policy row in the DB
for i in 1 2 3 4 5; do
curl -sS -o /dev/null -X POST http://localhost:9080/api/policies \
-H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
-d "{\"quoteId\":$QID}" &
done; wait
podman exec postgres psql -U insurance -d insurance -t \
-c "SELECT count(*) FROM policy WHERE quote_id = $QID"
Observe
Open Kafka UI (https://kafka.insurance-app.comptech-lab.com) →
topic policy-events → check Configs for
cleanup.policy=compact. Records are keyed by policyNumber; you’ll
see new entries per bind. Run the same demo twice with two different
quotes and watch how compaction is policy-scoped, not topic-scoped.
In RedisInsight, watch the key lock:policy:quote:<quoteId> appear
for ~10 seconds during the bind and disappear when the lock releases.
What you have
After this chapter:
- A distributed-lock-protected idempotent endpoint.
- A compacted topic projecting current policy state, ready for any downstream consumer that wants “tell me about every active policy.”
- The shape of every subsequent “X-events” topic in the curriculum (compacted for current-state projections, retention-based for history — slice 13 contrasts both side-by-side).
Next: 15 — Feature: Payment →