Clojure's Transducers in Haskell

An illustration of a mining pump used in the 1890s

Clojure's transducers are an intellectual curiosity. Not only are they interesting from a usage perspective, people (perhaps mostly Haskellers?) also try to understand what their types are. Trying to derive some sort of type for transducers is not as easy as one would think, partly because of state, mutability and IO. Franklin Chen wrote a great blogpost about this, unfortunately I feel it doesn't capture the entirety of transducers: It is close, but the implementation lacks short-circuiting and state, which I believe are important characteristics of transducers. Try to implement take with those transducer types, for example.

If you've done a lot of Haskell, you'll notice that what follows isn't exactly idiomatic Haskell. Monads, Comonads, RankNTypes and probably other things would make the code a lot easier to work with, but it would make it harder for non-Haskellers to follow. So I'll leave those out here, and refer you to the haskell-transducers library I wrote for more idiomatic implementation of the code here.

Reduce

To get a good grip on transducers, we really have to understand how reducers in Clojure work. Their first "iteration" were via the reduce function, which has evolved slightly over the years.

On May 4, 2009, Rich Hickey released version 1.0 of Clojure. Version 1.0 contained a well-known function in functional communities – reduce. For Haskellers, reduce can be considered a combination of the following three functions:

reduce :: (b -> a -> b) -> b -> [a] -> b
reduce f z []     = z
reduce f z (x:xs) = reduce f (f z x) xs

-- Also known as foldl'.
-- Example: reduce (+) 0 [1, 2, 3] => 6

reduce1 :: (a -> a -> a) -> [a] -> a
reduce1 f (x:xs) = reduce f x xs

-- Also known as foldl1'
-- Example: reduce1 (+) [1, 2, 3] => 6

reduce0 :: (() -> a) -> [b] -> a
reduce0 f [] = f ()

-- (A strange case indeed)

Whenever Clojure's reduce is provided with three arguments, it will use the Haskell version reduce. If it is provided with two arguments, it will peek into the sequence provided. If the sequence is empty, it will attempt to call reduce0, otherwise it will call reduce1.

Combining them all into a single function is possible, but that's unnecessary overhead for our purposes. Hence we'll just stick with the Haskell reduce version.

The version of reduce provided with 1.0 stayed that way for a long time, and wasn't changed before Clojure 1.5 was released in early 2013. With 1.5, you were provided a new type, Reduced, which could be used to tell reduce that the result was computed and that one doesn't have to read more of the sequence:

data Reduced a = Continue a
               | Reduced a
               deriving (Eq, Ord, Show, Read)

reduce :: (b -> a -> Reduced b) -> b -> [a] -> b
reduce f z []     = z
reduce f z (x:xs) = case f z x of
                      (Reduced  res) -> res
                      (Continue res) -> reduce f res xs

let rplus a b = Continue (a + b) in
  reduce rplus 0 [1, 2, 3] -- => 6

let takeFirst a b = Reduced a in
  reduce takeFirst 42 [1..] -- => 42

Enhancing Reducing Functions

To make a transducer variant in Haskell, we need to understand what a transducer is in the first place. The terminology according to the clojure.org/transducers is a bit hand-wavy in my opinion:

;;reducing function signature
whatever, input -> whatever

;;transducer signature
(whatever, input -> whatever) -> (whatever, input -> whatever)

Which, translated literally, is the following Haskell type (if we include Reduced):

type Reducer w i = w -> i -> Reduced w
type Transducer w i j = (Reducer w i) -> (Reducer w j)

However, this doesn't really capture the entirety of reducers. You see, a reducing function passed to transduce and friends must also provide a 0-arity and 1-arity function. In a way, it is three separate functions merged into one:

-- import Prelude hiding (init)

data Reducer a b = { init :: () -> b
                   , complete :: b -> b
                   , step :: b -> a -> Reduced b
                   }

The semantics behind it is as follows:

As we started off by saying that we will ignore the case where the initial value is not supplied, we will just drop init, because it will never be used in out case.

If we were in a language where closures could contain mutable objects (OCaml comes to mind), this definition would probably be sufficient, although slightly confusing to work with: This type doesn't capture the potential state a reducing function may contain.

A big catch with stateful reducing functions is that you have to be careful when you use them by yourself: They are stateful, and as such you have to create them and not reuse them in a different location later on:

