Observables for everything!

What's an Observable?

Encapsulates an asynchronous process that emits a sequence of output values (think: event source). Their output can terminate normally or abnormally, and they can be cancelled (unsubscribed).

const observable = Observable.of(1, 2, 3, 4);
const subscription = observable.subscribe({ // the observer
	next(value) { ... } // 0 or more times
	error(err) { ... } // at most once
	complete() { ... } // at most once
});
subscription.unsubscribe();

Semantics

  1. After subscribing, the subscription is in a "running" state where it calls next.
  2. At some point the process may finish and call only one of error or complete.
  3. After calling one of error or complete there will be no more callbacks.
  4. After calling unsubscribe there will be no more callbacks.

Single-value observables are still useful

For example, you can define an observable to make a single AJAX call: it will emit exactly one value:

function ajax(options) {
	return new Observable(observer => {
		// the subscriber function (awful name)
		const xhr = nanoajax.ajax(options, (code, text) => {
			if (code === 200) {
				observer.next({ code, text, xhr });
				observer.complete();
			}
			else observer.error(new HttpError(code, xhr));
		});
		return () => { xhr.abort(); }
	});
}

Single-value Observables are very similar to Promises -- but Promises don't support cancellation.

Why would you? 1.1

Take for example this code for the search box in md-admin:

onSuggestionsFetchRequested({value}) {
	this.version += 1; // ****
	const version = this.version; // ****
	microajax({
		url: `/api/search?q=${value}`,
		success(data) {
			if (this.version === version) // ****
				this.setState({ suggestions: data });
		}
	});
}

Why would you? 1.2

Contrast with subscribing to an observable:

onSuggestionsFetchRequested({value}) {
	if (this._subscription) this._subscription.unsubscribe();
	this._subscription = ajax({ url: `/api/search?q=${value}` })
		.subscribe(
			data => this.setState({ suggestions: data })
		);
}

componentWillUnmount() {
	if (this._subscription) this._subscription.unsubscribe();
}

Old search requests whose data will be ignored are now actively cancelled.

The naming of "subscribe", "subscription" etc may prompt people to do hosuekeeping. The original version didn't do any- XHR slots are limited on browsers, and congestion happens.

Why would you? 2.1

In TIMWebUI the search request was delayed for some time, meaning more resources to manage:

onSuggestionsFetchRequested({value}) {
	if (this._timeout) clearTimeout(this._timeout);
	this._timeout = setTimeout(() => {
		this.version += 1;
		const version = this.version;
		microajax({
			url: `/ui/api/search?q=${value}`,
			success(data) {
				if (version === this.version)
					this.setState({ suggestions: data });
			}
		}
	}, debounceTimeout);
}

Neither the timer nor the XHR were cleaned up on unmount.

Why would you? 2.2

Observables compose very well. They have a concat method, so you can simply concatenate the ajax call onto a delay for the debounce timeout:

onSuggestionsFetchRequested({value}) {
	if (this._subscription) this._subscription.unsubscribe();
	this._subscription = delayMillis(debounceTimeout) // ***
			.concat(ajax({ url: `/api/search?q=${value}` }))
		.subscribe(
			data => this.setState({ suggestions: data })
		);
}

The behaviour of delaying for the debounce timeout has been encapsulated into the observable/subscription. Unsubscribing will deal with cleaning up either the timer or the XHR, as appropriate.

Implementation

export function delayMillis(millis) {
	return new Observable(observer => {
		const timer = setTimeout(() => {
			observer.complete();
		}, millis);
		return () => {
			clearTimeout(timer);
		};
	});
}

Summary

  • Observable - asynchronous supplier of a sequence of values, maybe 0 or 1, that supports cancellation
  • Simple to write Observables that encapsulate a delay timer, interval timer, HTTP request
  • Can be composed easily to transform, concatenate, merge result sequences etc.
  • Encapsulation of tasks and processes into Observable supports separation of concerns- TOTF's subscription actions are a prime example

Observables are not a replacement for Redux, they can be used together (TOTF does this).