From Transducers to Conduits and Back Again
posted
Not too long ago, I wrote a blog post that proposed the following claim: That conduits and transducers are one and the same. In that blog post, I said that a Conduit-like library would be interesting to look at in Clojure, if only for readability. Well, I have just implemented such a library: clj-conduit.
The general idea of a conduit-like library is to have two “primitives” you can
use: await
and yield
. The naming is rather arbitrary; they might as well
have been named receive
and send
. As you would expect, await
will wait and
get back a value from a “stream”, whereas yield
will send a value to a
“stream”, if that stream wants the value.
If you replace “stream” with channel above, you more or less end up with
core.async. The biggest difference is that we can only await
on a single
stream, and we can only yield
on a single stream.
Note that the function await
already exists in clojure.core, so if you want to
:refer
it, you should also exclude it from core like so:
(ns ...
(:refer-clojure :exclude [await]))
(await)
will read the next value from the source, and (yield val)
will emit
a value to the sink. Both will “block” until they’ve either received or sent a
single value. If there are no more values to read, await
“blocks”
indefinitely, and if the sink doesn’t want any more values, yield
will “block”
indefinitely as well.
To create a conduit, you wrap the macro conduit
around the code that is to be
executed. Unfortunately, just like the go
-macro in core.async, the await
and
yield
calls has to be visible to the macro:
(defn await-n
[n]
(dotimes [_ n]
(await))) ;; <- not ok, even if called inside a conduit
The return value of a conduit
block is the transducer this conduit
represents.
If the conduit block has been run to completion, then the conduit will signal that it won’t process more values upstream. I.e. when it’s converted to a transducer, it will pass Reduced back.
… and that is really all you have to know for basic usage. Let’s see how we
can implement the transducers for map
and take
in conduit:
(ns my.lib
(:refer-clojure :exclude [await])
(:require [com.hypirion.conduit :refer [await yield conduit]]))
(defn mapping [f]
(conduit
(while true
(yield (f (await))))))
(defn taking [n]
(conduit
(dotimes [_ n]
(yield (await)))))
user=> (sequence (mapping #(+ 10 %)) (range 10))
(10 11 12 13 14 15 16 17 18 19)
user=> (sequence (comp (mapping inc) (taking 10)) (range 10 200))
(11 12 13 14 15 16 17 18 19 20)
mapping
just reads a value with await
, maps it over f
, then sends it
downstream with yield
. If we don’t have more values upstream, we will stop at
the await
call, and if the sink doesn’t want more values, we’ll stop at the
yield
call. Then we wrap this in a while true
loop to do this with all the
elements we can send.
taking
just sends values from the source down to the sink n
times, then
stops. Since the conduit is stopped, no more values are passed down.
taking
shows part of the value of conduits. It is much more compact and (in my
opinion) readable compared to the transducer version of take
as implemented in
clojure.core:
(defn take
([n]
(fn [rf]
(let [nv (volatile! n)]
(fn
([] (rf))
([result] (rf result))
([result input]
(let [n @nv
nn (vswap! nv dec)
result (if (pos? n)
(rf result input)
result)]
(if (not (pos? nn))
(ensure-reduced result)
result)))))))
...)
Nonblocking Awaits
The first problem with conduits is something we hit on when we try to implement
partition-all
. It needs to be signalled in some way that we have no more
input, but we have no way to do that with the current primitives.
This can be introduced in a couple of ways. I decided to implement it via a new
function await!
, which can be considered a “nonblocking” version of await
:
await!
works like await
, but will send nil
back when there are no more
values to read. You can also provide a default value to await!
, which will be
returned if there are no more values.
mapping
could be implemented with await!
if we wanted to:
(defn mapping [f]
(conduit
(loop [v (await! ::eos)]
(when-not (identical? v ::eos)
(yield (f v))
(recur (await! ::eos))))))
but I think it is slightly harder to read this way.
With await!
, partitioning-all
is relatively straightforward to implement:
(defn partitioning-all
[n]
(conduit
(loop [vs [(await)]]
(if (= n (count vs))
(do (yield vs)
(recur [(await)]))
(let [v (await! ::eos)]
(if-not (identical? v ::eos)
(recur (conj vs v))
(yield vs)))))))
partitioning-all
uses await
to read the first value to put in the vector. If
the list has the desired size, we yield it, and if not, we try to read more
values with await!
. We use ::eos
as the value to check whether we had a new
value or not, and if there are no more values to read, we yield the final vector
and stop.
Since it’s usually common to exit quickly with no value from await!
, we can
add two macros that help us with that: if-let-await!
and when-let-await!
.
They work in the same fashion as if-let
and when-let
:
(defn partitioning-all
[n]
(conduit
(loop [vs [(await)]]
(if (= n (count vs))
(do (yield vs)
(recur [(await)]))
(if-let-await! v
(recur (conj vs v))
(yield vs))))))
Nonblocking Yields
await!
makes sense if you want to flush out some values downstream before
exiting – it’s in some sense equivalent to the complete part of a transducer,
the 1-ary function.
In certain cases, it may also make sense to have a nonblocking yield: Sometimes you want to ensure that a final piece of code is run before the conduit finishes, for example finalizing resource usage1.
Consider the strange case where you also want to send the values you receive to a go channel, then close the channel when you’re done. You may attempt to do it in this fashion:
(defn yield-to-channel [c]
(conduit
(loop []
(when-let-await! v
(async/>!! c v)
(yield v)
(recur)))
(async/close! c)))
However, remember that if the sink doesn’t want any more values in, then yield
will block indefinitely, and we will never end up closing the channel.
The answer for this one is rather straightforward: We add a new primitive called
yield!
. yield!
is a “nonblocking” yield
, which will return true if the
sink still wants more input, false otherwise.
Now we can ensure that the channel is closed by checking the value returned by
yield!
:
(defn yield-to-channel [c]
(conduit
(loop []
(when-let-await! v
(async/>!! c v)
(when (yield! v)
(recur))))
(async/close! c)))
Implementation Details
Alright, how would one even go about making a library like this? The bad news is
that the work to get this implemented properly is pretty hard and complex: You
have to implement macros that walk the entire structure of the code, converts it
into a state machine, then provide escape hatches for certain functions, like
await
and yield
.
The good news is that there is a really good library named core.async with powerful macro internals that does exactly this work for us. For an explanation of the internals, have a look at the youtube videos Deep Walking Macros and Core Async Go Macro Internals - Part I and Part II. They are very thorough and is worth a watch if you’re interested in how core.async works internally.
Since we can just piggyback onto the macro system in core.async, all the code
for clj-conduit is about 140-150 lines if we strip away comments and
documentation. If you’ve got a grasp of how ioc_macros.clj
works, then the
code should be relatively straightforward to read.
The Good Parts
We’ve seen that a conduit version of take
is more readable than take
implemented in the usual transducer “style”. However, I think its effect is more
visible for any type of transducer that will work on one or more inputs/outputs
in one “go”.
interpose
is a small example of this. It’s implemented like this with conduit:
(defn interposing
[sep]
(conduit
(yield (await))
(while true
(let [val (await)]
(yield sep)
(yield val)))))
and the actual version in clojure.core looks like this:
(defn interpose
[sep]
(fn [rf]
(let [started (volatile! false)]
(fn
([] (rf))
([result] (rf result))
([result input]
(if @started
(let [sepr (rf result sep)]
(if (reduced? sepr)
sepr
(rf sepr input)))
(do
(vreset! started true)
(rf result input))))))))
It’s perhaps not surprising that conduits solve the same problems as core.async, considering the core of both is the inversion-of-control macros. If you want to grab the complete rationale, you should have a look at the videos of the motivation behind core.async, although some parts do not apply to conduits. As a short summary, what you get with conduits is2:
- Less overhead: All of the transducers in clojure.core start with
([] (rf))
. All but 2 continue with([result] (rf result))
. No need for that with conduits. - Separation of concerns: You don’t have to peek into the result of
yield
, which you may need to do with transducers (in this case viareduced?
). - Recursion vs mutation: By making the transducer into a process, you avoid
mutable state and the need to create a manual state machine (as seen in
interpose
through dispatching on and mutatingstarted
)
The Bad Parts
It may seem like conduits are vastly superior to implementing transducers manually, but there is no such thing as a free lunch.
By using the core.async macro system to implement conduits, it’s not possible
to use await
and yield
as higher order functions, nor in other functions
that you may want to call. It’s somewhat frustrating, and I’m not sure we can do
anything (realistic) about that.
The bigger “problem” is that conduits will be slower than transducers. Transducers were designed with efficiency in mind, and the implementation of conduits doesn’t create the most efficient transducers. I don’t know when this turns into an issue, but I would assume it’s not going to be a problem unless you have high throughput in the code region where conduits are used. Benchmark and tune to find out what “high throughput” is for your case, and if you should use conduits or not.
Summary
Conduits provide a new interface to create transducers. I think they capture the essence of transducers much better than the original interface: You read inputs from an upstream source, and send values down to a downstream sink. When you decide to read or send values is up to you, and so you can think your logic as an independent process:
That being said, there are some limitations with conduits. Most importantly, the performance of a conduit will be worse than a well-written transducer doing the same thing. I don’t think this will be an issue unless you use transducers in high performance code, but one should be aware of it.
For more more information and examples of conduits, take a look at the following pages:
- clj-conduit for the library readme and source code
- clojure.core ports for an implementation of all the transducers in clojure.core with the conduit library. Good for examples.
- examples for relatively basic conduit examples
- A more complex conduit example which implements simple moving averages. (I’d guess this would be hard to implement correctly with transducers).
-
This is a bad example, but I’ve never seen transducers manually handling non-GCable resources, so I’m not 100% sure if
yield!
will ever be used. ↩ -
The two last points are “stolen” directly from the clojure core.async video (43:21), because they also apply here. ↩