Basic components of ReactiveX with Android

Mohammed Rampurawala
7 min readAug 22, 2018

I will be using RxJava for telling the basic concepts of ReactiveX and using Kotlin as a programming language.

In the next article, I will be writing more about how to develop a simple application with Android Design Patterns, RxJava, RxAndroid, RxKotlin using the concepts explained below

Here we go:

ReactiveX the best functional programming and design patterns!!!

What is RxJava?

RxJava can be simply thought of as a library that gives us an easy to use observer pattern. This means that we have a process to be notified of when data changes, when a task completes, or if there are any errors that happen along the way. With Java, we have a couple of options for this pattern, such as the PropertyChangeListener or the Java Observable to work with, but they both have issues. They’re complex, there’s little to no good examples online, and there’s not a very clear pattern for working in layered architectures.

RxJava can simplify a lot of this and provide a common pattern to respond to user events, model data changes, and map those values to new results. It also allows us to chain complex logic through the different architectural layers while delaying execution and easily working in a threaded environment. This just scratches the surface of what RxJava can do.

RxJava is just part of the whole family. We have Java, we have C#, we have Kotlin, we have Swift. So once we learn these patterns in one language like Kotlin, if you’re also developing on iOS, you can apply these same principles across there.

Observables and Observers

Observables and Observers are at the core of Rx.

They’re two closely related concepts, and you can think of Observables as a list of values that you can iterate over. Or, a list of single events with those values. They are the things that you are watching. This list of values or events changes over time, and you can react to that. Observers are also called Subscribers, or Consumers.

Observers are the things that are doing the watching. Observers just care about when the observables data or state changes. Observables can be many things. It could be a continually growing list of UI events, such as taps or button clicks. It could be a single variable that notifies the changes to it over time. It could be a task of complex code that notifies when it’s finished.

Types of Observables.

There are three types of observables to work with:

  1. Relays
  2. Subjects
  3. Observables

Deep down inside, they’re all just special case of Observable.

Relays are the easiest to work with, subjects are a littlemore difficult, but an actual observable is typically only used in complex tasks such as chaining dependent network calls.

Relays

Relays are the easiest piece to work with and often are the best solution for 90% of your needs.

They give us the ability to be notified if that variable’s value changes.These are also known as a type of hot observable, which means events may have already happened before you started subscribing. You won’t get any of the previous events, but you will get the most current value or the default value when you first subscribe. The last thing to know is, they never error out or complete. This is because onError and onComplete events can’t even be triggered on a Relay. Relays are stateless

Behaviour Relay

Imperative Behaviour Relay

Imperative just means you can get it anytime, you can read it anytime, and you can get it exactly how you want it.

fun simpleValues() {
val behaviorRelay = BehaviorRelay.createDefault("1")
Log.d("Info ${ behaviorRelay.value }")

behaviorRelay.accept("2")
Log.d("Relay Value ${ behaviorRelay.value }")
}

Declarative Behaviour Relay

Declarative Nature of RxJava and what that means is we can tell RxJava what we want, but we don’t tell it how to get it. So in this case, we want to subscribe to the event, but we’re not gonna ask for it specifically like we did in previous example of Imperative Behavior Relay. So I will say behaviorRelay.subscribe, which just means I want to see the new event or the new value coming through. So in subscribe we will get the previous values which was set and will get new values until the relay is not disposed.

fun simpleValues() {
val behaviorRelay = BehaviorRelay.createDefault("1")

behaviorRelay.subscribe { newValue ->
Log.d("Relay value has changed: $newValue")
}

behaviorRelay.accept("3")
}

Subjects

Subjects are a little more complex. Unlike relays, they can error or complete, which means any subscriptions will no longer be triggered after an error or a complete event. These can also be subscribers and observables, meaning you combine the output from one observable and make it another’s input. These are also known as hot observables. So you won’t get all of the events but, depending on the type of subject you are using, you will get a certain number of events. Subjects are stateful

There are three types of Subjects:

  1. Behaviour (A behavior subject receives the last event or the default value if there are no events.)
  2. Publish (A publish subject begins with no value. It has no default or otherwise. It will only get new events)
  3. Replay (Replay subjects have a buffer of events that they will share. When you create them, you set the desired buffer size. And this is the number of elements that will be replayed to a new subscriber.)

Note: All subjects will receive onError and onCompleted events.

Behaviour Subject

Behavior Subject emits the most recent item it has observed and all subsequent observed items to each subscribed Observer.

