From Offsets to Pointers

From Offsets to Pointers

Current Status

Hail-lang values are represented in memory with a custom format, commonly referred to as the Region Value representation. For more details on that representation see the classes in is.hail.annotations, in particular, take a look at RegionValueBuilder.

Recall that arrays inside other data structures are represented as offsets that point to memory holding the length and the missing bits.

There is no way to refer to memory in a different region.

The Problem

In the current state, there is no easy way to execute a hail-lang expression that accesses data stored in two regions. For example, a hail-lang expression that reads data from the globals and reads data from the row or entry fields.

The current solution is to copy all necessary data into one region and execute the hail-lang expression with the single region.

Possible Solutions

To avoid all copies we must design a way to point to data in different regions.

  1. Use a few bits of the offset to mark the source region. De-referencing memory now requires a mask to the region identifier, a lookup of said identifier, and a second mask to get an offset in that region.
  2. Use actual memory addresses instead of offsets. De-referencing memory is a single machine operation. This requires that we use the Long pointer methods in sun.misc.Unsafe rather than the Object pointer + Long offsets methods.

Both of the proposed solutions require that the referencee-region is longer lived than the referencer-region. If not, you have pointers to garbage / non-existing memory.

Both of the proposed solutions present de-serialization challenges. Hail currently doesn’t permit users to generate cyclic data, so we, at least, need not concern ourselves with that (for now). Some relevant work in the super-fast de-serialization is Frank McSherry’s Abomonation. Abomonation serializes by literally copying the bits of the data. In the case of Rust’s Vec<T> (which is essentially std::vector<T>), it’s a bit unclear, but I think he writes the length as 4 bytes followed by the data, in-line.

The simplest, correct thing for us to do is to simply chase pointers recursively writing the contents out in-line. For example,

{ a: int32
, b: array<int32>
, c: int32
}

is written as:

  1. first four bytes encode a
  2. next four bytes encode the length of b
  3. the contents of b written in-line, all said: 4 * the length of b bytes
  4. final four bytes encode c

Region.copyFrom is safe when copying from a longer lived buffer to a shorter lived buffer. It is only safe when copying from a shorter lived buffer to a longer lived buffer if the pointers point to data with lifetime equal to or longer than the target buffer. In general, we should avoid Region.copyFrom from a shorter-lived buffer to a longer-lived buffer. The same advice transitively applies to RegionValueBuilder.addRegionValue. copyFrom does not seem to be used elsewhere, which eases the necessary work.

Proposed Solution

I (in consultation with @cseed) propose we use actual memory addresses.

Aside: Eliminating RegionValue and Inverting the Flow of Regions

This change also allows us to remove RegionValue in favor of Long, if we can guarantee the memory pointed-to by the Long is valid when we access it.

We further propose that the consumer of an RDD passes a Region to the RDD. RDDs are, of course, permitted to create their own Regions and pass them to sub-computations. This creates a nested series of Region lifetimes much like properly matched parentheses.

Note that if an RDD transformation produces a new Region Value, it must place the Region Value in a region with lifetime longer than the transformation. The Region provided by the RDD consumer is always a valid place to put Region Values.

The practical effect of this is that instead of RDD[RegionValue] we instead have:

class ContextRDD[C, T: ClassTag](rdd: RDD[C => Iterator[T]]) {

ContextRDD supports all the usual RDD methods on an RDD[T], via this lifting method:

  def withoutContext[U: ClassTag](f: Iterator[T] => Iterator[U])
      : ContextRDD[C, U] =
    new ContextRDD(rdd.map(_.andThen(f)))

For example, map(f: T => U):

  def map[U: ClassTag](f: T => U): ContextRDD[C, U] =
    withoutContext(_.map(f))

Instances of RDD[RegionValue] will be replaced with ContextRDD[RVDContext, Long]. (At first, RVDContext will have one field, a Region). Without a region, we cannot allocate any memory, so must users of ContextRDD will need to use the context-enhanced methods, like cmap:

  private[this] def withContext[U: ClassTag](f: (C, Iterator[T]) => Iterator[U])
      : ContextRDD[C, U] =
    new ContextRDD(rdd.map(useCtx => ctx => f(ctx, useCtx(ctx))))

  def cmap[U: ClassTag](f: (C, T) => U): ContextRDD[C, U] =
    withContext((c, it) => it.map(f(c,_)))
}

