This site runs best with JavaScript enabled.

Rolling Your Own Creation Operators in RxJS

johnlindquist

September 27, 2019


The Value of Writing Your Own Operators

When you import {Observable} from "rxjs", you open up a world of possibility. Granted, RxJS ships a lot of operators that handle a lot of edge cases, but we've seen many people abandon lodash/underscore to write their own array utils. So why not RxJS? You could even argue you want to save on file size, why bring in the fromEvent code that supports node events if you're only settings up a DOM click event? Hopefully I can help take the edge off of rolling your own.

As with all Operators, you begin with:

import { Observable } from 'rxjs'

📝 Someday, TC-39 might add an Observable primitive and we wouldn't even have to install RxJS

The Creation Operator Template

The following is a template you can use whenever you want to make a creation operator:

const creationOperatorTemplate = config => {
const source = new Observable(observer => {
return () => {
//unsubscribe logic
}
})
return source
}

The main thing to notice is you're creating and returning an Observable.

of - push a single value

of passes in a value:

const of = value =>

then sends that value through observer.next and completes immediately:

observer.next(value) //push out one value
observer.complete() //and done

Full operator:

const of = value => {
const source = new Observable(observer => {
observer.next(value) //push out one value
observer.complete() //and done
})
return source
}

fromEvent - push events

fromEvent takes the target and eventType (e.g., document and "click")

const fromEvent = (target, eventType) =>

Then uses observer.next as the event handler:

const next = observer.next.bind(observer)
target.addEventListener(eventType, next)

Lastly, unsubscribe will remove the eventHandler:

target.removeEventListener(eventType, next)

The full fromEvent operator:

const fromEvent = (target, eventType) => {
const source = new Observable(observer => {
const next = observer.next.bind(observer)
target.addEventListener(eventType, next)
return () => {
target.removeEventListener(eventType, next)
}
})
return source
}

interval - push an incremented number every X seconds

interval passes in the delay

const interval = delay =>

Uses the delay in a setInterval:

setInterval(callback, delay)

Our setInterval will call the callback, so we'll use next inside the callback and increment an i just so we have a value to pass to next (you can pass whatever you want to next, an incrementing number makes sense for an interval):

let i = 0
const callback ()=> {
next(i++)
}

Now to unsubscribe, we need the id so we can run clearInterval:

const id = setInterval(callback, delay)
return () => {
clearInterval(id)
}

The full interval operator:

const interval = delay => {
const source = new Observable(observer => {
const next = observer.next.bind(observer)
let i = 0
const callback ()=> {
next(i++)
}
const id = setInterval(callback, delay)
return () => {
clearInterval(id)
}
})
return source
}

merge - push values from all observables

merge takes in an Array of observables:

const merge = (...observables) =>

Runs subscribe on all of them:

observables.forEach(observable => {
observable.subscribe(newObserver)
})

But, to unsubscribe from all the observables, we'll need all of their subscriptions, so we'll need to use map:

const subscriptions = observables.map(observable => {
const subscription = observable.subscribe(newObserver)
return subscription
})

Then we can unsubscribe from all of them by looping through the subscriptions:

return () => {
subscriptions.forEach(subscription => {
subscription.unsubscribe()
})
}

The edge-case for merge is that we want to wait until all the observables call complete before calling complete on the main observer. So we can count the observables:

let active = observables.length

Then decrement the count each time one of them completes. If the count is 0, then call the main complete:

complete: () => {
active--
if (active === 0) {
complete()
}
}
const merge = (...observables) => {
const source = new Observable(observer => {
const next = observer.next.bind(observer)
const complete = observer.complete.bind(observer)
const error = observer.error.bind(observer)
let active = observables.length
const subscriptions = observables.map(observable => {
const newObserver = {
next,
error,
complete: () => {
active--
if (active === 0) {
complete()
}
},
}
const subscription = observable.subscribe(newObserver)
return subscription
})
return () => {
subscriptions.forEach(subscription => {
subscription.unsubscribe()
})
}
})
return source
}

combineLatest - Grab the latest value from each Observable, push them all through as an Array

We'll pass in an Array of Observables:

const combineLatest = (...observables) =>

Subscribe to all of them and get their subscriptions (just like we did with merge):

const subscriptions = observables.map((observable, i) => {
const subscription = observable.subscribe(value => {})
return subscription
})

Then we can create an Array of values that matches the length of the Array of Observables:

const values = Array.from({ length: observables.length })

To wait for each Observable to emit a value, we'll fill our Array of values with placeholder symbols:

const waiting = Symbol('waiting')
const values = Array.from({ length: observables.length }).fill(waiting)

Then inside next, we'll only push to observer.next if all of the values no longer contain that placeholder:

values[i] = value
if (values.every(value => value !== waiting)) {
observer.next(values)
}

Below is the full implementation:

const combineLatest = (...observables) => {
return new Observable(observer => {
let values = Array.from({ length: observables.length })
const subscriptions = observables.map((observable, i) => {
const subscription = observable.subscribe(value => {
values[i] = value
if (values.every(value => value)) {
observer.next(values)
}
})
return subscription
})
return () => {
subscriptions.forEach(subscription => {
subscription.unsubscribe()
})
}
})
}
Share article