SmallRye Reactive Stream Operators is an implementation of the Eclipse MicroProfile Reactive Stream Operators specification (version 1.0).


IMPORTANT: Project in maintenance mode

This repository is in maintenance mode. No new features will be implemented.

Another implementation of MicroProfile Reactive Streams Operators is available in Mutiny. It is strongly recommended to switch to this implementation.

Reactive Converters have been migrated to https://github.com/smallrye/smallrye-reactive-utils.

If you have any questions, send a message to https://groups.google.com/forum/#!forum/smallrye.


The MicroProfile Reactive Stream Operators specification define a set of operators for Reactive Streams. You can:

  • Create Reactive Streams

  • Process the data transiting in the streams

  • Accumulate results

The idea behind the specification is to provide the equivalent of java.util.stream however, for Reactive Stream, so, inherently asynchronous, supporting back-pressure and with error and completion signals propagation. The following code snippet shows how close the API is:

// java.util.stream version:
Stream.of("hello", "world")
        .filter(word -> word.length() <= 5)
        .map(String::toUpperCase)
        .findFirst()
        .ifPresent(s -> System.out.println("Regular Java stream result: " + s));
// reactive stream operator version:
ReactiveStreams.of("hello", "world")
        .filter(word -> word.length() <= 5)
        .map(String::toUpperCase)
        .findFirst()
        .run() // Run the stream (start publishing)
        // Retrieve the result asynchronously, using a CompletionStage
        .thenAccept(res -> res
                .ifPresent(s -> System.out.println("Reactive Stream result: " + s)));

The SmallRye implementation is based on RX Java 2.

  • The Javadoc of the specification is available here.

  • The Javadoc of SmallRye Reactive Streams Operators is available here.

The code is released under the Apache Software License 2.0 and is available on Github.

1. Getting Started

1.1. Quickstart

The easiest to start using SmallRye Reactive Stream Operators is to start it directly in a main class. You only need to put smallrye-reactive-streams-operators in your CLASSPATH to use it.

Creates a Maven project, and include the following dependency in your pom.xml:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-streams-operators</artifactId>
    <version>1.0.8-SNAPSHOT</version>
</dependency>

Once created, create a class file with a public static void main(String…​ args) method:

package io.smallrye.reactive.operators.quickstart;

import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

public class QuickStart {

    public static void main(String[] args) {
        // Create a stream of words
        ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators")
                .map(String::toUpperCase) // Transform the words
                .filter(s -> s.length() > 4) // Filter items
                .forEach(word -> System.out.println(">> " + word)) // Terminal operation
                .run(); // Run it (create the streams, subscribe to it...)
    }


}

Once everything is set up, you should be able to run the application using:

mvn compile exec:java -Dexec.mainClass=io.smallrye.reactive.operators.quickstart.QuickStart

Running the previous example should give the following output:

>> HELLO
>> SMALLRYE
>> REACTIVE
>> STREAM
>> OPERATORS

The Reactive Streams Operator is intended to be used in other software and not as a standalone api. However, to give you a better overview the 2 following quickstart explains how to use it in Eclipse Vert.x applications and Apache Camel applications.

1.2. Using Reactive Streams Operators in a Vert.x application

Eclipse Vert.x is a toolkit to create reactive and distributed systems. In addition to the bare Vert.x API, you can also use Vert.x using RX Java 2. As a consequence, you can wrap Vert.x streams and use Reactive Stream Operators to manipulate them:

package io.smallrye.reactive.operators.quickstart;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

public class DataProcessor extends AbstractVerticle {

    private static final int PORT = 8080;

