From Transducers to Conduits and Back Again
Not too long ago, I wrote a blog post that proposed the following claim: That conduits and transducers are one and the same. In that blog post, I said that a Conduit-like library would be interesting to look at in Clojure, if only for readability. Well, I have just implemented such a library: clj-conduit.
The general idea of a conduit-like library is to have two “primitives” you can
yield. The naming is rather arbitrary; they might as well
have been named
send. As you would expect,
await will wait and
get back a value from a “stream”, whereas
yield will send a value to a
“stream”, if that stream wants the value.
If you replace “stream” with channel above, you more or less end up with
core.async. The biggest difference is that we can only
await on a single
stream, and we can only
yield on a single stream.
Note that the function
await already exists in clojure.core, so if you want to
:refer it, you should also exclude it from core like so:
(ns ... (:refer-clojure :exclude [await]))
(await) will read the next value from the source, and
(yield val) will emit
a value to the sink. Both will “block” until they’ve either received or sent a
single value. If there are no more values to read,
indefinitely, and if the sink doesn’t want any more values,
yield will “block”
indefinitely as well.
To create a conduit, you wrap the macro
conduit around the code that is to be
executed. Unfortunately, just like the
go-macro in core.async, the
yield calls has to be visible to the macro:
(defn await-n [n] (dotimes [_ n] (await))) ;; <- not ok, even if called inside a conduit
The return value of a
conduit block is the transducer this conduit
If the conduit block has been run to completion, then the conduit will signal that it won’t process more values upstream. I.e. when it’s converted to a transducer, it will pass Reduced back.
… and that is really all you have to know for basic usage. Let’s see how we
can implement the transducers for
take in conduit:
(ns my.lib (:refer-clojure :exclude [await]) (:require [com.hypirion.conduit :refer [await yield conduit]])) (defn mapping [f] (conduit (while true (yield (f (await)))))) (defn taking [n] (conduit (dotimes [_ n] (yield (await))))) user=> (sequence (mapping #(+ 10 %)) (range 10)) (10 11 12 13 14 15 16 17 18 19) user=> (sequence (comp (mapping inc) (taking 10)) (range 10 200)) (11 12 13 14 15 16 17 18 19 20)
mapping just reads a value with
await, maps it over
f, then sends it
yield. If we don’t have more values upstream, we will stop at
await call, and if the sink doesn’t want more values, we’ll stop at the
yield call. Then we wrap this in a
while true loop to do this with all the
elements we can send.
taking just sends values from the source down to the sink
n times, then
stops. Since the conduit is stopped, no more values are passed down.
taking shows part of the value of conduits. It is much more compact and (in my
opinion) readable compared to the transducer version of
take as implemented in
(defn take ([n] (fn [rf] (let [nv (volatile! n)] (fn ( (rf)) ([result] (rf result)) ([result input] (let [n @nv nn (vswap! nv dec) result (if (pos? n) (rf result input) result)] (if (not (pos? nn)) (ensure-reduced result) result))))))) ...)
The first problem with conduits is something we hit on when we try to implement
partition-all. It needs to be signalled in some way that we have no more
input, but we have no way to do that with the current primitives.
This can be introduced in a couple of ways. I decided to implement it via a new
await!, which can be considered a “nonblocking” version of
await! works like
await, but will send
nil back when there are no more
values to read. You can also provide a default value to
await!, which will be
returned if there are no more values.
mapping could be implemented with
await! if we wanted to:
(defn mapping [f] (conduit (loop [v (await! ::eos)] (when-not (identical? v ::eos) (yield (f v)) (recur (await! ::eos))))))
but I think it is slightly harder to read this way.
partitioning-all is relatively straightforward to implement:
(defn partitioning-all [n] (conduit (loop [vs [(await)]] (if (= n (count vs)) (do (yield vs) (recur [(await)])) (let [v (await! ::eos)] (if-not (identical? v ::eos) (recur (conj vs v)) (yield vs)))))))
await to read the first value to put in the vector. If
the list has the desired size, we yield it, and if not, we try to read more
await!. We use
::eos as the value to check whether we had a new
value or not, and if there are no more values to read, we yield the final vector
Since it’s usually common to exit quickly with no value from
await!, we can
add two macros that help us with that:
They work in the same fashion as
(defn partitioning-all [n] (conduit (loop [vs [(await)]] (if (= n (count vs)) (do (yield vs) (recur [(await)])) (if-let-await! v (recur (conj vs v)) (yield vs))))))
await! makes sense if you want to flush out some values downstream before
exiting – it’s in some sense equivalent to the complete part of a transducer,
the 1-ary function.
In certain cases, it may also make sense to have a nonblocking yield: Sometimes you want to ensure that a final piece of code is run before the conduit finishes, for example finalizing resource usage1.
Consider the strange case where you also want to send the values you receive to a go channel, then close the channel when you’re done. You may attempt to do it in this fashion:
(defn yield-to-channel [c] (conduit (loop  (when-let-await! v (async/>!! c v) (yield v) (recur))) (async/close! c)))
However, remember that if the sink doesn’t want any more values in, then
will block indefinitely, and we will never end up closing the channel.
The answer for this one is rather straightforward: We add a new primitive called
yield! is a “nonblocking”
yield, which will return true if the
sink still wants more input, false otherwise.
Now we can ensure that the channel is closed by checking the value returned by
(defn yield-to-channel [c] (conduit (loop  (when-let-await! v (async/>!! c v) (when (yield! v) (recur)))) (async/close! c)))
Alright, how would one even go about making a library like this? The bad news is
that the work to get this implemented properly is pretty hard and complex: You
have to implement macros that walk the entire structure of the code, converts it
into a state machine, then provide escape hatches for certain functions, like
The good news is that there is a really good library named core.async with powerful macro internals that does exactly this work for us. For an explanation of the internals, have a look at the youtube videos Deep Walking Macros and Core Async Go Macro Internals - Part I and Part II. They are very thorough and is worth a watch if you’re interested in how core.async works internally.
Since we can just piggyback onto the macro system in core.async, all the code
for clj-conduit is about 140-150 lines if we strip away comments and
documentation. If you’ve got a grasp of how
ioc_macros.clj works, then the
code should be relatively straightforward to read.
The Good Parts
We’ve seen that a conduit version of
take is more readable than
implemented in the usual transducer “style”. However, I think its effect is more
visible for any type of transducer that will work on one or more inputs/outputs
in one “go”.
interpose is a small example of this. It’s implemented like this with conduit:
(defn interposing [sep] (conduit (yield (await)) (while true (let [val (await)] (yield sep) (yield val)))))
and the actual version in clojure.core looks like this:
(defn interpose [sep] (fn [rf] (let [started (volatile! false)] (fn ( (rf)) ([result] (rf result)) ([result input] (if @started (let [sepr (rf result sep)] (if (reduced? sepr) sepr (rf sepr input))) (do (vreset! started true) (rf result input))))))))
It’s perhaps not surprising that conduits solve the same problems as core.async, considering the core of both is the inversion-of-control macros. If you want to grab the complete rationale, you should have a look at the videos of the motivation behind core.async, although some parts do not apply to conduits. As a short summary, what you get with conduits is2:
- Less overhead: All of the transducers in clojure.core start with
( (rf)). All but 2 continue with
([result] (rf result)). No need for that with conduits.
- Separation of concerns: You don’t have to peek into the result of
yield, which you may need to do with transducers (in this case via
- Recursion vs mutation: By making the transducer into a process, you avoid
mutable state and the need to create a manual state machine (as seen in
interposethrough dispatching on and mutating
The Bad Parts
It may seem like conduits are vastly superior to implementing transducers manually, but there is no such thing as a free lunch.
By using the core.async macro system to implement conduits, it’s not possible
yield as higher order functions, nor in other functions
that you may want to call. It’s somewhat frustrating, and I’m not sure we can do
anything (realistic) about that.
The bigger “problem” is that conduits will be slower than transducers. Transducers were designed with efficiency in mind, and the implementation of conduits doesn’t create the most efficient transducers. I don’t know when this turns into an issue, but I would assume it’s not going to be a problem unless you have high throughput in the code region where conduits are used. Benchmark and tune to find out what “high throughput” is for your case, and if you should use conduits or not.
Conduits provide a new interface to create transducers. I think they capture the essence of transducers much better than the original interface: You read inputs from an upstream source, and send values down to a downstream sink. When you decide to read or send values is up to you, and so you can think your logic as an independent process:
That being said, there are some limitations with conduits. Most importantly, the performance of a conduit will be worse than a well-written transducer doing the same thing. I don’t think this will be an issue unless you use transducers in high performance code, but one should be aware of it.
For more more information and examples of conduits, take a look at the following pages:
- clj-conduit for the library readme and source code
- clojure.core ports for an implementation of all the transducers in clojure.core with the conduit library. Good for examples.
- examples for relatively basic conduit examples
- A more complex conduit example which implements simple moving averages. (I’d guess this would be hard to implement correctly with transducers).