For example, if the Longs in the RDD point to struct{a: int32} we could copy the a field:

def copyTheAField(c: RVDContext, s: Long): Long {
  val r = c.region
  val t = TStruct("a" -> TInt32())
  r.appendInt(Memory.loadInt(t.loadField(t.fieldIdx("a"))))
}

NB: We don’t actually know into which region s points, but we can use Memory.loadInt to load from an arbitrary memory address.

Regions cannot grow by copying the buffer into a bigger buffer, because other users may have pointers into their current memory location. Regions should probably become arrays of blocks, always allocating in the latest block.

Are we recreating the JVM allocator?

This post led me to read about Cap’n Proto last night. It looks highly relevant to what we’re trying to do. Maybe it’s possible we could actually use existing Cap’n Proto implementations, but even if not it seems worth seeing how they solved similar problems.

See the intro and encoding spec in particular. I can also give a face-to-face summary for anybody interested (already talked to Dan).

Cap’n Proto is in the same ballpark. It doesn’t have any way of handling multiple
references to an object, it’s purely for trees. I’m not sure if that’s a problem for HAIL.

Growing/resizing a list/vector is not allowed - but presumably is a problem for any
region-based approach, where reallocation will give a hard-to-reuse free block.

And to my taste, CapnP puts too much emphasis on padding and alignment, whereas for
an all-x86 project you can almost certainly do better by allowing arbitrary byte alignment for scalar data (and only worrying about alignment for particularly AVX/SIMD-friendly vector data).

Of course any approach which has a non-pointer in-memory representation of references
will impose some overhead in both runtime and (probably) code verbosity, so there’s a
question about the right design compromise for HAIL. That verbosity can be minimal if the representation allows dereferencing without having to explicitly specify the context/region for the reference - but arguably gets burdensome if you need a (regionId, objRef) pair to get to the referenced object.

Cross-region references get you into tricky questions about managing lifetimes. And for extra credit and a real headache, you could worry about cycles between regions, even if they aren’t cycles of objects - e.g. object A in region 1 refers to object B in region 2, and
object C in region 2 refers to object D in region 1.

I suspect there’s some way to handle cross-region references by generating a trampoline object in the referring region, which then contains the necessary information - a keep-alive reference to the destination region, and an offset within the destination region) - to jump to the actual destination object.

Though dereferencing would then need some magic to distinguish a local-to-this-region
reference from a trampoline-to-other-region reference - e.g. use the least-significant bit of
a reference as a tag bit to make that distinction.

Ideally a region would only have a single trampoline object for each distinct other-region-destination object.

When you’re finished with a region, you need to clear the keep-alive references to other regions, otherwise you could get cycles of garbage.

This is probably a decent approach for the case where we expect the vast majority of references to be local, with only a few cross-region references. If you had many
cross-region refs, the overhead of trampoline objects would get bad.

However, this does imply that at the point where we construct or assign a cross-region
reference, we need to have a way of getting hold of the region to be able to create the
trampoline object. So that makes pointer-assignment a little tricky and possibly ugly.

I’ve done stuff in the past which maintained a global map from 64KB super-pages to various
information, and if you’re sufficiently careful about the way region-buffers get allocated, something like that can be used to map from an object address to the region object, for both the object containing the reference, and the (possibly other region) destination object.

I think we are willing to make the simplifying assumption that Regions are strictly nested in a matching parentheses kind of way, i.e. Every Region has a unique parent region whose lifetime contains it. References up the tree of regions are OK, no downward references are permitted. Suppose computation C is allocating in Region R, and it calls computation D as a subcomputation and asks it to allocate in Region S. If the result of sub-computation D is important to its parent, computation C, then C should copy the data from S into R.

I think, in general, C really shouldn’t ask D to use a new Region unless C expects that D will generate a lot of intermediate garbage in the region. If D does generate lots of garbage, the cost of copying the result may be worth the memory savings.

You could probably insulate the caller C from the implementation details of the callee D a bit better with an interface where C passes D both a region R and an offset at which D should create its result. If D needs scratch space, it can create a new region—which it frees before returning—or it can add a bit of garbage at the offset passed from C, returning to C the offset of the actual result. This way, if C creates lots of garbage, it can use a scratch region, but still build its result directly in the parent region, with no copying.

