New stream design proposal

Well this got big. This might be way too much for a design document. I can write a followup that is a concise explanation of the current design. But I came to the current design passing through many failed attempts, and reasonable looking approaches can fail in subtle ways. So I thought it would be helpful (to future me if nobody else) to work up to the current design by a sequence of simpler versions and seeing where they fall short.

The main design goals are:

  • Support finalization of streams, so we can add code to a stream that is guaranteed to be run exactly once, and no attempt will be made to pull elements from the stream after the finalization code is run.
  • Support grouping operations like groupBy. The previous stream abstraction makes this difficult. I acheive this by making the element type of streams completely generic, so that Stream[Stream[A]] poses no problems.
  • As much as possible, try to generate bytecode that’s as good as what I would write by hand, or at least see a path forward to get there in the future without substantially changing the streaming model.
  • Make both the compile-time and run-time control flow as transparent as possible.

Warmup: Missingness

To warm up, I’ll consider the special case of a stream with 0 or 1 element. Such a stream represents a possibly missing value, and could be used as an alternative to EmitTriplet.

In the current Emit, you set up an environment consisting of values for all free variables, a region to produce results into, and maybe some extra context to support aggregations. Then you recur into a child ir, which gives you back an EmitTriplet consisting of three pieces of code. You repeat for your other children, and assemble all the pieces into your own EmitTriplet.

case class EmitTriplet(setup: Code[Unit], m: Code[Boolean], v: Code[_])

If EmitTriplet were an unstaged type, it would look something like

class EmitTriplet[A] {
  def apply(): (Boolean, A)
}

where calling apply() does any setup work, before giving you a pair (isMissing: Boolean, value: A), with the understanding that you shouldn’t read value unless isMissing is false.

Of course, a possibly missing value can also be represented by an Option[A].

abstract class Option[A]
case class None[A] extends Option[A]
case class Some[A](a: A) extends Option[A]

How might we encode a staged Option? The trick is to focus not on what it means to construct a value opt: Option[A], but rather to encode what it means to consume such a value. The primitive way to consume opt—which can be used to define all possible consumers—is by pattern matching:

opt match {
  case None => noneBody()
  case Some(a) => someBody(a)
}

This is analogous to how the primitive way to consume a Boolean is with an if statement with two branches.

We can encode a staged Option as something that requires some extra context from its environment. In addition to the values of all free variables, a region to produce results into, etc., the option also requires the two branch bodies—or said differently, two continuations—before it can generate code. We can represent the dependence on the two continuations as follows, along with conversions to and from EmitTriplet:

type Bot = Code[Ctrl]

abstract class COption[A] { self =>
  def apply(none: Bot, some: A => Bot): Bot
  
  def matchOpt[B: TypeInfo](none: Code[B], some: A => Code[B]): Code[B] =
    JoinPoint.CallCC[Code[B]] { (jb, ret) =>
      matchBot(ret(none), a => ret(some(a)))
    }
  
  def flatMap[B](f: A => COption[B]): COption[B] = new COption[B] {
    def apply(none: Bot, some: B => Bot): Bot = {
      val noneJP = joinPoint()
      noneJP.define(_ => none)
      self.apply(
        none = noneJP(()),
        some = a => f(a).apply(
          none = noneJP(()),
          some = some))
    }
  }
}

