Demo: retry
retry operator can resubscribe the source observable in case we encounter an error in the incoming items for the given number of times.
//When (v == 3), add the given error to the output stream
let input = timer(500,1000).pipe(concatMap(v =>
(v==3)? throwError(()=>new Error("E")):of(v)
));
//retry 2 times on error
let process1 = retry(2);
//catch any error and convert to the given observable
let process2 = catchError(err => of('e'));
let output = input.pipe(process1,process2)
.subscribe(v=>console.log(v))