Also, C isn’t being asked to consider things like whether “C expects that D will generate a lot of intermediate garbage”. D is being asked to make decisions based on D’s implementation, so if that implementation changes, C is insulated from the change.

I like this.

I am apprehensive about providing a location in which to write the result. I think I prefer this level of computation to require copies. If we want to elide the copies, we should think about expanding the scope of the IR, IMO.

Currently, the IR cannot make use of multiple regions, so that needs to change before we can take this approach. I think this is a straightforward but somewhat large change.

The simplest, correct thing for us to do is to simply chase pointers recursively writing the contents out in-line.

This is what the current PackCodec does and it should just use the standard region value interface to to traverse the value being serialized and thus should be insensitive to the choices we’re talking about here.

There is a second DirectCodec which directly writes the region value to the output. This is possible because (1) values are stored in a single region, and (2) pointers are represented as offsets. It is tested but not currently used.

If we consider the proposal of changing offsets to raw pointers AND supporting multiple blocks, there are two challenges to serializing: (1) you need to turn the pointers back into offsets, and (2) you have to account for the multiple regions by either copying values from other regions into the current region or serialize all regions being referred to and compute the offsets accordingly.

While direct serialization seems attractive (because it will be so fast) I am skeptical it is particularly useful in most cases because in general a region that arrives at the write step will contain garbage that we don’t want to write out so so the region value needs to be normalized which can just be done with region value traversal copying it into a single, fresh region:

def withoutContext

Maybe mapPartitions (or mapPartitionsWithoutContext) is closer to our current concepts? And similarly mapPartitionsWithContext?

We don’t actually know into which region s points

Regions known their start and length, so if we track the set of regions (e.g. a Region contains a set of regions that its values might point to), then we can search to find the region a value points to. This would be one option to implement multi-block serialization.

Regions cannot grow by copying the buffer into a bigger buffer, because other users may have pointers into their current memory location. Regions should probably become arrays of blocks, always allocating in the latest block.

Right now, all longer-lived regions we want to be able to point into are in fact read-only: globals and column values. So I think we can invert the RDD and switch from offsets to pointers without allocating blocks.

Are we recreating the JVM allocator?

No. Region value allocation is a totally different strategy. That’s the point.

Field Notes

Both to clarify my own understanding and to loop others in, here are my current thoughts on this effort.

Hail objects are now stored in unmanaged blocks of memory. Their addresses are no longer instances of is.hail.annotation.RegionValue. Instead, the address to a hail object is a machine address, represented in the JVM by long. Note: we may not use managed blocks because the GC may arbitrarily relocate said blocks, which invalidates our memory addresses.

A Region is an unmanned block of memory allocated by sun.misc.Unsafe.

A RegionValueBuilder constructs hail objects within a single provided Region.

The lifetime of a Region is a period of time during which the Region will definitely not be free’d. I suspect lifetimes will typically form trees, wherein a submodule of Hail might create a Region for its own purposes and that Region's lifetime will be strictly nested within the Region of the calling module.

The Problem of Serialization

Memory addresses have no meaning on other machines. Previously, we sent hail objects to foreign machines by copying the region containing the hail object and serializing the bytes of that region along with the object’s offset within that region. We will now, instead, encode the object by chasing pointers. This is already implemented by PackEncoder. The result of encoding is an Array[Byte] that can be sent across machines. The offset to the hail object is, implicitly, 0.

If we go with this approach, then it seems that a Region could contain multiple
chunks of memory, and that extending a Region would not need to copy
the current objects (and also wouldn’t want to copy the objects in the existing chunks, since that would screw any inter-object references).

Then a Region just represents an allocate-only/deallocate-everything
pool of memory.

This still leaves the danger that a cross-region reference can go screwy
if the lifetime of the Region containing the destination object ends before the lifetime of the Region containing the reference.

This problem usually occurs when an aggregator-lifetime region needs
a value from a row-lifetime region (e.g. a string). Have to be careful about copying in those situations.

1 Like

I am running into a previously unforeseen issue.

Previously the producer would reset the region (thus free’ing memory) per-element. The producer knows when an element is no longer needed (next is called on the producer’s iterator). In this set up the consumer resets the region. If the consumer is down stream of a flatMap, then it’s not safe to reset per-element because the intermediate producer may produce two elements depending on one previous element.

Grr.

For now, I’m requiring that you copy things out of the region if you need it for more than one produced element. A better solution would allow the produce to mark reset boundaries, and where to reset to (i.e. just throw away the prefix for this element)

