Antoine Antoine - 1 year ago 189
Javascript Question

Hot and shared Observable from an EventEmitter

Is there a way to have a hot observable from an

(or equivalent available in Angular 2 alpha 46 / RxJS 5 alpha)? i.e. if we subscribe after the value is resolved, it triggers with the previously resolved value. Similar to what we have when always returning the same promise.

Ideally, only using Angular 2 objects (I read somewhere a light RxJS would be embedded later to remove the dependency), otherwise importing RxJS is fine. AsyncSubject seems to match my need, but it is not available in RxJS 5 alpha.

I tried the following, without success (never triggers). Any idea about how to use it?

let emitter = new EventEmitter<MyObj>();
setTimeout(() => { MyObj());});
this.observable = emitter;
return this.observable.share();

Full plunker here comparing hot and cold

Usecase: reach some async objects only once (for example a series of HTTP calls merged/wrapped in a new
), but provide the resolved async object to any service/component subscribing to it, even if they subscribe after it is resolved (the HTTP responses are received).

EDIT: the question is not about how to merge HTTP responses, but how to get a (hot?) observable from EventEmitter or any equivalent available with Angular 2 alpha 46 / RxJS 5 alpha that allows to subscribe after the async result is retrieved/resolved (HTTP is just an example of async origin). myEventEmitter.share() does not work (cf plunker above), although it works with the Observable returned by HTTP (cf plunker from @Eric Martinez). And as of Angular 2 alpha 46, .toRx() method does not exist any more, the EventEmitter is the observable and subject itself.

This is something working well with promises as long as we always return the same promise object. Since we have observers introduced with HTTP Angular 2 services, I would like to avoid mixing promises and observers (and observers are said to be more powerful than promises, so it should allow to do what is easy with promises).

Specs about share() (I haven't found doc for version 5 alpha - version used by Angular 2) - working on the
returned by the Angular 2 HTTP service, not working on EventEmitter.

EDIT: clarified why not using the Observable returned by HTTP and added that not using RxJS directly would be even better.

EDIT: changed description: the concern is about multiple subscriptions, not merging the HTTP results.


Answer Source

ReplaySubject is doing what I was looking for. @robwormald provided a working example on gitter I slightly modified to better demonstrate.

Exposing HTTP response:

import {Injectable} from 'angular2/angular2';
import {Http} from 'angular2/http';
import {ReplaySubject} from '@reactivex/rxjs/dist/cjs/Rx'

export class PeopleService {
  constructor(http:Http) {
    this.people = new ReplaySubject(1);

      .map(res => res.json())

Subscribing multiple times:

// ... annotations
export class App {
  constructor(peopleService:PeopleService) {

    people.subscribe(v => {

    //some time later

    setTimeout(() => {
      people.subscribe(v => {
      people.subscribe(v => {

Full plunker

EDIT: the BehaviorSubject is an alternative. In this usecase, the difference is the initial value, for example if we want to display content from cache before updating with the HTTP response.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download