Cascading Observables Reply

Mike BerrymanIn my Observables in Parallel blog post I discussed how I’ve been moving away from Promises in favor of Observables and how to translate functionality from promise-based solutions to observable-based solutions.  The aforementioned blog post discussed how to wait for multiple Observables to complete before performing some action.  In that scenario all the Observables were independent of each other, meaning none of the Observables depended on the results of another.

In this blog post I’m going to discuss how to use Observables that do depend on a different Observable to complete before executing – AKA Cascading Observables.  One such use case scenario for Cascading Observables is when you need to call a web service to get some data, then use that data to call another web service, all in a single operation.

Like my previous post, I’ve made a Plunker for demonstration purposes.  This example generates a random number between 1 and 100, performs some calculations on that number, then performs another calculation on that result.  All 3 of these actions are done in a setTimeout() block with a 1 second delay one after another to simulate asynchronous operations.  The “Trigger” and “Trigger and Wait” buttons perform the same action but demonstrate different ways to utilize the concept of Cascading Observables (which I’ll go into in a moment).  Here’s the code:

@Component({
  selector: 'my-app',
  template: `
<div>
<h1>Cascading Async Operations</h1>

<hr />

<div>
<table>
<tr>
<td style='white-space: nowrap;'>Random Number from first Observable:</td>
<td>{{observableRandResult}}</td>
</tr>
<tr>
<td style='text-align: right;'>x 2:</td>
<td>{{observableCalcResult1}}</td>
</tr>
<tr>
<td style='text-align: right;'>x 3:</td>
<td>{{observableCalcResult2}}</td>
</tr>
</table>
Top-Level Observable Status: <span [style.color]="mainObservableStatus === 'Idle' ? 'Blue' : 'Red'">{{mainObservableStatus}}</span>
<input type="button" [disabled]="calculating" value="Trigger" (click)="doCascadingAsyncOperations(false)" />
        <input type="button" [disabled]="calculating" value="Trigger and Wait" (click)="doCascadingAsyncOperations(true)" /></div>
</div>
`,
})
export class App {
  observableRandResult: number;
  observableCalcResult1: number;
  observableCalcResult2: number;
  mainObservableStatus: string = "Idle";
  calculating: boolean = false;
  constructor() {}

  doCascadingAsyncOperations(waitForAll: boolean): void {
    this.calculating = true;
    this.observableRandResult = null;
    this.observableCalcResult1 = null;
    this.observableCalcResult2 = null;
    this.mainObservableStatus = "Running";
    if (waitForAll) {
      this._cascadingAsyncOperationsWorker2().subscribe((result) => {
        this.mainObservableStatus = "Idle";
      });
    }
    else {
      this._cascadingAsyncOperationsWorker1().subscribe((result) => {
        this.mainObservableStatus = "Idle";
      });
    }
  }

  _cascadingAsyncOperationsWorker1(): Observable {
    return new Observable(mainObserver => {
      setTimeout(() => {
        mainObserver.next((Math.random() * 100).toFixed(0));
        mainObserver.complete();
      }, 1000);
    })
    .do(mainObservableResult => {
      this.observableRandResult = mainObservableResult;
      (new Observable(innerObserver1 => {
        setTimeout(() => {
          innerObserver1.next(mainObservableResult * 2);
          innerObserver1.complete();
        }, 1000);
      })
      .do(innerObservable1Result => {
        this.observableCalcResult1 = innerObservable1Result;
        (new Observable(innerObserver2 => {
          setTimeout(() => {
            innerObserver2.next(innerObservable1Result * 3);
            innerObserver2.complete();
          }, 1000);
        })
        .do(innerObservable2Result => {
          this.observableCalcResult2 = innerObservable2Result;
          this.calculating = false;
        })).subscribe();
      })).subscribe();
    });
  }

  _cascadingAsyncOperationsWorker2(): Observable {
    return new Observable(wrapperObserver => {
      (new Observable(mainObserver => {
        setTimeout(() => {
          mainObserver.next((Math.random() * 100).toFixed(0));
          mainObserver.complete();
        }, 1000);
      })
      .do(mainObservableResult => {
        this.observableRandResult = mainObservableResult;
        (new Observable(innerObserver1 => {
          setTimeout(() => {
            innerObserver1.next(mainObservableResult * 2);
            innerObserver1.complete();
          }, 1000);
        })
        .do(innerObservable1Result => {
          this.observableCalcResult1 = innerObservable1Result;
          (new Observable(innerObserver2 => {
            setTimeout(() => {
              innerObserver2.next(innerObservable1Result * 3);
              innerObserver2.complete();
            }, 1000);
          })
          .do(innerObservable2Result => {
            this.observableCalcResult2 = innerObservable2Result;
            this.calculating = false;
            wrapperObserver.next();
            wrapperObserver.complete();
          })).subscribe();
        })).subscribe();
      })).subscribe();
    });
  }
}

The meat of the functionality is located in the 2 _cascadingAsyncOperationsWorker() functions.  They’re both very similar so we’ll focus on _cascadingAsyncOperationsWorker1() first.

When you create an Observable, you include a parameter that that you can call the .next(), .error() or .complete() functions in within the Observable in order to perform certain operations.  The .next() function is what allows us to pass values around; specifically to our .do() function that’s attached to each Observable.  The .next() function is also what triggers the callback for the subscriber to let the subscriber know the operation has completed.

As a side-note, in this example you don’t necessarily need a .do() function in order to do something with the values being generated in each Observable.  This is because we have access to the blocks of code inside the Observables where the values are being created.  In a lot of situations this may not be the case, so using the .do() (or .map()) function allows us to get the result of the Observable and do something with it outside of the subscriber’s callback.

So the first Observable generates a random number after 1 second, then passes that number to the second Observable which multiples it by 2 after 1 second, which then passes THAT product to a final Observable that waits 1 second and multiplies it by 3.  If you click “Trigger” you can see this in action.

The 2 inner Observables are being subscribed to immediately when they’re created in order to trigger their respective functionality, but the top-level Observable doesn’t run until it is subscribed to, which is what happens when you click “Trigger”.  However you’ll notice that when you click “Trigger” the top-level observable status changes to “Running”, but then changes back to “Idle” before the 2nd and 3rd Observables in the chain have completed!

This is because the top-level Observable is triggering its subscriber’s callback as soon as it emits a value by calling the .next() function.  In a lot of situations this isn’t ideal – you want to wait until all the Observables in the chain have completed before performing some action.  “Trigger and Wait” will do just that.

The _cascadingAsyncOperationsWorker2() is called by “Trigger and Wait” and is identical to _cascadingAsyncOperationsWorker1() except that the top-level Observable is wrapped in a dummy Observable.  We’ll use this to wait until the final Observable in the chain is completed before triggering the subscriber’s callback.  Notice that the wrapperObserver parameter from the now-top-level Observable isn’t being used until the last Observable in the chain.  This is how we force the subscriber to wait until everything’s completed.

And now you can utilize Observables in a cascading fashion in order to have an Observable depend on the results of another, all in one call!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s