    @Override
    public void start(Future<Void> done) {
        vertx.createHttpServer()
                .requestHandler(request -> {
                    // Consume messages from the Vert.x event bus
                    MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("data");
                    // Wrap the stream and manipulate the data
                    ReactiveStreams.fromPublisher(consumer.toFlowable())
                            .limit(5) // Take only 5 messages
                            .map(Message::body) // Extract the body
                            .map(json -> json.getInteger("value")) // Extract the value
                            .peek(i -> System.out.println("Got value: " + i)) // Print it
                            .reduce(0, (acc, value) -> acc + value)
                            .run() // Begin to receive items
                            .whenComplete((res, err) -> {
                                // When the 5 items has been consumed, write the result to the
                                // HTTP response:
                                if (err != null) {
                                    request.response().setStatusCode(500).end(err.getMessage());
                                } else {
                                    request.response().end("Result is: " + res);
                                }
                            });
                })
                .listen(PORT, ar -> done.handle(ar.mapEmpty()));

    }
}

This example creates an HTTP server and for each request to collect 5 messages sent by another component on the Vert.x event bus. It computes the sum of these 5 elements and writes the result to the HTTP response. It’s important to notice that the messages coming from the event bus are sent asynchronously. So, it would not be possible to write the previous code using java.util.streams.

When used in a Vert.x application, Reactive Stream Operators can be used to processed data and compute an asynchronous result.

1.3. Using Reactive Streams Operators in a Camel application

Apache Camel is a toolkit to define routing and mediation rules, mainly used to integrate systems, using enterprise integration patterns. Apache Camel provides more than 200+ components so that it can integrate virtually with anything.

You can combine Reactive Stream Operators and Apache Camel thanks to the Camel Reactive Stream Component.

package io.smallrye.reactive.operators.quickstart;

import org.apache.camel.CamelContext;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.impl.DefaultCamelContext;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Subscriber;

import java.io.File;
import java.nio.file.Files;

public class QuickStart {

    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
        Subscriber<String> subscriber = camel
                .subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);

        ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators",
                "using", "Apache", "Camel")
                .map(String::toUpperCase) // Transform the words
                .filter(s -> s.length() > 4) // Filter items
                .peek(System.out::println)
                .map(s -> s + " ")
                .to(subscriber)
                .run();
        context.start();

        // Just wait until it's done.
        Thread.sleep(1000);

        Files.readAllLines(new File("target/values.txt").toPath())
                .forEach(s -> System.out.println("File >> " + s));
    }

}

You can also use Camel to create Reactive Streams Publisher and transform the items using Reactive Streams Operators.

2. Operators

As mentioned before, the Reactive Streams API is an asynchronous version of java.util.stream for Reactive Streams. This section list the operators that are provided.

The Reactive Streams Operators introduce a set of types to allow creating Reactive Streams:

Reactive Streams Reactive Stream Operators Termination

Publisher

PublisherBuilder

build()

Processor

ProcessorBuilder

build()

Subscriber

SubscriberBuilder

build()

Besides, the Reactive Streams Operators introduce CompletionRunner that triggers the emission of the items and provides a way to retrieve the asynchronously computed result.

2.1. Creating streams

The first part of the API allows to create PublisherBuilder. A Reactive Streams Publisher can be created from the builder using the .buildRS method.

2.1.1. Creating empty streams
  • Operator: empty

  • Description: Creates an empty stream.

empty
  • Example:

        PublisherBuilder<T> empty = ReactiveStreams.empty();
2.1.2. Creating streams from elements
  • Operator: of, ofNullable

  • Description: Creates a stream of 0, 1 or n elements.

of single
of many
  • Example:

        PublisherBuilder<T> streamOfOne = ReactiveStreams.of(t1);
        PublisherBuilder<T> streamOfThree = ReactiveStreams.of(t1, t2, t3);
        PublisherBuilder<T> streamOfOneOrEmpty = ReactiveStreams.ofNullable(maybeNull);
2.1.3. Creating failing streams
  • Operator: failed

  • Description: Creates a failed stream.

failed
  • Example:

        PublisherBuilder<T> failed = ReactiveStreams.failed(new Exception("BOOM!"));
