hyPiRion

From Transducers to Conduits and Back Again

posted

Illustration of an excavator machine on rails with a conveyor belt from the 1880s

Not too long ago, I wrote a blog post that proposed the following claim: That conduits and transducers are one and the same. In that blog post, I said that a Conduit-like library would be interesting to look at in Clojure, if only for readability. Well, I have just implemented such a library: clj-conduit.

The general idea of a conduit-like library is to have two “primitives” you can use: await and yield. The naming is rather arbitrary; they might as well have been named receive and send. As you would expect, await will wait and get back a value from a “stream”, whereas yield will send a value to a “stream”, if that stream wants the value.

If you replace “stream” with channel above, you more or less end up with core.async. The biggest difference is that we can only await on a single stream, and we can only yield on a single stream.

Note that the function await already exists in clojure.core, so if you want to :refer it, you should also exclude it from core like so:

(ns ...
  (:refer-clojure :exclude [await]))

(await) will read the next value from the source, and (yield val) will emit a value to the sink. Both will “block” until they’ve either received or sent a single value. If there are no more values to read, await “blocks” indefinitely, and if the sink doesn’t want any more values, yield will “block” indefinitely as well.

To create a conduit, you wrap the macro conduit around the code that is to be executed. Unfortunately, just like the go-macro in core.async, the await and yield calls has to be visible to the macro:

(defn await-n
 [n]
 (dotimes [_ n]
   (await))) ;; <- not ok, even if called inside a conduit

The return value of a conduit block is the transducer this conduit represents.

If the conduit block has been run to completion, then the conduit will signal that it won’t process more values upstream. I.e. when it’s converted to a transducer, it will pass Reduced back.

… and that is really all you have to know for basic usage. Let’s see how we can implement the transducers for map and take in conduit:

(ns my.lib
 (:refer-clojure :exclude [await])
 (:require [com.hypirion.conduit :refer [await yield conduit]]))

(defn mapping [f]
  (conduit
    (while true
      (yield (f (await))))))

(defn taking [n]
  (conduit
    (dotimes [_ n]
      (yield (await)))))

user=> (sequence (mapping #(+ 10 %)) (range 10))
(10 11 12 13 14 15 16 17 18 19)

user=> (sequence (comp (mapping inc) (taking 10)) (range 10 200))
(11 12 13 14 15 16 17 18 19 20)

mapping just reads a value with await, maps it over f, then sends it downstream with yield. If we don’t have more values upstream, we will stop at the await call, and if the sink doesn’t want more values, we’ll stop at the yield call. Then we wrap this in a while true loop to do this with all the elements we can send.

taking just sends values from the source down to the sink n times, then stops. Since the conduit is stopped, no more values are passed down.

taking shows part of the value of conduits. It is much more compact and (in my opinion) readable compared to the transducer version of take as implemented in clojure.core:

(defn take
  ([n]
     (fn [rf]
       (let [nv (volatile! n)]
         (fn
           ([] (rf))
           ([result] (rf result))
           ([result input]
              (let [n @nv
                    nn (vswap! nv dec)
                    result (if (pos? n)
                             (rf result input)
                             result)]
                (if (not (pos? nn))
                  (ensure-reduced result)
                  result)))))))
   ...)

Nonblocking Awaits

The first problem with conduits is something we hit on when we try to implement partition-all. It needs to be signalled in some way that we have no more input, but we have no way to do that with the current primitives.

This can be introduced in a couple of ways. I decided to implement it via a new function await!, which can be considered a “nonblocking” version of await: await! works like await, but will send nil back when there are no more values to read. You can also provide a default value to await!, which will be returned if there are no more values.

mapping could be implemented with await! if we wanted to:

(defn mapping [f]
  (conduit
    (loop [v (await! ::eos)]
      (when-not (identical? v ::eos)
        (yield (f v))
        (recur (await! ::eos))))))

but I think it is slightly harder to read this way.

With await!, partitioning-all is relatively straightforward to implement:

(defn partitioning-all
  [n]
  (conduit
   (loop [vs [(await)]]
     (if (= n (count vs))
       (do (yield vs)
           (recur [(await)]))
       (let [v (await! ::eos)]
         (if-not (identical? v ::eos)
           (recur (conj vs v))
           (yield vs)))))))

partitioning-all uses await to read the first value to put in the vector. If the list has the desired size, we yield it, and if not, we try to read more values with await!. We use ::eos as the value to check whether we had a new value or not, and if there are no more values to read, we yield the final vector and stop.

Since it’s usually common to exit quickly with no value from await!, we can add two macros that help us with that: if-let-await! and when-let-await!. They work in the same fashion as if-let and when-let:

(defn partitioning-all
  [n]
  (conduit
   (loop [vs [(await)]]
     (if (= n (count vs))
       (do (yield vs)
           (recur [(await)]))
       (if-let-await! v
         (recur (conj vs v))
         (yield vs))))))

Nonblocking Yields

await! makes sense if you want to flush out some values downstream before exiting – it’s in some sense equivalent to the complete part of a transducer, the 1-ary function.

In certain cases, it may also make sense to have a nonblocking yield: Sometimes you want to ensure that a final piece of code is run before the conduit finishes, for example finalizing resource usage1.

Consider the strange case where you also want to send the values you receive to a go channel, then close the channel when you’re done. You may attempt to do it in this fashion:

(defn yield-to-channel [c]
  (conduit
   (loop []
     (when-let-await! v
       (async/>!! c v)
       (yield v)
       (recur)))
   (async/close! c)))

However, remember that if the sink doesn’t want any more values in, then yield will block indefinitely, and we will never end up closing the channel.

The answer for this one is rather straightforward: We add a new primitive called yield!. yield! is a “nonblocking” yield, which will return true if the sink still wants more input, false otherwise.

Now we can ensure that the channel is closed by checking the value returned by yield!:

(defn yield-to-channel [c]
  (conduit
   (loop []
     (when-let-await! v
       (async/>!! c v)
       (when (yield! v)
         (recur))))
   (async/close! c)))

Implementation Details

Alright, how would one even go about making a library like this? The bad news is that the work to get this implemented properly is pretty hard and complex: You have to implement macros that walk the entire structure of the code, converts it into a state machine, then provide escape hatches for certain functions, like await and yield.

The good news is that there is a really good library named core.async with powerful macro internals that does exactly this work for us. For an explanation of the internals, have a look at the youtube videos Deep Walking Macros and Core Async Go Macro Internals - Part I and Part II. They are very thorough and is worth a watch if you’re interested in how core.async works internally.

Since we can just piggyback onto the macro system in core.async, all the code for clj-conduit is about 140-150 lines if we strip away comments and documentation. If you’ve got a grasp of how ioc_macros.clj works, then the code should be relatively straightforward to read.

The Good Parts

We’ve seen that a conduit version of take is more readable than take implemented in the usual transducer “style”. However, I think its effect is more visible for any type of transducer that will work on one or more inputs/outputs in one “go”.

interpose is a small example of this. It’s implemented like this with conduit:

(defn interposing
  [sep]
  (conduit
   (yield (await))
   (while true
     (let [val (await)]
       (yield sep)
       (yield val)))))

and the actual version in clojure.core looks like this:

(defn interpose
  [sep]
  (fn [rf]
    (let [started (volatile! false)]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result input]
         (if @started
           (let [sepr (rf result sep)]
             (if (reduced? sepr)
               sepr
               (rf sepr input)))
           (do
             (vreset! started true)
             (rf result input))))))))