I actually did foresee this issue, and never got around to talking about the design with you. If you’re coming in tomorrow, I’d enjoy talking through things. If not, I can try to get my thoughts in order enough to write something comprehensible.

I don’t see the problem here. The flatMap is itself a consumer that establishes another region value that is longer lived than the (upstream) per-element region used by the consumer of the flatMap. The flatMap’s region is still allocated per-partition and owned by the RVDContext.

The issue is with calling Region.clear. At first, I removed all calls to clear, but the tests started OOM’ing. Then I tried adding a call to clear between calls to Iterator.next in the ultimate consumer (writes, collects, etc.). This fails if an intermediate node’s uses a flatMap a la:

val rvb = new RegionValueBuilder()
val rv2 = new RegionValue()

inputs.flatMap { rv =>
  for (i <- Seq(0, 1, 2)) yield {
    rvb.set(rv.region)
    rvb.start(TTuple(Int, oldRowType))
    rvb.addInt(i)
    rvb.addRegionValue(oldRowType, rv.offset)
    rv2.set(rv.region, rvb.end())
    rv2
  }
}

because the ultimate consumer (knowing nothing of this flatMap) clears after i = 0, which throws away rv, but I still need it for i = 1 and i = 2.

Code that previously used this pattern would clear the suffix it used or start a new region entirely.

My working solution (but I seek better ones from y’all) is to copy any necessary information to another region. This is not a long-term viable solution because the intermediate node does not know when to free the region it creates. My best long-term idea is to provide a way for users to insert “clear marks” so that consumers somehow know what suffix is garbage.

NB: Everything is still on heap. The change I’m making now is just to make everyone use the region provided by the ContextRDD.

I prefer to use reference-counts to keep lifetimes correct and near-optimal (i.e. never delete stuff that someone else still wants; but quickly delete stuff that no-one wants any more) without forcing the rest of the design into a straitjacket which may steer it away from other goals (e.g. performance, avoiding unnecessary copying etc).

Hmm, I guess if you want to have multiple RV’s allocated from a single Region, but corresponding to different downstream records, then you would need the RV’s to be
ref-counted, but then also for the Region to be ref-counted. That seems to be heading in a gnarly direction where Region’s could get really big and have interleaved allocations corresponding to different records, and I’m not sure you want to go there.

After lunchtime conversation yesterday, I think I understand the proposed use of Regions better - viz
allowing downstream operators to add and remove stuff to an existing Region as long as they treat the
upstream contents as immutable. But I’m still not sure I like it.

I have two big concerns:

  1. It seems like a big change from the current Spark-ish GC-based contract between iterators.

  2. I think some kinds of operators might be able to achieve higher throughput (in C++) if they’re
    allowed some bounded eagerness, i.e. pulling several rows from the upstream iterator to form
    a batch for processing. But that would require decoupling the Region-lifetime (and frame-on-Region
    lifetime) from the timing of the iterator operations, i.e. a downstream operator would be interested in a
    Region until it explicitly says that it isn’t.

Along with that, I have a question. What immutable data is it that we’re hoping to share between
upstream and downstream RegionValue’s on the same Region ? My guess is that it’s mostly
string-contents. When you mutate one element in a TArray (or change its missingness), you’ve got to
copy the whole TArray. When you mutate one field in a TStruct, you’ve got to copy the whole TStruct.
So what actually does get preserved, other than string contents ?

[To elaborate on this, the current RegionValue design puts TStruct’s inline rather than as individually-allocated objects, which means that any mutation of a field of a TStruct has to propagate outwards to cause reallocation/copying of an enclosing TArray (or the outermost TStruct). Which requires more copying and less sharing than with the Scala/Java policy of every-TStruct-is-a-separately-allocated-object.]

But then string contents are often also identical from record to record, as well as between mutated
downstream versions of the same record. So maybe it makes sense to have string-dictionary objects which
have a lifetime longer than one record (maybe up to 256 records ?), and encode string values as 32bit
indexes into a string dictionary, and have multiple Region’s referring to a shared refcounted string dictionary
(the dictionary is refcounted, but not the individual entries).

First, @rcownie, thanks for challenging the larger design. I look forward to more of the discussions and I think they can only improve the design.

I think @dking and I might have slightly different expectations about where things are going. Let me give my take.

