
Every Angular developer encounters RxJS in their codebase. A frequently common challenge for new Angular developers involves grasping RxJS concepts, including Observable, Observer, and Subject.
Working with RxJS might not feel intuitive initially, requiring time to understand the underlying mindset of the library. This article is called to make understanding RxJS much easier by providing some analogies from real life. One such relatable analogy can be drawn with household water pipes.
Imagine a house or a flat. Each property requires water pipe installations. In the world of RxJS, we can represent pipe installation as usage of operators like from or of as those that create a stream of water.
export class WaterStreamsComponent {
stream$ = of("water")
}
The stream$ itself does nothing, it simply exists, much like water pipes in your property. The water flowing through these pipes represents our data, which could be an object, string, number, array, event, or any other data type.
Imagine a scenario where you’re feeling thirsty and reach for a glass of water. Just as you turn the tap to let water flow through the pipes, in RxJS we use the subscribe method to start receiving data from an Observable stream.
export class WaterStreamsComponent implements OnInit {
stream$ = of("water")
ngOnInit() {
this.stream$.subscribe(water => console.log('drink it!', water))
}
}
In our water analogy, leaving the water tap open can result in either flooding your neighbor’s apartment below or receiving a massive water bill. The essential lesson to be learned from this analogy underscores the importance of unsubscribing from a stream when it’s no longer needed. This helps prevent memory leaks that can significantly slow down your application.โ
export class WaterStreamsComponent implements OnInit, OnDestroy {
stream$ = of('water')
waterPipe$?: Subscription
ngOnInit() {
this.waterPipe$ = this.stream$.subscribe(
water => console.log('drink it!', water)
)
}
ngOnDestroy() {
this.waterPipe$?.unsubscribe() // <-- close water tap
}
}
Transformation & Filtering
If your water isn’t clean enough, you’d probably opt for a water filter to purify the tap water, removing metals or solids. In RxJS, this is a job for filtering operators, with the filter being the most popular choice. The logic here is similar to the JavaScript Array filter operator. To employ the RxJs filter operator, we need to use the pipe function, which allows us to use and combine various operators.
import { filter } from 'rxjs/operators';
// ...
this.waterPipe$ = this.stream$.pipe(
filter((water) => water === 'water') // <-- filtering water
).subscribe(
cleanWater => console.log('drink it!', cleanWater)
)
// ...
Or, imagine that after a long day of coding in Angular and immersing ourselves in RxJS, you may be eagerly anticipating a relaxing hot shower. Just as you rely on a water boiler or heater to warm the cold water in your home, think of this process in terms of RxJS transformation operators, like the map operator.
import { map } from 'rxjs/operators';
export class WaterStreamsComponent implements OnInit {
// ...
warmUp = (water: string) => `${water}-warm`
ngOnInit() {
this.waterPipe$ = this.stream$
.pipe(
map((water) => this.warmUp(water)) // <-- data transformation
).subscribe((warmWater) =>
console.log('take a shower!', warmWater));
}
}
We can stack different operators together. Donโt forget the comma after each operator.
// ...
this.waterPipe$ = this.stream$
.pipe(
filter(water => water === 'water'), <-- filter first
map(water => this.warmUp(water)) <-- then warm up
).subscribe(
warmWater => console.log('take a shower!', warmWater)
)
// ...
It also happens, that we want to perform neither transformation nor filtering. Instead, we want to use the water “outside” for a while and bring it back into a pipe without any modifications. For example, we would need it to make a water meter work. This is a so-called “side-effect” and in RxJS such side effects can be performed by the operator like “tap”.
import { tap } from 'rxjs/operators';
// ...
this.waterPipe$ = this.stream$
.pipe(
tap(
water => console.log('count water consumption', water)
)
).subscribe(
water => console.log('Drink it!', warmWater)
)
// ...
Dealing With Errors
Dealing with an aging pipe system can often lead to unexpected water leaks or even pipe bursts. In such challenging situations, making informed decisions is crucial. One smart approach is to promptly shut off the water supply to prevent further damage.
In the world of RxJS, the error raised in one of the operators completes the stream, so the data doesn’t “flow” downstream through other operators. Instead, the error goes to error-handling operators that gracefully manage and recover from errors in your code flow. These include (catchError, retry, retryWhen)
// ...
this.waterPipe$ = this.stream$
.pipe(
map(water => { throw Error("Break the Pipe") }),
catchError(error => {
// Handle the error...
console.error('Error:', error);
// fallback value
return of('') }),
filter(water => water === 'water'), // won't execute
map(water => this.warmUp(water)) // won't execute
).subscribe(
water => console.log('No water :(', water) //<-- water is ''
)
Combining Streams
Now that we’re familiar with how the water pipe system works in our household, let’s take our imagination a step further. Visualize how the water utility system is interconnected with other houses and the city’s larger water infrastructure.
In RxJS, it’s like a symphony of combining, connecting, and extracting water (data) from one stream to another, achieved through a combination of transforming operators like (switchMap, concatMap) and join creation operators (forkJoin, merge). This mix of operators allows you to orchestrate streams in many ways.
export class WaterStreamsComponent implements OnInit {
stream$ = of('water')
homeWaterPipe$?: Subscription
cityWaterPipe$ = of('city water')
warmUp = (water: string) => `${water}-warm`
ngOnInit() {
this.cityWaterPipe$
.pipe(
filter((water) => water === "city water")
)
this.homeWaterPipe$ = this.stream$
.pipe(
switchMap(() => this.cityWaterPipe$), // <-- connection
map(water => this.warmUp(water)))
.subscribe(
warmWater => console.log('take a shower!', warmWater)
)
}
//...
}
In this example, the filtered water is delivered by the cityWaterPipe$ to the homeWaterPipe$. The connection between 2 streams is possible thanks to the switchMap operator that under the hood subscribes to a water stream from cityWaterPipe$ and lets data (water) flow down to the map operator and eventually ends up in the subscribe handler.
That’s it!
I hope You enjoyed our RxJS streams water pipes analogy journey. Just as a friendly suggestion, there’s no need to go to the extreme of dismantling your own home to employ the tap operator and assess the purity of your water.๐
Which analogy helped you to understand or explain the observables?