It’s perhaps not surprising that conduits solve the same problems as core.async, considering the core of both is the inversion-of-control macros. If you want to grab the complete rationale, you should have a look at the videos of the motivation behind core.async, although some parts do not apply to conduits. As a short summary, what you get with conduits is2:

  • Less overhead: All of the transducers in clojure.core start with ([] (rf)). All but 2 continue with ([result] (rf result)). No need for that with conduits.
  • Separation of concerns: You don’t have to peek into the result of yield, which you may need to do with transducers (in this case via reduced?).
  • Recursion vs mutation: By making the transducer into a process, you avoid mutable state and the need to create a manual state machine (as seen in interpose through dispatching on and mutating started)

The Bad Parts

It may seem like conduits are vastly superior to implementing transducers manually, but there is no such thing as a free lunch.

By using the core.async macro system to implement conduits, it’s not possible to use await and yield as higher order functions, nor in other functions that you may want to call. It’s somewhat frustrating, and I’m not sure we can do anything (realistic) about that.

The bigger “problem” is that conduits will be slower than transducers. Transducers were designed with efficiency in mind, and the implementation of conduits doesn’t create the most efficient transducers. I don’t know when this turns into an issue, but I would assume it’s not going to be a problem unless you have high throughput in the code region where conduits are used. Benchmark and tune to find out what “high throughput” is for your case, and if you should use conduits or not.

Summary

Conduits provide a new interface to create transducers. I think they capture the essence of transducers much better than the original interface: You read inputs from an upstream source, and send values down to a downstream sink. When you decide to read or send values is up to you, and so you can think your logic as an independent process:

Diagram of a conduit: An arrow in describing inputs, the conduit logic as a box, and an arrow out explaining outputs

That being said, there are some limitations with conduits. Most importantly, the performance of a conduit will be worse than a well-written transducer doing the same thing. I don’t think this will be an issue unless you use transducers in high performance code, but one should be aware of it.

For more more information and examples of conduits, take a look at the following pages:

  • clj-conduit for the library readme and source code
  • clojure.core ports for an implementation of all the transducers in clojure.core with the conduit library. Good for examples.
  • examples for relatively basic conduit examples
  • A more complex conduit example which implements simple moving averages. (I’d guess this would be hard to implement correctly with transducers).
  1. This is a bad example, but I’ve never seen transducers manually handling non-GCable resources, so I’m not 100% sure if yield! will ever be used. 

  2. The two last points are “stolen” directly from the clojure core.async video (43:21), because they also apply here.