~75 min read · updated 2026-05-16

Feature: Search

Debezium CDC reads Postgres logical replication, publishes to Kafka, and a Liberty consumer indexes into OpenSearch. Slice 14 in the insurance-app repo.

This chapter introduces Change Data Capture: every INSERT / UPDATE / DELETE on the claim table flows out of Postgres’s write-ahead log, through Debezium as a Kafka Connect source connector, into a Kafka topic, picked up by a Liberty consumer, and indexed into OpenSearch. By the end, search hits land within ~5 seconds of a POST /api/claims.

Companion commit: f19d7c1 in insurance-app.

The pipeline

Postgres (wal_level=logical)
  └─► Debezium PostgresConnector
        (Kafka Connect plugin: pgoutput
         publication: dbz_publication
         slot: insurance_claim_slot)

        └─► Kafka topic dbserver1.public.claim

              └─► SearchIndexer (Liberty, raw KafkaConsumer)

                    └─► OpenSearch
                          index: claims
                          PUT /claims/_doc/{id}
                          DELETE /claims/_doc/{id}  (on op=d)

Prerequisites Postgres needs

wal_level=logical (already set in the postgres:17-alpine config in the compose), and a user with REPLICATION privilege (already true for insurance because it’s bootstrapped as a superuser). Without either, Debezium connector startup fails with a clear error.

Debezium connector config

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "tasks.max": "1",
  "database.hostname": "postgres",
  "database.user":     "insurance",
  "database.password": "insurance",
  "database.dbname":   "insurance",
  "topic.prefix":      "dbserver1",
  "schema.include.list": "public",
  "table.include.list":  "public.claim",
  "plugin.name":              "pgoutput",
  "publication.autocreate.mode": "filtered",
  "slot.name":                "insurance_claim_slot",
  "tombstones.on.delete":     "false",

  "key.converter":   "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable":   "false",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",

  "transforms":                              "unwrap",
  "transforms.unwrap.type":                  "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones":       "false",
  "transforms.unwrap.delete.handling.mode":  "rewrite"
}

Two things to call out:

  1. pgoutput = the modern Postgres logical decoding plugin (built into Postgres ≥ 10, no wal2json install needed).
  2. ExtractNewRecordState SMT unwraps Debezium’s envelope ({before, after, source, op}) into just the after row. The indexer Java code stays simple — it gets plain row JSON, not envelope metadata. Deletes are signalled by __deleted: true on the unwrapped record.

Registering on demand — scripts/register-debezium.sh

#!/usr/bin/env bash
CONNECT_URL=http://localhost:8083
NAME=insurance-claim-cdc
CONFIG_BODY=$(jq -c '.config' compose/infra/connectors/debezium-postgres-claim.json)

# PUT /connectors/<name>/config is upsert — safe to re-run.
curl -sS -X PUT -H 'Content-Type: application/json' \
  "$CONNECT_URL/connectors/$NAME/config" --data "$CONFIG_BODY"

# Auto-restart if state isn't RUNNING — Kafka coordinator wobble
# sometimes leaves the connector UNASSIGNED while tasks stay RUNNING.
STATUS=$(curl -sS "$CONNECT_URL/connectors/$NAME/status")
[ "$(echo "$STATUS" | jq -r '.connector.state')" != "RUNNING" ] && \
  curl -sS -X POST "$CONNECT_URL/connectors/$NAME/restart?includeTasks=true" >/dev/null

Idempotent. Smoke calls it before the first search test; you can re-run it any time.

The Liberty indexer

@ApplicationScoped
public class SearchIndexer {
    private static final String CDC_TOPIC = "dbserver1.public.claim";

    @Inject OpenSearchClient os;
    @Resource ManagedExecutorService executor;

    void start(@Observes @Initialized(ApplicationScoped.class) Object init) {
        Properties props = new Properties();
        props.put(GROUP_ID_CONFIG, "insurance-search-" + UUID.randomUUID());
        props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
        // …
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(List.of(CDC_TOPIC));
        executor.submit(this::pollLoop);
    }

    private void handle(ConsumerRecord<String,String> r) {
        if (r.value() == null) return;
        Map<String,Object> after = jsonb.fromJson(r.value(), Map.class);
        Object id = after.get("id");
        if (id == null) return;
        String docId = String.valueOf(id);

        if (Boolean.TRUE.equals(after.get("__deleted")) ||
            "true".equals(String.valueOf(after.get("__deleted")))) {
            os.delete("claims", docId);
            return;
        }
        Map<String,Object> doc = new LinkedHashMap<>();
        // project just the searchable columns
        doc.put("id",                  after.get("id"));
        doc.put("policy_number",       after.get("policy_number"));
        doc.put("description",         after.get("description"));
        doc.put("other_party_vin",     after.get("other_party_vin"));
        doc.put("other_party_carrier", after.get("other_party_carrier"));
        doc.put("ocr_text",            after.get("ocr_text"));
        doc.put("status",              after.get("status"));
        doc.put("filed_at",            after.get("filed_at"));
        os.put("claims", docId, doc);
    }
}

