Skip to main content

Module 13: Advanced Async Patterns

Cross-turn handlers and advanced reactive patterns.

What You’ll Learn

  • Cross-turn message handlers for logging and metrics
  • Error handling patterns in reactive streams
  • Resource management for long-lived clients
  • When to use advanced patterns vs simple TurnSpec

Prerequisites

This module assumes you’ve completed Module 04: ClaudeAsyncClient and understand the TurnSpec pattern.

Cross-Turn Message Handlers

Register handlers that receive messages across all conversation turns:
ClaudeAsyncClient client = ClaudeClient.async()
    .workingDirectory(Path.of("."))
    .build();

// These handlers receive ALL messages, across ALL turns
client.onMessage(msg -> {
    logger.info("Message received: {}", msg.getType());
});

client.onResult(result -> {
    metrics.recordCost(result.totalCostUsd());
    metrics.recordTurns(result.numTurns());
});

Key Reactive Operators

OperatorPurpose
doOnNextSide effects for each element
doOnSuccessSide effect for Mono result
flatMapChain to next operation (enables multi-turn)
filterSelect specific message types
doOnErrorHandle errors
subscribe()Start the stream (nothing happens until you subscribe)

Error Handling

client.query(prompt).messages()
    .doOnError(error -> {
        logger.error("Stream error", error);
    })
    .onErrorResume(error -> Flux.empty())
    .subscribe();

Use Case: Spring WebFlux SSE Endpoint

One common use case is streaming responses via Server-Sent Events:
@RestController
public class ChatController {
    private final ClaudeAsyncClient client;

    public ChatController() {
        this.client = ClaudeClient.async()
            .workingDirectory(Path.of("."))
            .permissionMode(PermissionMode.BYPASS_PERMISSIONS)
            .build();
    }

    @GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> chat(@RequestParam String message) {
        return client.query(message).textStream();
    }
}
This pattern applies to any reactive web framework, not just Spring WebFlux.

When to Use ClaudeAsyncClient

ScenarioRecommendation
Reactive web applicationClaudeAsyncClient
SSE streaming to browserClaudeAsyncClient
High-concurrency serverClaudeAsyncClient
CLI tool or scriptClaudeSyncClient
Traditional blocking web appClaudeSyncClient
Simpler debugging needsClaudeSyncClient

Tradeoffs and Limitations

Complexity: Reactive programming has a steeper learning curve. Stack traces are harder to read, and debugging requires understanding the reactive execution model. Error handling: Errors propagate differently in reactive streams. Forgetting to handle errors in the reactive chain can cause silent failures. Blocking operations: Mixing blocking calls (e.g., JDBC) with reactive streams defeats the purpose and can cause thread starvation. Use reactive-compatible libraries throughout. Testing: Unit testing reactive code requires StepVerifier or similar utilities. Standard JUnit assertions don’t work directly with Flux and Mono. Resource management: The async client is typically a long-lived managed bean (not created per-request). Close it during application shutdown, not after each request.

Source Code

View on GitHub

Running the Example

mvn compile exec:java -pl module-13-async-advanced

Next Module

Module 14: Permission Callbacks (coming soon) - Dynamic control over tool execution.