TableAggregate lowering proposal

This post describes a design proposal for how to lower TableAggregate to CollectDistributedArray with a minimal amount of change from the current nodes.

The new functionality must support the control flow of the interpretation rule for TableAggregate:

  1. For each partition:
    a. initialize the aggregator state
    b. call seqOp on each record
    c. serialize aggregator state, and collect serialized bytes (tree aggregate not supported initially)
  2. On driver:
    a. deserialize and call combOp on each serialized aggregator state
    b. Call resultOp on combined aggregator state

Existing IR nodes do not support either 1) or 2). However, ArrayAgg is somewhat similar to each, so we would need two new nodes based on ArrayAgg (note that the interfaces are rough and certainly subject to improvement):

SerializeAggsToBytes()

Node of type TBinary. Serializes existing agg state to TBinary representation.

RunAgg(init: IR, seqs: IR, result: IR)

This node is necessary to support nested ArrayAggs, I think. The type of the node is the type of result. The init arg initializes state, either through a deserialize or init. The seqs is a void arg that can mutate the agg state, like ArrayFor…SeqOp. The result uses the agg state to return something, like ResultOp or SerializeAggToBytes.

ArrayAggAndSerialize(stream: IR, binding: String, init: IR, seq: IR)

This node requires that the aggregators have been extracted/lowered to init,seq,comb already. It has type TBinary.

This node lowers roughly to the emittable nodes:

(RunAgg
    init,
    ( ArrayFor
        stream,
        binding,
        seq),
    SerializeAggsToBinary())

CombineSerializedAggsAndResult(init: IR, serializedAggs: IR, combOp: IR, resultOp: IR)

This node has the type of resultOp (I think? need to revisit current aggs). It lowers roughly into the emittable nodes:

( RunAgg
    init,
    ( ArrayFor
        serializedAggs,
        element,
        (CombOp (DeserializeAggs (Ref element) ) )),
    resultOp)

Lowering TableAggregate

The entire TableAggregate lowers first into:

// already extracted into init, seq, comb, result
( CombineSerializedAggsAndResult(
    init,
    (CollectDistributedArray 
       ... context and stuff...
      ( ArrayAggAndSerialize childStream, binding, init, seq))
    combOp,
    resultOp)

This is close to what I was imagining. I’m thinking of it this way:

What do we do with aggregators? We initialize them, we fold in values (seqOp), we fold in other aggregator states (combOp), we finalize their values (resultOp), and we capture their intermediate state during distributed aggregations.

RunAgg feels right. It will need something about the aggregator signatures and maybe init: IR
needs to be a ctor: Array[Array[IR]] and init: Array[Array[IR]], the constructor and initialization arguments for each aggregator?

The implementation of aggregation over columns (once per row) allocated the aggregators once (called the ctor) and then initialized them once per row. I don’t quite see how to do that in the current proposals, so we could fold the ctor args into the aggregator args. Alternatively, we could have a separate nodes for constructing and initializing aggregators.

However,

SerializeAggsToBytes

seems to unnecessarily bake in serialization. I think we should just have a way to get the aggregator state as a Hail value. That could be as binary as serialized bytes, or just an int64 in the case of count, say. So I’d propose AggStateValue(i: Int) which turns the state of the aggregator into a Hail value, type determined by the aggregator.

CombineSerializedAggsAndResult

To make this more modular, I think you can just have a CombOp(i: Int, s: T) which folds into to aggregator i the state represented by s. Then on the driver the table aggregation just gets lowered into:

  (RunAgg
     (init)
     (ArrayFor (CollectDistributedArray ...) x
       (CombOp i (... function of x ...))
     (... result ops ...))

So I just think you need: RunAgg, CombOp and AggStateValue.

Ah, interesting. I like this.

I’ve been working on implementing my aggregators proposal, which meshes pretty well with this. If it looks like there’s a faster path to getting lowering working, I can pause that to do the quicker change.

I don’t fully understand your version of RunAgg. Could you explain how a nested ArrayAgg or ArrayAggScan is lowered to RunAgg?

In my version, (ArrayAggScan array eltName agg) lowers to

(RunAgg
  init
  (AggArrayDo array eltName agg))

where agg lowers to (init: AggInitArgs, loweredAgg: AggIR). AggInitArgs packs together the appropriate init args for the agg state type, and AggIR has both an agg state type and a result resType: Type. Here, init and loweredAgg have equal agg state types, and loweredAgg.resType == agg.typ.

Here, RunAgg is defined RunAgg(init: AggInitArgs, agg: AggIR). AggArrayDo is basically ArrayAggScan, but allowed to be nested.

(ArrayAgg array eltName agg) lowers to

(RunAgg
  init
  (AggDo
    [(AggArrayDo array eltName seq)
     ("result" result)]
    (Ref "result"))

where loweredAgg is further split into (seq: AggIR, result: AggIR) seq with the same agg state types, result.resType == agg.typ, and seq.resType is void. AggDo runs a sequence of AggIRs with the same state types in order, binding some of the results in a result IR (not AggIR).

Could you explain how a nested ArrayAgg or ArrayAggScan is lowered to RunAgg ?

I think a nested ArrayAgg just turns into a nested RunAgg. Is there a problem with that? I think we need a RunAggScan to lower ArrayAggScan since RunAgg needs to return a realizable type.

The emitter needs to also handle nested RunAggs. I’m not sure if it works for nested ArrayAggs right now.

I was confused where the array went in the lowered form. I understand now for ArrayAgg: the array goes into an ArrayFor in the seqs IR.

But I guess RunAggScan would need to take an array explicitly? Something like

RunAggScan(array: IR, name: String, init: IR, seqs: IR, result: IR)

where now seqs and result will be executed once per element?

This is really just ArrayAggScan with the aggregation split into seq and result phases, and with init args extracted. Doing the same thing for ArrayAgg is probably sufficient, instead of RunAgg.

I think your RunAggScan interface look roughly right, yeah.

Doing the same thing for ArrayAgg is probably sufficient, instead of RunAgg

Do you mean instead of implementing RunAgg?

Having both nodes makes sense – we lower from ArrayAgg to RunAgg.

I just mean having RunAggScan take an array/stream, but not RunAgg, is weirdly asymmetric. Is

RunAgg(array: IR, name: String, init: IR, seqs: IR, result: IR)

any less expressive?

oh, I see. Either seems fine.

is weirdly asymmetric

What’s the violated symmetry? As conceived here, RunAgg is more flexible than RunAggScan, so its interface should be more flexible. In particular, its body doesn’t need to be an iteration over an array: now that we have a loop, the elements could be generated from a loop, for example.

One way to make RunAggScan look more like RunAgg (which I prefer to the converse) is to add a void-valued yield operation that returns a result of the scan loop that continues with the iteration.