;; (take 10) is a transducer
;; a transducer is reducer -> reducer, so vec10 is a reducer
user=> (def vec10 ((take 10) conj))
#'user/vec10
user=> (reduce vec10 [] (range 20))
[0 1 2 3 4 5 6 7 8 9]
;; state used
user=> (reduce vec10 [] (range 20))
[]
;; Since vec10 has already been used, we get a weird result

Adding State to Reducing Functions

If we add immutable state into the type in Haskell, we can avoid this problem and make it easier to not mess up reducer usage [1].

To add state to the type, we add a new parameter s, which represents the state of a reducer. If the reducer doesn't use any, we can just put the unit type, (), as the type. We also need to know the initial state, so we add in the value initState to the type.

data Reducer s a b = { initState :: s
                     , complete :: s -> b -> b
                     , step :: s -> b -> a -> (s, Reduced b)
                     }

Now, the step and complete functions take the state s as a parameter, and initState is the initial state. Step also needs to produce the new state and emit it.

While this isn't doesn't feel entirely identical to how Reducers in Clojure actually works (As mentioned, the state is contained in a closure over the reducer), they are semantically equivalent.

Of course, now we need to fix reduce to fit the new definition of a reducing function:

reduce' :: (s -> b -> a -> (s, Reduced b)) -> s -> b -> [a]
           -> (s, b)
reduce' _ state acc [] = (state, acc)
reduce' f state acc (x:xs) 
  = case f state acc x of
    (s, Reduced v) -> (s, v)
    (s, Continue v) -> reduce' f s v xs

reduce :: Reducer s a b -> b -> [a] -> b
reduce (Reducer is c f) z as
  = let (state, res) = reduce' f is z as
    in c state res

This isn't too different from the previous version: reduce' is a helper that takes the step function, and walks over the values in the same fashion as before. The only difference is that it has to handle the reducing function state as well. reduce explodes the Reducer into initial state (is), the complete function (c) and the step function (f), and ensures they are used in the right way.

We've done way too much now without an example though, so let's use this in practice. We can begin with a simple function, like plus:

let rplus = Reducer { initState = ()
                    , complete = \s b -> b
                    , step = \s b a -> (s, Continue (b + a))
                    } in
  reduce rplus 0 [1..3]

We set initState to the unit value, as we don't need state. Complete doesn't do any magic, it just returns the result itself. And step ignores the state (s) and sums up b and a.

This is a lot of overhead to get the functionality of a basic reduce, so we can make a helper function that produces reducing functions for us that worked with the original Clojure 1.0 version of reduce:

stateless :: (b -> a -> b) -> Reducer () a b
stateless f = Reducer { initState = (),
                        complete = \_ x -> x,
                        step = \_ x y -> ((), Continue (f x y))
                      }

Now we can sum numbers much easier again:

reduce (stateless (+)) 0 [1..3]

Of course, if the goal was to sum numbers, we've just made an extremely overengineered version of foldl. But summing numbers doesn't need state, any magic during complete, and no need for short-circuiting when a certain condition is met.

A more sophisticated example would be only taking the first n numbers and sum them up. We can do that by setting initState to n, then count down when we walk over the numbers. When we reach n <= 0, we return Reduced instead of Continue:

takeNPlus :: Int -> Reducer Int Int Int
takeNPlus n = Reducer { initState = 1,
                        complete = \_ x -> x,
                        step = theStep
                      }
  where theStep curN sum num
          | n <= 0 = (0, Reduced sum)
          | otherwise = (curN - 1, Continue (sum + num))

But this is, by definition, complex! We've now merged take and rplus into a single function. It would be much better if we could make take and rplus independent of eachother, and compose them back in later on.

If we only had to work on lists, then these two expressions would be equal:

foldl (+) 0 (take n myList)

reduce (takeNPlus 10) 0 myList

Since (+) and take are independent in the previous expression, we could swap out (+) with some other function. We cannot do that with takeNPlus, and that is, of course, why we would like to use transducers.

Making Basic Transducers

Transducers come in all kinds of shapes and types. Some can call the reducing function more than once, others can remove input before passing it downstream, and some may short-circuit early on.

One of the important things about a transducer is that it must not assume anything about the output type, nor the state type of the reducer it receives.

To begin lightly and see how this applies, we can start with filtering. It doesn't use state, so it's relatively straightforward to look at:

filtering :: (a -> Bool) -> Reducer s a b -> Reducer s a b
filtering pred (Reducer is c stepFn)
  = Reducer { initState = is,
              complete = c,
              step = filterStep
            }
  where filterStep s acc x
          | pred x = stepFn s acc x
          | otherwise = (s, Continue acc)

The b's in the Reducers are untouched, and is only used/produced via the functions within the Reducer passed in (stepFn and c).

The filterStep function calls the step function from the reducing function given, but only if the predicate function returns true. Otherwise it just returns the current state and the original value. Using it in real life would look like this:

*Main> reduce (stateless (+)) 0 [1..10]
55
*Main> reduce (filtering even (stateless (+))) 0 [1..10]
30
*Main> reduce (filtering odd (stateless (+))) 0 [1..10]
25

It's sort of in the "opposite" order of what one would expect, so to clarify: The way to read these chains of transducer calls is left to right: First we filter, then we sum.

Transducers and State

Since filtering doesn't use any state, it just takes the state from the Reducer passed in. However, if we had some state we wanted to use, we could add it by using a tuple. A perfectly valid implementation of filtering would be this:

filtering :: (a -> Bool) -> Reducer s a b -> Reducer ((), s) a b
filtering pred (Reducer is c stepFn)
  = Reducer { initState = ((), is),
              complete = c . snd,
              step = filterStep
            }
  where filterStep (myState, s) acc x
          | pred x = let (s', res) = stepFn s acc x in
                       ((myState, s'), res)
          | otherwise = ((myState,s), Continue acc)

The trick is to unwrap the tuple into your own state (myState) and the downstream reducer's state (s). Whenever you call the downstream reducer's step function, you pass in s and the value you want. Then you also have to wrap it back up again when you return the result.

A small note here: Since we still don't have any complete function, we just use c . snd. This is just a shortcut notation for

\(_, s) val -> c s val

Which unwraps our own state and calls the downstream complete with the state and value as it expects.

Since we've now looked at how we can apply state, let's see it in action with taking, the function we wanted up earlier where we wanted to split takeNPlus up into composable parts:

taking :: Int -> Reducer s a b -> Reducer (Int, s) a b
taking n (Reducer is c f)
  = Reducer { initState = (n, is),
              complete = c . snd,
              step = takeStep
            }
  where takeStep (curN, s) res x
          | 0 < curN = let (s', v) = f s res x in
                      ((curN - 1, s'), v)
          | otherwise = ((curN, s), Reduced res)

Here, we call the downstream reducer until we've done so n times, then we call reduced. We keep the state in an Int, and do the unwrapping and rewrapping of state as mentioned earlier. Using it is just as easy as using filtering:

*> reduce (taking 10 (stateless (+))) 0 [1..]
55
-- read: take 10, then filter even, then sum together
*> reduce (taking 10 (filtering even (stateless (+)))) 0 [1..]
30
-- read: filter even, then take 10, then sum together
*> reduce (filtering even (taking 10 (stateless (+)))) 0 [1..]
110

Complete it!

So far, we haven't seen any use of the complete part of a reducer. It's a rarely used thing, but is handy every now and then. If you for example want to partition the results based on some predicate function, sort of like this:

-- You have a pipeline of numbers:
85 38 64 92 36 0 44 87 58 22 58 72 17 50 25 69 84 48 35 55
-- 'partitionBy even' will convert the pipeline into this:
[85] [38, 64, 92, 36, 0, 44] [87] [58, 22, 58, 72], [17], ...

the partitionBy function may hold a list of elements that it has accumulated so far. If we're at the end of the list above, we have [35, 55] stored in our state. But if we just stopped normally, then those values wouldn't be passed down to the reducer we used! Hence the need of complete: Even if we're at the end of the stream/pipeline of values, we may have values we haven't yet sent down to the reducing function downstream.

partitionBy is a nasty beast though. It has to do a lot of stuff, so let's first look at the entire thing, then piece by piece. (You can skip this section if you'd like to, the point is to show how one would utilise complete)

partitionBy :: Eq x => (a -> x) -> Reducer s [a] b 
                    -> Reducer (Maybe ([a], x), s) a b
partitionBy pfn (Reducer is c stepFn) 
  = Reducer { initState = (Nothing, is),
              complete = partitionComplete,
              step = partitionStep
            }
  where partitionComplete (Nothing, s) x = c s x
        partitionComplete (Just (xs, _), s) acc
          = let (state', res) = stepFn s acc xs
            in c state' (extract res)
        partitionStep (Nothing, s) acc x
          = ((Just ([x], pfn x), s), Continue acc)
        partitionStep (Just (as, cmp), s) acc x
          | cmp == pfn x = ((Just (as ++ [x], cmp), s),
                            Continue acc)
          | otherwise = let (state', acc') = stepFn s acc as
                        in ((Just ([x], pfn x), state'), acc')

-- Extract is a utility function for getting the result out of a
-- Reduced value.
extract :: Reduced a -> a
extract (Continue x) = x
extract (Reduced x) = x

partitionBy takes a function pfn that converts a to x. If the x value is equal for multiple values in a row, we store them in the same list. When the x value change, we send the list we've gathered so far downstream.

Our state is therefore Maybe ([a], x), where [a] is the list of values we've gathered so far, and x is the output of pfn for all the values in the list. The reason why this is put in a Maybe is because we don't have any x value to compare with at the beginning of a reduction (it is nothing in that case).

If we're at the beginning, we know that our state is Nothing. In that case, we just put the value we get in into a new list, and we also store the x result. We don't call or do anything with the reducer function downstream:

        partitionStep (Nothing, s) acc x
          = ((Just ([x], pfn x), s), Continue acc)

If it's not Nothing, we know we have some values. If cmp – the x value we've stored this far – is equal to the value of pfn x, we put the value we got into the list by doing as ++ [x] (Yes, it's inefficient, but it does the job)

        partitionStep (Just (as, cmp), s) acc x
          | cmp == pfn x = ((Just (as ++ [x], cmp), s),
                            Continue acc)

Otherwise, we have to send the list we've stored so far to the reducer downstream and create a new list.

          | otherwise = let (state', acc') = stepFn s acc as
                        in ((Just ([x], pfn x), state'), acc')

And so, for the final part: If we're at the end of the stream of values and have no list, we just call our downstream reducer's complete function:

  where partitionComplete (Nothing, s) x = c s x

But when we have a list, we need to send it down via the step function. Then we take the result of the step function (the state and the value), and pass that to the downstream reducer's complete function:

        partitionComplete (Just (xs, _), s) acc
          = let (state', res) = stepFn s acc xs
            in c state' (extract res)

All in all pretty confusing and hard to grok, but fortunately the need for this is pretty rare. You should be fine just knowing how to manipulate state and calling the reducers you get as input.

Reduce, Sequence and Conduits

One of the cool things about transducers is that, since they should never specify constraints on the output type of the reducer, they can be used for many other things than just "normal" reductions. sequence is such a function, which effectively takes a list and transforms it into another list via a transducer. It's sort of like a beefed up version of map. The type signature is a bit weird though:

--import Prelude hiding (sequence)

sequence :: (Reducer () b [b] -> Reducer t a [b]) -> [a] -> [b]

Here we leak a bit of internal information about how sequence is implemented, and we can fix it with a Haskell extension. We'll not do it here though, just remember that a transducer should not care about the output type at all, and the input statue is not something we care about either. It would be better if the type signature looked like this instead, because that's what we really mean:

sequence :: (Reducer s b z -> Reducer t a z) -> [a] -> [b]

In contrast to reduce, sequence is lazy and you can consume the values that have been produced so far right away (and discard the ones you don't need):

*> take 10 (sequence (mapping (* 5)) [1..])
[5,10,15,20,25,30,35,40,45,50]

In fact, you cannot implement sequence via a reducer passed to reduce! We could try by implementing append, but it wouldn't work:

*> let append = stateless (\x y -> x ++ [y]) in 
     reduce (mapping (* 5) append) [] [1..10]
[5,10,15,20,25,30,35,40,45,50]
*> let append = stateless (\x y -> x ++ [y]) in 
     take 10 (reduce (mapping (* 5) append) [] [1..])
-- will never return

Although lazy vs. eager is probably not the most inspiring example for Haskellers, it means that the transducers themselves are independent of evaluation strategies. For example, you can use transducers as Conduits from the Conduits library, by transforming them:

import Data.Conduit

-- Yields the bs it receives
conduitYielder :: Monad m => Reducer () b (Conduit a m b)
conduitYielder = stateless run
  where run m x = m >> yield x

-- awaits the as upstream, then feed them into the
-- reducing function
conduitAwaiter :: Monad m => (Reducer s a (Conduit a m b)) 
                          -> Conduit a m b
conduitAwaiter (Reducer is c f) = go is
  where go s = do mval <- await
                  case mval of
                    (Just val) -> feed s val
                    Nothing -> feedLast s
        feed s val = case f s (return ()) val of
                       -- run comp immediately before continuing
                       (s', Reduced comp) -> comp >> feedLast s'
                       (s', Continue comp) -> comp >> go s'
        feedLast s = c s (return ())

-- merging them
toConduit :: Monad m => (Reducer () b (Conduit a m b) 
                          -> Reducer s a (Conduit a m b))
                        -> Conduit a m b
toConduit xform = conduitAwaiter (xform conduitYielder)

It's not important to understand the code here, the important thing is that you can make a transducer into a conduit with the function toConduit.

myXform = taking 10 . mapping f . partitionBy (`mod` 3)
  where f x = 3 * x^3 - 2 * x^2 + 2 * x

Now we can use myXform on whatever thing we'd like to, regardless of whether it's lists or conduits:

import Data.Conduit
import qualified Data.Conduit.List as CL

main :: IO ()
main = do
  mapM_ print (sequence myXform [1..])
  CL.sourceList [1..] $$ toConduit myXform =$ CL.mapM_ print

-- both expressions print the same lines

Monads and Conduits

Alright, so I said that I wouldn't use monads and the like, but the previous section brings up an interesting question which necessarily introduces monads: Can we convert a conduit into a transducer? To answer that question, we need to have a brief look at the Conduit type:

--      +--- We can read/await values of type a
--      |
--      |  +--- And use the monad m (if we want to)
--      |  |
--      |  |  +--- To produce/yield values of type b
--      |  |  |
--      v  v  v
Conduit a  m  b

So the question becomes, what is really the equivalent transducer, if it exists?

Awaiting values of type a inside a Conduit doesn't require the monad, so the "input type", whatever that is for a transducer, will still be the same. Yielding, on the other hand, may use the monad to retrieve information necessary to produce bs. And since a transducer has this inverse ordering, we theoretically end up with

Reducer s (m b) z -> Reducer t a z

The full signature of a potential fromConduit seems then to be

fromConduit :: Monad m => Conduit a m b -> Reducer s (m b) z
                                        -> Reducer t a z

I am pretty sure you can implement this fromConduit function. But if that is the case, aren't Conduits and Transducers then equivalent (isomorphic)? I am pretty sure they are. You probably have to tweak or make another variant of toConduit that also expects a monad, so that you get the laws

fromConduit . toConduit = id

toConduit . fromConduit = id

but again, it seems doable.

This makes sense informally too. If we get rid of the monad used over a Conduit for the time being, we can see the similarity: await is used to consume values, which will return Just a or Nothing. The Just a case is equivalent to a call to the step function of a reducing function, and Nothing is equivalent to a call to complete. yield is used to produce values, which is the same as calling the step function on the reducing function you have been passed in.

What about the state? Transducers as I have implemented them will have to explicitly manage state. Conduits, being monads, can contain state implicitly via recursion. As an example, here we have take for conduits:

ctake :: Monad m => Int -> Conduit a m a
ctake = loop
  where loop n
          | n <= 0 = return ()
          | otherwise = do mval <- await
                           case mval of
                             (Just val) -> do yield val
                                              loop (n - 1)
                             Nothing -> return ()

My initial reaction is that Conduits feels cleaner and a bit more straightforward to use compared to transducers, especially if you have to make them yourself. And if they are equivalent, then it seems unnecessary to use transducers in Haskell.

Transducers in Clojure are easier to make and write than the Haskell ones, once you've gotten the hang of it. Most of that is because you don't have to think about the underlying reducer's state. You just call init, step and complete where it makes sense to do so. In addition, the transducers in Clojure focus heavily on performance and mutability, and the transducer abstraction, which effectively just calls functions, seems hard to beat.

That's not to say that a Conduit-like library in Clojure isn't valuable though, it may well be worth the effort to explore and implement one. The goal of such a library wouldn't be performance but rather readability. Which, in many cases, seems like a good tradeoff.

My haskell-transducers repository contains a working example of all the code explained here, but cleaned up (taking -> take and so on) and is at least a bit more idiomatic.






[1] Presumably at the cost of performance, but I would assume it's possible to bypass this for performance critical code.

Tagged with: clojure, haskell.