2.1.4. Creating streams from CompletionStage
  • Operator: fromCompletionStage, fromCompletionStageNullable

  • Description: Creates a stream of 0 or 1 element emitted when the passed CompletionStage is completed.

fromCompletionStage
fromCompletionStageNullable
  • Example:

        PublisherBuilder<T> streamOfOne = ReactiveStreams.fromCompletionStage(cs);
        // If the redeemed value is `null`, an error is propagated in the stream.
        PublisherBuilder<T> streamOfOne = ReactiveStreams.fromCompletionStageNullable(cs);
        // If the redeemed value is `null`, the stream is completed immediately.
2.1.5. Creating streams from collections
  • Operator: fromIterable

  • Description: Creates a stream emitting the elements from the passed iterable and then send the completion signal.

fromIterable
  • Example:

        PublisherBuilder<T> stream = ReactiveStreams.fromIterable(iterable);
        // If the iterable does not contain elements, the resulting stream is empty.
2.1.6. Wrapping a Reactive Stream Publisher
  • Operator: fromPublisher

  • Description: Creates a stream emitting the elements from the passed Publisher.

  • Example:

        PublisherBuilder<T> stream = ReactiveStreams.fromPublisher(publisher);
        // If the publisher does not emit elements, the resulting stream is empty.
2.1.7. Generating infinite streams
  • Operator: generate, iterate

  • Description: Creates a stream using the generator method. The number of generated elements depends on the request.

generate
iterate
  • Example:

        AtomicInteger counter = new AtomicInteger();
        PublisherBuilder<Integer> stream = ReactiveStreams
                .generate(() -> counter.getAndIncrement());
        // The resulting stream is an infinite stream.
        PublisherBuilder<Integer> stream = ReactiveStreams
                .iterate(0, last -> last + 1);
        // The resulting stream is an infinite stream.

2.2. Processing streams

These operators transform the items transiting on the streams.

2.2.1. Creating a processor

A processor is a Reactive Streams component that is both a Publisher and a Subscriber. It consumes and emits elements.

  • Example:

        ProcessorBuilder<I, O> builder = ReactiveStreams.<I>builder()
                .map(i -> (O) i); // Emit element of type O
2.2.2. Filtering elements
  • Operator: filter, distinct, dropWhile, skip, limit, takeWhile

  • Description: These operators filter items transiting on the stream:

    • filter - select the element using a predicate

    • distinct - remove similar element (Attention: do not use on large or unbounded streams)

    • dropWhile - drop elements until the predicate returns true

    • skip - ignore x elements

    • takeWhile - forward elements until the predicate returns true

    • limit - pick x elements

filter
distinct
dropWhile
skip
takeWhile
limit
  • Example:

        ReactiveStreams.of(1, 2, 3)
                .filter(i -> i > 2); // (1, 2)

        ReactiveStreams.of(2, 2, 3, 3, 2, 1)
                .distinct(); // (2, 3, 1)

        ReactiveStreams.of(2, 2, 3, 3, 2, 1)
                .dropWhile(i -> i == 2); // (3, 3, 2, 1)

        ReactiveStreams.of(2, 2, 3, 3, 2, 1)
                .skip(3); // (3, 2, 1)

        ReactiveStreams.of(2, 2, 3, 3, 2, 1)
                .limit(3); // (2, 2, 3)

        ReactiveStreams.of(2, 2, 3, 3, 2, 1)
                .takeWhile(i -> i == 2); // (2, 2)
2.2.3. Composing asynchronous actions
  • Operator: flatMap, flatMapCompletionStage, flatMapIterable, flatMapRsPublisher

  • Description: Produces a stream for each element of the stream. The return stream is flatten (serialized) in the returned stream

    • flatMap - Returns a PublisherBuilder and serialize the elements in the returned stream.

    • flatMapCompletionStage - Produces a CompletionStage. When completed, the result is passed to the returned stream.

    • flatMapIterable - Produces an Iterable and flatten the element into the returned stream. This flatMap method is not asynchronous.

    • flatMapRSPublisher - Like flatMap but return a Reactive Streams Publisher