These are really similar to any of the different types of subjects just remember, if the number of elements that you get that were created before you subscribed, that’s what makes the difference.

BehaviorSubject does not operate by default on a particular Scheduler and the Observers get notified on the thread the respective onXXX methods were invoked.

When the onError(Throwable) is called, the BehaviorSubject enters into a terminal state and emits the same Throwable instance to the last set of Observers. During this emission, if one or more Observers dispose their respective Disposables, the Throwable is delivered to the global error handler via RxJavaPlugins.onError(Throwable) (multiple times if multiple Observers cancel at once). If there were no Observers subscribed to this BehaviorSubject when the onError() was called, the global error handler is not invoked.

fun subjects() {
val behaviorSubject = BehaviorSubject.createDefault(24)

val disposable = behaviorSubject.subscribe({ newValue -> //onNext
Log.d("BehaviorSubject subscription: $newValue")
}, { error -> //onError
Log.d("Error: ${ error.localizedMessage }")
}, { //onCompleted
Log.d("Completed")
}, { disposable -> //onSubscribed
Log.d("Subscribed")
})

behaviorSubject.onNext(34)
behaviorSubject.onNext(48)
behaviorSubject.onNext(48) //duplicates show as new events by default

//1 onError
// val someException = IllegalArgumentException("some fake error")
// behaviorSubject.onError(someException)
// behaviorSubject.onNext(109) //will never show

//2 onComplete
behaviorSubject.onComplete()
behaviorSubject.onNext(10983) //will never show

}

Traits

These are just one-off tasks that can be wrapped in a single observable. They are Single, Completable, and Maybe.

Single will receive only one onNext or one onError event. They won’t receive a complete or undisposed events.

//Creating Single Observableval single = Single.create<String> { single ->
//do some logic here
val success = true


if
(success) { //return a value
single.onSuccess("nice work!")
} else {
val someException = IllegalArgumentException("some fake error")
single.onError(someException)
}
}
// Observing the single Observable with subscriptionsingle.subscribe({ result ->
//do something with result
println
("single: ${ result }")
}, { error ->
//do something for error
}).disposedBy(container)

Completable will only receive one complete or error event. They won’t receive onNext at all, or a disposable event.

//Creating Completable Observableval completable = Completable.create { completable ->
//do logic here
val success = true

if
(success) {
completable.onComplete()
} else {
val someException = IllegalArgumentException("some fake error")
completable.onError(someException)
}
}
// Observing the completable Observable with subscription
completable.subscribe({
//handle on complete
println
("Completable completed")
}, { error ->
//do something for error
}).disposedBy(container)

Maybe will receive only one onNext or one completed event, but not both,and possibly, an error event. But no dispose event.

//Creating Maybe Observableval maybe = Maybe.create<String> { maybe ->
//do something
val success = true
val
hasResult = true


if
(success) {
if (hasResult) {
maybe.onSuccess("some result")
} else {
maybe.onComplete()
}
} else {
val someException = IllegalArgumentException("some fake error")
maybe.onError(someException)
}
}
// Observing the maybe Observable with subscription
maybe.subscribe({ result ->
//do something with result
println
(" Maybe - result: ${ result }")
}, { error ->
//do something with the error
}, {
//do something about completing
println
(" Maybe - completed")
}).disposedBy(con)

In the above code container is a CompositeDisposable

But deep down inside, they’re all just Observables. In fact, whenever you see a one-off call like a network call or something that wrap a Single result, it’s a great place to use a trait instead of an Observable.

CompositeDisposable

Composite Disposable is a container that we will get a bunch of Disposables from, and what is a Disposable? Well, when we create a Subscription, we’re given something like a pointer to that Subscription, so that when the life of this class or something dies, we can clean up all the subscriptions contained in that CompositeDisposable. In Android usually, clearing of CompositeDisposable is done in onDestroy() of Android Component.

Add following dependencies in your build.gradle to start with Reactive Programming in Android.

implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'io.reactivex.rxjava2:rxjava:2.1.13'
implementation 'io.reactivex.rxjava2:rxkotlin:2.2.0'
implementation 'com.jakewharton.rxrelay2:rxrelay:2.0.0'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.5'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:0.22.5'

If you like it then put a clap (👏 ) on it.

Part 2 COMING SOOOON….

--

--

Mohammed Rampurawala

Senior Android Engineer @DeliveryHero | Ex-Zalando | Machine learning https://mohammedr.me