I wanted to keep the IR functional because I think it greatly simplifies most of what we want to do in the query optimizer. In the initial draft of the IR, @dking included struct and array mutation operations and general looping constructs which I pushed back on.

I don’t think the implementation of the IR needs to be immutable, nor should it. I’m totally fine with modifying unshared structures and have described a few examples where I’m explicitly hoping for it: to use your struct example, if you’re going to insert into a struct downstream, you should allocate space for the insertion upstream and modify it downstream.

The existing Scala/Spark model is totally iterators over immutable data. That’s where we inherited it from. In walking from Spark + annotation => C++ + region values, I thought changing it would be too disruptive. (I also think it has simplifying value.) I’m happy to revisit this if you have an alternate proposal.

I think GC and iterators are both wrong for us so I think moving away from them is a feature, not a bug. The work @dking is doing on ContextRDD to invert the iterator model will allow us to manage memory explicitly rather than relying on GC.

Absolutely, we want to vectorize our computations in the database sense. My model for this was to be able to have the caller request multiple rows delivered in a region and return either an array of pointers to the row, or a pointer to an array of rows in the region. If you do the later, none of the existing interfaces change.

But that would require decoupling the Region-lifetime (and frame-on-Region
lifetime) from the timing of the iterator operations

I’m not sure I understand. The region lifetimes and the iterator operations are completely decoupled in the current design. There is nothing to stop a consumer from calling a producer multiple times to add values to a region without reseting the region, or from returning values in the same region multiple times in a consumer (exactly the flatMap case).

Along with that, I have a question. What immutable data is it that we’re hoping to share between upstream and downstream RegionValue’s on the same Region ? My guess is that it’s mostly string-contents. When you mutate one element in a TArray (or change its missingness), you’ve got to copy the whole TArray. When you mutate one field in a TStruct, you’ve got to copy the whole TStruct. So what actually does get preserved, other than string contents ?

These are great and important questions. A few comments:

  1. Almost none of our data is strings. The most common entry schema is:

    struct {
      GT: call
      AD: array<int>,
      DP: int,
      GQ: int,
      PL: array<int>
    }
    

    This accounts for 99% of our data (by volume). If by strings, you mean non-scalar valus, then yes, you’re right: strings, arrays, sets and dicts. Remember, the entries are stored in the region value row as a column-indexed array of the above entry structs. When you insert a toplevel field into the region value row, that array (99%) of the data, get shared.

  2. For inserting fields into structs, I see three possible implementations: (1) copy the struct: bad! (2) pre-allocate the space for the inserted field and assign the field (and missing bits) and (3) save the value to be inserted and a pointer to the original struct.

    To implement (3), as compared to the “default” layout for a struct, we need a way to (statically) distinguish between multiple physical implementations of the struct type. I call these “physical types” and are on the near-term roadmap. I wrote up the idea a while back here: Future of types 2: virtual and physical types.

    From this perspective, I think of user-level types as the “interface” to values inside Hail, and physical types to be “implementations” of those interfaces, but these “implementations” should be known and resolved statically.

  3. We don’t make it easy to mutate a single array element at the user level because that just isn’t efficient unless we make mutation part of the user model, which I’d like to avoid for lots of reasons. (Does SQL?)

I think it would be reasonable to make structs pointed to. In fact, we should have both: the choice should be a physical type.

Thanks for the response. I think I see where you’re heading with this, which is more
towards a global optimization of the data layout within a stage, making the upstream
operators conform with a data layout that allows for roughly the union of all fields you
want anywhere, but then with physical types which allow the upstream operators to leave gaps and ignore any stuff in the gaps.

Then if a field gets possibly-modified by an operator, it gets allocated into a different
previously-empty struct offset, leaving the unmutated field values in place.

At the high level, we have a set of design decisions about how to manage data layout, upstream/downstream operator contract,and making sure that the big data is actively managed rather falling through holes into GC-and-auto-close(). We probably need to get in front of a whiteboard and make sure we’re heading the same way - in particular, if the desired endpoint is global optimization of data layout with physical types, then we wouldn’t want to detour into a lot of complicated code generation to do eventually-unnecessary copying.

[On the particular goal of allowing immutable-row-entry-array to survive mutation of the row-annotations, an alternative would be put those two in separate Regions
so that you could completely replace either one independent of the other (and sometimes drop one or the other completely)].