flatMap
flatMapCompletionStage
flatMapIterable
flatMapRsPublisher
  • Example:

        ReactiveStreams.of(1, 2)
                .flatMap(i -> ReactiveStreams.of(i, i)); // (1, 1, 2, 2)

        ReactiveStreams.of(1, 2)
                .flatMapIterable(i -> Arrays.asList(i, i)); // (1, 1, 2, 2)

        ReactiveStreams.of(1, 2)
                .flatMap(i -> ReactiveStreams.of(i, i)); // (1, 1, 2, 2)

        ReactiveStreams.of(1, 2)
                .flatMapCompletionStage(i -> invokeAsyncService(i));
The produced value can be emitted asynchronously, except for flatMapIterable.
The CompletionStage returned by flatMapCompletionStage must not redeem null, as null is an invalid value for Reactive Streams. So, you cannot use CompletionStage<Void>.
2.2.4. Transforming items
  • Operator: map

  • Description: Produces a value synchronously

map
  • Example:

        ReactiveStreams.of(1, 2, 3)
                .map(i -> i + 1); // (2, 3, 4)
2.2.5. Combining a Processor
  • Operator: via

  • Description: Forward the items to a Processor or ProcessorBuilder

  • Example:

        ProcessorBuilder<Integer, String> processor = ReactiveStreams
                .<Integer>builder().map(i -> Integer.toString(i));

        ReactiveStreams.of(1, 2)
                .via(processor); // ("1", "2")

2.3. Action operators

These operators give you the ability to react to the different events happening in the streams.

  • Operator: peek, onComplete, onTerminate, onError

  • Description: These operators let you react to various events such as when an element is received, an error is propagated or when the stream completes.

    • peek - called for each element

    • onComplete - called when the stream completes

    • onError - called when an error is propagated in the stream

    • onTerminate - called either when an error is propagated or when the stream completes

peek
onComplete
onError
onTerminate

2.4. Error management operators

These operators allow recovering after a failure. Because you handle asynchronous streams of data, you can’t use try/catch, so these operators provide a similar feature.

  • Operator: onErroResume, onErrorResumeWith, onErrorResumeWithRsPublisher

  • Description: These operators let you react to various events such as when a element is received, an error is propagated or when the stream completes.

onErrorResume
onErrorResumeWith
onErrorResumeWithRSPublisher
2.4.1. Terminal operator and computing asynchronous result

These operators act as subscribers and produce a result. As the result is computed asynchronously, you retrieve a CompletionStage object.

2.4.2. Cancelling a stream
  • Operator: cancel

  • Description: Cancel the subscription to a stream. No more items will be received.

2.4.3. Ignoring elements
  • Operator: ignore

  • Description: ignore the elements transiting on the streams. The elements are still emitted but ignored.

ignore
  • Example:

        ReactiveStreams.of(1, 2, 3)
                .peek(i -> System.out.println("Receiving: " + i))
                .ignore()
                .run()
                .thenAccept(x -> System.out.println("Done!"));
2.4.4. Collecting results
  • Operator: collect, reduce, toList

  • Description: These operators allows accumulating items or intermediary results to compute a final value.

collect
reduce
toList
  • Example:

        ReactiveStreams.of(1, 2, 3)
                .collect(Collectors.summingInt(i -> i))
                .run()
                // Produce 6
                .thenAccept(res -> System.out.println("Result is: " + res));

        ReactiveStreams.of(1, 2, 3)
                .collect(() -> new AtomicInteger(1), AtomicInteger::addAndGet)
                .run()
                // Produce 7
                .thenAccept(res -> System.out.println("Result is: " + res));

        ReactiveStreams.of(1, 2, 3)
                .reduce((acc, item) -> acc + item)
                .run()
                // Produce Optional(6)
                .thenAccept(res -> res.ifPresent(sum ->
                        System.out.println("Result is: " + sum)));

        ReactiveStreams.of(1, 2, 3)
                .toList()
                .run()
                // Produce [1, 2, 3]
                .thenAccept(res -> System.out.println("Result is: " + res));
