Encapsulates an asynchronous process that emits a sequence of output values (think: event source). Their output can terminate normally or abnormally, and they can be cancelled (unsubscribed).
const observable = Observable.of(1, 2, 3, 4);
const subscription = observable.subscribe({ // the observer
next(value) { ... } // 0 or more times
error(err) { ... } // at most once
complete() { ... } // at most once
});
subscription.unsubscribe();
next
.error
or complete
.error
or complete
there will be no more callbacks.unsubscribe
there will be no more callbacks.For example, you can define an observable to make a single AJAX call: it will emit exactly one value:
function ajax(options) {
return new Observable(observer => {
// the subscriber function (awful name)
const xhr = nanoajax.ajax(options, (code, text) => {
if (code === 200) {
observer.next({ code, text, xhr });
observer.complete();
}
else observer.error(new HttpError(code, xhr));
});
return () => { xhr.abort(); }
});
}
Single-value Observables are very similar to Promises -- but Promises don't support cancellation.
Take for example this code for the search box in md-admin:
onSuggestionsFetchRequested({value}) {
this.version += 1; // ****
const version = this.version; // ****
microajax({
url: `/api/search?q=${value}`,
success(data) {
if (this.version === version) // ****
this.setState({ suggestions: data });
}
});
}
Contrast with subscribing to an observable:
onSuggestionsFetchRequested({value}) {
if (this._subscription) this._subscription.unsubscribe();
this._subscription = ajax({ url: `/api/search?q=${value}` })
.subscribe(
data => this.setState({ suggestions: data })
);
}
componentWillUnmount() {
if (this._subscription) this._subscription.unsubscribe();
}
Old search requests whose data will be ignored are now actively cancelled.
The naming of "subscribe", "subscription" etc may prompt people to do hosuekeeping. The original version didn't do any- XHR slots are limited on browsers, and congestion happens.
In TIMWebUI the search request was delayed for some time, meaning more resources to manage:
onSuggestionsFetchRequested({value}) {
if (this._timeout) clearTimeout(this._timeout);
this._timeout = setTimeout(() => {
this.version += 1;
const version = this.version;
microajax({
url: `/ui/api/search?q=${value}`,
success(data) {
if (version === this.version)
this.setState({ suggestions: data });
}
}
}, debounceTimeout);
}
Neither the timer nor the XHR were cleaned up on unmount.
Observables compose very well. They have a concat
method, so you can simply
concatenate the ajax call onto a delay for the debounce timeout:
onSuggestionsFetchRequested({value}) {
if (this._subscription) this._subscription.unsubscribe();
this._subscription = delayMillis(debounceTimeout) // ***
.concat(ajax({ url: `/api/search?q=${value}` }))
.subscribe(
data => this.setState({ suggestions: data })
);
}
The behaviour of delaying for the debounce timeout has been encapsulated into the observable/subscription. Unsubscribing will deal with cleaning up either the timer or the XHR, as appropriate.
export function delayMillis(millis) {
return new Observable(observer => {
const timer = setTimeout(() => {
observer.complete();
}, millis);
return () => {
clearTimeout(timer);
};
});
}
Observables are not a replacement for Redux, they can be used together (TOTF does this).