Spring Spring Boot Webflux

Developers guide to Webflux

Intro

This article aims to shed light on the building blocks of Webflux, only scratching the surface of the most important concepts.

Spring Boot 2.0 just got it's RELEASE badge, introducing Webflux support to Kotlin.

Spring frameworks already contain lots of awesome features that make developers' life easier. We can already make web based services/applications with great performance so the following question needs to be raised: why should we bother at all?

Let's see why Webflux with the new release is a big deal!

Webflux is a reactive model-based web framework. Well, basically that's all. Let's break this sentence into smaller sections to understand what's actually hidden behind it!

Webflux vs MVC

Webflux is a web framework. That's clear, but we already have the Spring MVC web framework. Why do we need a new one then?
The core of Spring MVC web communication is the Servlet API and it's built upon servlet containers that implement it, like Tomcat or Jetty. This API is quite strict on how communication goes from client to server and does not leave much room for flexibility.
Webflux doesn't depend on the Servlet API, meaning that it can use both servers that's built and not built (Netty, Undertow) around the Servlet API.

Netty

Spring Boot's Webflux module comes with Netty right out of the box.
"Netty is an asynchronous event-driven network application framework for rapid development and maintainable high performance".
Sounds as good as any other one-liner but what does the "asynchronous event-driven" part stand for?
You can think of it as non-blocking IO. However, if this doesn't sound familiar to you, let me help.

First, let's wrap up how a server works!
Before making any request, both the client and the server needs to bind itself to a socket (port) to establish a connection. The server is listening on the socket, while the client sends a request for the connection. After the connection has been established, the client starts to send data, the server processes it and sends a response. Following that, the connection can be closed.
Most of the time is spent on waiting for the connection to be established and the data to be received.

There is one more thing left before understanding what's non-blocking IO and that's understanding blocking IO.

Blocking IO

In the case of blocking IO there is a thread that handles every connection from opening until closing. This means there can only be one single open connection, as the thread is busy managing that connection. While others try to connect, nothing will happen, until the opened connection is closed. That's where the term 'blocking' comes from.

blocking-io-1

Threaded-Blocking IO

There is a solution to that called Threaded-blocking IO. We bind every new connection to a new socket and create a new thread to process the data. When a client requests a connection from the server socket it sends back a new socket to communicate with. That's good enough because quite many threads can be created based on the CPU and OS limits. Now multiple connections can be handled until the limit of the maximum number of threads is reached. Each thread requires memory allocation, so while the concurrent connection goes up, the performance of the server goes down.
A thread pool could also be used here but that would still limit the number of connections, although without OutOfMemoryExceptions.
threaded-blocking-io-1

Non-blocking IO

That's how we end up with Non-blocking IO.
Instead of binding the thread to a connection, a notification is sent when the data is ready to read from a buffer. NIO uses buffers instead of streams to read/write data, because data is processed when it's already there, not when it's actually being received. With this mechanism, multiple connections can be handled by one thread. That's how finally no threads are blocked while waiting on the IO.
The thread is only responsible for reading the buffers when it's notified about the arrival of any new data. This mechanism is called the 'event loop'.
nio-1
Netty makes use of 2 event loops. The first one is called the 'boss' and the second is the 'worker'. The boss accepts incoming connections and registers them for the worker. When the data is available, the worker is notified and processes the data. Usually multiple threads are used as boss and worker. In this case they are called 'event loop groups'.

Following that logic, lots of concurrent connections can be handled with just a couple of threads. That's why NIO-based servers are so awesome, and serve as our first building blocks 🚀.

Now, we've got a great non-blocking server, but we also need an application layer that doesn't block. After all, it makes no difference having a non-blocking code when you block the non-blocking code, does it?
It can happen mostly when reaching out to external resources like reading from files, making HTTP calls or accessing a database. This can ring a bell, since we already discussed the same topic concerning NIO. The goal is the same, not to block our thread while waiting for external resources. Instead of waiting and doing nothing while the data is streaming, we would just want to be notified that the data has arrived.

So how should we solve the problem?

Callbacks

Callbacks could be used in a way but that would just be boilerplate, and it would result in lack of functionality. Adding a couple of more inner callbacks would make understanding the code more difficult.

  fun callAsync(id: String, completion: (Post) -> Unit) {
    getPost(id) { response ->
      insertPost(response) { post ->
        completion(post)
      }
    }
  }

  fun getPost(id: String, completion: (Post) -> Unit) {
    //DO HTTP request
  }

  fun insertPost(item: Post, completion: (Post) -> Unit) {
    //DO Database insert
  }

Reactive

Reactive programming can save us in this situation. By definition, reactive programming is about using asynchronous, non-blocking building blocks with a functional style of coding.
There are a couple of frameworks that make use of this approach like Reactor Core, RxJava2, and Akka Streams. To make them interoperable, a standard has been created called Reactive Streams. The API of Reactive Streams contains only 4 interfaces. Publisher, Subscriber, Subscription and Processor.

For now, it's enough to understand the Publisher and Subscriber.

interface Publisher<T> { 
  fun subscribe(s: Subscriber<in T>)
}

interface Subscriber<T> {
    fun onSubscribe(Subscription s);
    fun onNext(T t);
    fun onError(Throwable t);
    fun onComplete();
}

The Publisher can emit either zero or more elements or it can result in an error. To make any use of the emitted data we need to subscribe to the Publisher by a Subscriber. The Subscriber will receive emitted data via the onNext method or in case of an error the onError method will be called. If the emitting has ended without any error the onComplete method is called.

An HTTP call can be described as a Publisher that creates an HTTP request, makes the call, then emits the response to the Subscriber when the response has arrived. It's up to the Subscriber what happens to the emitted response.

Based on the Reactive Streams API it would look like this:

