Clojure's Transducers in Haskell
posted
Clojure’s transducers are an intellectual curiosity. Not only are they
interesting from a usage perspective, people (perhaps mostly Haskellers?) also
try to understand what their types are. Trying to derive some sort of type for
transducers is not as easy as one would think, partly because of state,
mutability and IO. Franklin Chen wrote a great blog post about
this, unfortunately, I feel it doesn’t capture the entirety of transducers: It
is close, but the implementation lacks short-circuiting and state, which I
believe are important characteristics of transducers. Try to implement take
with those transducer types, for example.
If you’ve done a lot of Haskell, you’ll notice that what follows isn’t exactly idiomatic Haskell. Monads, Comonads, RankNTypes and probably other things would make the code a lot easier to work with, but it would make it harder for non-Haskellers to follow. So I’ll leave those out here, and refer you to the haskell-transducers library I wrote for more idiomatic implementation of the code here.
Reduce
To get a good grip on transducers, we really have to understand how reducers in
Clojure work. Their first “iteration” was via the reduce
function, which has
evolved slightly over the years.
On May 4, 2009, Rich Hickey released version 1.0 of Clojure. Version 1.0
contained a well-known function in functional communities – reduce
. For
Haskellers, reduce can be considered a combination of the following three
functions:
reduce :: (b -> a -> b) -> b -> [a] -> b
reduce f z [] = z
reduce f z (x:xs) = reduce f (f z x) xs
-- Also known as foldl'.
-- Example: reduce (+) 0 [1, 2, 3] => 6
reduce1 :: (a -> a -> a) -> [a] -> a
reduce1 f (x:xs) = reduce f x xs
-- Also known as foldl1'
-- Example: reduce1 (+) [1, 2, 3] => 6
reduce0 :: (() -> a) -> [b] -> a
reduce0 f [] = f ()
-- (A strange case indeed)
Whenever Clojure’s reduce is provided with three arguments, it will use the
Haskell version reduce
. If it is provided with two arguments, it will peek
into the sequence provided. If the sequence is empty, it will attempt to call
reduce0
, otherwise, it will call reduce1
.
Combining them all into a single function is possible, but that’s unnecessary
overhead for our purposes. Hence we’ll just stick with the Haskell reduce
version.
The version of reduce provided with 1.0 stayed that way for a long time and
wasn’t changed before Clojure 1.5 was released in early 2013. With 1.5, you were
provided with a new type, Reduced
, which could be used to tell reduce that the
result was computed and that one doesn’t have to read more of the sequence:
data Reduced a = Continue a
| Reduced a
deriving (Eq, Ord, Show, Read)
reduce :: (b -> a -> Reduced b) -> b -> [a] -> b
reduce f z [] = z
reduce f z (x:xs) = case f z x of
(Reduced res) -> res
(Continue res) -> reduce f res xs
let rplus a b = Continue (a + b) in
reduce rplus 0 [1, 2, 3] -- => 6
let takeFirst a b = Reduced a in
reduce takeFirst 42 [1..] -- => 42
Enhancing Reducing Functions
To make a transducer variant in Haskell, we need to understand what a transducer is in the first place. The terminology according to the clojure.org/transducers is a bit hand-wavy in my opinion:
;;reducing function signature
whatever, input -> whatever
;;transducer signature
(whatever, input -> whatever) -> (whatever, input -> whatever)
Which, translated literally, is the following Haskell type (if we include Reduced):
type Reducer w i = w -> i -> Reduced w
type Transducer w i j = (Reducer w i) -> (Reducer w j)
However, this doesn’t really capture the entirety of reducers. You see, a
reducing function passed to transduce
and friends must also provide a 0-arity
and 1-arity function. In a way, it is three separate functions merged into one:
-- import Prelude hiding (init)
data Reducer a b = { init :: () -> b
, complete :: b -> b
, step :: b -> a -> Reduced b
}
The semantics behind it is as follows:
- If reduce/transduce or friends aren’t passed an initial value,
init
will be called. complete
is called when the reducer will not be fed any more input.step
is called for each input (when the result is not short-circuited)
As we started off by saying that we will ignore the case where the initial value
is not supplied, we will just drop init
, because it will never be used in our
case.
If we were in a language where closures could contain mutable objects (OCaml comes to mind), this definition would probably be sufficient, although slightly confusing to work with: This type doesn’t capture the potential state a reducing function may contain.
A big catch with stateful reducing functions is that you have to be careful when you use them by yourself: They are stateful, and as such you have to create them and not reuse them in a different location later on:
;; (take 10) is a transducer
;; a transducer is reducer -> reducer, so vec10 is a reducer
user=> (def vec10 ((take 10) conj))
#'user/vec10
user=> (reduce vec10 [] (range 20))
[0 1 2 3 4 5 6 7 8 9]
;; state used
user=> (reduce vec10 [] (range 20))
[]
;; Since vec10 has already been used, we get a weird result
Adding State to Reducing Functions
If we add immutable state to the type in Haskell, we can avoid this problem and make it easier to not mess up reducer usage1.
To add state to the type, we add a new parameter s
, which represents the state
of a reducer. If the reducer doesn’t use any, we can just put the unit type,
()
, as the type. We also need to know the initial state, so we add in the
value initState
to the type.
data Reducer s a b = { initState :: s
, complete :: s -> b -> b
, step :: s -> b -> a -> (s, Reduced b)
}
Now, the step
and complete
functions take the state s
as a parameter, and
initState
is the initial state. Step also needs to produce the new state and
emit it.
While this isn’t doesn’t feel entirely identical to how Reducers in Clojure actually works (As mentioned, the state is contained in a closure over the reducer), they are semantically equivalent.
Of course, now we need to fix reduce to fit the new definition of a reducing function:
reduce' :: (s -> b -> a -> (s, Reduced b)) -> s -> b -> [a]
-> (s, b)
reduce' _ state acc [] = (state, acc)
reduce' f state acc (x:xs)
= case f state acc x of
(s, Reduced v) -> (s, v)
(s, Continue v) -> reduce' f s v xs
reduce :: Reducer s a b -> b -> [a] -> b
reduce (Reducer is c f) z as
= let (state, res) = reduce' f is z as
in c state res
This isn’t too different from the previous version: reduce'
is a helper that
takes the step
function, and walks over the values in the same fashion as
before. The only difference is that it has to handle the reducing function state
as well. reduce
explodes the Reducer into the initial state (is
), the complete
function (c
) and the step function (f
), and ensures they are used in the
right way.
We’ve done way too much now without an example though, so let’s use this in practice. We can begin with a simple function, like plus:
let rplus = Reducer { initState = ()
, complete = \s b -> b
, step = \s b a -> (s, Continue (b + a))
} in
reduce rplus 0 [1..3]
We set initState
to the unit value, as we don’t need state. Complete doesn’t
do any magic, it just returns the result itself. And step ignores the state
(s
) and sums up b and a.
This is a lot of overhead to get the functionality of a basic reduce, so we can make a helper function that produces reducing functions for us that worked with the original Clojure 1.0 version of reduce:
stateless :: (b -> a -> b) -> Reducer () a b
stateless f = Reducer { initState = (),
complete = \_ x -> x,
step = \_ x y -> ((), Continue (f x y))
}
Now we can sum numbers much easier again:
reduce (stateless (+)) 0 [1..3]
Of course, if the goal was to sum numbers, we’ve just made an extremely
over-engineered version of foldl
. But we don’t need state to sum numbers. Nor
do we need any magic during complete, and we have no need for short-circuiting
when a certain condition is met.
A more sophisticated example would be only taking the first n
numbers and sum
them up. We can do that by setting initState to n
, then count down when we
walk over the numbers. When we reach n <= 0
, we return Reduced
instead of
Continue
:
takeNPlus :: Int -> Reducer Int Int Int
takeNPlus n = Reducer { initState = 1,
complete = \_ x -> x,
step = theStep
}
where theStep curN sum num
| n <= 0 = (0, Reduced sum)
| otherwise = (curN - 1, Continue (sum + num))
But this is, by definition, complex! We’ve now merged take
and rplus
into a
single function. It would be much better if we could make take
and rplus
independent of each other, and compose them back in later on.
If we only had to work on lists, then these two expressions would be equal:
foldl (+) 0 (take n myList)
reduce (takeNPlus 10) 0 myList
Since (+)
and take
are independent in the previous expression, we could swap
out (+)
with some other function. We cannot do that with takeNPlus
, and that
is, of course, why we would like to use transducers.
Making Basic Transducers
Transducers come in all kinds of shapes and types. Some can call the reducing function more than once, others can remove input before passing it downstream, and some may short-circuit early on.
One of the important things about a transducer is that it must not assume anything about the output type, nor the state type of the reducer it receives.
To begin lightly and see how this applies, we can start with filtering
. It
doesn’t use state, so it’s relatively straightforward to look at:
filtering :: (a -> Bool) -> Reducer s a b -> Reducer s a b
filtering pred (Reducer is c stepFn)
= Reducer { initState = is,
complete = c,
step = filterStep
}
where filterStep s acc x
| pred x = stepFn s acc x
| otherwise = (s, Continue acc)
The b
’s in the Reducers are untouched and is only used/produced via the
functions within the Reducer passed in (stepFn
and c
).
The filterStep
function calls the step function from the reducing
function given, but only if the predicate function returns true. Otherwise it
just returns the current state and the original value. Using it in real life
would look like this:
*Main> reduce (stateless (+)) 0 [1..10]
55
*Main> reduce (filtering even (stateless (+))) 0 [1..10]
30
*Main> reduce (filtering odd (stateless (+))) 0 [1..10]
25
It’s sort of in the “opposite” order of what one would expect, so to clarify: The way to read these chains of transducer calls is left to right: First we filter, then we sum.
Transducers and State
Since filtering
doesn’t use any state, it just takes the state from the
Reducer passed in. However, if we had some state we wanted to use, we could add
it by using a tuple. A perfectly valid implementation of filtering would be
this:
filtering :: (a -> Bool) -> Reducer s a b -> Reducer ((), s) a b
filtering pred (Reducer is c stepFn)
= Reducer { initState = ((), is),
complete = c . snd,
step = filterStep
}
where filterStep (myState, s) acc x
| pred x = let (s', res) = stepFn s acc x in
((myState, s'), res)
| otherwise = ((myState,s), Continue acc)
The trick is to unwrap the tuple into your own state (myState
) and the
downstream reducer’s state (s
). Whenever you call the downstream reducer’s
step function, you pass in s
and the value you want. Then you also have to
wrap it back up again when you return the result.
A small note here: Since we still don’t have any complete function, we just use
c . snd
. This is just a shortcut notation for
\(_, s) val -> c s val
Which unwraps our own state and calls the downstream complete with the state and value as it expects.
Since we’ve now looked at how we can apply state, let’s see it in action with
taking
, the function we wanted up earlier where we wanted to split takeNPlus
up into composable parts:
taking :: Int -> Reducer s a b -> Reducer (Int, s) a b
taking n (Reducer is c f)
= Reducer { initState = (n, is),
complete = c . snd,
step = takeStep
}
where takeStep (curN, s) res x
| 0 < curN = let (s', v) = f s res x in
((curN - 1, s'), v)
| otherwise = ((curN, s), Reduced res)
Here, we call the downstream reducer until we’ve done so n
times, then we call
reduced. We keep the state in an Int, and do the unwrapping and rewrapping of
the state as mentioned earlier. Using it is just as easy as using filtering:
*> reduce (taking 10 (stateless (+))) 0 [1..]
55
-- read: take 10, then filter even, then sum together
*> reduce (taking 10 (filtering even (stateless (+)))) 0 [1..]
30
-- read: filter even, then take 10, then sum together
*> reduce (filtering even (taking 10 (stateless (+)))) 0 [1..]
110
Complete it!
So far, we haven’t seen any use of the complete
part of a reducer. It’s a
rarely used thing but is handy every now and then. If you, for example, want to
partition the results based on some predicate function, sort of like this:
-- You have a pipeline of numbers:
85 38 64 92 36 0 44 87 58 22 58 72 17 50 25 69 84 48 35 55
-- 'partitionBy even' will convert the pipeline into this:
[85] [38, 64, 92, 36, 0, 44] [87] [58, 22, 58, 72], [17], ...
the partitionBy
function may hold a list of elements that it has accumulated
so far. If we’re at the end of the list above, we have [35, 55]
stored in our
state. But if we just stopped normally, then those values wouldn’t be passed
down to the reducer we used! Hence the need of complete
: Even if we’re at the
end of the stream/pipeline of values, we may have values we haven’t yet sent
down to the reducing function downstream.
partitionBy
is a nasty beast though. It has to do a lot of stuff, so let’s
first look at the entire thing, then piece by piece. (You can skip this section
if you’d like to, the point is to show how one would utilise complete)
partitionBy :: Eq x => (a -> x) -> Reducer s [a] b
-> Reducer (Maybe ([a], x), s) a b
partitionBy pfn (Reducer is c stepFn)
= Reducer { initState = (Nothing, is),
complete = partitionComplete,
step = partitionStep
}
where partitionComplete (Nothing, s) x = c s x
partitionComplete (Just (xs, _), s) acc
= let (state', res) = stepFn s acc xs
in c state' (extract res)
partitionStep (Nothing, s) acc x
= ((Just ([x], pfn x), s), Continue acc)
partitionStep (Just (as, cmp), s) acc x
| cmp == pfn x = ((Just (as ++ [x], cmp), s),
Continue acc)
| otherwise = let (state', acc') = stepFn s acc as
in ((Just ([x], pfn x), state'), acc')
-- Extract is a utility function for getting the result out of a
-- Reduced value.
extract :: Reduced a -> a
extract (Continue x) = x
extract (Reduced x) = x
partitionBy
takes a function pfn
that converts a
to x
. If the x
value
is equal for multiple values in a row, we store them in the same list. When the
x
value change, we send the list we’ve gathered so far downstream.
Our state is therefore Maybe ([a], x)
, where [a]
is the list of values we’ve
gathered so far, and x
is the output of pfn
for all the values in the list.
The reason why this is put in a Maybe
is because we don’t have any x
value
to compare with at the beginning of a reduction (it is nothing in that case).
If we’re at the beginning, we know that our state is Nothing
. In that case, we
just put the value we get in into a new list, and we also store the x
result.
We don’t call or do anything with the reducer function downstream:
partitionStep (Nothing, s) acc x
= ((Just ([x], pfn x), s), Continue acc)
If it’s not Nothing
, we know we have some values. If cmp
– the x
value
we’ve stored this far – is equal to the value of pfn x
, we put the value we
got into the list by doing as ++ [x]
(Yes, it’s inefficient, but it does the
job)
partitionStep (Just (as, cmp), s) acc x
| cmp == pfn x = ((Just (as ++ [x], cmp), s),
Continue acc)
Otherwise, we have to send the list we’ve stored so far to the reducer downstream and create a new list.
| otherwise = let (state', acc') = stepFn s acc as
in ((Just ([x], pfn x), state'), acc')
And so, for the final part: If we’re at the end of the stream of values and have no list, we just call our downstream reducer’s complete function:
where partitionComplete (Nothing, s) x = c s x
But when we have a list, we need to send it down via the step function. Then we take the result of the step function (the state and the value), and pass that to the downstream reducer’s complete function:
partitionComplete (Just (xs, _), s) acc
= let (state', res) = stepFn s acc xs
in c state' (extract res)
All in all pretty confusing and hard to grok, but fortunately, the need for this is pretty rare. You should be fine just knowing how to manipulate state and calling the reducers you get as input.
Reduce, Sequence and Conduits
One of the cool things about transducers is that, since they should never
specify constraints on the output type of the reducer, they can be used for many
other things than just “normal” reductions. sequence
is such a function, which
effectively takes a list and transforms it into another list via a transducer.
It’s sort of like a beefed-up version of map
. The type signature is a bit
weird though:
--import Prelude hiding (sequence)
sequence :: (Reducer () b [b] -> Reducer t a [b]) -> [a] -> [b]
Here we leak a bit of internal information about how sequence
is implemented,
and we can fix it with a Haskell extension. We’ll not do it here though, just
remember that a transducer should not care about the output type at all, and the
input statue is not something we care about either. It would be better if the
type signature looked like this instead because that’s what we really mean:
sequence :: (Reducer s b z -> Reducer t a z) -> [a] -> [b]
In contrast to reduce
, sequence
is lazy and you can consume the values that
have been produced so far right away (and discard the ones you don’t need):
*> take 10 (sequence (mapping (* 5)) [1..])
[5,10,15,20,25,30,35,40,45,50]
In fact, you cannot implement sequence
via a reducer passed to reduce
! We
could try by implementing append
, but it wouldn’t work:
*> let append = stateless (\x y -> x ++ [y]) in
reduce (mapping (* 5) append) [] [1..10]
[5,10,15,20,25,30,35,40,45,50]
*> let append = stateless (\x y -> x ++ [y]) in
take 10 (reduce (mapping (* 5) append) [] [1..])
-- will never return
Although lazy vs. eager is probably not the most inspiring example for Haskellers, it means that the transducers themselves are independent of evaluation strategies. For example, you can use transducers as Conduits from the Conduits library, by transforming them:
import Data.Conduit
-- Yields the bs it receives
conduitYielder :: Monad m => Reducer () b (Conduit a m b)
conduitYielder = stateless run
where run m x = m >> yield x
-- awaits the as upstream, then feed them into the
-- reducing function
conduitAwaiter :: Monad m => (Reducer s a (Conduit a m b))
-> Conduit a m b
conduitAwaiter (Reducer is c f) = go is
where go s = do mval <- await
case mval of
(Just val) -> feed s val
Nothing -> feedLast s
feed s val = case f s (return ()) val of
-- run comp immediately before continuing
(s', Reduced comp) -> comp >> feedLast s'
(s', Continue comp) -> comp >> go s'
feedLast s = c s (return ())
-- merging them
toConduit :: Monad m => (Reducer () b (Conduit a m b)
-> Reducer s a (Conduit a m b))
-> Conduit a m b
toConduit xform = conduitAwaiter (xform conduitYielder)
It’s not important to understand the code here, the important thing is that you
can make a transducer into a conduit with the function toConduit
.
myXform = taking 10 . mapping f . partitionBy (`mod` 3)
where f x = 3 * x^3 - 2 * x^2 + 2 * x
Now we can use myXform
on whatever thing we’d like to, regardless of whether
it’s lists or conduits:
import Data.Conduit
import qualified Data.Conduit.List as CL
main :: IO ()
main = do
mapM_ print (sequence myXform [1..])
CL.sourceList [1..] $$ toConduit myXform =$ CL.mapM_ print
-- both expressions print the same lines
Monads and Conduits
Alright, so I said that I wouldn’t use monads and the like, but the previous section brings up an interesting question which necessarily introduces monads: Can we convert a conduit into a transducer? To answer that question, we need to have a brief look at the Conduit type:
-- +--- We can read/await values of type a
-- |
-- | +--- And use the monad m (if we want to)
-- | |
-- | | +--- To produce/yield values of type b
-- | | |
-- v v v
Conduit a m b
So the question becomes, what is really the equivalent transducer, if it exists?
Awaiting values of type a inside a Conduit doesn’t require the monad, so the “input type”, whatever that is for a transducer, will still be the same. Yielding, on the other hand, may use the monad to retrieve information necessary to produce bs. And since a transducer has this inverse ordering, we theoretically end up with
Reducer s (m b) z -> Reducer t a z
The full signature of a potential fromConduit
seems then to be
fromConduit :: Monad m => Conduit a m b -> Reducer s (m b) z
-> Reducer t a z
I am pretty sure you can implement this fromConduit
function. But if that is
the case, aren’t Conduits and Transducers then equivalent (isomorphic)? I am
pretty sure they are. You probably have to tweak or make another variant of
toConduit
that also expects a monad, so that you get the laws
fromConduit . toConduit = id
toConduit . fromConduit = id
but again, it seems doable.
This makes sense informally too. If we get rid of the monad used over a Conduit
for the time being, we can see the similarity: await
is used to consume
values, which will return Just a
or Nothing
. The Just a
case is equivalent
to a call to the step function of a reducing function, and Nothing
is
equivalent to a call to complete. yield
is used to produce values, which is
the same as calling the step function on the reducing function you have been
passed in.
What about the state? Transducers, as I have implemented them will, have to
explicitly manage state. Conduits, being monads, can contain state implicitly
via recursion. As an example, here we have take
for conduits:
ctake :: Monad m => Int -> Conduit a m a
ctake = loop
where loop n
| n <= 0 = return ()
| otherwise = do mval <- await
case mval of
(Just val) -> do yield val
loop (n - 1)
Nothing -> return ()
My initial reaction is that Conduits feels cleaner and a bit more straightforward to use compared to transducers, especially if you have to make them yourself. And if they are equivalent, then it seems unnecessary to use transducers in Haskell.
Transducers in Clojure are easier to make and write than the Haskell ones, once
you’ve gotten the hang of it. Most of that is because you don’t have to think
about the underlying reducer’s state. You just call init
, step
and
complete
where it makes sense to do so. In addition, the transducers in
Clojure focus heavily on performance and mutability, and the transducer
abstraction, which effectively just calls functions, seems hard to beat.
That’s not to say that a Conduit-like library in Clojure isn’t valuable though, it may well be worth the effort to explore and implement one. The goal of such a library wouldn’t be performance but rather readability. Which, in many cases, seems like a good tradeoff.
My haskell-transducers
repository contains a working example of all the code explained here, but
cleaned up (taking
-> take
and so on) and is at least a bit more idiomatic.
-
Presumably at the cost of performance, but I would assume it’s possible to bypass this for performance critical code. ↩