Tuesday, November 19, 2019

Reactive Programming: Blocking(Sync) vs Non-Blocking(Async) Request Processing



Blocking Request Processing:

Most of the time, when we expose any endpoint to external world, internally it follows below flow:

controller <-> service <-> dao <-> DB

The existing servlet frameworks, assign a separate thread to each request and that thread will be blocked for that user, until response is sent or process is erred out. Its a synchronized processing of each request.


Blocking request processing


Non - Blocking Request Processing:

This is the new style of programming, where the flow of processing would be same as below, however with each request we have event handlers and call back functions.

                                    controller <-> service <-> dao <-> DB

When any request comes, new generation of web containers ( like netty, undertow, Servlet 3.1+) will start assigning threads to each request, however as any request wait for any slow process to finish, that request will be moved to some kind of queue via handlers and that thread will be freed up for new requests, when slow process will finish for that request, callback will be initiated with received data. Here basically requests are asynchronous in nature and threads are not blocked.

Non-blocking request processing

image courtesy: https://howtodoinjava.com/spring-webflux/spring-webflux-tutorial/

Saturday, November 16, 2019

Spring 5: WebClient vs RestTemplate

In today's micro-services world, where almost everything is exposed as REST endpoints, we are bound to call these REST services.

How do we call ?

We have a convenient class called RestTemplate, provided by Spring. However this is synchronous in nature, it uses Thread-per-Request method. Until each request is completed and response is sent back to user or service erred out, the thread will be blocked out for that user.

Just assume this for heavy traffic but slow response application, within seconds the app will be sending 404 or 500 as it can't handle the request anymore as well as so many existing requests (threads) will degrade the performance of the application.


@GetMapping("/employees")
public List<Employee> getEmployeesBlocking() {
    log.info("Starting BLOCKING Controller!");
    final String uri = getSlowServiceUri();

    RestTemplate restTemplate = new RestTemplate();
    ResponseEntity<List<Employee>> response = restTemplate.exchange(
      uri, HttpMethod.GET, null,
      new ParameterizedTypeReference<List<Employee>>(){});

    List<Employee> result = response.getBody();
    result.forEach(employee -> log.info(employee.toString()));
    log.info("Exiting BLOCKING Controller!");
    return result;

}

is it worth for thread to be blocked, waiting for response from called endpoint ?

can't we free up the thread and queue the task, which can be picked up when we get the response ?

With RestTemplate answer is no... as it is Synchronous in nature, however Spring 5 has new class called WebClient, which can do the job for us.

WebClient works on the concept of Asynchronous and non blocking strategy. it is part of WebFlux Project which is reactive programming extension provided by Spring.

Basically behind the scene it will treat each request as task and queue them when they are waiting for response. The serving thread will be unblocked to serve other requests and once response received for that task, webclient will activate the task by assigning thread to it and finish rest of the execution as programmed.


@GetMapping(value = "/employees",
            produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Employee> getEmployeesNonBlocking() {
    log.info("Starting NON-BLOCKING Controller!");
    Flux<Employee> employeeFlux = WebClient.create()
      .get()
      .uri(getSlowServiceUri())
      .retrieve()
      .bodyToFlux(Employee.class);

    employeeFlux.subscribe(employee -> log.info(employee.toString()));
    log.info("Exiting NON-BLOCKING Controller!");
    return employeeFlux;

}

RestTemplate is getting deprecated and will not be supported in future versions of Spring. We should start using WebClient. It not only supports Asynchronous it also supports Synchronous calls so we call use WebClient as a replacement for RestTemplate.

Reactive Programming: What is it ?

Reactive means : Anything which responds based on some events.

If we go by the name 'Reactive Programming', it means a programming technique where things get executed based on some reactions or in programming terms events drive the execution.

Aren't we familiar with event based execution:

1. We have queues where in case of message arrival, some code get executed or
2. we have listeners which continuously listen to events, like arrival of some message on web sockets to process it.

So aren't we already writing reactive programs ? What is new here ?

The new is the level of complexities involved in the way we write Reactive Programs

1. These old way of reactive coding, we require a dedicated external system like queue for publishing and subscription. You require some sort of dedicated listener who can take care of every message arrival.

2. In case of plain java event based programming we have to block 1 thread which is continuously listening to the events and doing whatever needed.

In nutshell, old way of programming involves many components or wastage of precious resources to achieve reactive programming.

The new Reactive library will do away with this complexities and it provide in build APIs which do the job of reaction :) based on the events.

Whatever may be the paradigm, for reactive programming below three actors are must:

1. Observables : which will emit events called Publisher as well
2. Observers : which will consume events called Subscriber as well
3. Scheduler: which will schedule or control the flow of events




image courtesy: https://medium.com/@kevalpatel2106/what-is-reactive-programming-da37c1611382


Java 9 has a Reactive Stream API to support Reactive Programming. It majorly contains below 4 interfaces :

Publisher.java

public interface Publisher<T>
{
    public void subscribe(Subscriber<? super T> s);
}

Subscriber.java

public interface Subscriber<T>
{
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscription.java

public interface Subscription<T>
{
    public void request(long n);
    public void cancel();
}

Processor.java

public interface Processor<T, R> extends Subscriber<T>, Publisher<R>
{
}