ShuffleStart(
keyFields Array<String>,
rowType (virtual) Struct,
rowEType EType
): id UUID
ShuffleWrite(
id UUID,
partitionId PInt64,
attemptId PInt64,
rows Stream<Struct>
): Unit
ShuffleWritingFinished(
id UUID,
outPartitions PInt64
): Array<rowType.select(keyFields)>
ShuffleRead(
id UUID,
partitionId PInt64
): Stream<Struct>
ShuffleDelete(
id UUID
): Unit
How does IR manage resources? Should Shuffle be a new type that performs a
network call when it is freed? My current thinking is that ShuffleDelete can be
inserted by the TableIR lowerer. I believe it will see all potential reads of
the shuffle (effectively, the transitive closure of the parents of the TableKeyBy or TableOrderBy that is being lowered to a pair of stages using a
ShuffleWrite and a ShuffleRead).
ShuffleWritingFinished returns an array of length outPartitioning + 1. Partition i's bounds are [result[i], result[i+1]).
ShuffleRead is permitted to pass any partitionId in [0, outPartitions). It may
read the same partition multiple times by multiple actors. The stream returned
by each partition contains only records whose keys fall within the partition
bounds (left-inclusive, right-exclusive) returned by ShuffleWritingFinished. Each partition will have roughly the same number of
records.
Thanks, Dan! Aside from the keyFields change (string => SortField) I think we can use these nodes as-is to lower TableIR, but my ideal nodes would probably look like:
ShuffleStart(
keyFields Array<SortField>,
rowType (virtual) Struct,
rowEType EType
): id UUID
ShuffleWrite(
id UUID,
partitionId PInt64,
// attemptId PInt64, Comment: can we lose this param, or is that too hard from yourside?
rows Stream<Struct>
): Unit
ShuffleWritingFinished(
id UUID,
): Void
ShuffleGetPartitionBounds(
id UUID
n_partitions Int
): Array[Struct] // n_partitions + 1 bounds
ShuffleRead(
id UUID,
keyRange IR // type TInterval of key
rowEType: EType // same as the one in ShuffleStart, but with helps bookkeeping
): Stream<Struct>
ShuffleDelete(
id UUID
): Unit
I can elide the attempt id, but I thought that was important for Hail? Is it possible for two active workers to have the same partition id? If both workers successfully send all their data, does it matter to Hail which one I keep? Cotton had proposed client-side filtering of unsuccessful attempts. Iām also happy to accept a list of successful or unsuccessful attempts on ShuffleWritingFinished.
Long term, I think the ShuffleGetPartitionBounds really ought to be ShuffleGetApproxCDF, but our approximate cdf implementation operates on doubles currently.
ShuffleStart(
keyFields Array<SortField>,
rowType (virtual) Struct,
rowEType EType
): id UUID
ShuffleWrite(
id UUID,
partitionId PInt64,
rows Stream<Struct>
): UUID
ShuffleWritingFinished(
successfulIds: Array[UUID]
id UUID,
): Void
ShuffleGetPartitionBounds(
id UUID
n_partitions Int
): Array[Struct] // n_partitions + 1 bounds
ShuffleRead(
id UUID,
keyRange IR // type TInterval of key
rowEType: EType // same as the one in ShuffleStart, but with helps bookkeeping
): Stream<Struct>
ShuffleDelete(
id UUID
): Unit