Reactive Programming with Spring WebFlux

Reactive Programming

Recently we are frequently hearing about a new paradigm of coding that is gaining more and more attention, the so mentioned Reactive Programming. The flow of information on the internet regarding what this “hype” is all about is getting noisier, lots and lots of information explaining this model is flowing throughout several technology channels.

Reactive Programming is a model of coding where the communication mainly happens throughout a non-blocking stream of data.  This model of programming, turn your code to be “reactive”, reacting to change and not being blocked, as performing operations that read and wait for responses from a database or file. Hence, in this manner, we enter in a mode where we are reacting to events, as data are becoming available. Its principles are based on the Reactive Manifesto.

Backpressure

One important aspect, worth to mention, regarding this non-blocking processing, is the concept of the backpressure. This is a mechanism that must ensure producers don’t overwhelm consumers. So, for instance, if the consumer can’t handle more than 100 HTTP Requests by second, if eventually, this occurs, the consumer must slow down the flow or even stop for a moment, until the situation gets normalized again.

Hence, in order to deal with an enormous flow of data, this is an important aspect to take into consideration, this way we can ensure the producers won’t overwhelm the consumers.

Specification, Implementations, Libraries and Spring WebFlux

Reactive Programming, Reactive Streams, and Reactor. What do they have to do with each other?  At the start, as we faced this terms reading about the subject, it might be confused figure out what is the difference between them, as we are always seeing those words being used interchangeably. But, actually, they are not exactly the same thing. So,  Just to get a clearer view of them, let’s summarize:

  • Reactive Programming: this is the paradigm, the model that dictates and the reason for the existence of the others below
  • Reactive Streams: this is a specification that defines how should work an API that implements and follows the Reactive Programming paradigm. By the way, JDK 9 it comes already prepared with it.
  • Reactor: this is a Java implementation of the Reactive Streams specification, consequently bring to Java the Reactive Programming model.

Spring WebFlux, is the “reaction” of the Spring for this paradigm to use on web applications. It is a web framework that brings the support for the reactive programming model. The Spring WebFlux it is implemented using the Project Reactor, the library chosen by Spring.

The WebFlux It is not a replacement for the Spring MVC, actually, they can complement each other, working together at the same solution. We are going to use it in our sample to better understand how everything works.

The Problem and the Solution, Hands On!

Let’s travel a little bit to the future, not so very far, we are kind of in this future already. Imagine a Highway, those with a heavy traffic, with a flow of vehicles that could reach something like 5.000 vehicles by hour at the highest peak of a day, even more on holidays. In this future, all vehicles are obligated to have installed an RFID that transmits some data calculated by the vehicle’s computer. In some places in this highway, we have receptors installed, receiving all information of the vehicles, like plate number, weight, speed, color, etc.

reactive-programming-spring-webflux---highway

That information is flowing as a data stream all the time around to a host of the company, that makes all this available throughout its server, with Spring WebFlux Endpoints. Every one interested could connect a WebClient Spring WebFlux to receive this information in real time, as the vehicles go by.

 The Spring WebFlux Server

There are two programming modes to use WebFlux, by annotated controllers (like in Spring MVC) or by functional endpoints. We are using the last one, taking advantage of the Java 8 lambda expressions, implementing through the functional programming model.

First, let’s define in this server a router to receive the requests of the clients interested on the highway vehicles traffic.


@Component
public class HighwayRouter {

    @Bean
    public RouterFunction route(HighwayHandler highwayHandler) {
        return RouterFunctions
        		.route(RequestPredicates.GET("/vehicles")
        				.and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)),
        				highwayHandler::vehicleDetected);
    }

}

Through a RouterFunctions we set up a GET Endpoint named “/vehicles”, answering in the format of a stream of JSON objects.  The stream is coming from the method vehicleDetected on the handler class HighwayHandler.

@Component
public class HighwayHandler {

    @Autowired
    HighwayTraffic highwayTraffic;

    public Mono vehicleDetected(ServerRequest request) {
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(highwayTraffic.flowTraffic(),Vehicle.class);

    }

}

