What are hot and cold observables?
One of the most interesting analogy available on the internet, on hot vs cold observables is the relationship between - A Netflix Subscription vs A Movie in a Theater.
Cold
We call an observable as cold if it creates a new instance of its source upon each subscription. Which again means a cold observable can not share it's source stream with multiple subscribers. Going by the analogy,
- A Nextflix subscription represents a cold observable as each subscriber will have his own instance of the movie stream.
- We can not share a single stream of the movie with multiple subscribers.
Hot
On the otherhand, the source instance of a hot observable is independent of its subscribers. This in turn, allows multiple subscribers to move in or move out and share the same instance of the source among them.
- It's like multiple subscribers watching a common show of a movie.
- The subscribers can even move in or move out at different times to share the movie instance.
Besides having independent life-cycle wrt it's subscribers, a movie in a theater catering to many is, alos, an example of a multicast. Thus, the source instance of a hot observable being shareable with multiple subscribers, makes it multicast.
Having the concepts in the mind, let's watch it through code.
Observables are cold by default
A plain Observable
in RxJs is cold by default, which means for each subscription a new instance
of the source sends the data. For example, as we can see below, each subscription has separate emits
from the source.
**Stackblitz**
import { interval, timer, Subject } from 'rxjs';
import { take, tap } from 'rxjs';
const source = interval(1000).pipe(
take(1),
tap((x) => console.log('Emits: ', x)),
);
source.subscribe((v) => console.log('subscription1: ', v)),
source.subscribe((v) => console.log('subscription2: ', v));
Output : The source emits separately for every subscription.
Emits: 0
subscription1: 0
Emits: 0
subscription2: 0
Now let's see how can we make the source instance exist independent of it's subscription, so that we can feed the same source to multiple subscribers.
Convert cold into hot using share
RxJs provides multicasting through a component called Subject
. A Subject
is capable of receiving
the data like an observer and has the ablity to share the data with multiple subscribers.
The share()
operator, attaches a subject to separate the source instance of our observable from its
subscribers, making it hot and shareable.
Analysis of the Output
1. Is our observable
hot now?
Emits: 0
subscription1: 0
Emits: 1
subscription1: 1
subscription2: 1
As we can see in the output, the Subscription1 which is joining 1.5 sec late, is missing out the 1st emit but, sharing the 2nd emitted value with subscription1.
Thus, clearly the source instance is now independent of it's subscriptions and the subscribers can come in or go out, with the source continuing on it's own. This means our observable is now hot.
2. Test it with zero subscription count
By design the source resets and gets ready for the next subscription when the subscription count is zero.
The share
operator is configuarable through its input ShareConfig
.
interface ShareConfig<T> {
connector?: () => SubjectLike<T>
resetOnError?: boolean | ((error: any) => ObservableInput<any>)
resetOnComplete?: boolean | (() => ObservableInput<any>)
resetOnRefCountZero?: boolean | (() => ObservableInput<any>)
}
a. Let's update the subscription2 to complete after taking just 1 item, so that we have zero subscription after 1 sec and before subscription1 joins at 1.5 sec. (use the following commented code in the demo)
source.pipe(take(1)).subscribe((v) => console.log('subscription2: ', v));
b. The share
operator is configuarable through its input ShareConfig
.
interface ShareConfig<T> {
connector?: () => SubjectLike<T>
resetOnError?: boolean | ((error: any) => ObservableInput<any>)
resetOnComplete?: boolean | (() => ObservableInput<any>)
resetOnRefCountZero?: boolean | (() => ObservableInput<any>)
}
Lets's delay the reset by 1 sec on zero subscription count by replacing share()
as follows:
//share()
share({ resetOnRefCountZero: () => timer(1000) })
Output : Now, the resultant output should look like below
Emits: 0
subscription2: 0
Emits: 1
subscription1: 1
From the output we can verify that the source instance was still available for subscription1, even if there were no subscriptions between 1sec to 1.5 sec.
That means the hot movie continued to play in the theater even when there were no viewers!