Observables in Parallel 1

Mike BerrymanSince observables are being heavily pushed in Angular 2 I’ve been spending some time getting acquainted with how to use them.  Specifically I’ve been “translating” from a promise-based solution to an observable-based solution for certain functionality I’ve been using promises for.  In this blog post I’m going to address how to utilize observables in parallel.

When I say “observables in parallel”, what I mean is multiple observables that are called all at once with the calling code waiting for all the observables to complete their respective actions before continuing.  With promises you could create an array of promises, then utilize the Promise.all() function to wait until all the promises have completed before moving on.  Each individual promise would utilize the .then() function to do whatever needs to be done with that promises’s results.

The most relevant use-case for this scenario in my opinion is to fetch data from multiple web service endpoints before continuing, so that’s what my examples will revolve around.  But the underlying concept would be the same for any use-case where multiple asynchronous operations need to be performed at the same time and all must be completed before moving on.

I’ve make a Plunker to demonstrate performing 3 asynchronous operations in parallel using promises and observables.  The demos will generate 3 random numbers in 3 different asynchronous operations when you click the “Trigger” button, with each operation starting at the same time but taking 1, 2, or 3 seconds to “complete”.  Once every operation is completed, a “wrapper” that is waiting for everything to finish will update the overall status.

@Component({
  selector: 'my-app',
  template: `
<div>
<h1>Multiple Asynchronous Operations in Parallel</h1>

<hr />

<div>
<h3>Promises</h3>
<div>Random # from first Promise: {{promiseRandResult1}}</div>
<div>Random # from first Promise: {{promiseRandResult2}}</div>
<div>Random # from first Promise: {{promiseRandResult3}}</div>
<div>All Promises Status: {{allPromisesStatus}}</div>
<input type="button" [disabled]="allPromisesStatus === 'Running'" value="Trigger" (click)="doMultipleAsnycOperationsWithPromises()" /></div>

<hr />

<div>
<h3>Observables</h3>
<div>Random # from first Observable: {{observableRandResult1}}</div>
<div>Random # from first Observable: {{observableRandResult2}}</div>
<div>Random # from first Observable: {{observableRandResult3}}</div>
<div>All Observables Status: {{allObservablesStatus}}</div>
<input type="button" [disabled]="allObservablesStatus === 'Running'" value="Trigger" (click)="doMultipleAsnycOperationsWithObservables()" /></div>
</div>
`,
})
export class App {
  public promiseRandResult1: number = 0;
  public promiseRandResult2: number = 0;
  public promiseRandResult3: number = 0;
  public allPromisesStatus: string = "Idle";
  public observableRandResult1: number = 0;
  public observableRandResult2: number = 0;
  public observableRandResult3: number = 0;
  public allObservablesStatus: string = "Idle";
  constructor() {
  }

  public doMultipleAsnycOperationsWithPromises(): void {
    this.promiseRandResult1 = 0;
    this.promiseRandResult2 = 0;
    this.promiseRandResult3 = 0;
    this.allPromisesStatus = "Running";

    this._multiplePromisesWorkerAsync().then((result) => {
      this.allPromisesStatus = "Idle";
    });
  }

  public doMultipleAsnycOperationsWithObservables(): void {
    this.observableRandResult1 = 0;
    this.observableRandResult2 = 0;
    this.observableRandResult3 = 0;
    this.allObservablesStatus = "Running";

    this._multipleObservablesWorkerAsync().subscribe((result) => {
      this.allObservablesStatus = "Idle";
    });
  }

  private _multiplePromisesWorkerAsync(): Promise {
    let promises: Promise[] = [];

    promises.push(new Promise<any>(resolve => setTimeout(resolve, 1000))
      .then(() => { return Promise.resolve((Math.random() * 100).toFixed(0)); })
      .then((randNum) => this.promiseRandResult1 = randNum));

    promises.push(new Promise<any>(resolve => setTimeout(resolve, 2000))
      .then(() => { return Promise.resolve((Math.random() * 100).toFixed(0)); })
      .then((randNum) => this.promiseRandResult2 = randNum));

    promises.push(new Promise<any>(resolve => setTimeout(resolve, 3000))
      .then(() => { return Promise.resolve((Math.random() * 100).toFixed(0)); })
      .then((randNum) => this.promiseRandResult3 = randNum));

    return Promise.all(promises);
  }

  private _multipleObservablesWorkerAsync(): Observable {
    let observables: Observable[] = [];

    observables.push(new Observable(observer => {
      setTimeout(() => {
        observer.next((Math.random() * 100).toFixed(0));
        observer.complete();
      }, 1000);
    }).do((randNum) => {
      this.observableRandResult1 = randNum;
    }));

    observables.push(new Observable(observer => {
      setTimeout(() => {
        observer.next((Math.random() * 100).toFixed(0));
        observer.complete();
      }, 2000);
    }).do((randNum) => {
      this.observableRandResult2 = randNum;
    }));

    observables.push(new Observable(observer => {
      setTimeout(() => {
        observer.next((Math.random() * 100).toFixed(0));
        observer.complete();
      }, 3000);
    }).do((randNum) => {
      this.observableRandResult3 = randNum;
    }));

    return Observable.forkJoin(observables);
  }
}

The utility functions that make this possible are the Promise.all() and Observable.forkJoin() functions.  These functions take an array of promises/observables and combine them all into one Promise/Observable that can be used to wait until all the individual promises/observables are completed.

One of the challenges of performing multiple asynchronous operations at once is doing something with each individual operation’s result.  With the Promise.all() and Observable.forkJoin() functions each individual operation’s result is included in an array that is returned to the caller.  I find this clunky and messy since you now have to keep track of what order the operations were added to the array passed to Promise.all()/Observable.forkJoin() to know which value in the overall resulting array corresponds to which asynchronous operation.  I prefer to do whatever I need to do with each operation’s result “within” that operation.

For promises, this means using the .then() chain function.  With a promise .then() is how you get to the result of the promise.  The nice thing about the .then() function is that it returns a Promise with the value being whatever is returned from the .then() function.  Since I’d rather not have the Promise.all() function “know” about each promise’s results I’m using the .then() function in order to have each individual promise do whatever is necessary with its result.  In the demo each promise that is added to the promises array has a .then() function that will set one of the page variables to the result.

For observables, there are 2 (that I know of at least) chain functions that operate similar to a promise’s .then().  The .do() and the .map() functions allow you to perform some operation on the result, the difference being .do() will keep sending the original result down the observable chain whereas .map() will allow you to manipulate the result and send the manipulated version down the chain.  Since I have no reason to change the result in any way I’m using the .do() in the example to set page variables.

So by using the Observable.forkJoin() aggregator you can perform as many observable actions in parallel with the ability to wait until all are completed.

One comment

  1. Pingback: Cascading Observables « The Chronicles of Nerdia

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s