~75 min read · updated 2026-05-18

Feature: Reporting

Kafka Streams windowed aggregation inside Liberty + a scheduled MI synapse task that snapshots the rolling totals into a report_run table. Slice 12 in the insurance-app repo.

This chapter introduces two enterprise patterns that pair naturally: Kafka Streams for stateful windowed aggregation, and scheduled MI tasks (synapse Quartz) for periodic side-effects. Liberty hosts the streaming topology in-process; MI fires the snapshotter on a 30-second cron-like cadence.

Companion commit: 910825a in insurance-app.

The shape

payment-events  ──► Kafka Streams (in Liberty)
                      groupBy(status from JSON)
                      .windowedBy(1 min tumbling)
                      .count(Materialized.as("payment-counts"))

                            ▼ in-memory state store
                      ReadOnlyWindowStore<String, Long>

                            ▼ exposed via REST
                      GET /api/reports/payment-stats

MI synapse task                          POST /api/reports/snapshot
  every 30s ──► insurance-app:9080/api/reports/snapshot
                                         persists a report_run row
                                         with current totals

Two query paths over the same data: real-time via the state store (GET payment-stats), historical via the report_run table (GET /reports/runs). Both useful; both fed by the same single Streams topology.

Kafka Streams — the topology

@ApplicationScoped
public class PaymentReportStreams {
    public static final String STORE_NAME = "payment-counts";
    private KafkaStreams streams;

    void start(@Observes @Initialized(ApplicationScoped.class) Object init) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,    "insurance-app-payment-report");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,   Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        StreamsBuilder b = new StreamsBuilder();
        b.<String, String>stream("payment-events")
            .filter((k, v) -> v != null && !v.isBlank())
            .groupBy((k, v) -> "status:" + extractStatus(v),
                     Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count(Materialized.as(STORE_NAME));

        streams = new KafkaStreams(b.build(), props);
        streams.setUncaughtExceptionHandler(ex -> SHUTDOWN_CLIENT);
        streams.start();
    }

    @PreDestroy void stop() { streams.close(Duration.ofSeconds(5)); }
}

Three pieces:

  • APPLICATION_ID_CONFIG is both the consumer group and the prefix for the internal -changelog + -repartition topics Kafka Streams auto-creates. Unique per app, must be stable across restarts (otherwise state rebuilds from scratch).
  • Tumbling window = non-overlapping fixed-duration buckets. 1 minute here; a payment counted in 12:00:30 lives in the bucket [12:00:00, 12:01:00).
  • Materialized.as("payment-counts") = the in-memory KV store the REST endpoint queries. Each window/key combination is its own entry.

Adding kafka-streams to the WAR

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.9.0</version>
</dependency>

Same Kafka-line version as kafka-clients. Streams ships its own in-memory state store implementation; nothing to install beyond the JAR.

Querying the state store from REST

public List<WindowedCount> countByStatusLastHour() {
    if (streams == null || streams.state() != KafkaStreams.State.RUNNING) return List.of();

    ReadOnlyWindowStore<String, Long> store = streams.store(
        StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.windowStore()));
    Instant to   = Instant.now();
    Instant from = to.minus(Duration.ofHours(1));
    List<WindowedCount> out = new ArrayList<>();
    for (String status : new String[]{"SUCCEEDED", "FAILED", "UNKNOWN"}) {
        try (WindowStoreIterator<Long> it = store.fetch("status:" + status, from, to)) {
            while (it.hasNext()) {
                var kv = it.next();
                out.add(new WindowedCount(status, kv.key, kv.key + 60_000, kv.value));
            }
        }
    }
    return out;
}

Notice the state() != RUNNING early-return — the store is unusable during startup. REST callers can poll without seeing exceptions during a slow boot.

MI synapse task — Quartz cron in 28 lines

mi/synapse-config/tasks/ReportTask.xml:

<task name="ReportTask"
      class="org.apache.synapse.startup.tasks.MessageInjector"
      group="synapse.simple.quartz"
      xmlns="http://ws.apache.org/ns/synapse">
    <trigger interval="30"/>
    <property name="injectTo"     value="sequence"/>
    <property name="sequenceName" value="ReportSnapshotSeq"/>
    <property name="message">
        <message xmlns="http://ws.apache.org/ns/synapse">
            <tick>now</tick>
        </message>
    </property>
</task>

mi/synapse-config/sequences/ReportSnapshotSeq.xml:

<sequence name="ReportSnapshotSeq" xmlns="http://ws.apache.org/ns/synapse">
    <call>
        <endpoint>
            <http method="post"
                  uri-template="http://insurance-app:9080/api/reports/snapshot?source=mi-scheduled-task"/>
        </endpoint>
    </call>
    <drop/>
</sequence>

MessageInjector is a stock synapse task class that injects a synthetic message into a named sequence. <trigger interval="30"/> is seconds; production cadences would be 5–15 minutes.

The MI Containerfile got two new COPY directives:

COPY synapse-config/sequences/ /home/wso2carbon/wso2mi-4.4.0/repository/deployment/server/synapse-configs/default/sequences/
COPY synapse-config/tasks/     /home/wso2carbon/wso2mi-4.4.0/repository/deployment/server/synapse-configs/default/tasks/

ReportSnapshotResource — the endpoint MI hits

@POST
@Path("/snapshot")
@Transactional
public ReportRun snapshot(@QueryParam("source") String source) {
    Map<String, Long> totals = streams.totalsByStatusLastHour();
    ReportRun r = new ReportRun();
    r.setSucceededCount(totals.getOrDefault("SUCCEEDED", 0L));
    r.setFailedCount(   totals.getOrDefault("FAILED",    0L));
    r.setUnknownCount(  totals.getOrDefault("UNKNOWN",   0L));
    r.setSource(source != null && !source.isBlank() ? source : "mi-scheduled-task");
    r.setCreatedAt(OffsetDateTime.now());
    return repo.save(r);
}

No auth. Trust comes from the network boundary — MI is the only caller on insurance-net, and the endpoint isn’t published through HAProxy. The trade-off: simpler scheduler ops vs less defense in depth. For a single-VM teaching artifact, fine; for a multi-tenant production system, you’d want at minimum a mutual TLS or shared secret here.

Verify

source ~/insurance-app/.wso2is-creds
AT=$()

# Fire a few payments so the state store has data.
for i in 1 2 3; do
  curl -sS -o /dev/null -X POST http://localhost:9080/api/payments \
    -H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
    -H "Idempotency-Key: rpt-$i-$RANDOM" \
    -d "{\"policyNumber\":\"$POL\",\"amount\":100,\"currency\":\"USD\"}"
done
sleep 4

# Live counts from the in-memory state store
curl -sS http://localhost:9080/api/reports/payment-stats | jq

# Wait 35s for MI to fire at least once
sleep 35
curl -sS "http://localhost:9080/api/reports/runs?limit=3" | jq '[.[] | {source, succeededCount, failedCount}]'

The latest report_run row should have source: "mi-scheduled-task" and counts matching the live state store.

What you have

  • A stateful Kafka Streams topology inside Liberty, with a queryable in-memory state store.
  • An MI scheduled task that fires on a fixed interval.
  • A report_run audit table with one row per task tick.
  • The “REST endpoint on insurance-net = trust by network boundary” pattern for internal scheduler → app calls.

Next: 20 — Feature: Audit trail →