Hatchgrid
for Astrum
Understanding Reactive Programming with Spring WebFlux: Benefits and Use Cases

Understanding Reactive Programming with Spring WebFlux: Benefits and Use Cases

Technology
reactive-programmingspring-bootbackendengineering

Reactive programming has become increasingly important in modern backend development, especially when dealing with high-concurrency applications and I/O-intensive operations. Spring WebFlux brings reactive programming to the Spring ecosystem, offering a non-blocking alternative to traditional servlet-based applications.

What is Reactive Programming?

Reactive programming is a programming paradigm that deals with asynchronous data streams and the propagation of changes. Instead of blocking threads while waiting for I/O operations, reactive systems process data as it becomes available, making efficient use of system resources.

Key Concepts

Asynchronous and Non-blocking Traditional blocking I/O keeps threads waiting:

// Blocking approach
public User getUserById(Long id) {
    // Thread waits here until database responds
    return userRepository.findById(id);
}

Reactive approach frees up threads:

// Reactive approach
public Mono<User> getUserById(Long id) {
    // Thread is freed immediately, result comes later
    return userRepository.findById(id);
}

Back Pressure Reactive systems can handle situations where data producers are faster than consumers:

@Service
public class UserService {
    
    public Flux<User> getAllUsers() {
        return userRepository.findAll()
            .limitRate(100) // Process max 100 items at a time
            .delayElements(Duration.ofMillis(10)); // Add small delay
    }
}

Spring WebFlux vs. Spring MVC

When to Choose WebFlux

WebFlux is ideal for:

  • High-concurrency applications (thousands of simultaneous connections)
  • I/O-intensive operations (database queries, external API calls)
  • Streaming data applications
  • Microservices with many external dependencies

Stick with MVC for:

  • CPU-intensive operations
  • Blocking dependencies that can’t be made reactive
  • Teams unfamiliar with reactive programming
  • Simple CRUD applications with low concurrency

Performance Comparison

// MVC Controller (blocking)
@RestController
public class UserController {
    
    @GetMapping("/users/{id}")
    public ResponseEntity<User> getUser(@PathVariable Long id) {
        User user = userService.findById(id); // Blocks thread
        return ResponseEntity.ok(user);
    }
}

// WebFlux Controller (non-blocking)
@RestController
public class ReactiveUserController {
    
    @GetMapping("/users/{id}")
    public Mono<ResponseEntity<User>> getUser(@PathVariable Long id) {
        return userService.findById(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }
}

Core Reactive Types in WebFlux

Mono<T> - Single Value or Empty

Mono represents 0 or 1 element:

@Service
public class UserService {
    
    public Mono<User> createUser(User user) {
        return userRepository.save(user)
            .doOnSuccess(savedUser -> 
                log.info("User created: {}", savedUser.getId()))
            .doOnError(error -> 
                log.error("Failed to create user", error));
    }
    
    public Mono<User> getUserByEmail(String email) {
        return userRepository.findByEmail(email)
            .switchIfEmpty(Mono.error(
                new UserNotFoundException("User not found: " + email)));
    }
}

Flux<T> - Multiple Values

Flux represents 0 to N elements:

@Service
public class UserService {
    
    public Flux<User> getUsersByDepartment(String department) {
        return userRepository.findByDepartment(department)
            .filter(user -> user.isActive())
            .map(this::enrichUserData)
            .sort(Comparator.comparing(User::getLastName));
    }
    
    public Flux<UserDto> streamUsers() {
        return userRepository.findAll()
            .map(this::convertToDto)
            .delayElements(Duration.ofMillis(100)); // Simulate streaming
    }
}

Practical Implementation Examples

1. Reactive REST API

@RestController
@RequestMapping("/api/users")
public class ReactiveUserController {
    
    private final UserService userService;
    
    @GetMapping
    public Flux<UserDto> getAllUsers(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size) {
        
        return userService.findAllUsers(page, size)
            .map(this::toDto);
    }
    
    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserDto>> getUser(@PathVariable Long id) {
        return userService.findById(id)
            .map(this::toDto)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }
    
    @PostMapping
    public Mono<ResponseEntity<UserDto>> createUser(
            @Valid @RequestBody CreateUserRequest request) {
        
        return userService.createUser(request)
            .map(this::toDto)
            .map(user -> ResponseEntity.status(HttpStatus.CREATED).body(user))
            .onErrorResume(ValidationException.class, 
                ex -> Mono.just(ResponseEntity.badRequest().build()));
    }
    
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<UserDto> streamUsers() {
        return userService.streamAllUsers()
            .map(this::toDto);
    }
}

2. Reactive Database Access with R2DBC

@Repository
public class ReactiveUserRepository {
    
    private final R2dbcEntityTemplate template;
    
    public Mono<User> save(User user) {
        return template.insert(user);
    }
    
    public Mono<User> findById(Long id) {
        return template.selectOne(
            Query.query(where("id").is(id)), 
            User.class
        );
    }
    