object COption {
  def fromEmitTriplet[A](et: EmitTriplet): COption[Code[A]] = new COption[Code[A]] {
    def matchBot(none: Bot, some: Code[A] => Bot): Bot = {
      Code(et.setup, et.m.mux(none, some(coerce[A](et.v)))
    }
  }
  
  def toEmitTriplet[A: TypeInfo](opt: COption[Code[A]], mb: MethodBuilder): EmitTriplet = {
    val m = mb.newLocal[Boolean]
    val v = mb.newLocal[A]
    val setup = opt.match[Unit](
      none = Code(m := true, v := Code.defaultValue[A]),
      some = a => Code(m := false, v := a))
    EmitTriplet(setup, m, v.load())
  }
}

(Because I’ll be using Code[Ctrl] so much, I made an alias type Bot = Code[Ctrl]. Code[Ctrl] is used in the JoinPoint package to represent code that doesn’t return a value, but instead jumps away to some label/continuation in the environment. Bot is short for bottom, as you can think of this as the bottom type: Bot has no values, so an expression or function of type Bot can never return. I’m also going to gloss over the bookkeeping of threading MethodBuilders and JoinPointBuilders through everywhere.)

COption is like a lazy version of EmitTriplet. Note that the round-trip from COption to EmitTriplet and back has the effect of forcing the evaluation of the lazy COption, storing the result in local variables.

The main benefit of COption is the bytecode that is generated when working with multiple optional values. flatMap abstracts the common pattern of working with multiple optional values, where if any of them are missing we want to jump to the same spot. For example, if emit and PArray.loadElement both returned COption, ArrayRef would be written (omitting bounds checking)

emit(arrayIR).flatMap { array =>
  emit(idxIR).flatMap { idx =>
    arrayType.loadElement(array, idx)
  }
}

which generates optimal bytecode. Compared to the EmitTriplet based implementation, which computes three local boolean variables for array/idx/element missingness, this needs no local booleans, and one fewer branch.

In COption, and later in streams, to avoid code duplication we stick to the rule that continuations (none and some) must never be used more than once. If the implementation of a COption wants to use a continuation multiple times, such as if there are multiple ways the value might end up missing (as is common), it needs to define a join point, as in val noneJP = joinPoint(); eosJP.define(_ => none). This is completely analogous to how we avoid using Code arguments multiple times, assigning them to locals instead.

Notice that COption[A] puts no restrictions on the type A (only requiring a TypeInfo[A] to convert to an EmitTriplet). This allows composing staged abstractions. For example, this allows keeping missingness separate from the core stream abstraction, representing a stream which might be entirely missing simply as COption[Stream[A]], even though Stream isn’t a first class type (Code[Stream[A]] doesn’t make sense).

Streams: Take 1

We can view the Option abstraction as a communication protocol between the producer and the consumer of an Option value: the Option can handle a single message “evaluate()”, and responds by sending either a “missing()” or a “present(value)” message back to the consumer.

Thinking in terms of communication protocols like that has turned out to be the most effective way for me to understand streams. What is the protocol between a stream producer and consumer? Like with Option, the consumer starts the exchange by sending a “pull()” message. Also as before the producer can respond by sending one of two messages: “EOS()” (end of stream) or “push(value)”.

So far this looks exactly the same as Option. The main difference is that the consumer is allowed to send multiple “pull()” messages. We will also specify that the consumer is not allowed to send “pull()” messages after it has received an “EOS()” message (we could also require the producer to continue responding with “EOS()” to all further “pull()” requests, but this adds complexity to the producers).

Let’s make this precise. We can write a staged stream following this protocol exactly like we did for Option:

abstract class Stream[A] {
  def apply(eos: Bot, push: A => Bot): Bot
}

The idea is that as the consumer of a stream, you must provide to the stream your eos and push continuations, and the producer will then provide you with the implementation of pull.

Note: As with COption, there is no restriction on the type A. This was one of the design goals of the new stream implementation, to allow grouping methods to return Stream[Stream[A]]. It also lets us handle missingness easily with Stream[COption[A]].

Using this, we can implement a consumer like fold easily enough:

def fold[A, S](
  stream: Stream[A],
  s0: S,
  f: (A, S) => S,
  ret: S => Bot
): Bot = {
  val s = newLocal[S]
  val pullJP = joinPoint()
  pullJP.define(_ =>
    stream(
      eos = ret(s),
      push = a => Code(s := f(a, s), pullJP(()))))
  
  Code(s := s0, pullJP(()))
}

But we run into trouble trying to implement a producer like unfold. (If you’re unfamiliar with ParameterPack, it means S is either a Code or a tuple of Codes, and lets you make locals or fields to store them.)

def unfold[A, S: ParameterPack](
  s0: S,
  f: S => COption[(A, S)]
): Stream[A] = new Stream[A] {
  def apply(eos: Bot, push: A => Bot): Bot = {
    val state = newLocal[S]
    f(state)(
      none = eos,
      some = { case (a, s1) =>
        Code(state := s1, push(a))
      }
    )
  }
}

The problem is, Stream.apply returns the implementation of pull, but has nowhere to put the initialization s := s0. This is easily fixed by allowing Stream.apply to return both “pull” and “setup” blocks of code.

There’s another more subtle error in the above unfold. Suppose we try implementing range using unfold:

def range(start: Code[Int], stop: Code[Int]): Stream[Code[Int]] =
  unfold[Code[Int], Code[Int]](
    s0 = start,
    f = s => new COption[(Code[Int], Code[Int])] {
      def apply(none: Bot, some: (Code[Int], Code[Int]) => Bot): Bot =
        (s < stop).mux(
          some(s, s + 1),
          none)
    })

Conceptually, we want to maintain the unfold state as the next number to push. But note that the s that unfold passes to us in f is a local variable reference. When we use the some branch, pushing a = s and s1 = s + 1, unfold first updates the state state := s1 and then pushes a to the stream consumer. But a is a really a reference to state, which just changed.

The issue is basically that passing Codes around is a lazy evaluation model, and it is difficult to control the order of evaluation to force computing a = state before computing state = state + 1. The way to avoid this problem in general is to treat values of type Code linearly (more precisely, affinely), i.e. never use a Code somebody passed you more than once. The f in the range above uses its argument s three times, which besides just being a possible source of code duplication (if s is big, not just a local reference), also leads to the evaluation order problem.

A way to make order of evaluation completely explicit is using continuation passing style. In this case, we change the type of f in unfold to (S, COption[(A, S)] => Bot) => Bot.

Streams: Take 2

Incorporating the new changes we have:

case class Source(setup: Code[Unit], close: Code[Unit], pull: Bot)

abstract class Stream[A] {
  def apply(eos: Bot, push: A => Bot): Source
}

def fold[A, S](
  stream: Stream[A],
  s0: S,
  f: (A, S) => S,
  ret: S => Bot
): Bot = {
  val s = newLocal[S]
  val pullJP = joinPoint()
  val source =  stream(
    eos = ret(s),
    push = a => Code(s := f(a, s), pullJP(())))
    
  pullJP.define(_ => source.pull)
  
  Code(s := s0, source.setup, pullJP(()))
}

def unfold[A, S: ParameterPack](
  s0: S,
  f: (S, COption[(A, S)] => Bot) => Bot
): Stream[A] = new Stream[A] {
  def apply(eos: Bot, push: A => Bot): Source = {
    val state = newLocal[S]
    Source(
      setup = state := s0,
      close = Code._empty,
      pull = f(state, _(
        none = eos,
        some = { case (a, s1) =>
          Code(state := s1, push(a))
        })))
  }
}

def range(start: Code[Int], stop: Code[Int]): Stream[Code[Int]] =
  unfold[Code[Int], Code[Int]](
    s0 = start,
    f = (s, ret) => {
      val temp = newLocal[Code[Int]] 
      Code(
        temp := s, // Only use of s! Forces evaluation of s.
        ret(new COption[(Code[Int], Code[Int])] {
          def apply(none: Bot, some: (Code[Int], Code[Int]) => Bot): Bot =
            (temp < stop).mux(
              some(temp, temp + 1),
              none)
        }))
    })

While we were adding setup to streams, I also snuck in close, as a hook to put memory deallocations and other resource freeing. It’s easy enough to have producers clean up after themselves before sending “EOS”, but without close there would be no way to clean up a producer when the producer stops pulling, such as in a take or a zip (when one stream sends “EOS”, we need to close the other one). The invariant we must enforce is that the setup and close of any stream are always perfectly paired.

This raises a new ambiguity in the producer/consumer protocol: when a producer sends “EOS”, is the producer responsible for closing itself beforehand, or is it the consumer’s responsibility to close the producer after recieving “EOS”? Both seem reasonable, but we’ll see later that flatmap makes the later difficult, so I’ll stick to close-before-EOS. (This turns out to complicate zip, and it’s a recurring theme that the interaction of flatmap and zip creates intrinsic complexity that has to end up somewhere.)

So let’s get to flatmap and zip, and see where Take 2 breaks down.

def flatMap[A](outerStream: Stream[Stream[A]]): Stream[A] = new Stream[A] {
  def apply(eosOut: Bot, pushOut: A => Bot): Source = {
    val outerPullJP = joinPoint
    val innerPullJP = joinPoint()
    
    // We only find out the inner stream after the outer stream calls push.
    // We need somewhere to store the inner stream to use outside of push.
    var innerSource: Source = null
    
    val outerSource = outerStream(
      // when the outer stream ends, the flatMap ends
      eos = eosOut,
      push = innerStream => {
        innerSource = innerStream(
          // when the inner stream ends, we need to pull a new element from the outer stream
          eos = outerPullJP(()),
          // when the inner stream pushes, we forward the value to our consumer
          push = pushOut)
        
        innerPullJP.define(_ => innerSource.pull)
        
        // when the outer stream pushes, we initialize and pull from the inner stream
        Code(innerSource.setup, innerPullJP(()))
      })
    
    outerPullJP.define(_ => outerSource.pull)
    
    val firstPull = newLocal[Code[Boolean]]
    
    Source(
      setup = Code(firstPull := true, outerSource.setup),
      // our consumer will only use close after we have pushed, meaning there is
      // an inner stream that needs to be closed.
      close = Code(innerSource.close, outerSource.close),
      pull = Code(
        // we have to direct the first pull to the outer stream, and all other pulls to the inner stream
        firstPull.mux(
          Code(firstPull := false, outerPullJP(())),
          innerPullJP(()))))
  }
}

Other than the complication of needing to recieve the inner stream from our producer when the call push, and needing to use it after our producer has given us their Source, this is hopefully pretty clear. One tricky piece is that when our consumer requests the first value, we need to start by pulling from the outer stream (which has been wired to pull from the inner stream when it pushes), but every other time we need to pull from the inner stream, letting it continue from where it left off.

Actually, this has a subtle bug. Note that innerSource.setup needs to be run every time the outer stream pushes a new value. Say innerSource.setup assigns to a local variable innerVar. When our consumer calls pull, we have to branch, jumping either to the outer pull or the inner pull. On the first pull, innerVar won’t have been assigned yet, so the JVM will consider innerVar to be unassigned at the top of the pull label. But then on later pulls, we will try to jump to the inner pull, which will try to read from innerVar, causing a bytecode verification error.

To fix this, the inner stream needs to give us a second piece of setup to be run before the entire flatmap, to ensure all of its state variables are given dummy values. And we should pair a second piece of cleanup with the outer setup (this would let us make optimizations like allocating and deallocating a region once for a nested stream that is run many times).

(There’s another bug that I’ll gloss over, because it’s easy to fix. If the outer stream is statically empty, it may never call push at compile time, so innerSource may remain null when we try to use it at the end. It’s simple to check for that and handle that case correctly, but it obscures the code a bit.)

Streams: Take 3

The last version of Stream adds the second setup/close pair. I’ll only show the new flatMap; the others contain no surprises, they just need to use setup0 to assign dummy values to all their state variables.

case class Source(setup0: Code[Unit], close0: Code[Unit], setup: Code[Unit], close: Code[Unit], pull: Bot)

abstract class Stream[A] {
  def apply(eos: Bot, push: A => Bot): Source
}

def flatMap[A](outerStream: Stream[Stream[A]]): Stream[A] = new Stream[A] {
  def apply(eosOut: Bot, pushOut: A => Bot): Source = {
    val outerPullJP = joinPoint
    val innerPullJP = joinPoint()
    
    var innerSource: Source = null
    
    val outerSource = outerStream(
      eos = eosOut,
      push = innerStream => {
        innerSource = innerStream(
          eos = outerPullJP(()),
          push = pushOut)
        
        innerPullJP.define(_ => innerSource.pull)
        
        Code(innerSource.setup, innerPullJP(()))
      })
    
    outerPullJP.define(_ => outerSource.pull)
    
    val firstPull = newLocal[Code[Boolean]]
    
    Source(
      setup0 = Code(firstPull := true, outerSource.setup0, innerSource.setup0),
      close0 = Code(innerSource.close0, outerSource.close0),
      setup = Code(firstPull := true, outerSource.setup),
      close = Code(innerSource.close, outerSource.close),
      pull = Code(
        firstPull.mux(
          Code(firstPull := false, outerPullJP(())),
          innerPullJP(()))))
  }
}

The last stream combinator I’ll look at is zip, because the combination of zip and flatMap seems to be the real test of a stream fusion system. With this version of Stream, zip turns out to be straightfoward, and surprisingly similar to flatMap.

def zip[A, B](left: Stream[A], right: Stream[B]): Stream[(A, B)] = new Stream[(A, B)] {
  def apply(eosOut: Bot, pushOut: ((A, B)) => Bot): Source = {
    def eosJP = joinPoint()
    def leftEosJP = joinPoint()
    def rightEosJP = joinPoint()
    
    var rightSource: Source = null
    
    val leftSource = left(
      eos = leftEosJP(()),
      push = a => {
        // similar to flatMap, we need to wait until left calls push (at compile time)
        // before we call into right.
        rightSource = right(
          eos = rightEosJP(()),
          push = b => pushOut((a, b)) )
        
        // when left pushes an A, we need to pull a B
        rightSource.pull
      })
    
    leftEosJP.define(_ => Code(rightSource.close, eosJP(())))
    rightEosJP.define(_ => Code(leftSource.close, eosJP(())))
    eosJP.define(_ => eosOut)
    
    Source(
      setup0 = Code(leftSource.setup0, rightSource.setup0),
      close0 = Code(leftSource.close0, rightSource.close0),
      setup = Code(leftSource.setup, rightSource.setup),
      close = Code(leftSource.close, rightSource.close),
      pull = leftSource.pull)
  }
}

The only complication is that, because of our decision that producers must cleanup before sending EOS, when one child stream sends us EOS we must call close on the other. If it were the consumer’s responsibility to close after getting EOS, zip would be simpler. But consider the consumer of a flatMap. When they get an EOS, the flatMap is in the outer stream, and there is no inner stream to close. But when they decide to close the flatMap early, the inner stream must be closed. So a single close wouldn’t be sufficient, or flatMap would have to implement close to check a local variable to know if the inner close must be run. Again, the combination of zip and flatMap creates complexity that has to end up somewhere.

Final thoughts

There is one last optimization I’ve made in my branch, that I want to mention but not go into detail. Source has one more field, firstPull: Option[Bot]. When it is defined, the consumer must use firstPull for the first pull, and pull for the rest. This lets flatMap define firstPull to pull from the outer stream, and pull to pull from the inner stream, eliminating the local flag and branch. The tradeoff is that zip now needs a local flag to track whether the right stream has been pulled from yet. But that is only necessary when both left and right have implemented firstPull; otherwise make the one with no firstPull the right stream. This way, the common case of zipping two streams where not both are nested generates code without any unecessary branches.

It turns out to be simple to convert from the existing streams to this one, but the other direction is impossible. So my migration plan is to start by implementing the consumers in the new model, calling into the old stream emitter for unsupported nodes and wrapping it in the converter. Now that I’ve got the details of the stream representation worked out, things are starting to go faster. I expect to start making PRs next week, first migrating to the new model, then adding in region management. And at some point I’ll write a more concise design doc to check in.

Patrick, thanks for the write up.

There is one last optimization I’ve made in my branch

What’s your branch? It isn’t linked here and there isn’t a PR up.

I’m not happy with the design and want to reframe some of the goals. I think some of the complexity comes from trying to address issues here that are better solved by changing other parts of the code.

As much as possible, try to generate bytecode that’s as good as what I would write by hand, or at least see a path forward to get there in the future without substantially changing the streaming model.

This should not be a goal. The quality of bytecode is generally irrelevant to performance on the JVM. It goes through a sophisticated optimizing compiler before being run (or isn’t rough enough to matter for performance). When I was developing Code, I made a number changes to improve the quality of local bytecode generation and they made no difference. For example:

Compared to the EmitTriplet based implementation, which computes three local boolean variables for array/idx/element missingness, this needs no local booleans, and one fewer branch.

This is wrong. Code has two implementations of Code[Bool], one of which is implemented as a pair of jump targets, and will generate the code exactly the code you want. See CodeConditional and CodeBoolean. It also had no measurable impact on performance. A compiler can easily clean this stuff up.

The issue is basically that passing Code s around is a lazy evaluation model

There seems to be continual confusion in the group about the nature of code. This is surely my fault for failing to communicate the intention as well as the bad ergonomics of the library (e.g. Code[T] looks like a value of type T, I mimicked higher level constructs like while loops to make it a bit easier to write).

Code[T] represents a block of code that leaves a value of type T on the stack. Code is built by composing blocks together. The execution model is the execution of the final generated code. I think it is a category error to confuse the parameter passing semantics of the compiler language with the execution semantics of the generated code. Treating code as something else leads, as you say, to subtle bugs.

Separately, I am going to propose some changes to Code to improve the ergonomics. This will make code lower level and more explicit, and I think will improve the transparency of the generated code.

I think we should drop ParameterPack and not support nested streams in this way. That means not supporting nested streams in the initial version. I will make a separate proposal called PValues that will address nested streams. I think we should also drop joint point. I think it has poor readability, I’m not compelled by the safety benefits, and I’m likely to made the proposed Code changes and I don’t want to maintain it. We’re generating byte code, using operations at the level of the byte code makes the generated code much more transparent.

Let me summarize the new goals:

