How to multicast an observable ?
Multicast means to be able to trasmit the same data to multiple subscribers.
A plain Observable
in RxJs is unicast by default, which means only one subscriber can read the data
stream on each subscription. What if we want to share the same data stream with multiple subscribers ?
RxJs, provides multicasting through a components called Subjects
. A Subject
is capable of receiving
the data like an observer and has the ablity to share the data with multiple subscribers.
So, the simplest solution to our problem is to feed the outcome of our observable
into a multicasting Subject
.
And, with a multicasting subject at the end, our observable will be capable of sharing it's data with multiple
subscribers.
Observables are Unicast by Default
Let's say we have an observable that generates a random number. If we subscribe it multiple times, we will get a different numbers on each subscription.
Stackblitz
import { Observable } from 'rxjs';
const randomValue = () => Math.round(Math.random() * 10);
const unicastByDefault = new Observable((observer) => {
observer.next(randomValue());
observer.complete();
});
//Each subscription is likely to get different output
unicastByDefault.subscribe(v => console.log(v)), //9
unicastByDefault.subscribe(v => console.log(v)), //7
unicastByDefault.subscribe(v => console.log(v)); //3
Now, let's see how to make our observable multicast, so that all our subscribers can share the same data.
Convert an Observable into Multicast
Earlier RxJs used to provide multicast
operator to do the job. But, in version 7, it has been
replaced with simpler operators like : connectable
and share
.
The basic idea is to attach a multicasting component, such as a Subject
as a subscriber, so that
it can receive the incoming data and share the same with multiple subscribers.
1. Multicast using connectable
Stackblitzimport { Observable, Subject, connectable } from 'rxjs';
const randomValue = () => Math.round(Math.random() * 10);
const unicastByDefault = new Observable((observer) => {
observer.next(randomValue());
observer.complete();
});
const multicastWithConnectable = connectable(
unicastByDefault,
{ connector: () => new Subject()}
);
//Now all the subscriptions will share the same data stream
multicastWithConnectable.subscribe((v) => console.log(v)), //4
multicastWithConnectable.subscribe((v) => console.log(v)), //4
multicastWithConnectable.subscribe((v) => console.log(v)); //4
//connect will start streaming the data
multicastWithConnectable.connect();
The multicastWithConnectable
has a subject subscribing to our unicastByDefault
observable.
The Subject
being capable of multicast, is able to share the input data stream.
As specified as sample output, all 3 subscriptions will now have the same output, when
we call multicastWithConnectable.connect()
.
2. Multicast using share
Here the syntax looks more simpler, without the need to connect separately. But, key
syntax to rememember here is to use comma(,
) and not semi-colon(;
) between the
subscriptions.
Analysis of the Output
Case -1 : Subscriptions share the emits from source
As we can see, both the subscriptions are receiving data from the same emits from source. Moreover, since the subscription2 has joined late, it has missed the first emitted data.
Case -2 : share
makes it multicast
If we comment out - share()
- We should see separately emitted data for each subscription, as then the source will no more multicast.
Case -3 : zero active subscription count will close the shared subscription
If we update the subscription1, to take only 1 input as below. The shared subscription should close immediately as number of subscription count becomes zero.
source.pipe(take(1)).subscribe((v) => console.log('subscription1: ', v)),
In this case, subscription2 subscribing afterwards will subscribe the source separately. Hence, it will have it's own set of data and the output would be as follows:
Emits: 0
subscription1: 0
Emits: 0
subscription2: 0
Emits: 1
subscription2: 1
Case -4 : We can configure using ShareConfig
Moreover, the share
operator is a configuarable one. In case we are looking for advanced usecases,
here is the snippet of the interface taken from the RxJS official website.
interface ShareConfig<T> {
connector?: () => SubjectLike<T>
resetOnError?: boolean | ((error: any) => ObservableInput<any>)
resetOnComplete?: boolean | (() => ObservableInput<any>)
resetOnRefCountZero?: boolean | (() => ObservableInput<any>)
}
For instance, we can still make 'subscription2' to share the subscription, in case 3, if we delay the
reset by 1sec after the subscription count becomes zero by replacing share()
with :
share({ resetOnRefCountZero: () => timer(1000) })