    public Flux<User> findByDepartment(String department) {
        return template.select(
            Query.query(where("department").is(department)), 
            User.class
        );
    }
    
    public Mono<Long> countActiveUsers() {
        return template.count(
            Query.query(where("active").is(true)), 
            User.class
        );
    }
}

3. Combining Multiple Reactive Streams

@Service
public class UserAggregationService {
    
    public Mono<UserProfileDto> getUserProfile(Long userId) {
        Mono<User> userMono = userService.findById(userId);
        Mono<List<Order>> ordersMono = orderService.findByUserId(userId);
        Mono<UserPreferences> preferencesMono = 
            preferencesService.findByUserId(userId);
        
        return Mono.zip(userMono, ordersMono, preferencesMono)
            .map(tuple -> UserProfileDto.builder()
                .user(tuple.getT1())
                .orders(tuple.getT2())
                .preferences(tuple.getT3())
                .build());
    }
    
    public Flux<UserActivity> getUserActivityStream(Long userId) {
        Flux<Order> orders = orderService.findByUserIdStream(userId);
        Flux<LoginEvent> logins = loginService.findByUserIdStream(userId);
        
        return Flux.merge(
            orders.map(order -> UserActivity.fromOrder(order)),
            logins.map(login -> UserActivity.fromLogin(login))
        ).sort(Comparator.comparing(UserActivity::getTimestamp));
    }
}

4. Error Handling in Reactive Streams

@Service
public class RobustUserService {
    
    public Mono<User> findUserWithFallback(Long id) {
        return primaryUserService.findById(id)
            .timeout(Duration.ofSeconds(3))
            .onErrorResume(TimeoutException.class, 
                ex -> secondaryUserService.findById(id))
            .onErrorResume(Exception.class, 
                ex -> cacheService.getCachedUser(id))
            .switchIfEmpty(Mono.error(
                new UserNotFoundException("User not found: " + id)));
    }
    
    public Mono<User> createUserWithRetry(User user) {
        return userRepository.save(user)
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
                .filter(throwable -> throwable instanceof DataIntegrityViolationException)
                .doBeforeRetry(signal -> 
                    log.warn("Retrying user creation, attempt: {}", 
                        signal.totalRetries() + 1)));
    }
}

Testing Reactive Applications

Unit Testing with StepVerifier

@ExtendWith(SpringExtension.class)
class UserServiceTest {
    
    @Mock
    private UserRepository userRepository;
    
    @InjectMocks
    private UserService userService;
    
    @Test
    void shouldReturnUserWhenFound() {
        // Given
        User user = new User(1L, "john@example.com", "John Doe");
        when(userRepository.findById(1L)).thenReturn(Mono.just(user));
        
        // When & Then
        StepVerifier.create(userService.findById(1L))
            .expectNext(user)
            .verifyComplete();
    }
    
    @Test
    void shouldReturnEmptyWhenUserNotFound() {
        // Given
        when(userRepository.findById(999L)).thenReturn(Mono.empty());
        
        // When & Then
        StepVerifier.create(userService.findById(999L))
            .verifyComplete();
    }
    
    @Test
    void shouldHandleMultipleUsers() {
        // Given
        List<User> users = Arrays.asList(
            new User(1L, "john@example.com", "John"),
            new User(2L, "jane@example.com", "Jane")
        );
        when(userRepository.findAll()).thenReturn(Flux.fromIterable(users));
        
        // When & Then
        StepVerifier.create(userService.findAll())
            .expectNext(users.get(0))
            .expectNext(users.get(1))
            .verifyComplete();
    }
}

Integration Testing

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = "spring.r2dbc.url=r2dbc:h2:mem:///testdb")
class UserControllerIntegrationTest {
    
    @Autowired
    private WebTestClient webTestClient;
    
    @Autowired
    private UserRepository userRepository;
    
    @Test
    void shouldCreateUser() {
        CreateUserRequest request = new CreateUserRequest("john@example.com", "John Doe");
        
        webTestClient.post()
            .uri("/api/users")
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(request)
            .exchange()
            .expectStatus().isCreated()
            .expectBody(UserDto.class)
            .value(user -> {
                assertThat(user.getEmail()).isEqualTo("john@example.com");
                assertThat(user.getName()).isEqualTo("John Doe");
            });
    }
    
    @Test
    void shouldStreamUsers() {
        // Given
        userRepository.saveAll(Arrays.asList(
            new User("user1@example.com", "User 1"),
            new User("user2@example.com", "User 2")
        )).blockLast();
        
        // When & Then
        webTestClient.get()
            .uri("/api/users/stream")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .expectStatus().isOk()
            .returnResult(UserDto.class)
            .getResponseBody()
            .take(2)
            .as(StepVerifier::create)
            .expectNextCount(2)
            .verifyComplete();
    }
}

Performance Considerations and Best Practices

1. Thread Pool Configuration

