Architecture & DesignReactive Programming with RxJava

Reactive Programming with RxJava

Developer.com content and product recommendations are editorially independent. We may make money when you click on links to our partners. Learn More.

Reactive programming is a style of programming that helps simplify the synchronous processing of long running operations. When using reactive programming, you take advantage of asynchronous data streams. You can write reactive programs by using .NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, or even Objective-C. In reactive programming, the consumer actually reacts to the data as soon as the data arrives. RxJava is a Java implementation of ReactiveX (also known as Reactive Extensions) and provides the necessary API that you can use to write asynchronous, event-based programs. This article provides a discussion on RxJava with code examples to illustrate the concepts.

What Is RxJava and Why Should I Care?

Essentially, RxJava is a port of .NET Rx to JVM. The key concepts in RxJava are: seamless concurrency, event based approach, a nice way to escape the callback hell problem, asynchronous execution support, and a reactive approach. In this programming model, you can define the source of the data and also the consumer of the data. Once the data’s source and the data’s consumer have been defined and they are connected to each other, the RxJava framework can be used to push the data that is generated by the source to the consumer.

It should be noted that the Observable class in RxJava leverages the concepts of the GOF Observable pattern. Added to what the GOF has stated, the Observable concept in RxJava adds a few missing capabilities. These capabilities include the ability to use the OnCompleted event handler of the producer to notify that there isn’t any more data available. The producer also can signal that an error has occurred in the OnError event handler.

The Building Blocks

The main concept in RxJava is events; you can take advantage of events to trigger procedures, trigger other events, and so forth. Note that events also can be awaited. The building blocks for RxJava include the following:

  • Observables that are used to represent the data. Note that the observables start producing data as soon as a subscriber is connected and starts listening.
  • Subscribers that listen to the observables. It should be noted that an observable can have one or many subscribers.
  • A collection of methods that can be used to compose or edit the data.

Programming RxJava

And now, let’s take a quick look at how we can use RxJava in our programs. The following code snippet shows how you can define the source of the data.

Observable<Integer> source = Observable.range(1, 10);

Now that the source of the data has been defined, we need to define the consumer of the data. The following code snippet shows how you can achieve this. Note that I make a template here; the full source code is available for you later in this section.

Subscriber<Integer> consumer = new Subscriber<Integer>() {
   // Write your code here
};

To define the data’s consumer, you would need to leverage the Subscriber<T> type. To push the data that is produced at the source of the data to the consumer, you should use the onNext(T data) method on the subscriber instance. If you have multiple subscribers, as soon as a new item is emitted by the source of the data, the onNext() method is called on each subscriber that is connected to the observable.

Once the data has been pushed, a completion event is raised and you can take advantage of the onComplete() method on the subscriber instance to signal completion. If an error has occurred while the data was being emitted and pushed to the consumer, you can take advantage of the onError(Throwable e) method on the subscriber instance to display an appropriate message. Finally, to connect the consumer of the data to the data source, you should make a call to the subscribe method, as shown next.

source.subscribe(consumer);

Here’s a complete code listing for the consumer.

Observable.range(1, 10).subscribe(
   ctr -> System.out.println(ctr),
   error -> System.out.println("An error occurred..."),
   () -> System.out.println("Done")
);

Summary

This article presented an overview on reactive programming concepts and how we can work with RxJava with code examples. We have learnt the concepts of reactive programming, its uses, and how to leverage the features of this exciting new programming paradigm.

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Latest Posts

Related Stories