The class responsible to handle the requests have to get the information from some source, as we are dealing with a flow, usually, the source must be capable of generating a data stream. Spring also is adding to Spring Data framework the feature of Reactive Repositories, this way it will be possible to have the source, like a Mongo database, generating in a non-blocking reactive model as well. In our sample, we are simulating this traffic flow in memory.

At the core of this in-memory simulator we have a method that generates the traffic, here is where we create a Flux object. There are basically two types of data object that Reactor provides, Mono and Flux, depends on the cardinality we have to choose one or another.

As we have a response with 0…N registers (vehicles), we use Flux object. Let’s simulate a hot stream, that is, a flow of data that happens uninterruptedly. Different from a cold stream, that is expected to have a finite collection of data.


Cold Stream: it is a flow of data with a fixed length, they are finite, we know previously the size of it, like for example, a list of cities, countries, etc.

Hot Stream: this is more complex to deal with, they are unpredictable, it happens infinitely. They are always running like a constant stream of data that can be subscribed at any point in time.


public Flux<Vehicle> flowTraffic() {
		LocalDateTime startTime = LocalDateTime.now();

		return Flux.<Vehicle>create(fluxSink -> {
			boolean inFrameTime = true;
			int index = 1;
			while (index <= 30000 && inFrameTime && !fluxSink.isCancelled() ) {
				fluxSink.next(HighwayUtilities.simulateTraffic());
				index++;

				long timeMinutesHighwayOpened = startTime.until(LocalDateTime.now(), 
						ChronoUnit.MILLIS);
				if (timeMinutesHighwayOpened > 30000) {
					LOGGER.info("TrafficSimulator finish --> With timer");
					inFrameTime = false;
				}
			}
		}).share();
}

Inside the creation of the Flux object (from line 33 to 41), we have a loop statement where we read the vehicles goes by the highway (our in-memory utility method HighwayUtilities.simulateTraffic(), simulates a car passing by in the highway sending an RFID signal data). In order to not let run infinitely our sample, we configure two ways to stop it, either after 30.000 vehicles passed or 30 seconds ran, what it comes first.

One important thing to pay attention here is that the consumer will be receiving information every time the fluxSink.next( Vehicle ) method is called, while the loop statement is running, not after only it has finished. In the sequence of calls in those two methods of the Flux object, after the method create(…), we call share(), where it returned a new Flux that will multicasts the original. I said will multicasts, because another characteristic of share() is that, the Flux will be emitting data only when at least one subscriber is connected, when there’s no client subscribed the Flux it will be canceled.

The Spring WebFlux Client

Now it is time, as a consumer, to “plug” our WebClient with the server data stream endpoint of the highway concessionary.  We are interested in listening all the vehicles flowing through it. In order to achieve this, let’s use the Spring Webflux WebClient also part of its framework. It is a non-blocking reactive client to execute HTTP requests against reactive streams. It also provides some features like control the backpressure. Let’s create an instance connected to the server.


private WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080").build();

And then we start the connection subscribing to its endpoint when this method below was called.

public Disposable vehicleDetected() {
        AtomicInteger counter = new AtomicInteger(0);
        return webClient.get()
                .uri("/vehicles")
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .exchange()
                .publishOn(Schedulers.single())
                .flatMapMany(response -> response.bodyToFlux(Vehicle.class))
                .delayElements(Duration.ofMillis(1000))
                .subscribe(s -> {
                        System.out.println(counter.incrementAndGet() + " >>>>>>>>>> " + s);
                    },
                    err -> System.out.println("Error on Vehicle Stream: " + err),
                    () -> System.out.println("Vehicle stream stoped!"));
}

Some points to check here:

  • At line 7, we are asking that the functions operate by the Flux (onNext, onComplete and onError) be performed on this supplied thread.
  • At line 9, we are dealing with the backpressure. As the server might stream lots of vehicles by each second, we as a client can’t afford process all this volume in this short period, so we use the delayElements, setting up for 1 vehicle per second max.
  • At line 10, finally here, at the end of the sequence of methods, we subscribe to the endpoint, receiving and work with the data. At this moment the server “open the gate” and let the data flow.

Notice that this is a non-blocking operation, that is, the main thread does not get stuck after the call to those WebClient’s functions, however the lambda expression inside the subscribe method keep receiving information as long as the server is still returning data.