# application.yml
spring:
  reactor:
    netty:
      pool:
        max-connections: 1000
        max-idle-time: 30s
  r2dbc:
    pool:
      initial-size: 10
      max-size: 50
      max-idle-time: 30m

2. Avoid Blocking Operations

// ❌ Bad: Blocking in reactive stream
public Mono<User> processUser(User user) {
    return Mono.just(user)
        .map(u -> {
            // This blocks the reactive thread!
            Thread.sleep(1000);
            return u;
        });
}

// ✅ Good: Use reactive alternatives
public Mono<User> processUser(User user) {
    return Mono.just(user)
        .delayElement(Duration.ofSeconds(1))
        .map(u -> {
            // Non-blocking transformation
            return enrichUser(u);
        });
}

3. Memory Management

@Service
public class OptimizedUserService {
    
    public Flux<UserDto> getAllUsersOptimized() {
        return userRepository.findAll()
            .buffer(100) // Process in batches
            .flatMap(users -> 
                Flux.fromIterable(users)
                    .map(this::toDto)
                    .subscribeOn(Schedulers.parallel())
            );
    }
    
    public Flux<User> streamLargeDataset() {
        return userRepository.findAll()
            .limitRate(50) // Control backpressure
            .onBackpressureBuffer(1000) // Buffer up to 1000 items
            .share(); // Share among multiple subscribers
    }
}

Common Pitfalls and How to Avoid Them

1. Blocking in Reactive Chains

// ❌ Wrong
public Mono<String> processData(String data) {
    return webClient.get()
        .retrieve()
        .bodyToMono(String.class)
        .map(result -> {
            // This blocks!
            return blockingService.process(result);
        });
}

// ✅ Correct
public Mono<String> processData(String data) {
    return webClient.get()
        .retrieve()
        .bodyToMono(String.class)
        .flatMap(result -> 
            reactiveService.process(result)
        );
}

2. Not Subscribing to Reactive Streams

// ❌ Wrong: Nothing happens without subscription
public void updateUser(User user) {
    userRepository.save(user); // This does nothing!
}

// ✅ Correct: Subscribe to execute
public Mono<User> updateUser(User user) {
    return userRepository.save(user); // Return Mono for subscription
}

3. Improper Error Handling

// ❌ Wrong: Errors kill the stream
public Flux<User> processUsers() {
    return userRepository.findAll()
        .map(user -> {
            if (user.isInvalid()) {
                throw new IllegalArgumentException("Invalid user");
            }
            return user;
        });
}

// ✅ Correct: Handle errors gracefully
public Flux<User> processUsers() {
    return userRepository.findAll()
        .flatMap(user -> {
            if (user.isInvalid()) {
                return Mono.empty(); // Skip invalid users
            }
            return Mono.just(user);
        })
        .onErrorContinue((error, user) -> 
            log.warn("Skipping invalid user: {}", user, error));
}

Real-World Use Cases

1. High-Frequency Trading System

@Service
public class TradingService {
    
    public Flux<Trade> streamTrades() {
        return marketDataService.getPriceStream()
            .buffer(Duration.ofMillis(100))
            .flatMap(prices -> 
                tradingAlgorithm.generateTrades(prices))
            .flatMap(trade -> 
                tradeExecutor.execute(trade))
            .share();
    }
}

2. Real-Time Chat Application

@Controller
public class ChatController {
    
    @MessageMapping("/chat")
    public Flux<ChatMessage> handleChat(Flux<ChatMessage> messages) {
        return messages
            .flatMap(message -> 
                chatService.processMessage(message))
            .share();
    }
}

3. IoT Data Processing

@Service
public class IoTDataProcessor {
    
    public Flux<ProcessedData> processIoTStream() {
        return sensorDataRepository.streamData()
            .window(Duration.ofMinutes(1))
            .flatMap(window -> 
                window.reduce(this::aggregateData))
            .flatMap(aggregated -> 
                anomalyDetector.detect(aggregated));
    }
}

Conclusion

Spring WebFlux and reactive programming offer powerful tools for building high-performance, scalable applications. While the learning curve can be steep, the benefits in terms of resource utilization and scalability are significant for the right use cases.

Key takeaways:

  • Use reactive programming for I/O-intensive, high-concurrency applications
  • Master Mono and Flux operations for effective stream processing
  • Avoid blocking operations in reactive chains
  • Test thoroughly with StepVerifier and WebTestClient
  • Monitor performance and tune thread pools appropriately

When to consider WebFlux:

  • Building microservices with many external dependencies
  • Processing streaming data
  • Creating real-time applications
  • Scaling to thousands of concurrent connections

Start small with reactive programming - convert one service at a time and measure the performance impact. With proper understanding and implementation, reactive programming can significantly improve your application’s scalability and responsiveness.


Ready to go reactive? Start by identifying the most I/O-intensive parts of your current application and consider converting them to reactive patterns. The investment in learning will pay dividends in application performance and scalability.

Share this article

Get more insights delivered to your inbox

Join the discussion

Comments coming soon...