2.4.5. Get the first item of a stream
  • Operator: findFirst

  • Description: Retrieves the first item of a stream if any

findFirst
  • Example:

        ReactiveStreams.of(1, 2, 3)
                .findFirst()
                .run()
                // Produce Optional[1]
                .thenAccept(maybe -> System.out.println(maybe));
2.4.6. Execute a method for each element
  • Operator: forEach

  • Description: Execute a method for each element of a stream. Unlike peek, this is a terminal operation.

forEach
  • Example:

        ReactiveStreams.of(1, 2, 3)
                .forEach(
                        i -> System.out.println("Receiving " + i)
                )
                .run();
2.4.7. Pass to a Reactive Streams Subscriber
  • Operator: to

  • Description: Forward the elements of a stream to a given Subscriber or SubscriberBuilder.

  • Example:

        SubscriberBuilder<Integer, Optional<Integer>> subscriber
                = ReactiveStreams.<Integer>builder()
                    .map(i -> i + 1)
                    .findFirst();

        ReactiveStreams.of(1, 2, 3)
                .to(subscriber)
                .run()
                // Produce Optional[2]
                .thenAccept(optional ->
                        optional.ifPresent(i -> System.out.println("Result: " + i)));

3. Execution Model

SmallRye Reactive Stream Operators provides a way to control on which thread are the different callbacks invoked. By default it uses the caller thread.

If you are building a Vert.x application, add the following dependency to your project so enforce the Vert.x execution model:

<dependency>
  <groupId>io.smallrye</groupId>
    <artifactId>smallrye-reactive-streams-vertx-execution-model</artifactId>
  <version>1.0.8-SNAPSHOT</version>
</dependency>

With this dependency, if you are calling ReactiveStreams.x from a Vert.x thread, the same thread is used to call the different callbacks and pass the result.

4. Reactive Type Converters

The reactive type converters are a set of modules not directly related to MicroProfile Reactive Streams Operators. These converters adapts reactive types from different reactive programming libraries. The main interface is:

public interface ReactiveTypeConverter<T> {

    <X> CompletionStage<X> toCompletionStage(T instance);

    <X> Publisher<X> toRSPublisher(T instance);

    <X> T fromCompletionStage(CompletionStage<X> cs);

    <X> T fromPublisher(Publisher<X> publisher);

    // ...

You can use converters to convert types provided by different reactive programming libraries to Publisher and CompletionStage, and the opposite:

converters

To use the converter you need:

  1. Add the converter api dependency:

<dependency>
  <groupId>io.smallrye</groupId>
  <artifactId>smallrye-reactive-converter-api</artifactId>
  <version>1.0.8-SNAPSHOT</version>
</dependency>
  1. For each reactive programming, add the associated dependency:

<dependency>
    <groupId>io.smallrye</groupId>
    <artifactId>smallrye-reactive-converter-reactive-streams-operators</artifactId>
    <version>1.0.8-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>io.smallrye</groupId>
    <artifactId>smallrye-reactive-converter-reactor</artifactId>
    <version>1.0.8-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>io.smallrye</groupId>
    <artifactId>smallrye-reactive-converter-rxjava1</artifactId>
    <version>1.0.8-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>io.smallrye</groupId>
    <artifactId>smallrye-reactive-converter-rxjava2</artifactId>
    <version>1.0.8-SNAPSHOT</version>
</dependency>
  1. In your code, lookup for a converter and apply the conversion. For instance:

CompletionStage cs = ...
ReactiveTypeConverter<Completable> converter = Registry.lookup(Completable.class)
    .orElseThrow(() -> new AssertionError("Completable converter should be found"));
Completable converted = converter.fromCompletionStage(cs);

The conversion rules are detailed in the javadoc.