Fresh UUID group.id per process (same pattern as AuditConsumer in chapter 20): OpenSearch is a derived projection, so a Liberty restart should re-index everything from the topic head. Postgres remains the source of truth.

OpenSearchClient — JDK HttpClient + Jsonb

@ApplicationScoped
public class OpenSearchClient {
    private static final String BASE = "http://opensearch:9200";
    private final HttpClient http = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(3)).build();

    public void put(String index, String id, Map<String,Object> doc) {
        http.send(HttpRequest.newBuilder()
            .uri(URI.create(BASE + "/" + index + "/_doc/" + id))
            .header("Content-Type", "application/json")
            .PUT(HttpRequest.BodyPublishers.ofString(jsonb.toJson(doc)))
            .build(), HttpResponse.BodyHandlers.ofString());
    }

    public Map<String,Object> search(String index, Map<String,Object> query) { … }
    public void refresh(String index) { … }
}

mpRestClient would also work, but OpenSearch’s responses are deeply nested (hits.hits[]._source...); typed DTOs would add Jackson binding work that isn’t worth it for one endpoint. Plain JDK HttpClient + Jsonb-as-Map is direct and easy to debug.

The search endpoint

@GET
@Path("/claims")
public SearchResponse search(@QueryParam("q") String q) {
    os.refresh("claims");   // force a refresh so the freshly-CDC'd doc is searchable
    Map<String,Object> body = Map.of(
        "size", 20,
        "query", Map.of(
            "multi_match", Map.of(
                "query",  q,
                "fields", List.of(
                    "description^2", "ocr_text", "other_party_vin",
                    "other_party_carrier", "policy_number", "status"))));
    Map<String,Object> raw = os.search("claims", body);
    // unpack hits.hits[]._source into typed Hit records
}

The os.refresh("claims") is intentional. OpenSearch defaults to a 1-second refresh interval; without an explicit refresh on the query path, a freshly-CDC’d doc could be invisible for that second and the smoke would flake. Costs ~5 ms; worth it.

Why Liberty does the sink (not a Kafka Connect sink plugin)

Debezium 2.7.3’s image ships only source plugins. Wiring an OpenSearch sink would mean swapping the Connect image (Confluent / Aiven) or hot-installing JARs into the running container — both fragile under rootless podman, and the Kafka Connect teaching is fully covered on the source side anyway.

Liberty consuming the CDC topics and PUT-ing to OpenSearch covers the same teaching capability (Kafka Connect for the source; CDC events as the contract) without the plugin-management distraction.

Verify

# Register the connector (idempotent — safe to re-run)
~/insurance-app/scripts/register-debezium.sh

# Connector + task should be RUNNING
curl -sS http://localhost:8083/connectors/insurance-claim-cdc/status \
  | jq '{conn: .connector.state, task: .tasks[0].state}'

# File a claim with a unique marker word
MARKER="dent-marker-$RANDOM"
# (assume POL + AT from earlier chapters, plus /tmp/photo.jpg)
CID=$(curl -sS -X POST http://localhost:9080/api/claims \
        -H "Authorization: Bearer $AT" \
        -F "policyNumber=$POL" \
        -F "description=$MARKER" \
        -F "attachment=@/tmp/photo.jpg" \
      | jq -r .id)

sleep 6   # CDC propagation + OpenSearch indexing

# Search by the marker
curl -sS "http://localhost:9080/api/search/claims?q=$MARKER" | jq '{total, ids: [.hits[].id]}'

You should see total >= 1, and ids includes $CID. Open OpenSearch Dashboards (https://search.insurance-app.comptech-lab.com) → Discover → index claims to see the same data OpenSearch-side.

What you have

  • A full CDC pipeline: Postgres logical replication → Debezium → Kafka → Liberty → OpenSearch.
  • The pgoutput plugin + ExtractNewRecordState SMT pattern that most production Debezium + Postgres setups use today.
  • Idempotent connector registration via PUT /connectors/<name>/config
    • auto-restart on UNASSIGNED.
  • A multi_match search endpoint over indexed claims.
  • The “fresh-UUID group.id + rebuild on every boot” pattern, reused from the audit chapter, applied to a different downstream (OpenSearch instead of an in-memory map).

Next: 22 — Feature: GUI tour →