ExecStreamHowTo Struct Reference

ExecStreamHowTo is a guide to writing new implementations of the ExecStream interface and plugging them into Farrago. More...


Detailed Description

ExecStreamHowTo is a guide to writing new implementations of the ExecStream interface and plugging them into Farrago.

This document is intended for developers who need to extend Fennel query execution capabilities. Writing an ExecStream implementation is a moderately difficult undertaking requiring familiarity with a number of underlying Fennel libraries. Before starting, ask yourself whether a new ExecStream is really the best solution to the problem at hand; it may be easier to extend some other facility (TODO links):

If you have already ruled out all of those alternatives, then read on. As background, you may also want to read the SchedulerDesign and ExecStreamDesign.


Choosing a Base Class

The first step is to decide where your new ExecStream fits into the class hierarchy. For traditional query processing, streams are wired together into tree structures, with each stream producing a single output flow of tuples and consuming zero or more input streams. The abstract class SingleOutputExecStream is a common base for all such stream implementations. If your stream will be instantiated as an inputless leaf of the tree (e.g. the implementation for a new kind of table access), then you can derive from SingleOutputExecStream directly. Otherwise, choose either ConduitExecStream (for exactly one input, e.g. a filter) or ConfluenceExecStream (for any number of inputs, e.g. a join or union) as the base.

Fennel also supports streams which produce multiple outputs, e.g. for recursive query processing, but we will ignore that possibility in this document.

The base classes provided fill in some of the abstract ExecStream methods, and in many cases those implementations will suffice. However, you are free to override any or all of them; usually the overriding method will need to call the base method.

As an example, suppose we want to define a new ExecStream which performs the same job as the uniq command in Unix; that is, it requires a pre-sorted input stream, and removes duplicates by comparing successive pairs of tuples. This is clearly a ConduitExecStream, so we now know enough to write:

class UniqExecStream : public ConduitExecStream
{
    // TODO
};

So, given an input data stream like

CanadaOntario
CanadaOntario
EUFrance
EUGermany
EUGermany
USAGeorgia
USSRGeorgia
USSRGeorgia

UniqExecStream should produce an output data stream like

CanadaOntario
EUFrance
EUGermany
USAGeorgia
USSRGeorgia


Parameters

To get maximum mileage out of the effort required to implement an ExecStream, it is a good idea to parameterize the stream so that it can be used in a variety of contexts. For UniqExecStream, we might want to support an option which causes duplicate detection to result in an error. With this option disabled, UniqExecStream can be used to implement SELECT DISTINCT; with this option enabled, UniqExecStream can be used to prevent duplicate values when loading a unique index.

ExecStream parameters are defined via a parallel class hierarchy descending from ExecStreamParams. Each class derived from ExecStream should have a corresponding parameter class, even no new parameters are required; the rationale for this rule will be explained later when we cover how streams are instantiated. For now, here is a parameter class for our running example:

struct UniqExecStreamParams : public ConduitExecStreamParams
{
    bool failOnDuplicate;

    explicit UniqueExecStreamParams()
    {
        failOnDuplicate = false;
    }
};

It is polite (though not strictly required) to define a default constructor for your parameters class which assigns sensible values to all parameters.


Algorithm

At this point, it is a good idea to sketch out the algorithm which the stream will use to process its input and produce its output. This will necessarily be quite different from the final code, but should be enough to get an idea of the state variables required. Here's pseudocode for UniqExecStream:

lastTuple = null;
while (!input.EOS()) {
    currentTuple = input.readTuple();
    if (lastTuple != null) {
        if (currentTuple == lastTuple) {
            if (failOnDuplicate) {
                throw DuplicateKeyExcn(lastTuple);
            }
            continue;
        }
    }
    lastTuple = currentTuple;
    output.writeTuple(currentTuple);
}

One difficult aspect of ExecStream implementation is transforming active code (as above) into a passive state machine. In the real implementation, execution must yield whenever input.readTuple() exhausts an input buffer or output.writeTuple overflows an output buffer; and input and output streams are never invoked directly.


