Proposal: Shuffler IR

Shuffler IR Design

UUID is probably a string

ShuffleStart(
  keyFields Array<String>,
  rowType (virtual) Struct,
  rowEType EType
): id UUID

ShuffleWrite(
  id UUID,
  partitionId PInt64,
  attemptId PInt64,
  rows Stream<Struct>
): Unit

ShuffleWritingFinished(
  id UUID,
  outPartitions PInt64
): Array<rowType.select(keyFields)>

ShuffleRead(
  id UUID,
  partitionId PInt64
): Stream<Struct>

ShuffleDelete(
  id UUID
): Unit

How does IR manage resources? Should Shuffle be a new type that performs a
network call when it is freed? My current thinking is that ShuffleDelete can be
inserted by the TableIR lowerer. I believe it will see all potential reads of
the shuffle (effectively, the transitive closure of the parents of the
TableKeyBy or TableOrderBy that is being lowered to a pair of stages using a
ShuffleWrite and a ShuffleRead).

ShuffleWritingFinished returns an array of length outPartitioning + 1. Partition i's bounds are [result[i], result[i+1]).

ShuffleRead is permitted to pass any partitionId in [0, outPartitions). It may
read the same partition multiple times by multiple actors. The stream returned
by each partition contains only records whose keys fall within the partition
bounds (left-inclusive, right-exclusive) returned by
ShuffleWritingFinished. Each partition will have roughly the same number of
records.

Thanks, Dan! Aside from the keyFields change (string => SortField) I think we can use these nodes as-is to lower TableIR, but my ideal nodes would probably look like:

ShuffleStart(
  keyFields Array<SortField>,
  rowType (virtual) Struct,
  rowEType EType 
): id UUID

ShuffleWrite(
  id UUID,
  partitionId PInt64,
  // attemptId PInt64, Comment: can we lose this param, or is that too hard from yourside?
  rows Stream<Struct>
): Unit

ShuffleWritingFinished(
  id UUID,
): Void

ShuffleGetPartitionBounds(
  id UUID
  n_partitions Int
): Array[Struct] // n_partitions + 1 bounds

ShuffleRead(
  id UUID,
  keyRange IR // type TInterval of key
  rowEType: EType // same as the one in ShuffleStart, but with helps bookkeeping
): Stream<Struct>

ShuffleDelete(
  id UUID
): Unit

I can implement this.

I can elide the attempt id, but I thought that was important for Hail? Is it possible for two active workers to have the same partition id? If both workers successfully send all their data, does it matter to Hail which one I keep? Cotton had proposed client-side filtering of unsuccessful attempts. Iā€™m also happy to accept a list of successful or unsuccessful attempts on ShuffleWritingFinished.

Long term, I think the ShuffleGetPartitionBounds really ought to be ShuffleGetApproxCDF, but our approximate cdf implementation operates on doubles currently.

Actually, can we do:

ShuffleWrite(
  id UUID,
  partitionId PInt64,
  rows Stream<Struct>
): UUID

ShuffleWritingFinished(
  successfulIds: Array[UUID]
  id UUID,
): Void
1 Like

The final outcome of this discussion:

ShuffleStart(
  keyFields Array<SortField>,
  rowType (virtual) Struct,
  rowEType EType 
): id UUID

ShuffleWrite(
  id UUID,
  partitionId PInt64,
  rows Stream<Struct>
): UUID

ShuffleWritingFinished(
  successfulIds: Array[UUID]
  id UUID,
): Void

ShuffleGetPartitionBounds(
  id UUID
  n_partitions Int
): Array[Struct] // n_partitions + 1 bounds

ShuffleRead(
  id UUID,
  keyRange IR // type TInterval of key
  rowEType: EType // same as the one in ShuffleStart, but with helps bookkeeping
): Stream<Struct>

ShuffleDelete(
  id UUID
): Unit

UUID will probably be an array of byte.