This post describes a design proposal for how to lower TableAggregate to CollectDistributedArray with a minimal amount of change from the current nodes.
The new functionality must support the control flow of the interpretation rule for TableAggregate:
- For each partition:
a. initialize the aggregator state
b. call seqOp on each record
c. serialize aggregator state, and collect serialized bytes (tree aggregate not supported initially) - On driver:
a. deserialize and call combOp on each serialized aggregator state
b. Call resultOp on combined aggregator state
Existing IR nodes do not support either 1) or 2). However, ArrayAgg is somewhat similar to each, so we would need two new nodes based on ArrayAgg (note that the interfaces are rough and certainly subject to improvement):
SerializeAggsToBytes()
Node of type TBinary. Serializes existing agg state to TBinary representation.
RunAgg(init: IR, seqs: IR, result: IR)
This node is necessary to support nested ArrayAggs, I think. The type of the node is the type of result
. The init
arg initializes state, either through a deserialize or init. The seqs
is a void arg that can mutate the agg state, like ArrayFor…SeqOp. The result
uses the agg state to return something, like ResultOp
or SerializeAggToBytes
.
ArrayAggAndSerialize(stream: IR, binding: String, init: IR, seq: IR)
This node requires that the aggregators have been extracted/lowered to init,seq,comb already. It has type TBinary.
This node lowers roughly to the emittable nodes:
(RunAgg
init,
( ArrayFor
stream,
binding,
seq),
SerializeAggsToBinary())
CombineSerializedAggsAndResult(init: IR, serializedAggs: IR, combOp: IR, resultOp: IR)
This node has the type of resultOp (I think? need to revisit current aggs). It lowers roughly into the emittable nodes:
( RunAgg
init,
( ArrayFor
serializedAggs,
element,
(CombOp (DeserializeAggs (Ref element) ) )),
resultOp)
Lowering TableAggregate
The entire TableAggregate lowers first into:
// already extracted into init, seq, comb, result
( CombineSerializedAggsAndResult(
init,
(CollectDistributedArray
... context and stuff...
( ArrayAggAndSerialize childStream, binding, init, seq))
combOp,
resultOp)