How to multicast an observable ?


Unicast vs Multicast Observables


Multicast means to be able to trasmit the same data to multiple subscribers.

A plain Observable in RxJs is unicast by default, which means only one subscriber can read the data stream on each subscription. What if we want to share the same data stream with multiple subscribers ?

RxJs, provides multicasting through a components called Subjects. A Subject is capable of receiving the data like an observer and has the ablity to share the data with multiple subscribers.

So, the simplest solution to our problem is to feed the outcome of our observable into a multicasting Subject. And, with a multicasting subject at the end, our observable will be capable of sharing it's data with multiple subscribers.

Observables are Unicast by Default


Let's say we have an observable that generates a random number. If we subscribe it multiple times, we will get a different numbers on each subscription.


Stackblitz
import { Observable } from 'rxjs';

const randomValue = () => Math.round(Math.random() * 10);

const unicastByDefault = new Observable((observer) => {
  observer.next(randomValue());
  observer.complete();
});

//Each subscription is likely to get different output
unicastByDefault.subscribe(v => console.log(v)), //9
unicastByDefault.subscribe(v => console.log(v)), //7
unicastByDefault.subscribe(v => console.log(v)); //3

Now, let's see how to make our observable multicast, so that all our subscribers can share the same data.

Convert an Observable into Multicast


Earlier RxJs used to provide multicast operator to do the job. But, in version 7, it has been replaced with simpler operators like : connectable and share.

The basic idea is to attach a multicasting component, such as a Subject as a subscriber, so that it can receive the incoming data and share the same with multiple subscribers.

1. Multicast using connectable

Stackblitz
import { Observable, Subject, connectable } from 'rxjs';

const randomValue = () => Math.round(Math.random() * 10);

const unicastByDefault = new Observable((observer) => {
  observer.next(randomValue());
  observer.complete();
});

const multicastWithConnectable = connectable(
    unicastByDefault,
    { connector: () => new Subject()}
);

//Now all the subscriptions will share the same data stream
multicastWithConnectable.subscribe((v) => console.log(v)),  //4
multicastWithConnectable.subscribe((v) => console.log(v)),  //4
multicastWithConnectable.subscribe((v) => console.log(v));  //4
//connect will start streaming the data
multicastWithConnectable.connect();

The multicastWithConnectable has a subject subscribing to our unicastByDefault observable.

The Subject being capable of multicast, is able to share the input data stream.

As specified as sample output, all 3 subscriptions will now have the same output, when we call multicastWithConnectable.connect().

2. Multicast using share

Here the syntax looks more simpler, without the need to connect separately. But, key syntax to rememember here is to use comma(,) and not semi-colon(;) between the subscriptions.



Analysis of the Output

Case -1 : Subscriptions share the emits from source

As we can see, both the subscriptions are receiving data from the same emits from source. Moreover, since the subscription2 has joined late, it has missed the first emitted data.

Case -2 : share makes it multicast

If we comment out - share()

  • We should see separately emitted data for each subscription, as then the source will no more multicast.
Case -3 : zero active subscription count will close the shared subscription

If we update the subscription1, to take only 1 input as below. The shared subscription should close immediately as number of subscription count becomes zero.

source.pipe(take(1)).subscribe((v) => console.log('subscription1: ', v)),

In this case, subscription2 subscribing afterwards will subscribe the source separately. Hence, it will have it's own set of data and the output would be as follows:

Emits: 0
subscription1: 0
Emits: 0
subscription2: 0
Emits: 1
subscription2: 1
Case -4 : We can configure using ShareConfig

Moreover, the share operator is a configuarable one. In case we are looking for advanced usecases, here is the snippet of the interface taken from the RxJS official website.

interface ShareConfig<T> {
connector?: () => SubjectLike<T>
resetOnError?: boolean | ((error: any) => ObservableInput<any>)
resetOnComplete?: boolean | (() => ObservableInput<any>)
resetOnRefCountZero?: boolean | (() => ObservableInput<any>)
}

For instance, we can still make 'subscription2' to share the subscription, in case 3, if we delay the reset by 1sec after the subscription count becomes zero by replacing share() with :

share({ resetOnRefCountZero: () => timer(1000) })