I’m working on putting matrix joins into MatrixIR
. It was straightforward for TableIR
, but there are only a few particular kinds of join already implemented in MatrixTable
and I want to take the opportunity to try to design a more general framework of joins to use in the IR. I suspect having a mathematically grounded and symmetric join interface will make rewrite rules for optimization simpler, and will help pave the way for “TQL”. But just because this is mathematically nice doesn’t mean it’s the right design for the IR, and I would welcome discussion on that front.
I’m only talking about join semantics from the perspective of the IR right now (and the high level IR, that isn’t concerned with what is distributed vs. local). At this level, boiling all the behavior we want to a nice, compact, symmetric set of primitives seems worthwhile. The user-facing join api can certainly have lots of chrome on top of this, or hide some of it.
Table-Table joins
I’m going to ignore globals throughout, because they’re easy: in any kind of join, just concatenate the left and right global schemas and values. What I’m proposing is different from SQL joins, though SQL joins can be easily implemented using these. I’ll write Table[K, V]
for a table with key type K
and value type V
. There are four kinds of join of two tables Table[K, V1]
and Table[K, V2]
:
- The inner join produces a table with schema
Table[K, (V1, V2)]
. If a given key occurs x times in the left table and y times in the right, it will occur x*y times in the join, with all possible value pairs. This covers the case where x or y is 0. - The left join produces a table with schema
Table[K, (V1, Array[V2])]
. There are the same number of rows in the resulting table as there were in the left, and for each row in the left, if that key occurs y times in the right table, then theArray[V2]
field has length y. - The right join is just the mirror of the left join.
- “Outer” join in this framework is really cogroup, producing a table with schema
Table[K, (Array[V1], Array[V2])]
. In fact, all the previous joins can easily be implemented in terms of cogroup, at the cost of making implicit information that the optimizer might want to make use of. For example, in a left join, it’s reasonable to give the result the same partitioner as the left table, but that isn’t the best general strategy for an outer join.
The “left join distinct” behavior can be represented by mapping over the array in this left join, taking empty array to missing, and non-empty arrays to their head element. We should take care that this case is optimized well.
Another possibility is to make the IR more explicit by recording when we know a table has unique keys, such as when it was the result of a groupByKey
, makeDistinct
, or cogroup
. I like this option, because I think of general tables and tables with unique keys as semantically very different. Under this approach, all joins produce Table[K, (V1, V2)]
, with the restriction that in a left join, the right table has unique keys; in a right join, the left table has unique keys; and in an outer join, both tables have unique keys. The current leftJoinDistinct
could then be represented in the IR by makeDistinct
on the right followed by left join. The joins listed above result from these unique-key joins by inserting groupByKey
where needed.
Union can also be represented using cogroup. If two tables have the same schema, Table[K, V]
, the cogroup gives Table[K, (Array[V], Array[V])]
, where the length of the two arrays count how many times the row K
occurred in the left and right tables. If we map the value field by array concatenation, giving Table[K, Array[V]]
, and then explode the value field, we get the union of the two tables.
Matrix-Table joins
Next are Matrix-Table joins. I’ll write Matrix[(RK, R), (CK, C), E]
for a matrix with row-key type RK
, row-value type R
, and so on. Given such a matrix, and a table Table[K, V]
, if K = RK
then the same four join types work, joining rows of the table to rows of the matrix. If K = CK
, there are another four joins of table rows to matrix columns.
- The inner join produces a matrix with schema
Matrix[(RK, (R, V)), (CK, C), E]
. If the table has unique keys, and emptyV
, this is afilterRowsTable
. - The left join produces a matrix with schema
Matrix[(RK, (R, Array[V])), (CK, C), E]
. The “left join distinct” modification can be done here too, which givesannotateRowsTable
. - The right join produces a matrix with schema
Matrix[(RK, (Array[R], V)), (CK, C), Array[E]]
. This is the first case where we see interesting behavior at the entries level. Both theArray[R]
values and theArray[E]
values will have length y, where y is the number of times the corresponding row key occurs in the right hand table. - Finally, cogroup produces a matrix with schema
Matrix[(RK, (Array[R], Array[V])), (CK, C), Array[E]]
.
Matrix-Matrix joins
Now things really get interesting. Consider two matrices with signatures Matrix[(RK, R1), (CK, C1), E1]
and Matrix[(RK, R2), (CK, C2), E2]
. We are joining rows-on-the-left with rows-on-the-right, and cols-on-the-left with cols-on-the-right. So two joins are occurring, and we need to choose one of the four join types for each, giving 16 possibilities (ignoring the distinct/non-distinct axis).
- The inner*inner join produces a matrix with schema
Matrix[(RK, (R1, R2)), (CK, (C1, C2)), (E1, E2)]
. - The left*inner join (left join on rows, inner on cols) produces a matrix with schema
Mat[(RK, (R1, Array[R2])), (CK, (C1, C2)), (E1, Array[E2])]
. Because we did a left join on rows, each row of the result corresponds to one row of the left and a collection of rows of the right, whereas because we did an inner join on columns, each column of the result corresponds to one col of the left and one of the right. Combining these, each entry (row-col pair) of the result corresponds to one row-col pair of the left, and a collection of row-col pairs (fixed col, collection of rows) of the right. - The cogroup*inner join produces a matrix with schema
Matrix[(RK, (Array[R1], Array[R2])), (CK, (C1, C2)), (Array[E1], Array[E2])]
. For each entry of the result, theArray[E1]
and theArray[R1]
will have the same length, and likewise for theArray[E2]
and theArray[R2]
. IfR1 = R2
andE1 = E2
, then by concatenating both pairs of arrays, and jointly exploding them (I see how to implement this but won’t get in to it here), we can recover the existingunionRows
. But I would be curious to know if the exploded version is ever what is really wanted, not the grouped version.
This is probably enough to convey the idea. It gets a bit more complicated when neither the row-join nor the col-join is inner, because then one or both of the entry fields becomes 2-dimensional, e.g. (Array[E1], Matrix[E2]). But these cases aren’t needed to replicate any existing functionality.
Now that I’ve written all this out, I think the complexity of the matrix-matrix joins is a strong argument for the approach I mentioned in the table-table join section, where some joins require unique keys in some of their arguments. That gets rid of all arrays and matrices in the result schemas, instead putting them in the groupByKey
operations, where there is only one table/matrix to reason about.
Final thoughts
This is a lot of different types of join. I have some thoughts about managing the complexity. In fact, I think this is where the symmetry of this scheme really helps. Basically, I’m envisioning a lower level IR which is concerned only with 1-dimensional collections—RVD
in the distributed case and iterators in the local case. I think all of the higher-dimensional joins can be reduced to this lower-level IR in a way in which the axes are treated more-or-less indepentently, and such that the same implementation works regardless of which axes are local and which are distributed. But this post is plenty long already, so I’ll write those thoughts up separately.