What Does Pipe Do Anyway?
Let’s take a quick look at the most common RxJS example. This code will log out
MouseEvents from clicking on the documuent:
import { fromEvent } from "rxjs"fromEvent(document, "click").subscribe(value => {console.log(value)})
So what happens when we add a pipe into the mix:
fromEvent(document, "click").pipe() //what happens here?.subscribe(value => {console.log(value)})
As it turns out, our MouseEvents are still logged out. This is the exact same behavior
as before. But why?
Pipe Returns an Observable
To demonstrate, the code belows shows that pipe returns its own observable:
const observable = fromEvent()console.log(observable === observable.pipe()) //true
So What’s an Operator?
An operator is a function you pass into a pipe. And pipe returns its own observable. So let’s think about what that means:
- An
operatorhas the original observable as the first argument - An
operatorreturns an observable
This most basic operator we can write looks like this:
const operator = observable => {//return the original observablereturn observable}fromEvent(document, "click").pipe(operator) //our operator only passes the observable through.subscribe(value => {console.log(value)})
Observable In, Observable Out
Since returning the original observable does nothing, let’s try returning a different observable. Pay special attention to the following:
- The
clickobservable never calls subscribe! It’s simply ignored by the operator - We
subscribeto thehiobservable
In fact:
console.log(click.pipe(() => hi) === hi) //true!
This isn’t at all what we want, but it proves “Observable in, Observable out”
Always subscribe to the Original Observable Inside of an Operator
The previous examples were simply to prove a point: Operators receive the original Observable return an Observable.
But the purpose of operators is to subscribe to the original Observable then change the behavior of the observer:
The simplest example I can think of involves subscribing and logging out “hi”.
const operator = observable => {observable.subscribe(value => {console.log("hi")})return observable}
With this operator in place, our demo will log out both "hi" and the MouseEvent.
The Operator’s True Purpose: Intercepting Values from Observables
Herein lies the secret sauce of operators:
- Create a new Observable inside the Operator
subscribeto the original Observable- Pass different values to
next
const operator = observable => {const newObservable = { //1. Create a new Observablesubscribe: next => {observable.subscribe(value => { //2. Subscribe to the originalnext("hi") //3. Pass a different value to `next`})}}return newObservable}
This opens the door to do anything inside an operator!
❗️ RxJS has APIs for creating new Observables (e.g.,
new Observable). It’s important to use the API instead of the plain object we wrote above to handle completion, errors, and many more cases.
Adding Arguments to Operators
Let’s extract the "hi" from our previous example to be an argument in our operator:
//beforeconst operator = observable => {...next("hi")//afterconst operator = message => observable => {...next(message)
Now we can pass "hi" as the first argument to our operator.
observable.pipe(operator("hi"))
operator(message) creates a function
that’s passed back to pipe which then passes in the Observable. If this is unfamiliar, it may help to
see it written out in long-form, then refactored step-by-step:
Long Version
observable.pipe(observable => {const newOperator = operator("hi")return newOperator(observable)})
Medium Version
observable.pipe(observable => {return operator("hi")(observable)})
Short Version
observable.pipe(operator("hi"))
All three versions are the same. It only depends on your exposure to these coding patterns for which version is the most comfortable to you. I’d recommend becoming familiar with the short version, because that’s what all the RxJS docs use.
You now have unlimited customization options. You can pass in values, functions, observables, or anything you want to customize how your new Observable will behave. The power is in your hands! 💪
Pipe Internals from Scratch
Let’s strip down our RxJS patterns to the bare minimum required to “push”
values to a next function. Here’s our next function:
const next = value => {console.log(value)}
Next, we’ll create a barebones Observable; an Object with a subscribe method
which takes next as a function and invokes it:
const observable = {subscribe: next => {next("hello")}}
Finally, invoke subscribe with next and you should see “hello” in the console:
Here Comes the Pipe
[Insert “ceci n’est pas une pipe” joke here]
The pipe method will sit in-between the Observable and the Observer allowing
us to operate on what happens between the beginning and the end:
observable.pipe(/*do something awesome*/).subscribe(next)
To create a pipe method, we need to pass the Observable itself (AKA this in JavaScript)
down through the pipe so it has access to the internals:
pipe(operator){operator(this)}
We can drop that pipe method directly on to the Observable:
const observable = {subscribe: next => {next("hello")},pipe(operator){return operator(this)}}
Let’s create an operator that does nothing:
const operator = observable => {return observable}
And now drop the operator into the pipe:
observable.pipe(operator).subscribe(next)
You’ll see that we get the same "hello" output as before. The Observable
is going in the function and out the function unchanged:
pipe Can Take Multiple Operators
If you’ve seen many pipe demos, you’ve probably seen:
.pipe(map, filter, scan)
Multiple arguments is simply an API choice for convenience by the RxJS team. If they would have
limited pipe to one argument, you would have to chain pipe like this:
.pipe(map).pipe(filter).pipe(scan)
To enable multiple operators in our demo, we have to treat them as an Array. We can use
the ... array syntax to pull in every operator as an Array. Then use reduce on that
Array to apply each operator to the observable:
pipe(...operators) {return operators.reduce((observable, operator) => {return operator(observable)}, this)}
Now we’re free to pass in as many operators as we want: