Well this got big. This might be way too much for a design document. I can write a followup that is a concise explanation of the current design. But I came to the current design passing through many failed attempts, and reasonable looking approaches can fail in subtle ways. So I thought it would be helpful (to future me if nobody else) to work up to the current design by a sequence of simpler versions and seeing where they fall short.
The main design goals are:
- Support finalization of streams, so we can add code to a stream that is guaranteed to be run exactly once, and no attempt will be made to pull elements from the stream after the finalization code is run.
- Support grouping operations like
groupBy
. The previous stream abstraction makes this difficult. I acheive this by making the element type of streams completely generic, so thatStream[Stream[A]]
poses no problems. - As much as possible, try to generate bytecode that’s as good as what I would write by hand, or at least see a path forward to get there in the future without substantially changing the streaming model.
- Make both the compile-time and run-time control flow as transparent as possible.
Warmup: Missingness
To warm up, I’ll consider the special case of a stream with 0 or 1 element. Such a stream represents a possibly missing value, and could be used as an alternative to EmitTriplet.
In the current Emit
, you set up an environment consisting of values for all free variables, a region to produce results into, and maybe some extra context to support aggregations. Then you recur into a child ir, which gives you back an EmitTriplet
consisting of three pieces of code. You repeat for your other children, and assemble all the pieces into your own EmitTriplet
.
case class EmitTriplet(setup: Code[Unit], m: Code[Boolean], v: Code[_])
If EmitTriplet
were an unstaged type, it would look something like
class EmitTriplet[A] {
def apply(): (Boolean, A)
}
where calling apply()
does any setup work, before giving you a pair (isMissing: Boolean, value: A)
, with the understanding that you shouldn’t read value
unless isMissing
is false.
Of course, a possibly missing value can also be represented by an Option[A]
.
abstract class Option[A]
case class None[A] extends Option[A]
case class Some[A](a: A) extends Option[A]
How might we encode a staged Option
? The trick is to focus not on what it means to construct a value opt: Option[A]
, but rather to encode what it means to consume such a value. The primitive way to consume opt
—which can be used to define all possible consumers—is by pattern matching:
opt match {
case None => noneBody()
case Some(a) => someBody(a)
}
This is analogous to how the primitive way to consume a Boolean
is with an if
statement with two branches.
We can encode a staged Option
as something that requires some extra context from its environment. In addition to the values of all free variables, a region to produce results into, etc., the option also requires the two branch bodies—or said differently, two continuations—before it can generate code. We can represent the dependence on the two continuations as follows, along with conversions to and from EmitTriplet
:
type Bot = Code[Ctrl]
abstract class COption[A] { self =>
def apply(none: Bot, some: A => Bot): Bot
def matchOpt[B: TypeInfo](none: Code[B], some: A => Code[B]): Code[B] =
JoinPoint.CallCC[Code[B]] { (jb, ret) =>
matchBot(ret(none), a => ret(some(a)))
}
def flatMap[B](f: A => COption[B]): COption[B] = new COption[B] {
def apply(none: Bot, some: B => Bot): Bot = {
val noneJP = joinPoint()
noneJP.define(_ => none)
self.apply(
none = noneJP(()),
some = a => f(a).apply(
none = noneJP(()),
some = some))
}
}
}
object COption {
def fromEmitTriplet[A](et: EmitTriplet): COption[Code[A]] = new COption[Code[A]] {
def matchBot(none: Bot, some: Code[A] => Bot): Bot = {
Code(et.setup, et.m.mux(none, some(coerce[A](et.v)))
}
}
def toEmitTriplet[A: TypeInfo](opt: COption[Code[A]], mb: MethodBuilder): EmitTriplet = {
val m = mb.newLocal[Boolean]
val v = mb.newLocal[A]
val setup = opt.match[Unit](
none = Code(m := true, v := Code.defaultValue[A]),
some = a => Code(m := false, v := a))
EmitTriplet(setup, m, v.load())
}
}
(Because I’ll be using Code[Ctrl]
so much, I made an alias type Bot = Code[Ctrl]
. Code[Ctrl]
is used in the JoinPoint
package to represent code that doesn’t return a value, but instead jumps away to some label/continuation in the environment. Bot
is short for bottom, as you can think of this as the bottom type: Bot
has no values, so an expression or function of type Bot
can never return. I’m also going to gloss over the bookkeeping of threading MethodBuilder
s and JoinPointBuilder
s through everywhere.)
COption
is like a lazy version of EmitTriplet
. Note that the round-trip from COption
to EmitTriplet
and back has the effect of forcing the evaluation of the lazy COption
, storing the result in local variables.
The main benefit of COption
is the bytecode that is generated when working with multiple optional values. flatMap
abstracts the common pattern of working with multiple optional values, where if any of them are missing we want to jump to the same spot. For example, if emit
and PArray.loadElement
both returned COption
, ArrayRef
would be written (omitting bounds checking)
emit(arrayIR).flatMap { array =>
emit(idxIR).flatMap { idx =>
arrayType.loadElement(array, idx)
}
}
which generates optimal bytecode. Compared to the EmitTriplet
based implementation, which computes three local boolean variables for array/idx/element missingness, this needs no local booleans, and one fewer branch.
In COption
, and later in streams, to avoid code duplication we stick to the rule that continuations (none
and some
) must never be used more than once. If the implementation of a COption
wants to use a continuation multiple times, such as if there are multiple ways the value might end up missing (as is common), it needs to define a join point, as in val noneJP = joinPoint(); eosJP.define(_ => none)
. This is completely analogous to how we avoid using Code
arguments multiple times, assigning them to locals instead.
Notice that COption[A]
puts no restrictions on the type A
(only requiring a TypeInfo[A]
to convert to an EmitTriplet
). This allows composing staged abstractions. For example, this allows keeping missingness separate from the core stream abstraction, representing a stream which might be entirely missing simply as COption[Stream[A]]
, even though Stream
isn’t a first class type (Code[Stream[A]]
doesn’t make sense).
Streams: Take 1
We can view the Option
abstraction as a communication protocol between the producer and the consumer of an Option
value: the Option
can handle a single message “evaluate()”, and responds by sending either a “missing()” or a “present(value)” message back to the consumer.
Thinking in terms of communication protocols like that has turned out to be the most effective way for me to understand streams. What is the protocol between a stream producer and consumer? Like with Option
, the consumer starts the exchange by sending a “pull()” message. Also as before the producer can respond by sending one of two messages: “EOS()” (end of stream) or “push(value)”.
So far this looks exactly the same as Option
. The main difference is that the consumer is allowed to send multiple “pull()” messages. We will also specify that the consumer is not allowed to send “pull()” messages after it has received an “EOS()” message (we could also require the producer to continue responding with “EOS()” to all further “pull()” requests, but this adds complexity to the producers).
Let’s make this precise. We can write a staged stream following this protocol exactly like we did for Option
:
abstract class Stream[A] {
def apply(eos: Bot, push: A => Bot): Bot
}
The idea is that as the consumer of a stream, you must provide to the stream your eos
and push
continuations, and the producer will then provide you with the implementation of pull
.
Note: As with COption
, there is no restriction on the type A
. This was one of the design goals of the new stream implementation, to allow grouping methods to return Stream[Stream[A]]
. It also lets us handle missingness easily with Stream[COption[A]]
.
Using this, we can implement a consumer like fold
easily enough:
def fold[A, S](
stream: Stream[A],
s0: S,
f: (A, S) => S,
ret: S => Bot
): Bot = {
val s = newLocal[S]
val pullJP = joinPoint()
pullJP.define(_ =>
stream(
eos = ret(s),
push = a => Code(s := f(a, s), pullJP(()))))
Code(s := s0, pullJP(()))
}
But we run into trouble trying to implement a producer like unfold
. (If you’re unfamiliar with ParameterPack
, it means S
is either a Code
or a tuple of Code
s, and lets you make locals or fields to store them.)
def unfold[A, S: ParameterPack](
s0: S,
f: S => COption[(A, S)]
): Stream[A] = new Stream[A] {
def apply(eos: Bot, push: A => Bot): Bot = {
val state = newLocal[S]
f(state)(
none = eos,
some = { case (a, s1) =>
Code(state := s1, push(a))
}
)
}
}
The problem is, Stream.apply
returns the implementation of pull, but has nowhere to put the initialization s := s0
. This is easily fixed by allowing Stream.apply
to return both “pull” and “setup” blocks of code.
There’s another more subtle error in the above unfold
. Suppose we try implementing range
using unfold
:
def range(start: Code[Int], stop: Code[Int]): Stream[Code[Int]] =
unfold[Code[Int], Code[Int]](
s0 = start,
f = s => new COption[(Code[Int], Code[Int])] {
def apply(none: Bot, some: (Code[Int], Code[Int]) => Bot): Bot =
(s < stop).mux(
some(s, s + 1),
none)
})
Conceptually, we want to maintain the unfold state as the next number to push. But note that the s
that unfold
passes to us in f
is a local variable reference. When we use the some
branch, pushing a = s
and s1 = s + 1
, unfold
first updates the state state := s1
and then pushes a
to the stream consumer. But a
is a really a reference to state
, which just changed.
The issue is basically that passing Code
s around is a lazy evaluation model, and it is difficult to control the order of evaluation to force computing a = state
before computing state = state + 1
. The way to avoid this problem in general is to treat values of type Code
linearly (more precisely, affinely), i.e. never use a Code
somebody passed you more than once. The f
in the range
above uses its argument s
three times, which besides just being a possible source of code duplication (if s
is big, not just a local reference), also leads to the evaluation order problem.
A way to make order of evaluation completely explicit is using continuation passing style. In this case, we change the type of f
in unfold
to (S, COption[(A, S)] => Bot) => Bot
.
Streams: Take 2
Incorporating the new changes we have:
case class Source(setup: Code[Unit], close: Code[Unit], pull: Bot)
abstract class Stream[A] {
def apply(eos: Bot, push: A => Bot): Source
}
def fold[A, S](
stream: Stream[A],
s0: S,
f: (A, S) => S,
ret: S => Bot
): Bot = {
val s = newLocal[S]
val pullJP = joinPoint()
val source = stream(
eos = ret(s),
push = a => Code(s := f(a, s), pullJP(())))
pullJP.define(_ => source.pull)
Code(s := s0, source.setup, pullJP(()))
}
def unfold[A, S: ParameterPack](
s0: S,
f: (S, COption[(A, S)] => Bot) => Bot
): Stream[A] = new Stream[A] {
def apply(eos: Bot, push: A => Bot): Source = {
val state = newLocal[S]
Source(
setup = state := s0,
close = Code._empty,
pull = f(state, _(
none = eos,
some = { case (a, s1) =>
Code(state := s1, push(a))
})))
}
}
def range(start: Code[Int], stop: Code[Int]): Stream[Code[Int]] =
unfold[Code[Int], Code[Int]](
s0 = start,
f = (s, ret) => {
val temp = newLocal[Code[Int]]
Code(
temp := s, // Only use of s! Forces evaluation of s.
ret(new COption[(Code[Int], Code[Int])] {
def apply(none: Bot, some: (Code[Int], Code[Int]) => Bot): Bot =
(temp < stop).mux(
some(temp, temp + 1),
none)
}))
})
While we were adding setup
to streams, I also snuck in close
, as a hook to put memory deallocations and other resource freeing. It’s easy enough to have producers clean up after themselves before sending “EOS”, but without close
there would be no way to clean up a producer when the producer stops pulling, such as in a take
or a zip
(when one stream sends “EOS”, we need to close the other one). The invariant we must enforce is that the setup
and close
of any stream are always perfectly paired.
This raises a new ambiguity in the producer/consumer protocol: when a producer sends “EOS”, is the producer responsible for closing itself beforehand, or is it the consumer’s responsibility to close the producer after recieving “EOS”? Both seem reasonable, but we’ll see later that flatmap makes the later difficult, so I’ll stick to close-before-EOS. (This turns out to complicate zip, and it’s a recurring theme that the interaction of flatmap and zip creates intrinsic complexity that has to end up somewhere.)
So let’s get to flatmap and zip, and see where Take 2 breaks down.
def flatMap[A](outerStream: Stream[Stream[A]]): Stream[A] = new Stream[A] {
def apply(eosOut: Bot, pushOut: A => Bot): Source = {
val outerPullJP = joinPoint
val innerPullJP = joinPoint()
// We only find out the inner stream after the outer stream calls push.
// We need somewhere to store the inner stream to use outside of push.
var innerSource: Source = null
val outerSource = outerStream(
// when the outer stream ends, the flatMap ends
eos = eosOut,
push = innerStream => {
innerSource = innerStream(
// when the inner stream ends, we need to pull a new element from the outer stream
eos = outerPullJP(()),
// when the inner stream pushes, we forward the value to our consumer
push = pushOut)
innerPullJP.define(_ => innerSource.pull)
// when the outer stream pushes, we initialize and pull from the inner stream
Code(innerSource.setup, innerPullJP(()))
})
outerPullJP.define(_ => outerSource.pull)
val firstPull = newLocal[Code[Boolean]]
Source(
setup = Code(firstPull := true, outerSource.setup),
// our consumer will only use close after we have pushed, meaning there is
// an inner stream that needs to be closed.
close = Code(innerSource.close, outerSource.close),
pull = Code(
// we have to direct the first pull to the outer stream, and all other pulls to the inner stream
firstPull.mux(
Code(firstPull := false, outerPullJP(())),
innerPullJP(()))))
}
}
Other than the complication of needing to recieve the inner stream from our producer when the call push, and needing to use it after our producer has given us their Source
, this is hopefully pretty clear. One tricky piece is that when our consumer requests the first value, we need to start by pulling from the outer stream (which has been wired to pull from the inner stream when it pushes), but every other time we need to pull from the inner stream, letting it continue from where it left off.
Actually, this has a subtle bug. Note that innerSource.setup
needs to be run every time the outer stream pushes a new value. Say innerSource.setup
assigns to a local variable innerVar
. When our consumer calls pull
, we have to branch, jumping either to the outer pull or the inner pull. On the first pull, innerVar
won’t have been assigned yet, so the JVM will consider innerVar
to be unassigned at the top of the pull
label. But then on later pulls, we will try to jump to the inner pull, which will try to read from innerVar
, causing a bytecode verification error.
To fix this, the inner stream needs to give us a second piece of setup to be run before the entire flatmap, to ensure all of its state variables are given dummy values. And we should pair a second piece of cleanup with the outer setup (this would let us make optimizations like allocating and deallocating a region once for a nested stream that is run many times).
(There’s another bug that I’ll gloss over, because it’s easy to fix. If the outer stream is statically empty, it may never call push
at compile time, so innerSource
may remain null when we try to use it at the end. It’s simple to check for that and handle that case correctly, but it obscures the code a bit.)
Streams: Take 3
The last version of Stream
adds the second setup/close pair. I’ll only show the new flatMap
; the others contain no surprises, they just need to use setup0
to assign dummy values to all their state variables.
case class Source(setup0: Code[Unit], close0: Code[Unit], setup: Code[Unit], close: Code[Unit], pull: Bot)
abstract class Stream[A] {
def apply(eos: Bot, push: A => Bot): Source
}
def flatMap[A](outerStream: Stream[Stream[A]]): Stream[A] = new Stream[A] {
def apply(eosOut: Bot, pushOut: A => Bot): Source = {
val outerPullJP = joinPoint
val innerPullJP = joinPoint()
var innerSource: Source = null
val outerSource = outerStream(
eos = eosOut,
push = innerStream => {
innerSource = innerStream(
eos = outerPullJP(()),
push = pushOut)
innerPullJP.define(_ => innerSource.pull)
Code(innerSource.setup, innerPullJP(()))
})
outerPullJP.define(_ => outerSource.pull)
val firstPull = newLocal[Code[Boolean]]
Source(
setup0 = Code(firstPull := true, outerSource.setup0, innerSource.setup0),
close0 = Code(innerSource.close0, outerSource.close0),
setup = Code(firstPull := true, outerSource.setup),
close = Code(innerSource.close, outerSource.close),
pull = Code(
firstPull.mux(
Code(firstPull := false, outerPullJP(())),
innerPullJP(()))))
}
}
The last stream combinator I’ll look at is zip
, because the combination of zip
and flatMap
seems to be the real test of a stream fusion system. With this version of Stream
, zip
turns out to be straightfoward, and surprisingly similar to flatMap
.
def zip[A, B](left: Stream[A], right: Stream[B]): Stream[(A, B)] = new Stream[(A, B)] {
def apply(eosOut: Bot, pushOut: ((A, B)) => Bot): Source = {
def eosJP = joinPoint()
def leftEosJP = joinPoint()
def rightEosJP = joinPoint()
var rightSource: Source = null
val leftSource = left(
eos = leftEosJP(()),
push = a => {
// similar to flatMap, we need to wait until left calls push (at compile time)
// before we call into right.
rightSource = right(
eos = rightEosJP(()),
push = b => pushOut((a, b)) )
// when left pushes an A, we need to pull a B
rightSource.pull
})
leftEosJP.define(_ => Code(rightSource.close, eosJP(())))
rightEosJP.define(_ => Code(leftSource.close, eosJP(())))
eosJP.define(_ => eosOut)
Source(
setup0 = Code(leftSource.setup0, rightSource.setup0),
close0 = Code(leftSource.close0, rightSource.close0),
setup = Code(leftSource.setup, rightSource.setup),
close = Code(leftSource.close, rightSource.close),
pull = leftSource.pull)
}
}
The only complication is that, because of our decision that producers must cleanup before sending EOS, when one child stream sends us EOS we must call close
on the other. If it were the consumer’s responsibility to close after getting EOS, zip
would be simpler. But consider the consumer of a flatMap. When they get an EOS, the flatMap is in the outer stream, and there is no inner stream to close. But when they decide to close the flatMap early, the inner stream must be closed. So a single close
wouldn’t be sufficient, or flatMap
would have to implement close
to check a local variable to know if the inner close must be run. Again, the combination of zip and flatMap creates complexity that has to end up somewhere.
Final thoughts
There is one last optimization I’ve made in my branch, that I want to mention but not go into detail. Source
has one more field, firstPull: Option[Bot]
. When it is defined, the consumer must use firstPull
for the first pull, and pull
for the rest. This lets flatMap
define firstPull
to pull from the outer stream, and pull
to pull from the inner stream, eliminating the local flag and branch. The tradeoff is that zip
now needs a local flag to track whether the right stream has been pulled from yet. But that is only necessary when both left
and right
have implemented firstPull
; otherwise make the one with no firstPull
the right stream. This way, the common case of zipping two streams where not both are nested generates code without any unecessary branches.
It turns out to be simple to convert from the existing streams to this one, but the other direction is impossible. So my migration plan is to start by implementing the consumers in the new model, calling into the old stream emitter for unsupported nodes and wrapping it in the converter. Now that I’ve got the details of the stream representation worked out, things are starting to go faster. I expect to start making PRs next week, first migrating to the new model, then adding in region management. And at some point I’ll write a more concise design doc to check in.