Feature: Search
Debezium CDC reads Postgres logical replication, publishes to Kafka, and a Liberty consumer indexes into OpenSearch. Slice 14 in the insurance-app repo.
This chapter introduces Change Data Capture: every INSERT / UPDATE
/ DELETE on the claim table flows out of Postgres’s write-ahead log,
through Debezium as a Kafka Connect source connector, into a Kafka
topic, picked up by a Liberty consumer, and indexed into OpenSearch.
By the end, search hits land within ~5 seconds of a POST /api/claims.
Companion commit: f19d7c1 in insurance-app.
The pipeline
Postgres (wal_level=logical)
└─► Debezium PostgresConnector
(Kafka Connect plugin: pgoutput
publication: dbz_publication
slot: insurance_claim_slot)
│
└─► Kafka topic dbserver1.public.claim
│
└─► SearchIndexer (Liberty, raw KafkaConsumer)
│
└─► OpenSearch
index: claims
PUT /claims/_doc/{id}
DELETE /claims/_doc/{id} (on op=d)
Prerequisites Postgres needs
wal_level=logical (already set in the postgres:17-alpine config in
the compose), and a user with REPLICATION privilege (already true
for insurance because it’s bootstrapped as a superuser). Without
either, Debezium connector startup fails with a clear error.
Debezium connector config
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.user": "insurance",
"database.password": "insurance",
"database.dbname": "insurance",
"topic.prefix": "dbserver1",
"schema.include.list": "public",
"table.include.list": "public.claim",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"slot.name": "insurance_claim_slot",
"tombstones.on.delete": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
Two things to call out:
pgoutput= the modern Postgres logical decoding plugin (built into Postgres ≥ 10, nowal2jsoninstall needed).ExtractNewRecordStateSMT unwraps Debezium’s envelope ({before, after, source, op}) into just theafterrow. The indexer Java code stays simple — it gets plain row JSON, not envelope metadata. Deletes are signalled by__deleted: trueon the unwrapped record.
Registering on demand — scripts/register-debezium.sh
#!/usr/bin/env bash
CONNECT_URL=http://localhost:8083
NAME=insurance-claim-cdc
CONFIG_BODY=$(jq -c '.config' compose/infra/connectors/debezium-postgres-claim.json)
# PUT /connectors/<name>/config is upsert — safe to re-run.
curl -sS -X PUT -H 'Content-Type: application/json' \
"$CONNECT_URL/connectors/$NAME/config" --data "$CONFIG_BODY"
# Auto-restart if state isn't RUNNING — Kafka coordinator wobble
# sometimes leaves the connector UNASSIGNED while tasks stay RUNNING.
STATUS=$(curl -sS "$CONNECT_URL/connectors/$NAME/status")
[ "$(echo "$STATUS" | jq -r '.connector.state')" != "RUNNING" ] && \
curl -sS -X POST "$CONNECT_URL/connectors/$NAME/restart?includeTasks=true" >/dev/null
Idempotent. Smoke calls it before the first search test; you can re-run it any time.
The Liberty indexer
@ApplicationScoped
public class SearchIndexer {
private static final String CDC_TOPIC = "dbserver1.public.claim";
@Inject OpenSearchClient os;
@Resource ManagedExecutorService executor;
void start(@Observes @Initialized(ApplicationScoped.class) Object init) {
Properties props = new Properties();
props.put(GROUP_ID_CONFIG, "insurance-search-" + UUID.randomUUID());
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
// …
consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(CDC_TOPIC));
executor.submit(this::pollLoop);
}
private void handle(ConsumerRecord<String,String> r) {
if (r.value() == null) return;
Map<String,Object> after = jsonb.fromJson(r.value(), Map.class);
Object id = after.get("id");
if (id == null) return;
String docId = String.valueOf(id);
if (Boolean.TRUE.equals(after.get("__deleted")) ||
"true".equals(String.valueOf(after.get("__deleted")))) {
os.delete("claims", docId);
return;
}
Map<String,Object> doc = new LinkedHashMap<>();
// project just the searchable columns
doc.put("id", after.get("id"));
doc.put("policy_number", after.get("policy_number"));
doc.put("description", after.get("description"));
doc.put("other_party_vin", after.get("other_party_vin"));
doc.put("other_party_carrier", after.get("other_party_carrier"));
doc.put("ocr_text", after.get("ocr_text"));
doc.put("status", after.get("status"));
doc.put("filed_at", after.get("filed_at"));
os.put("claims", docId, doc);
}
}
Fresh UUID group.id per process (same pattern as
AuditConsumer in chapter 20): OpenSearch is a derived projection,
so a Liberty restart should re-index everything from the topic head.
Postgres remains the source of truth.
OpenSearchClient — JDK HttpClient + Jsonb
@ApplicationScoped
public class OpenSearchClient {
private static final String BASE = "http://opensearch:9200";
private final HttpClient http = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(3)).build();
public void put(String index, String id, Map<String,Object> doc) {
http.send(HttpRequest.newBuilder()
.uri(URI.create(BASE + "/" + index + "/_doc/" + id))
.header("Content-Type", "application/json")
.PUT(HttpRequest.BodyPublishers.ofString(jsonb.toJson(doc)))
.build(), HttpResponse.BodyHandlers.ofString());
}
public Map<String,Object> search(String index, Map<String,Object> query) { … }
public void refresh(String index) { … }
}
mpRestClient would also work, but OpenSearch’s responses are deeply
nested (hits.hits[]._source...); typed DTOs would add Jackson
binding work that isn’t worth it for one endpoint. Plain JDK
HttpClient + Jsonb-as-Map is direct and easy to debug.
The search endpoint
@GET
@Path("/claims")
public SearchResponse search(@QueryParam("q") String q) {
os.refresh("claims"); // force a refresh so the freshly-CDC'd doc is searchable
Map<String,Object> body = Map.of(
"size", 20,
"query", Map.of(
"multi_match", Map.of(
"query", q,
"fields", List.of(
"description^2", "ocr_text", "other_party_vin",
"other_party_carrier", "policy_number", "status"))));
Map<String,Object> raw = os.search("claims", body);
// unpack hits.hits[]._source into typed Hit records
}
The os.refresh("claims") is intentional. OpenSearch defaults to a
1-second refresh interval; without an explicit refresh on the query
path, a freshly-CDC’d doc could be invisible for that second and
the smoke would flake. Costs ~5 ms; worth it.
Why Liberty does the sink (not a Kafka Connect sink plugin)
Debezium 2.7.3’s image ships only source plugins. Wiring an OpenSearch sink would mean swapping the Connect image (Confluent / Aiven) or hot-installing JARs into the running container — both fragile under rootless podman, and the Kafka Connect teaching is fully covered on the source side anyway.
Liberty consuming the CDC topics and PUT-ing to OpenSearch covers the same teaching capability (Kafka Connect for the source; CDC events as the contract) without the plugin-management distraction.
Verify
# Register the connector (idempotent — safe to re-run)
~/insurance-app/scripts/register-debezium.sh
# Connector + task should be RUNNING
curl -sS http://localhost:8083/connectors/insurance-claim-cdc/status \
| jq '{conn: .connector.state, task: .tasks[0].state}'
# File a claim with a unique marker word
MARKER="dent-marker-$RANDOM"
# (assume POL + AT from earlier chapters, plus /tmp/photo.jpg)
CID=$(curl -sS -X POST http://localhost:9080/api/claims \
-H "Authorization: Bearer $AT" \
-F "policyNumber=$POL" \
-F "description=$MARKER" \
-F "attachment=@/tmp/photo.jpg" \
| jq -r .id)
sleep 6 # CDC propagation + OpenSearch indexing
# Search by the marker
curl -sS "http://localhost:9080/api/search/claims?q=$MARKER" | jq '{total, ids: [.hits[].id]}'
You should see total >= 1, and ids includes $CID. Open
OpenSearch Dashboards (https://search.insurance-app.comptech-lab.com)
→ Discover → index claims to see the same data OpenSearch-side.
What you have
- A full CDC pipeline: Postgres logical replication → Debezium → Kafka → Liberty → OpenSearch.
- The
pgoutputplugin +ExtractNewRecordStateSMT pattern that most production Debezium + Postgres setups use today. - Idempotent connector registration via
PUT /connectors/<name>/config- auto-restart on UNASSIGNED.
- A
multi_matchsearch endpoint over indexed claims. - The “fresh-UUID group.id + rebuild on every boot” pattern, reused from the audit chapter, applied to a different downstream (OpenSearch instead of an in-memory map).
Next: 22 — Feature: GUI tour →