  • Emit code for streams of EmitTriplets with support for all operations, but no support for nested streams. This means group by and explode will have to work over arrays and won’t be deforested for the moment.
  • Generate the right structure but don’t be overly worried about the byte code quality. I think firstPull is also in this category. No reason a compiler can’t peel off a loop iteration to specialize a conditional.

Speaking of optimization, don’t let the perfect be the enemy of the good (or OK, or even just working). This has been a long time in coming, and making minor improvements to code in the stream generator (which can always be revisited) at the expense of delaying impactful changes elsewhere in the stack (like whole stage code generation) is not a good trade-off. For optimization to be most effective, it should be done globally. If you’re doing performance optimization without a specific benchmark or evaluating where the most impact can be made, the time could probably be spent better. I’m tempted to ask for a completely straightforward iterator based model to start to get other projects unblocked, but I don’t think we have to go that far yet.

I’ll read out to you and @tpoterba to talk about the timeline for this. You should start by proposing a revised design.

I will follow up with proposals for Code and PValue that will explain how nested streams will work, but roughly: streams will become streams of PValues, there is a PValue for each (concrete) physical type, and the PStreamValue will carry code to implement the nested streams. PValue will be the place to store multi-component objects obviating ParameterPack.

What’s your branch? It isn’t linked here and there isn’t a PR up.

I have a branch I’ve been using to iterate on the design. I have conversion from the old streams working, plus emitters for ArrayFold and ArrayForEach working, which I was planning on breaking off and PRing today.

The quality of bytecode is generally irrelevant to performance on the JVM.

I’m somewhat nervous about this in the context of the stream code generation, after John’s digging into bad performance in the NDArray code. The code generated for processing a partition will live in a method that is only called once, and that contains all labels defining the high level control flow of the stream pipeline (maybe calling into other methods for the per-value work). As I currently understand the JIT, this top level method won’t qualify for JIT compilation because of its low call count. The JIT may compile pieces of the method using OSR, but that seems to be much less effective.

But I do take your point about premature optimization.

The issue is basically that passing Code s around is a lazy evaluation model

There seems to be continual confusion in the group about the nature of code. This is surely my fault for failing to communicate the intention as well as the bad ergonomics of the library (e.g. Code[T] looks like a value of type T, I mimicked higher level constructs like while loops to make it a bit easier to write).

Code[T] represents a block of code that leaves a value of type T on the stack. Code is built by composing blocks together. The execution model is the execution of the final generated code. I think it is a category error to confuse the parameter passing semantics of the compiler language with the execution semantics of the generated code. Treating code as something else leads, as you say, to subtle bugs.

Sorry, I wasn’t being very clear. My point is that if you read code using Code as a staged program, where a Code[T] is a “value of type T at the later stage”, then Code[T] acts like a lazy type, because using it multiple times repeats the computation that creates the value. Of course I also understand the lower level mechanics (a block of code that leaves a value of type T on the stack). It’s helpful to me to sometimes think at the higher level of abstraction, sometimes the lower. I’m interested in finding the patterns that let us write correct Code while also making the behavior of the generated code easy to reason about. Treating Code[T] arguments as lazy values, which you should bind to a local (“forcing the evaluation”) if you need to use it multiple times, is one simple such pattern.

I think we should drop ParameterPack and not support nested streams in this way. That means not supporting nested streams in the initial version. I will make a separate proposal called PValues that will address nested streams. I think we should also drop joint point. I think it has poor readability, I’m not compelled by the safety benefits, and I’m likely to made the proposed Code changes and I don’t want to maintain it. We’re generating byte code, using operations at the level of the byte code makes the generated code much more transparent.

I am definitely in favor of replacing ParameterPack with something better integrated with PTypes. But I don’t understand the connection with nested streams. I’m curious to see what you’re proposing. And I don’t have strong feelings about dropping joinPoint in favor of something lower level, but streams definitely need to use arbitrary labels and jumps, and I don’t think replacing joinPoint with something else would have a large effect on the stream emitter.

I have nested streams working now, and we can’t lower all TableIR without that. And while it was difficult finding something that works with both zip and flatMap, and finalization of streams, in the end I don’t think supporting nested streams is adding much complexity to the design.

To support concrete discussion, I can write up a design doc of the current design I have, and PR the first pieces of the emitter (conversion from old streams and stream consumers), unless there’s a different next step you want me to take. I’ll try to do those both today, or tomorrow at the latest (I have two interviews today). I anticipate writing the rest of the stream emitter and memory management to go quickly now.

I will follow up with proposals for Code and PValue that will explain how nested streams will work, but roughly: streams will become streams of PValues, there is a PValue for each (concrete) physical type, and the PStreamValue will carry code to implement the nested streams. PValue will be the place to store multi-component objects obviating ParameterPack.

I’m very interested to see what you have in mind.

My point is that if you read code using Code as a staged program, where a Code[T] is a “value of type T at the later stage”, then Code[T] acts like a lazy type, because using it multiple times repeats the computation that creates the value.

I think this is misleading and wrong.

Code[T]` is a “value of type T at the later stage”

Code[T] is not a value at a later stage! Code[T] is an code expression of type T at a later stage.

then Code[T] acts like a lazy type, because using it multiple times repeats the computation that creates the value

I’m not sure what a lazy type is. You said evaluation model before, I’ll stick with that. I think you mean call by name, not lazy. Lazy is memoized call by name.

This is also a categorical error: you’re mixing the evaluation model with the staged substitution model (escape in metaml, or splice in scheme, say) which are completely different things.

I also think you’re also wrong about the semantics of substitution in staged languages. Using metaml syntax x: int code = <5 + 7>, then <~x + ~x> evaluates to <(5 + 7) + (5 + 7)>. What else could it mean? metaml has explicit facilities for managing expressions vs values (bracket vs lift). There’s no free lunch.

(What else could it mean? <F[~x]> could mean <let t = ~x in F[t]> I suppose. I’m not sure if metaml is pure functional, in which case the distinction is at most efficiency, and if it is not, you couldn’t control the order of evaluation in staged code.

<F[x]> could mean <F[f()]> where f() is a thunk that implements lazy evaluation of x, and again you can’t control the order of evaluation.)

It seems backwards to me to use staged programming, and admitted obscure and underdeveloped area of language design, to model a classical, well-understood and concrete process like codegen, but hey, we should take our insights were we can get them.