Class Definition

In order to fill in our skeletal class, let us map the state from the pseudocode into actual Fennel data structures:

So, at a minimum, our class will have the following members:

class UniqExecStream : public ConduitExecStream
{
    bool failOnDuplicate;
    bool previousTupleValid;
    TupleData previousTuple;
    TupleData currentTuple;
    boost::scoped_array<FixedBuffer> pLastTupleSaved;

public:
    // implement ExecStream
    virtual void prepare(UniqExecStreamParams const &params);
    virtual void open(bool restart);
    virtual ExecStreamResult execute(ExecStreamQuantum const &quantum);
    virtual void closeImpl();
};

Note that the open, execute and closeImpl methods override base ExecStream methods, while the prepare method overloads instead since its parameter signature is different. So even though prepare is declared as virtual, it probably will not be invoked via virtual dispatch.

Also note that most ExecStream implementations do not require any constructors to be defined. The reason is that the prepare and open method are responsible for initializing state before execution. There is one exception to this rule: if the implementation provides a closeImpl method, it is necessary for the constructor to initialize any data members which might be accessed by closeImpl, since closeImpl is called even if the stream is discarded before prepare/open due to an exception. Classes which use only proper holder classes such as boost::scoped_array need not worry about this.

Likewise, a destructor is unnecessary unless the implementation allocates resources without proper holder objects and does not release them inside of closeImpl.


Preparation

The prepare method is called only once during the lifetime of a stream, before it is opened for the first execution. This is our chance to record parameter values and precompute any information needed throughout the lifetime of the stream. Precomputation is essential for high performance, since anything which can be allocated or precomputed at preparation time reduces per-tuple execution cost.

It's important to understand that the parameters reference passed to the prepare method is transient and will not be available during execution. Hence, it is the responsibility of the prepare method to copy out any information it needs to preserve.

Here is the code for UniqExecStream:

void UniqExecStream::prepare(UniqExecStreamParams const &params)
{
    ConduitExecStream::prepare(params);

    assert(pInAccessor->getTupleDesc() == pOutAccessor->getTupleDesc();

    failOnDuplicate = params.failOnDuplicate;

    previousTuple.compute(pInAccessor->getTupleDesc());
    currentTuple.compute(pInAccessor->getTupleDesc());
}

So, where did pInAccessor and pOutAccessor come from? They were inherited from ConduitExecStream, which guarantees that these instances of ExecStreamBufAccessor will be set up correctly by the time prepare is called. These provide the stream with access to the input and output buffers of the conduit. At preparation time, there is of course no data in the buffers yet, but TupleDescriptor instances have already been set up to inform us of the shape of the data which we will be expected to process during execution. As a sanity check, UniqExecStream asserts that its input and output tuple descriptors are identical, since it only filters the stream without changing its shape.

The calls to TupleData::compute are required in order to set up previousTuple and currentTuple for use during execution. Under the covers, these calls allocate and initialize correctly-sized arrays of TupleDatum objects; this is a good example of the kind of allocation which should be avoided during execution.

What about the pLastTupleSaved buffer? We already have enough information to allocate it now. However, it could be very large (depending on the maximum tuple size) and does not require significant computation to allocate. So, we defer its allocation until open is called, thus avoiding memory bloat.


Opening for Business

Once prepared, an ExecStream can be used over and over to process multiple executions of the same prepared query. Each execution is initiated with a call to open and terminated with a call to close. So, it's important for the open call to reset any state which will affect execution results; otherwise, leftover state from an earlier execution could pollute a later execution. For our example, we need to clear the previousTupleValid flag and allocate the pLastTupleSaved buffer:

void UniqExecStream::open(bool restart)
{
    ConduitExecStream::open(restart);
    previousTupleValid = false;
    if (!pLastTupleSaved) {
        uint cbTupleMax =
            pInAccessor->getConsumptionTupleAccessor().getMaxByteCount();
        pLastTupleSaved.reset(new FixedBuffer[cbTupleMax]);
    }
}

The restart flag is only true when a stream is restarted in the middle of an execution (e.g. as part of a nested loop join). Note that in rare cases, open may be called with restart=true even after a close.


Execution

Finally, everything is set up, and we can get busy with writing the execute method, which does the real work. Since each stream implementation is supposed to do something interesting and different, it's difficult to give precise instructions for writing this method. However, various categories of streams tend to follow well-defined patterns. Our UniqExecStream provides a good template for implementing filtering streams, and from there it is straightforward to generalize to streams with any number of inputs. We will start with a complete listing of the method body, and then walk through line by line.

ExecStreamResult UniqExecStream::execute(ExecStreamQuantum const &quantum)
{
    ExecStreamResult rc = precheckConduitBuffers();
    if (rc != EXECRC_YIELD) {
        return rc;
    }

    for (uint nTuples = 0; nTuples < quantum.nTuplesMax; ++nTuples) {
        while (!pInAccessor->isTupleConsumptionPending()) {
            if (!pInAccessor->demandData()) {
                return EXECRC_BUF_UNDERFLOW;
            }
            pInAccessor->unmarshalTuple(currentTuple);

            if (previousTupleValid) {
                int c = pInAccessor->getTupleDesc().compareTuples(
                    lastTuple, currentTuple);
                assert(c <= 0);
                if (c == 0) {
                    pInAccessor->consumeTuple();
                    if (failOnDuplicate) {
                        throw DuplicateKeyExcn(lastTuple);
                    }
                    continue;
                }
            } else {
                previousTupleValid = true;
            }

            TupleAccessor &tupleAccessor =
                pInAccessor->getScratchTupleAccessor();
            memcpy(
                pLastTupleSaved.get(),
                pInAccessor->getConsumptionStart(),
                pInAccessor->getConsumptionTupleAccessor()
                    .getCurrentByteCount());
            tupleAccessor.setCurrentTupleBuf(pLastTupleSaved.get());
            tupleAccessor.unmarshal(lastTuple);
        }

        if (!pOutAccessor->produceTuple(lastTuple)) {
            return EXECRC_BUF_OVERFLOW;
        }
        pInAccessor->consumeTuple();
    }
    return EXECRC_QUANTUM_EXPIRED;
}

In comparison with the pseudocode presented earlier, this is a bit more complicated. Examining the first few lines,

    ExecStreamResult rc = precheckConduitBuffers();
    if (rc != EXECRC_YIELD) {
        return rc;
    }

we find a call to the base class helper method ConduitExecStream::precheckConduitBuffers. This ensures that there is data ready to be processed in the input buffer and that at least some space remains available in the output buffer. If the end of the input stream has been reached, this call will cause execute to return EXECRC_EOS.

Next we come to the beginning of the outer loop:

    for (uint nTuples = 0; nTuples < quantum.nTuplesMax; ++nTuples) {

This loop keeps count of the number of tuples produced so far during the current invocation of execute, yielding once the requested quantum has expired. Within this outer loop is nested an inner loop:

        while (!pInAccessor->isTupleConsumptionPending()) {

This inner loop is executed once per input tuple consumed (approximately). The while test causes the loop body to be skipped altogether if a previous invocation of execute already accessed an input tuple but had to yield before completly processing it (due to output buffer overflow).

            if (!pInAccessor->demandData()) {
                return EXECRC_BUF_UNDERFLOW;
            }
            pInAccessor->unmarshalTuple(currentTuple);

Now that the stream is ready to process a new input tuple, the first thing to do is to make sure that one is available. (precheckConduitBuffers guarantees this the first time through the loop, but any time after that we can exhaust the current input buffer.) The ExecStreamBufAccessor::demandData call takes care of this. The ExecStreamBufAccessor::unmarshalTuple call unmarshals a tuple by reference from the input buffer into currentTuple, putting pInAccessor into the consumption pending state (the buffer space cannot be reused until we are done accessing it).

The next block of code is the heart of the algorithm:

            if (previousTupleValid) {
                int c = pInAccessor->getTupleDesc().compareTuples(
                    lastTuple, currentTuple);
                assert(c <= 0);
                if (c == 0) {
                    pInAccessor->consumeTuple();
                    if (failOnDuplicate) {
                        throw DuplicateKeyExcn(lastTuple);
                    }
                    continue;
                }
            } else {
                previousTupleValid = true;
            }

If a previous tuple has been seen, it is compared to the current one. The assertion verifies that our input stream was actually sorted in ascending order as promised. If the current and previous tuples match, the current one is consumed (and ignored) and control flows to fetch the next tuple. (Or an exception is thrown if the corresponding parameter is in effect. Note that in this case the input tuple is consumed so that if re-execution is requested the stream will skip past the offending tuple.)

Once the distinctness test has passed, it is necessary to save a copy of the new values in currentTuple as lastTuple:

            TupleAccessor &tupleAccessor =
                pInAccessor->getScratchTupleAccessor();
            memcpy(
                pLastTupleSaved.get(),
                pInAccessor->getConsumptionStart(),
                pInAccessor->getConsumptionTupleAccessor()
                    .getCurrentByteCount());
            tupleAccessor.setCurrentTupleBuf(pLastTupleSaved.get());
            tupleAccessor.unmarshal(lastTuple);

ExecStreamBufAccessor::getScratchTupleAccessor provides a spare instance of TupleAccessor for just such purposes. The code uses a straight memcpy for speed instead of marshalling individual values. However, after the memcpy, it is still necessary to call TupleAccessor::unmarshal so that lastTuple references the new data just copied.

At this point, it would be valid to insert a break statement to drop out of the inner loop, since the while test is guaranteed to be false. Either way, it is now time to write out the new tuple:

        if (!pOutAccessor->produceTuple(lastTuple)) {
            return EXECRC_BUF_OVERFLOW;
        }
        pInAccessor->consumeTuple();

Unfortunately, there may be insufficient space in the output buffer. When that happens, the stream must yield, going into suspended animation until the contents of the output buffer have been consumed and space is freed up. This is the reason that we don't actually consume the input tuple until the output tuple has been written successfully--the consumption pending state serves to guide us down the right path on the next execution. If this does not make sense to you now, it will after a few hours with a debugger.

ASIDE: In this case, we could have used a separate flag instead of the consumption state, but in general, the TupleData used for output may be referencing memory from the input tuple, so the pattern shown here is safest.

Finally, execution continues around the outer for loop unless the quantum has expired:

    }
    return EXECRC_QUANTUM_EXPIRED;

A number of optimizations are possible; here are a few:


Closing Up Shop

A stream can be closed at any time. There is no guarantee that it has been prepared or opened; if it has been opened, there is no guarantee that EOS has been reached, or that execute has been called even once. In response to a close request, a stream should release any resources which were not allocated by prepare. It does not need to reset other state, since the class contract guarantees that open will be called again before any re-execution.

For UniqExecStream, we should deallocate pLastTupleSaved so that excess memory is not tied down while the stream is inactive:

void UniqExecStream::closeImpl()
{
    pLastTupleSaved.reset();
    ConduitExecStream::closeImpl();
}


Unit Testing

Fennel provides a test harness for ExecStream implementations in ExecStreamTestBase. Class ExecStreamTest contains a number of stream test cases which you can use as examples. One of the challenges of stream testing is generating test data. For UniqExecStream, we want to generate data containing duplicates; for this, we can adapt RampExecStreamGenerator. So, here is the procedure for one possible test case:

  1. Set up the duplicate generator to drive a MockProducerExecStream, producing a sequence like { 0, 0, 1, 1, 2, 2, ... }

  2. Transform this mock output via UniqExecStream.

  3. Use a normal RampExecStreamGenerator to verify that the UniqExecStream sequence is { 0, 1, 2, ... } with the number of output tuples expected to be half the number of input tuples.

The code for this is shown below.

class RampDuplicateExecStreamGenerator : public MockProducerExecStreamGenerator
{
public:
    virtual int64_t generateValue(uint iRow)
    {
        return iRow/2;
    }
};

void UniqExecStreamTest::testWithDuplicates()
{
    StandardTypeDescriptorFactory stdTypeFactory;
    TupleAttributeDescriptor attrDesc(
        stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));

    MockProducerExecStreamParams mockParams;
    mockParams.outputTupleDesc.push_back(attrDesc);
    mockParams.nRows = 5000;     // at least two buffers
    mockParams.pGenerator.reset(new RampDuplicateExecStreamGenerator());

    ExecStreamEmbryo mockStreamEmbryo;
    mockStreamEmbryo.init(new MockProducerExecStream(),mockParams);
    mockStreamEmbryo.getStream()->setName("MockProducerExecStream");

    UniqExecStreamParams uniqParams;
    uniqParams.failOnDuplicate = false;

    ExecStreamEmbryo uniqStreamEmbryo;
    uniqStreamEmbryo.init(new UniqExecStream(),bufParams);
    uniqStreamEmbryo.getStream()->setName("UniqExecStream");

    SharedExecStream pOutputStream = prepareTransformGraph(
        mockStreamEmbryo, uniqStreamEmbryo);

    RampExecStreamGenerator expectedGen;

    verifyOutput(
        *pOutputStream,
        mockParams.nRows/2,
        expectedGen);
}

Other test cases for UniqExecStream would


Models and Factories

In order for your brand new ExecStream implementation to be usable from Farrago, there are a few more steps you need to take once it has been successfully unit tested:

  1. Determine which UML model you should edit to register the stream class in the catalog. (TODO: link to Farrago JNI docs.) Normally, this is the file farrago/catalog/xmi/FarragoExtMetamodelUML.zuml, which can be edited with Poseidon. If you are working on a separate project which extends Farrago, it may have its own model.

  2. Edit the model, defining a UML class which inherits from ExecutionStreamDef. The UML class should have attributes corresponding to the fields of the parameter class you defined in C++ (e.g. UniqExecStreamParams).

  3. Regenerate the catalog (ant createCatalog for Farrago).

  4. Regenerate the C++ code which exposes the model to Fennel (ant generateFemCpp for Farrago).

  5. Determine the correct C++ factory class which will be responsible for creating new instances of your stream class. For Farrago, this is ExecStreamFactory for some streams, or a class derived from ExecStreamSubFactory for others.

  6. Add a visit method to the factory. For our running example, the signature would be virtual void ExecStreamFactory::visit(ProxyUniqStreamDef &). Follow the example of other visit methods nearby.


Optimizer Rules

Of course, even though Farrago now knows how to instantiate your stream, you are not done yet, because Farrago does not yet know anything about the semantics of your stream. So the next step is to write an optimizer rule. Such a big topic is out of scope for this document (TODO: link to a yet-to-be-written HOWTO), but the next paragraph provides an outline.

For our example, the new optimizer rule would be responsible for matching the pattern wherein a GROUP BY relational operator with no aggregates and all fields grouped (the translation of DISTINCT) has a child with all fields pre-sorted. The rule would replace the GROUP BY with a physical relational expression corresponding to UniqExecStream, say FennelUniqRel (a Java class). In response to a call to FennelUniqRel.toStreamDef, FennelUniqRel would create a new instance of model-generated Java class UniqStreamDef and fill in its attributes. Once Fennel received the complete plan, this would be translated automatically into an instance of C++ class ProxyUniqStreamDef, which is the input to the factory method discussed in the previous section. The final step is to get down on your knees and beseech the optimizer to do what you mean, not what you say.


Advanced Resource Allocation

Some ExecStream implementations require resources which are costly enough that their allocation and deallocation needs to be centrally managed. The current heavyweight resources known to Fennel are threads and cache pages (as defined by ExecStreamResourceQuantity). Streams which require these resources (including any stream which performs disk access) must override method ExecStream::getResourceRequirements (the default implementation requests zero resources). In response, the resource governor, ExecStreamGovernor, calls ExecStream::setResourceAllocation. The default implementation for this method records the granted resource allocation in member variable ExecStream::resourceAllocation, which can be examined by the stream to decide how much to allocate. See ResourceGovernorDesign for a detailed description of the resource governor.

For the sake of example, suppose that instead of using boost::scoped_array to allocate pLastTupleSaved from the heap, we instead wanted to pin a cache page for this purpose. In that case, we would change UniqExecStream as follows:

class UniqExecStream : public ConduitExecStream
{
    bool failOnDuplicate;
    bool previousTupleValid;
    TupleData previousTuple;
    TupleData currentTuple;
    SegPageLock bufferLock;     // NEW
    SegmentAccessor scratchAccessor; // NEW
    PBuffer pLastTupleSaved;    // CHANGED

public:
    // implement ExecStream
    virtual void prepare(UniqExecStreamParams const &params);
    virtual void open(bool restart);
    virtual ExecStreamResult execute(ExecStreamQuantum const &quantum);
    virtual void closeImpl();

    // NEW
    virtual void getResourceRequirements(
        ExecStreamResourceQuantity &minQuantity,
        ExecStreamResourceQuantity &optQuantity);
};

void UniqExecStream::prepare(UniqExecStreamParams const &params)
{
    ConduitExecStream::prepare(params);

    assert(pInAccessor->getTupleDesc() == pOutAccessor->getTupleDesc();

    failOnDuplicate = params.failOnDuplicate;

    previousTuple.compute(pInAccessor->getTupleDesc());
    currentTuple.compute(pInAccessor->getTupleDesc());

    // NEW
    scratchAccessor = params.scratchAccessor;
    bufferLock.accessSegment(scratchAccessor);
}

// NEW
void UniqExecStream::getResourceRequirements(
    ExecStreamResourceQuantity &minQuantity,
    ExecStreamResourceQuantity &optQuantity)
{
    ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
    minQuantity.nCachePages += 1;
    optQuantity = minQuantity;
}

void UniqExecStream::open(bool restart)
{
    ConduitExecStream::open(restart);
    previousTupleValid = false;
    if (!restart) {
        uint cbTupleMax =
            pInAccessor->getConsumptionTupleAccessor().getMaxByteCount();

        // CHANGED
        bufferLock.allocatePage();
        assert(bufferLock.getPage().getCache().getPageSize() >= cbTupleMax);
        pLastTupleSaved = bufferLock.getPage().getWritableData();
    }
}

void UniqExecStream::closeImpl()
{
    // CHANGED
    pLastTupleSaved = NULL;
    bufferLock.unlock();

    ConduitExecStream::closeImpl();
}

TODO: scheduler interface for centralized thread pooling


Exception Handling

ExecStream::execute may throw an exception at any time. It is up to scheduler implementations to decide how to handle this. Some schedulers may terminate the query; others may enqueue the exception and attempt to continue. For this reason, streams which throw exceptions may want to update state before throwing (as in the UniqExecStream example) to allow for meaningful resumption of execution (either skipping an offending tuple, or re-attempting a failed request to an underlying service).


More To Explore

The best way to learn more about the techniques involved in constructing advanced ExecStream implementations is to study the interfaces involved (particular ExecStreamBufAccessor) and existing ExecStream implementations. For buffer-handling tricks, see SegBufferStream, ScratchBufferStream, and CopyExecStream. For an example of a stream which consumes two inputs at once, see CartesianJoinExecStream. For an example of a stream with no inputs, see BTreeScanExecStream.

Definition at line 887 of file ExecStreamHowTo.cpp.


The documentation for this struct was generated from the following file:
Generated on Mon Jun 22 04:00:30 2009 for Fennel by  doxygen 1.5.1