There are more options available for the WebClient, for instance, we can do some filter with the stream, like receive only the vehicles with a speed higher than 120 Km/h, check the line 8 below. Also, in this case as the flow of vehicles usually be smaller, we “relieve” the backpressure a little, delaying only 250 milliseconds.

public Disposable vehicleHigherThen120Detected() {
        AtomicInteger counter = new AtomicInteger(0);
        return webClient.get()
                .uri("/vehicles")
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .exchange()
                .flatMapMany(response -> response.bodyToFlux(Vehicle.class))
                .filter(v -> v.getSpeed() > 120)
                .delayElements(Duration.ofMillis(250))
                .subscribe(s -> {
                        System.out.println(counter.incrementAndGet() + " >>>>>>>>>> " + s);
                    },
                    err -> System.out.println("Error on Vehicle Stream: " + err),
                    () -> System.out.println("Vehicle stream stoped!"));
}

So, that way, we can catch every vehicle that is above the normal speed at this point of the highway, the point is filtering what you really need to process while receiving the continuous flow of the data stream, very useful.

Here the result (part of it) we get in our WebClient subscription to this webflux endpoint data stream.


********************************************************************************
     __  ___       __                           _____
    / / / (_)___ _/ /_ _      ______ ___  __   / ___/___  ______   _____  _____
   / /_/ / / __ `/ __ \ | /| / / __ `/ / / /   \__ \/ _ \/ ___/ | / / _ \/ ___/
  / __  / / /_/ / / / / |/ |/ / /_/ / /_/ /   ___/ /  __/ /   | |/ /  __/ /
 /_/ /_/_/\__, /_/ /_/|__/|__/\__,_/\__, /   /____/\___/_/    |___/\___/_/
         /____/                    /____/                                      

********************************************************************************

2018-04-22 12:43:57 [main] INFO  o.s.b.w.r.c.AnnotationConfigReactiveWebServerApplicationContext - Refreshing org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext@335b5620
2018-04-22 12:44:02 [main] INFO  o.s.b.w.e.netty.NettyWebServer - Netty started on port(s): 8080

13 >>>> Vehicle [carPlateNumber=FCK 8997, weight=1040, speed=137, color=Orange, modelYear=2014, gasType=Gasoline]
14 >>>> Vehicle [carPlateNumber=EWA 3102, weight=1438, speed=125, color=Silver, modelYear=1993, gasType=Gas]
15 >>>> Vehicle [carPlateNumber=DSR 1722, weight=327, speed=149, color=Black, modelYear=1976, gasType=Diesel]
16 >>>> Vehicle [carPlateNumber=VQQ 1013, weight=1344, speed=135, color=Orange, modelYear=1989, gasType=Gasoline]
17 >>>> Vehicle [carPlateNumber=MSZ 7605, weight=3990, speed=168, color=Orange, modelYear=2010, gasType=Gas]
18 >>>> Vehicle [carPlateNumber=QSL 5740, weight=2404, speed=144, color=Blue, modelYear=2014, gasType=Diesel]
19 >>>> Vehicle [carPlateNumber=FLJ 1025, weight=631, speed=119, color=Black, modelYear=2002, gasType=Gas]
20 >>>> Vehicle [carPlateNumber=AQQ 4943, weight=2104, speed=155, color=Black, modelYear=2007, gasType=Diesel]
21 >>>> Vehicle [carPlateNumber=SRW 9566, weight=3428, speed=134, color=Orange, modelYear=2008, gasType=Eletric]

...

Conclusion

In this article, we have taken a peek at what is this new paradigm of reactive programming, this new model of thinking on how to develop solutions that could take advantage of it. We use the Spring WebFlux, a Spring framework for web reactive system development, getting a view working in a sample.

The fact that this model has a non-blocking execution flow takes us to think a little bit different, it is not the same as the blocking execution model, where every call is blocked until the response is presented. This is the same paradigm that faces newcomers developing solution in NodeJS, for instance. It is important to be smart in your choice, if your problem doesn’t fit in a non-blocking model programming, don’t use it just because it is fancy. That’s why, as always, it must be well analyzed when and how it could be applied to contribute to a good solution.

As always get focus on the problem you are in charge to solve, and frequently recall the main objective when the things get fuzzier.

The source code of the sample application is available at GitHub.