The Java 8 specification brought us the reactive programming model in 2014, allowing software developers to deliver highly responsive user experiences. That specification included a stream-based standard for non-reactive programming, which enables data processing following a chain/sequence format. These two models should not be confused.

We will discuss the advantages of developing applications based on the reactive model and show a few examples based on what we do at Pismo. Let’s start by seeing how the Reactive Manifesto, from 2013, describes reactive systems:

This architecture allows developers to build systems that are event-driven, scalable, resilient and responsive. They deliver highly responsive user experiences with a real-time feel. They are backed by a scalable and resilient application stack, ready to be deployed on multicore and cloud computing architectures.

What kind of advantage do we gain by using the reactive model? A query on a database sometimes returns an enormous amount of data. In microservices-based applications, this may cause failures such as the dreaded StackOverflowError. It happens when the data extracted by the query requires more memory space than the amount reserved for the Java Virtual Machine (JVM).

When we use the reactive model, the risk of such errors significantly decreases since data is retrieved in sequence. Thus we can avoid receiving an excessive amount of data that could cause failures and stop data treatment. If a failure does occur, it doesn’t affect the subsequent data processing. Without such errors, we use less memory, processing time, and I/O capacity.

How reactive programming works

To show how we do this, let’s define a project using the Gradle build automation tool and JVM 11. To make the task easier, we will use the Reactor Core library, which follows the reactive programming model from the Java 8 specification. While we create the project using Gradle, let’s add all the required dependencies.

# bash:

plugins {

id ‘java’
id “io.spring.dependency-management” version “1.0.7.RELEASE”

}

group ‘org.example’
version ‘1.0-SNAPSHOT’

repositories {

mavenCentral()

}

dependencies {

implementation platform(‘io.projectreactor:reactor-bom:2020.0.10’)
implementation ‘io.projectreactor:reactor-core’

testImplementation ‘org.junit.jupiter:junit-jupiter-api:5.7.0’
testRuntimeOnly ‘org.junit.jupiter:junit-jupiter-engine:5.7.0’

}
test {

useJUnitPlatform()

}

Using a Flux object

Using Reactor Core, we can work with two main kinds of objects:

  • Flux, with zero or N items
  • Mono, with zero or one item

To show how this works, we created a test class (src/test/java) called io.pismo.reactor.ExemploTest. Initially, we will add the following content to the class:

// Java:

package io.pismo.reactor;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

public class ExemploTest {

@Test
public void fluxTest() {

Flux<String> stringFlux = Flux

.just(“One”, “Two”, “Three”, “Four”)
.log();

stringFlux

.subscribe(System.out::println,

(e) -> System.err.println(e),
() -> System.out.println(“Completed!”)

);

}

}

In the above example, we created a Flux object, starting with a 4-item list. When we run this code, we get the following output:

[ INFO] (Test worker) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (Test worker) | request(unbounded)
[ INFO] (Test worker) | onNext(One)
One
[ INFO] (Test worker) | onNext(Two)
Two
[ INFO] (Test worker) | onNext(Three)
Three
[ INFO] (Test worker) | onNext(Four)
Four
[ INFO] (Test worker) | onComplete()
Completed!

The subscribe() method processes the list. It receives three parameters:

  • How the list should be treated. In our example, it should just be printed (System.out::println).
  • What should happen in case of error ((e) -> System.err.println(e)).
  • What should be executed irrespective of success or failure (System.out.println(“Completed!”)).

If the subscribe() method is not called, the system doesn’t process the list. We added the log() method at the end to show that the Flux object treats one value at a time (onNext), exemplifying the sequenced data treatment. Note that the reactive model uses the Observer design pattern. So we have one or more listeners detecting event changes.

The Mono object

Now we can do the same thing with the Mono object:

// Java:

@Test
public void monoTest(){

Mono<String> stringMono = Mono

.just(“One”)
.log();

stringMono

.subscribe(System.out::println,

(e) -> System.err.println(e),
() -> System.out.println(“Completed!”)

);

}

The main difference between using Flux and Mono is the number of values with which we can work.

Handling errors

Now let’s see a demonstration of how we treat errors when there is a large volume of data.

// Java:

@Test
public void fluxTestRange() {

Flux<Integer> integerFlux = Flux

.range(1,1000)
.map(i -> {

if (i<=10) return i;
throw new RuntimeException(“Number is larger than 10”);

})
.log();

integerFlux

.subscribe(System.out::println,

(e) -> System.err.println(e),
() -> System.out.println(“Completed!”)

);

}

The above code treats a data range with values from 1 to 1000. However, during the execution, when the number surpasses 10, an exception occurs. Thus only the numbers 1 to 10 are printed. Then the program throws the exception and shows it on the screen. Finally, it prints the word “Completed!”

As happens when we work with a Java Stream, we can also filter the data, convert values, concatenate lists, and repeat sequences, among other operations.

Processing data sequentially

The following examples are methods that we use at Pismo. They receive an object from a request and then treat the data. The treatment may include adding data to send it to the subsequent data chain. Then we return the object with all the information filled in. So we get the complete object with new values and information.

Initially, let’s use the .map() method as an example.

// Java:

@Test
public void convertMapFlux(){

List<String> names = Arrays.asList(“adam”, “anna”, “jack”, “jenny”);
Flux<Integer> integerFlux = Flux.fromIterable(names)

.map(s -> s.length())
.log();

integerFlux

.subscribe(System.out::println,

(e) -> System.err.println(e),
() -> System.out.println(“Completed!”)

);

}

In the above example, we converted a value list into a Flux. Then the .map() method returns the size of each name, altering the answer to an integer Flux.

An example with .flatMap()

In the following example we will use .flatMap():

// Java:

class People {

private String name;
private String address;
private Integer age;

//getters and setters

@Override
public String toString() {

return “People{” +

“name='” + name + ‘\” +
“, address='” + address + ‘\” +
“, age=” + age +
‘}’;

}

}

@Test
public void convertFlatMapMono() {

Mono<People> monoPeople = Mono.just(new People())

.flatMap(people -> {

people.setName(“John Doe”);
return Mono.just(people);

})
.flatMap(people -> {

people.setAddress(“Street from nowhere”);
return Mono.just(people);

})
.flatMap(people -> {

people.setAge(30);
return Mono.just(people);

}).log();

monoPeople

.subscribe(System.out::println,

(e) -> System.err.println(e),
() -> System.out.println(“Completed!”)

);

}

In the above example, which is a bit more complex than the previous ones, the initial People object has changed values. It is then sent to a new Mono to be treated to the subsequent data stream, and this goes on successively. In the end, we have the People object with all the values gathered by the function calls.

An important thing to notice is that, in this example, Reactor follows the specified sequence. While using this standard at Pismo, we execute database queries, send messages and run other kinds of calls sequentially. This process generates the following output:

[ INFO] (Test worker) | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
[ INFO] (Test worker) | request(unbounded)
[ INFO] (Test worker) | onNext(People{name=’John Doe’, address=’Street from nowhere’, age=30})
People{name=’John Doe’, address=’Street from nowhere’, age=30}
Completed!
[ INFO] (Test worker) | onComplete()

At Pismo, we adopted the reactive programming model using Spring Boot/Webflux and Vert.X frameworks. In both cases, we apply the model to micro-services-based systems in containers. Thus we can increment data in the call execution chain to optimise data treatment and queries on relational and non-relational databases.

References

Reactor Core 3 Reference Guide
Red Hat Developers: Building Reactive Microservices in Java

More Articles

Subscribe To Our Newsletter