Faster Filtering with an Index

I’ve been thinking a bit about how we could do faster filtering. We’re all pretty disappointed with the user experience of the hack I wrote up for Caitlin. Here are some of my not fully developed thoughts.

As suggested by Cotton, I think a good interface would be to have RDD’s contain an iterator that can advance to the next record with a matching key. Unfortunately, the Java Iterator class isn’t well suited to this because it does not have an advance method. Using a flip book iterator we could add:

def advance(
  rv: RegionValue, 
  t: OrderingView[RegionValue], 
  orderingRelevantFields: Array[String]
): Unit

which advances the iterator to the next value which is equal to rv under t, if one exists. If no such value exists, the next call to isValid will return false. This could be leveraged by joins.

I see two options to do this efficiently when the ordering view only looks at a subset of the fields:

  1. If the source can produce the subset of fields more efficiently than producing the full row, then we can clearly scan past irrelevant records without fully decoding them.

  2. If the source has an index with a superset of the relevant fields, we can look up the location of the next matching record in the index and seek to that position.


Reading A Subset of Fields

We’d want to add a decoder that can read bytes of type T into a record of type U. Ideally, we would be able to retroactively add the unread fields when we realize the record matches. This would be easy with physical types because we can place the “key” fields first even if they are not first in the virtual type. Even without physical types, we could allocate space for the full record and populate the fields only when needed (i.e. the key fields first and the rest if the key matches), but this seems easier to get wrong.

But how does the source skip over an irrelevant record efficiently? I think we could write a method to skip the encoded bytes of a type more efficiently than we could read it because we don’t need to look at the bytes of a string or the bytes of an array of fixed-size things.


A Matrix Table Index

When a matrix table is written, it could optionally produce an index by tracking the offsets and keys of each record in a partition as the records are written. After writing all the records, the keys and the offsets are written in a separate file, sorted, one for each partition. This seems to work better if the dataset is already sorted by the key. If there’s enough records in a partition to make it worth it, we can create a b-tree for that partition. To produce a dataset-wide indexing data structure, we’d want the write to be a tree-reduce that built the b-tree’s internal nodes as it reduced. Assuming the data of the index is quite large, we’d want to write each internal node of the b-tree as a separate GS file. There’s a balance here of file size versus the amount of data we need to collect on a single machine. Having a b-tree distributed across a bunch of GS files seems complicated too.

With the current encoding, you’re not going to get much advantage from skipping unused fields
rather than decoding them. Apart from Boolean, Float, and Double, everything else is variable
sized, and the variable-length Int/Long encoding doesn’t let you find out the size without inspecting
every byte. So you still do maybe 75% of the work even if you’re skipping.

WIth seek’ing, you would still be doing the work of decompressing, but that can go quite fast.

i’m getting fairly close to having the C++ decoder- I’m generating type-specific code which compiles,
now I have to plumb it in to something that ends up looking like a PackDecoder. So we might want
to see how that performs once it’s working.

All these indexing designs only work if you have a sufficiently fast seek on the main input file.
If you assume the index-lookup is instantaneous, then there’s still the question of whether
doing N iterations { seek; read } is faster than doing just reading everything sequentially. And
that is going to depend on the how many items you access, the sequential bandwidth, and the
seek cost. Off the top of my head it’s probably much more likely to be useful when reading
from local SSD (seek time ~ 20-50usec) than from HDD (uncached seek ~ 8-12ms) or cloud
object store (I generally reckon latency about 80msec, but it’s going to vary between cloud providers
and it may have got faster in the last couple of years).

You also may be able to amortize the latency by queuing several simultaneous requests, but
then that complicates anything like indexing where the sequence of requests may not be easily
predictable.

The problem - at least for data exploration - is that you may not know in advance which keys
or (key-tuples) are going to need acceleration with indexes. In the SQL world this is traditionally
part of the job of the database admin. At Oracle/Endeca, they indexed every single column (but
didn’t have any way to accelerate key-tuple lookups).

I think my preference is to have something which figures out useful indexes on the fly from looking
at query plans and performance - and which can automatically cache and rebuild indexes as necessary,
so that the user doesn’thave to manage them explicitly. But that may be hard. The trouble with exposing
indexes is that hardly anyone will understand what they are and what they do …

OK, let’s link up when that’s in and do some experiments.

Addressing this out of order, but, the two examples I have in mind are addressed by an index on the key fields. In particular, both these examples are joins of a table against a matrix table where the table has ~1% of the matrix table’s variants.

Hmm. I am worried about index-lookup cost, not sure how worried I should be.

I’m particularly thinking of the common case of a table-matrix-table join on the matrix-table’s row keys. In this case, the positions to which we seek are in order, so we’re not seeking so much as skipping bytes. This seems like we’d have a clear win over decoding the in-between records.

To be concrete, I’m thinking of stuff like:

mt = hl.read_matrix_table('data.mt')
scores = hl.read_table('scores.t')
mt = mt.annotate_rows(score = scores[mt.row_key])
mt = mt.filter_rows(hl.is_defined(score))
mt = mt.annotate_cols(prs_risk_score = hl.agg.sum(mt.score * mt.beta))

EDIT: forgot to put the filter in

With the current format, I believe we do compression on a whole partition
(rather than subdividing it into smaller compressed chunks). One
consequence is that you to read the byte at offset X, you needed to read
the whole file from 0…X-1 to have the correct decompression state.

So as long as we have that constraint, the skipping doesn’t save us
anything in terms of IO and decompression - it only saves the cpu time
for decoding from packed format to RegionValue.

If that cpu time is less than the file-reading time, then it may already be
completely overlapped and eliminating the pack-decode time for most of
the rows might not give any speedup at all - but instead just mean we
spent more time with cores/thread idle waiting for input.

So the “clear win” is only real if we know that cpu is the bottleneck. I don’t have evidence either way on that question. Just the hope that even if it’s true now, it might be less true, or not true at all, with the C++
pack-decode.

Looking a little further into the future, we could consider changing the
file format to have a smaller compression granularity. But that would
only be a win if we can access the underlying storage in a way that
makes the sparse-access-to-a-few-compressed-chunks significantly
faster than streaming-access-to-the-whole-partition. Which is another
thing we should benchmark. Off the top of my head, if we take a wild guess of 80msec latency and 100MB/sec bandwidth for (each thread) reading from object store, then the seek-latency of 80msec corresponds
to about 8MB of streamed-data, so if the mean-skip-size is < 8MB, we’d
probably be faster reading everything, and if the mean-skipped-data is
much greater than 8MB then skipping is a win. But that’s taking numbers which are just guesses, and then doing a rough back-of-envelope calculation, and ignoring possibilities to amortize latency even in the
skipping case.

Agreed.

I benchmarked this for a scenario like the above code snippet where the scores table has 1/100th the rows of the full matrix table. In this setting, the profiler showed time dominated by region value builder decoding. It is definitely possible that the cost here is in the RVB’s state management (the type stack, index stack, etc.) which is completely eliminated by the compiled decoder.

When the compiled decoder lands, we should retry caitlin’s original pipeline to see if all my index work was basically bypassing the cost of RVB state management.

Some good measurements of the different cloud-storage systems here (2.5 years old) http://blog.zachbjornson.com/2015/12/29/cloud-storage-performance.html