Feature: Reporting
Kafka Streams windowed aggregation inside Liberty + a scheduled MI synapse task that snapshots the rolling totals into a report_run table. Slice 12 in the insurance-app repo.
This chapter introduces two enterprise patterns that pair naturally: Kafka Streams for stateful windowed aggregation, and scheduled MI tasks (synapse Quartz) for periodic side-effects. Liberty hosts the streaming topology in-process; MI fires the snapshotter on a 30-second cron-like cadence.
Companion commit: 910825a in insurance-app.
The shape
payment-events ──► Kafka Streams (in Liberty)
groupBy(status from JSON)
.windowedBy(1 min tumbling)
.count(Materialized.as("payment-counts"))
│
▼ in-memory state store
ReadOnlyWindowStore<String, Long>
│
▼ exposed via REST
GET /api/reports/payment-stats
MI synapse task POST /api/reports/snapshot
every 30s ──► insurance-app:9080/api/reports/snapshot
persists a report_run row
with current totals
Two query paths over the same data: real-time via the state store
(GET payment-stats), historical via the report_run table (GET
/reports/runs). Both useful; both fed by the same single Streams
topology.
Kafka Streams — the topology
@ApplicationScoped
public class PaymentReportStreams {
public static final String STORE_NAME = "payment-counts";
private KafkaStreams streams;
void start(@Observes @Initialized(ApplicationScoped.class) Object init) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "insurance-app-payment-report");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder b = new StreamsBuilder();
b.<String, String>stream("payment-events")
.filter((k, v) -> v != null && !v.isBlank())
.groupBy((k, v) -> "status:" + extractStatus(v),
Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count(Materialized.as(STORE_NAME));
streams = new KafkaStreams(b.build(), props);
streams.setUncaughtExceptionHandler(ex -> SHUTDOWN_CLIENT);
streams.start();
}
@PreDestroy void stop() { streams.close(Duration.ofSeconds(5)); }
}
Three pieces:
APPLICATION_ID_CONFIGis both the consumer group and the prefix for the internal-changelog+-repartitiontopics Kafka Streams auto-creates. Unique per app, must be stable across restarts (otherwise state rebuilds from scratch).- Tumbling window = non-overlapping fixed-duration buckets. 1 minute here; a payment counted in 12:00:30 lives in the bucket [12:00:00, 12:01:00).
Materialized.as("payment-counts")= the in-memory KV store the REST endpoint queries. Each window/key combination is its own entry.
Adding kafka-streams to the WAR
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.9.0</version>
</dependency>
Same Kafka-line version as kafka-clients. Streams ships its own
in-memory state store implementation; nothing to install beyond the
JAR.
Querying the state store from REST
public List<WindowedCount> countByStatusLastHour() {
if (streams == null || streams.state() != KafkaStreams.State.RUNNING) return List.of();
ReadOnlyWindowStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.windowStore()));
Instant to = Instant.now();
Instant from = to.minus(Duration.ofHours(1));
List<WindowedCount> out = new ArrayList<>();
for (String status : new String[]{"SUCCEEDED", "FAILED", "UNKNOWN"}) {
try (WindowStoreIterator<Long> it = store.fetch("status:" + status, from, to)) {
while (it.hasNext()) {
var kv = it.next();
out.add(new WindowedCount(status, kv.key, kv.key + 60_000, kv.value));
}
}
}
return out;
}
Notice the state() != RUNNING early-return — the store is
unusable during startup. REST callers can poll without seeing
exceptions during a slow boot.
MI synapse task — Quartz cron in 28 lines
mi/synapse-config/tasks/ReportTask.xml:
<task name="ReportTask"
class="org.apache.synapse.startup.tasks.MessageInjector"
group="synapse.simple.quartz"
xmlns="http://ws.apache.org/ns/synapse">
<trigger interval="30"/>
<property name="injectTo" value="sequence"/>
<property name="sequenceName" value="ReportSnapshotSeq"/>
<property name="message">
<message xmlns="http://ws.apache.org/ns/synapse">
<tick>now</tick>
</message>
</property>
</task>
mi/synapse-config/sequences/ReportSnapshotSeq.xml:
<sequence name="ReportSnapshotSeq" xmlns="http://ws.apache.org/ns/synapse">
<call>
<endpoint>
<http method="post"
uri-template="http://insurance-app:9080/api/reports/snapshot?source=mi-scheduled-task"/>
</endpoint>
</call>
<drop/>
</sequence>
MessageInjector is a stock synapse task class that injects a
synthetic message into a named sequence. <trigger interval="30"/>
is seconds; production cadences would be 5–15 minutes.
The MI Containerfile got two new COPY directives:
COPY synapse-config/sequences/ /home/wso2carbon/wso2mi-4.4.0/repository/deployment/server/synapse-configs/default/sequences/
COPY synapse-config/tasks/ /home/wso2carbon/wso2mi-4.4.0/repository/deployment/server/synapse-configs/default/tasks/
ReportSnapshotResource — the endpoint MI hits
@POST
@Path("/snapshot")
@Transactional
public ReportRun snapshot(@QueryParam("source") String source) {
Map<String, Long> totals = streams.totalsByStatusLastHour();
ReportRun r = new ReportRun();
r.setSucceededCount(totals.getOrDefault("SUCCEEDED", 0L));
r.setFailedCount( totals.getOrDefault("FAILED", 0L));
r.setUnknownCount( totals.getOrDefault("UNKNOWN", 0L));
r.setSource(source != null && !source.isBlank() ? source : "mi-scheduled-task");
r.setCreatedAt(OffsetDateTime.now());
return repo.save(r);
}
No auth. Trust comes from the network boundary — MI is the only
caller on insurance-net, and the endpoint isn’t published through
HAProxy. The trade-off: simpler scheduler ops vs less defense in
depth. For a single-VM teaching artifact, fine; for a multi-tenant
production system, you’d want at minimum a mutual TLS or shared
secret here.
Verify
source ~/insurance-app/.wso2is-creds
AT=$(…)
# Fire a few payments so the state store has data.
for i in 1 2 3; do
curl -sS -o /dev/null -X POST http://localhost:9080/api/payments \
-H "Authorization: Bearer $AT" -H "Content-Type: application/json" \
-H "Idempotency-Key: rpt-$i-$RANDOM" \
-d "{\"policyNumber\":\"$POL\",\"amount\":100,\"currency\":\"USD\"}"
done
sleep 4
# Live counts from the in-memory state store
curl -sS http://localhost:9080/api/reports/payment-stats | jq
# Wait 35s for MI to fire at least once
sleep 35
curl -sS "http://localhost:9080/api/reports/runs?limit=3" | jq '[.[] | {source, succeededCount, failedCount}]'
The latest report_run row should have source: "mi-scheduled-task"
and counts matching the live state store.
What you have
- A stateful Kafka Streams topology inside Liberty, with a queryable in-memory state store.
- An MI scheduled task that fires on a fixed interval.
- A
report_runaudit table with one row per task tick. - The “REST endpoint on insurance-net = trust by network boundary” pattern for internal scheduler → app calls.