Reactive Programming – Part 2

I continue to expound on reactive programming concepts in this note, by diving deeper into the reactor module. Remember in the previous note, I outlined different implementations of the Reactive Specification; Reactor being one of them. This note presents some of the fundamental concepts in the reactor module and how they facilitate programming in a reactive manner.


At the core of the Reactor implementation are the concepts of Publishers and Subscribers. Both concepts are originally developed in the reactive streams module. Reactor builds on top of that. Publishers are responsible for emitting data. Subscribers indicate interest in data emitted by Publishers and execute logic in response to items they receive. So far so good.


Here’s a look into reactive stream’s Publisher interface:

public interface Publisher<T> { 
    void subscribe(Subscriber<? super T> var1);
}

The publisher’s interface is pretty straightforward. A single subscribe method for attaching the Subscriber to the Publisher instance so the Subscriber can receive updates from the Publisher. Internally, the Publisher orchestrates a call to the subscriber’s onNext method (more on this later). Reactor has different Publisher implementations which I will delve into later in this note.


And then, there’s the Subscriber interface: Subscribers expose different methods that plug into a reactive pipeline. These methods are essentially callbacks that include logic in response to specific stream events. I provide comments for some clarity.

public interface Subscriber<T> {

    public void onSubscribe(Subscription s); // invoked after successfully subscribing
    public void onNext(T t); // invoked when an item is emitted from the Publisher upstream
    public void onError(Throwable t); // invoked when an error is encountered
    public void onComplete(); // invoked after all items have been emitted

}

As you might recall from my previous note, the process of reactive programming will typically involve assembling a pipeline where publishers expel streams of data with those streams consumed by subscribers. There are different methods of building these streams depending on the use case and particular scenario. However, Reactor ships with 3 types of publishers:

  1. Mono
  2. Flux
  3. Processor

Let’s look into each of them…

Mono – tone us ?

A Mono is a special kind of publisher that emits zero or one item and then closes the stream. The stream closing eventually invokes onComplete on all associated subscribers on the Mono instance. In other words, a Mono discharges at most one item. Mind you, a Mono can also emit an error. The Mono class contains a bunch of static utility methods that enable the instantiation of a Mono publisher. Here’s some sample code utilizing Mono:


I created a simple implementation of Subscriber interface that basically logs debug statements on the console in response to stream events.

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.logging.Level;
import java.util.logging.Logger;


public class TestSubscriber<T> implements Subscriber<T> {

    private static final Logger logger = Logger.getAnonymousLogger();

    private final String subscriberName;

    public TestSubscriber(String subscriberName) {
        this.subscriberName = subscriberName;
    }
    
    @Override
    public void onSubscribe(Subscription s) {
        logger.info(
            String.format(
                "%s >> Subscription started. Waiting for data!",
                subscriberName
            )
        );

        // publisher will not emit any items until subscriber explicitly issues request. 
        // Passing Long.MAX_VALUE means potentially infite number of items 
        // are being requested
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(T data) {
        logger.info(
            String.format(
                "%s >> New Data: %s", subscriberName, data.toString()
            )
        );
    }

    @Override
    public void onError(Throwable t) {
        logger.log(
            Level.SEVERE, 
            String.format("%s >> %s", subscriberName, t.getMessage()), t
        );
    }

    @Override
    public void onComplete() {
        logger.info(
            String.format("%s >> Subscription completed.", subscriberName)
        );
    }

}

Now we instantiate a Mono to test the publisher.

import reactor.core.publisher.Mono;


public class Monos {

    public static void main(String[] args) throws Exception {
        var mono = Mono.just(2);

        var subscriber1 = new TestSubscriber<>("Subscriber 1");
        var subscriber2 = new TestSubscriber<>("Subscriber 2");

        mono.subscribe(subscriber1);
        mono.subscribe(subscriber2);
    }

}


The code snippet above instantiates two subscribers and subscribes to the Mono which emits only one item to both subscribers. The output looks like this:

INFO: Subscriber 1 >> Subscription started. Waiting for events!

INFO: Subscriber 1 >> New Data: 2

INFO: Subscriber 1 >> Subscription completed.

INFO: Subscriber 2 >> Subscription started. Waiting for events!

INFO: Subscriber 2 >> New Data: 2

INFO: Subscriber 2 >> Subscription completed.

There are several other methods for instantiating a Mono. The just method I used earlier assumes the data already exists. In some cases, you may want to defer the creation of data to be emitted to the subscribers. In which case, you could use methods like fromSupplier, fromFuture, or other static from methods available on Mono.

What the Flux?

A Flux is different from a Mono in that zero or many items are emitted before the stream is closed. Just like Mono, Flux also has several static utility methods to facilitate easy instantiation. Here’s an example of flux usage:

import com.julianduru.learning.reactive.mono.TestSubscriber;
import reactor.core.publisher.Flux;

public class Fluxes {