val publisher: Publisher<String> = httpRequest()
publisher.subscribe(object : Subscriber<String> {
  override fun onComplete() {}
  override fun onError(t: Throwable) {}
  override fun onSubscribe(s: Subscription) {}
  override fun onNext(response: String) { println(response)} 
})

The actual HTTP call isn't made by the httpRequest(). It's just a blueprint how the HTTP request can be called. It will only be fired by subscribing to the Publisher.

Reactive Core

Webflux is built on top of the Reactor Core from the Reactive Streams implementations. There are 2 implementations of the Publisher interface in Reactor Core. Flux can emit 0 to n elements and Mono can emit 0 or 1 element.

Emitting a string element and printing it to the console can be done by a Mono type:

Mono.just("one").subscribe { println(it) }
// one

To emit a list of strings and print each one to the console, Flux can be used:

Flux.fromIterable(listOf("one", "two", "three"))
  .subscribe { println(it) }
// one
// two
// three

Let's see one more example to understand how it works in the background:

Mono.just("reactive code").subscribe { println(it) }
println("iterative code")
// reactive code
// iterative code

It's obvious that this reactive code blocks the iterative code from being printed and therefore it needs to wait while the reactive code terminates first. By default it uses the current thread. Reactor doesn't enforce using a concurrency model, it rather leaves it up to the developer. However, it provides an easy and smart way to manage and control threads.

Mono.just("reactive code")
  .subscribeOn(Schedulers.single())
  .subscribe { println(it) }
println("iterative code")
// iterative code
// reactive code

By calling the subscribeOn(Schedulers.single()), reactive code runs on a new thread rather than the current one, hence not blocking the iterative code anymore.

There is one more thing left to understand about how reactive code can be composed: with operators.
Most operators operate on a Publisher and return with a new Publisher. This allows us to apply these operators one after the other, in a chain. Each operator in the chain adds behavior to the Publisher that results from the operation of the previous operator.
The whole chain is thus linked, so that data originates from the first Publisher and moves down the chain, transformed by each link. Eventually, a Subscriber finishes the process.

Mono.just(1).flatMap { Mono.just("$it + two") }.subscribe { println(it) }
// 1 + two

flatMap() is one of the most used operators. It transforms the item emitted by the Mono into a new Mono instance. You're probably asking now why this is useful? The new Mono can emit a different type of element than the original or can also be empty or throw errors. It can also be a chain of Monos.

A server application can be imagined as a Publisher emitting a request when a new connection is registered. Operators can be chained one after the other to process the request and understand what the input data is, what to do with it and to generate a response. At the end of the chain there is a Subscriber in order to send the response back to the Client.

Simply put, this is what Webflux is about. It pairs a non-blocking server implementation with a reactive framework to handle the request and response in a non-blocking way 🚀.

Application

All of that sounds nice but let's see what we can achieve with this.

@Document
data class Post(@Id val id: Int, val userId: Int, val title: String, val body: String)

interface PostRespository : ReactiveMongoRepository<Post, Int> {}

@RestController
class PostController(private val postRespository: PostRespository) {

  @GetMapping("/posts/{id}")
  fun get(@PathVariable id: String) = getPost(id)
    .flatMap { insertPost(it) }

  fun getPost(id: String) = WebClient
    .create("https://jsonplaceholder.typicode.com/posts/$id")
    .get()
    .retrieve()
    .bodyToMono(Post::class.java)

  fun insertPost(post: Post) =
    postRepository.findById(post.id)
    .switchIfEmpty(postRepository.insert(post))
}

That's all the code that we need in order to have a REST API server with a /posts/{id} endpoint that makes an HTTP request, parses the response into an object, stores it in a database and returns the response of the database in a JSON format.

Let's break the code to understand what's happening:

  • 1-2 Post class can be persisted to the MongoDB by annotating with the @Document annotation.
  • 4 Implementing the ReactiveMongoRepository can access reactive domain specific CRUD methods.
  • 6-7 @RestController is a short-cut for @Controller and @ResponseBody annotations.
  • 9-11 @GetMapping creates a /posts/{id} HTTP GET endpoint.
    getPost returns a Mono<Post> that emits the Post with the given id. If such Post is found the insertPost will store it in the MongoDB and returns a new Mono<Post> that has been stored in the database.
  • 13-17 Creates a reactive HTTP GET request to the given URL and parses the response to a Mono<Post> type.
  • 19-21 Checks if Post with the given id can be found in the database. If it does returns it otherwise inserts the post to the MongoDB in a reactive way.

If you have used Spring MVC before, most of the code should be familiar to you, because Webflux uses the same annotations. The main differences are based on the return type of the functions - it needs to be Reactive based, either Mono or Flux.

Only 4 more lines of codes are necessary to fire the server up.

@SpringBootApplication
class AwesomeWebfluxApplication

fun main(args: Array<String>) {
  runApplication<AwesomeWebfluxApplication>(*args)
}

In my opinion, that's quite a minimal code compared to how much it is capable of in reality 💪.

The code of the application can be found on GitHub.

The server can be tested locally by a calling localhost:8080/posts/1 endpoint. Calling it will returns with the Post containing the id 1 in a JSON format.

Summary

With the help of non-blocking, asynchronous code, we can use less threads for the same amount of job to be done. More stability and scalability can be attained. Webflux provides a way to achieve this via configurable, minimal and clean code.
It cannot solve all of our problems, but can be useful in the following situations:

  • remote server needed to be called but the response time is high;
  • high number of clients with slow internet;
  • every time when using Spring MVC 🙊

As mentioned before, this article was only scratching the surface of the topic to better understand what goes on under the hood, and why Webflux can be useful. Lots of great features were left out of the article to keep it somewhat short, feel free to explore them for yourself.