~60 min read · updated 2026-05-17

Live claims feed (WebSocket)

A live, scroll-back-capable claims feed served over WebSocket. Redis Pub/Sub for fan-out, Redis Streams for the durable backlog, all bound to one @ServerEndpoint on Liberty. Slice 11 in the insurance-app repo.

This chapter teaches the WebSocket + Redis live-feed pattern — the plumbing under any “ops dashboard” that needs both live updates and a short scroll-back history. The actual operator-facing UI (a React + Express dashboard with one-click claim approval) is its own chapter much later in the track (28 — Agent dashboard). What follows is the substrate that dashboard reads from.

Insurance agents (or any operations user) want live signal about what’s happening, with the ability to scroll back through the last few minutes if they joined late. The natural pair of Redis primitives: Pub/Sub for live fan-out, Streams for the rewindable log. This chapter wires both behind one WebSocket endpoint.

Companion commit: f7a0be8 in insurance-app.

The dual-channel pattern

ClaimService.publishToDashboard(event)
  ├─► Pub/Sub:  PUBLISH dashboard:claims <json>          (live, no history)
  └─► Stream:   XADD dashboard:stream MAXLEN ~200 …      (durable, capped)

Browser
  └─► WebSocket /ws/dashboard
       ├─ @OnOpen: server reads last 20 entries from the Stream
       │  and sends them as one INITIAL_STATE message
       └─ AgentDashboardSubscriber forwards every Pub/Sub message live

Pub/Sub is the “phone ringing” channel — every subscriber gets a copy in real time, but anyone who wasn’t connected when a message published misses it. Streams is the “voicemail” log — capped at ~200 entries, durable across Redis restarts, replayable.

Together they let a late-joining dashboard hydrate AND get live updates without ever querying Postgres for “what happened in the last 10 minutes.”

Lettuce: a separate pub/sub connection

The standard CDI-injected RedisCommands<String,String> runs sync commands. It cannot be in subscribed mode at the same time as it runs XADD or SET. So Pub/Sub needs its own dedicated Lettuce client + connection:

@ApplicationScoped
public class AgentDashboardSubscriber {
    private RedisClient client;
    private StatefulRedisPubSubConnection<String, String> connection;

    void start(@Observes @Initialized(ApplicationScoped.class) Object init) {
        client = RedisClient.create("redis://redis:6379");
        connection = client.connectPubSub();
        connection.addListener(new RedisPubSubAdapter<>() {
            @Override public void message(String channel, String payload) {
                if ("dashboard:claims".equals(channel))
                    AgentDashboardSocket.broadcast(payload);
            }
        });
        connection.sync().subscribe("dashboard:claims");
    }

    @PreDestroy void stop() { /* close connection + shutdown client */ }
}

Eagerly started via the @Observes @Initialized trick (same as NotificationConsumer in chapter 16).

The WebSocket endpoint

@ApplicationScoped
@ServerEndpoint("/ws/dashboard")
public class AgentDashboardSocket {
    private static final Set<Session> SESSIONS = new CopyOnWriteArraySet<>();

    @OnOpen
    public void onOpen(Session session) {
        SESSIONS.add(session);
        // Send INITIAL_STATE: last 20 stream entries, oldest-first.
        session.getBasicRemote().sendText(jsonb.toJson(Map.of(
            "type", "INITIAL_STATE",
            "history", readRecentHistory())));
    }

    @OnClose public void onClose(Session s)              { SESSIONS.remove(s); }
    @OnError public void onError(Session s, Throwable t) { SESSIONS.remove(s); }

    /** Static — called from a Lettuce I/O thread that has no CDI context. */
    static void broadcast(String message) {
        for (Session s : SESSIONS) {
            if (!s.isOpen()) continue;
            try { s.getBasicRemote().sendText(message); }
            catch (Exception e) { SESSIONS.remove(s); }
        }
    }

    private List<String> readRecentHistory() {
        // XREVRANGE + reverse so the client renders oldest-first
        return redis.xrevrange("dashboard:stream", Range.unbounded(),
                               Limit.create(0, 20)).stream()
            .map(m -> m.getBody().getOrDefault("payload", "{}"))
            .toList().reversed();
    }
}

Why broadcast is static: the subscriber callback runs in a Lettuce I/O thread that has no CDI context. Plain static broadcast() avoids needing BeanManager dispatch from there.

The CopyOnWriteArraySet<Session> registry is intentionally simple — fine for a single-Liberty deployment. A multi-instance setup needs each instance to subscribe to Pub/Sub independently (already true: each AgentDashboardSubscriber opens its own connection), so fan-out scales horizontally.

Tying it into the claim flow

ClaimService.publishToDashboard(claim) runs after OCR + partner enrichment, so the dashboard event carries the fully-populated state:

private void publishToDashboard(Claim c) {
    String snippet = (c.getOcrText() == null) ? null
        : c.getOcrText().substring(0, Math.min(80, c.getOcrText().length())) + "…";
    dashboard.publish(new AgentDashboardEvent(
        "CLAIM_FILED", c.getId(), c.getPolicyNumber(),
        c.getDescription(), snippet,
        c.getOtherPartyCarrier(), c.getFiledAt().toString()));
}

The browser side — scripts/ws-probe.py

The companion repo’s scripts/ws-probe.py is a pure-stdlib WebSocket client (~150 lines). It hand-rolls the upgrade handshake

  • frame parser using socket, base64, hashlib. Use it as the reference smoke client (the VM doesn’t have websocat or the websockets Python package installed).

For a real browser, JavaScript’s native WebSocket is one line:

const ws = new WebSocket("ws://localhost:9080/ws/dashboard");
ws.onmessage = ev => console.log(JSON.parse(ev.data));

Auth — intentionally absent for slice 11

WebSocket auth in Liberty + mpJwt is awkward: browsers can’t set arbitrary Authorization headers on the initial GET that becomes the upgrade. Workarounds (query-param token, sec-websocket-protocol negotiation) all add ~50 lines of CDI / filter glue. Slice 11 intentionally ships the endpoint unauthenticated as a known limitation — the dashboard is read-only and the demo VM is on a private bridge. Documented as a TODO on the endpoint.

Chapter 27 (real human OIDC) closes this gap end-to-end on the customer side; the same approach applies to this endpoint once a UI fronts it.

Verify

# One terminal: connect via the probe + leave it open
python3 ~/insurance-app/scripts/ws-probe.py "$AT" "$POL" /tmp/photo.jpg

# It prints {"INITIAL_STATE": true, "LIVE_FRAME": true, "claimId": N}
# after firing a claim through the side-channel curl it does internally.

# Or in a browser tab open to https://app.insurance-app.comptech-lab.com,
# paste in DevTools console:
const ws = new WebSocket("wss://app.insurance-app.comptech-lab.com/ws/dashboard");
ws.onmessage = e => console.log(JSON.parse(e.data));

Open RedisInsight, key dashboard:stream — every CLAIM_FILED event appears there with XADD. Refresh and watch it grow.

What you have

  • Live + replayable feed of claim activity over WebSocket.
  • The Pub/Sub + Streams pattern that turns Redis into both a fan-out hub AND a short-history log.
  • Lettuce’s dedicated pub/sub connection pattern alongside the normal sync commands.
  • A @ServerEndpoint Liberty WebSocket binding (no extra config beyond webSocket-2.1 from webProfile-10.0).

Next: 19 — Feature: Reporting →