    public static void main(String[] args) throws Exception {
        var flux = Flux.just(2, 5, 6);
        
        var subscriber1 = new TestSubscriber<>("Subscriber 1");
        var subscriber2 = new TestSubscriber<>("Subscriber 2");
        
        flux.subscribe(subscriber1);
        flux.subscribe(subscriber2);
    }


}

And the output:

INFO: Subscriber 1 >> Subscription started. Waiting for events!

INFO: Subscriber 1 >> New Data: 2

INFO: Subscriber 1 >> New Data: 5

INFO: Subscriber 1 >> New Data: 6

INFO: Subscriber 1 >> Subscription completed.

INFO: Subscriber 2 >> Subscription started. Waiting for events!

INFO: Subscriber 2 >> New Data: 2

INFO: Subscriber 2 >> New Data: 5

INFO: Subscriber 2 >> New Data: 6

INFO: Subscriber 2 >> Subscription completed.

Processor – Xavier

A Processor is both a subscriber and a publisher. In fact, if you look into the Processor interface definition, it literally extends both Subscriber and Publisher:

interface definition, it literally extends both Subscriber and Publisher:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {

}

Meaning all methods available on Subscriber and Publisher are also available on Processor…

Hot and Cold Publishers

A hot publisher has a single source of data for all the subscribers. What this means is that a hot publisher serves as a kind of multicaster which publishes data to all subscribers and will keep emitting data as long as there is an interested subscriber. The multicasting of the single publisher will mean that late subscribers are likely to miss data. An example of a Hot Publisher is a Television or Radio Station; If you miss the broadcast, it doesn’t get replayed for you. But anytime you join in, you receive the updates from the single publisher.

import com.julianduru.learning.reactive.mono.TestSubscriber;
import com.julianduru.learning.reactive.util.Util;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.stream.Stream;


public class HotPublisher {


    public static void main(String[] args) throws Exception {
        var flux = Flux.fromStream(HotPublisher::getStream)
            .delayElements(Duration.ofSeconds(2)) // slow down emission
            .share()                              // multicast
            .cache(1);                            // maintain history of last 1 item

        flux.subscribe(new TestSubscriber<>("Hot Publisher Subscriber 1"));
        Thread.sleep(5000L); 

        flux.subscribe(new TestSubscriber<>("Hot Publisher Subscriber 2"));
        Thread.sleep(15000L);
    }


    private static Stream<String> getStream() {
        return Stream.of(
            "Item 1",
            "Item 2",
            "Item 3",
            "Item 4",
            "Item 5",
            "Item 6"
        );
    }
    

}

And here’s the output from running main. You can see the 2nd subscriber was late to the party, hence missed out on the prior events. Also, because we maintain a hot cache of size 1. The last event is replayed for the new subscriber before they start receiving new updates.

INFO: Hot Publisher Subscriber 1 >> Subscription started. Waiting for events!

INFO: Hot Publisher Subscriber 1 >> New Data: Item 1

INFO: Hot Publisher Subscriber 1 >> New Data: Item 2

INFO: Hot Publisher Subscriber 1 >> New Data: Item 3

INFO: Hot Publisher Subscriber 2 >> Subscription started. Waiting for events!

INFO: Hot Publisher Subscriber 2 >> New Data: Item 3

INFO: Hot Publisher Subscriber 1 >> New Data: Item 4

INFO: Hot Publisher Subscriber 2 >> New Data: Item 4

INFO: Hot Publisher Subscriber 1 >> New Data: Item 5

INFO: Hot Publisher Subscriber 2 >> New Data: Item 5

INFO: Hot Publisher Subscriber 1 >> New Data: Item 6

INFO: Hot Publisher Subscriber 2 >> New Data: Item 6

INFO: Hot Publisher Subscriber 1 >> Subscription completed.

INFO: Hot Publisher Subscriber 2 >> Subscription completed.

A Cold Publisher on the other hand creates a producer for each subscriber. Hence late subscribers will have the stream replayed for them and they will see all events. An example of a Cold Publisher is a site like Youtube. The video can be served to you anytime you join in and you don’t miss any information.

package com.julianduru.learning.reactive.publisher;

import com.julianduru.learning.reactive.mono.TestSubscriber;
import com.julianduru.learning.reactive.util.Util;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.stream.Stream;

/**
 * created by julian on 19/02/2022
 */
public class ColdPublisher {


    public static void main(String[] args) throws Exception{
        var flux = Flux.fromStream(ColdPublisher::getStream)
            .delayElements(Duration.ofSeconds(2));

        flux.subscribe(new TestSubscriber<>("Cold Publisher Subscriber 1"));
        Thread.sleep(7000L);

        flux.subscribe(new TestSubscriber<>("Cold Publisher Subscriber 2"));
        Thread.sleep(15000L);
    }


    private static Stream<String> getStream() {
        return Stream.of(
            "Item 1",
            "Item 2",
            "Item 3",
            "Item 4",
            "Item 5",
            "Item 6"
        );
    }


}

Inspecting the logs will reveal that all events were received by both subscribers.


INFO: Cold Publisher Subscriber 1 >> Subscription started. Waiting for events!

INFO: Cold Publisher Subscriber 1 >> New Data: Item 1

INFO: Cold Publisher Subscriber 1 >> New Data: Item 2

INFO: Cold Publisher Subscriber 1 >> New Data: Item 3

INFO: Cold Publisher Subscriber 2 >> Subscription started. Waiting for events!

INFO: Cold Publisher Subscriber 1 >> New Data: Item 4

INFO: Cold Publisher Subscriber 2 >> New Data: Item 1

INFO: Cold Publisher Subscriber 1 >> New Data: Item 5

INFO: Cold Publisher Subscriber 2 >> New Data: Item 2

INFO: Cold Publisher Subscriber 1 >> New Data: Item 6

INFO: Cold Publisher Subscriber 1 >> Subscription completed.

INFO: Cold Publisher Subscriber 2 >> New Data: Item 3

INFO: Cold Publisher Subscriber 2 >> New Data: Item 4

INFO: Cold Publisher Subscriber 2 >> New Data: Item 5

INFO: Cold Publisher Subscriber 2 >> New Data: Item 6

INFO: Cold Publisher Subscriber 2 >> Subscription completed.

Things are starting to get interesting. Subsequent notes on Reactive Programming will start to delve into more advanced concepts like threading, scheduling, overflow, backpressure. The end goal is to orchestrate entire reactive pipelines within microservice architectures. Pipelines that are robust, resilient and scalable.