Marcus Riemer Marcus Riemer - 21 days ago 5
TypeScript Question

How do I ensure that a call to subscribe on the observer initially receives the most recent value?

I am using

rxjs
together with Angular 2 and Typescript. I would like to share a common web-resource (a "project" in the context of my app, essentially a JSON document) between multiple components. To achieve this I introduced a service that exposes an observable, which will be shared by all clients:

/**
* Handed out to clients so they can subscribe to something.
*/
private _observable : Observable<Project>;

/**
* Used to emit events to clients.
*/
private _observer : Observer<Project>;

constructor(private _http: Http) {
// Create observable and observer once and for all. These instances
// are not allowed to changed as they are passed on to every subscriber.
this._observable = Observable.create( (obs : Observer<Project>) => {
this._observer = obs;
});
}


Clients now simply get a reference to that one
_observable
and subscribe to it.

/**
* Retrieves an observable that always points to the active
* project.
*/
get ActiveProject() : Observable<Project> {
return (this._observable);
}


When some component decides to actually load a project, it calls the following method:

/**
* @param id The id of the project to set for all subscribers
*/
setActiveProject(id : string) {
// Projects shouldn't change while other requests are in progress
if (this._httpRequest) {
throw { "err" : "HTTP request in progress" };
}

this._httpRequest = this._http.get('/api/project/' + id)
.catch(this.handleError)
.map(res => new Project(res.json()));

this._httpRequest.subscribe(res => {
// Cache the project
this._cachedProject = res;
// Show that there are no more requests
this._httpRequest = null;
// Inform subscribers
this._observer.next(this._cachedProject)

console.log("Got project");
});
}


It basically does a HTTP request, transforms the JSON document into a "proper" instance and calls
this._observer.next()
to inform all subscribers about the change.

But if something subscribes after the HTTP request has already taken place, the see nothing until a new HTTP request is issued. I have found out that there is some kind of caching (or replay?) mechanism in
rxjs
that seems to adress this, but I couldn't figure out how to use it.

tl;dr: How do I ensure that a call to
subscribe
on the observer initially receives the most recent value?

Extra question: By "pulling the observer out of the observable" (in the constructor), have I essentially created a subject?

Answer

That's what BehaviorSubject does

import { BehaviorSubject } from 'rxjs/subject/BehaviorSubject';
...
obs=new BehaviourSubject(4);
obs.subscribe(); //prints 4
obs.next(3); //prints 3
obs.subscribe(); //prints 3