ExecStreamBufAccessor.h

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/ExecStreamBufAccessor.h#20 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #ifndef Fennel_ExecStreamBufAccessor_Included
00025 #define Fennel_ExecStreamBufAccessor_Included
00026 
00027 #include "fennel/exec/ExecStreamDefs.h"
00028 #include "fennel/tuple/TupleDescriptor.h"
00029 #include "fennel/tuple/TupleFormat.h"
00030 #include "fennel/tuple/TupleAccessor.h"
00031 #include "fennel/tuple/TupleProjectionAccessor.h"
00032 #include "fennel/tuple/TupleOverflowExcn.h"
00033 
00034 #include <boost/utility.hpp>
00035 
00036 FENNEL_BEGIN_NAMESPACE
00037 
00045 class FENNEL_EXEC_EXPORT ExecStreamBufAccessor
00046     : public boost::noncopyable
00047 {
00048     PBuffer pBufStart;
00049 
00050     PBuffer pBufEnd;
00051 
00052     PBuffer pProducer;
00053 
00054     PBuffer pConsumer;
00055 
00056     ExecStreamBufProvision provision;
00057 
00058     ExecStreamBufState state;
00059 
00060     bool pendingEOS;
00061 
00062     TupleDescriptor tupleDesc;
00063 
00064     TupleFormat tupleFormat;
00065 
00066     TupleAccessor tupleProductionAccessor;
00067 
00068     TupleAccessor tupleConsumptionAccessor;
00069 
00070     TupleProjectionAccessor tupleProjectionAccessor;
00071 
00072     uint cbBuffer;
00073 
00075     inline void setEOS();
00076 
00077 public:
00078     inline explicit ExecStreamBufAccessor();
00079 
00080     virtual ~ExecStreamBufAccessor()
00081     {
00082     }
00083 
00089     inline void setProvision(ExecStreamBufProvision provision);
00090 
00098     inline void setTupleShape(
00099         TupleDescriptor const &tupleDesc,
00100         TupleFormat tupleFormat = TUPLE_FORMAT_STANDARD);
00101 
00105     inline void clear();
00106 
00117     inline void provideBufferForProduction(
00118         PBuffer pStart,
00119         PBuffer pEnd,
00120         bool reusable);
00121 
00130     inline void provideBufferForConsumption(
00131         PConstBuffer pStart,
00132         PConstBuffer pEnd);
00133 
00138     inline void requestProduction();
00139 
00144     inline void requestConsumption();
00145 
00150     inline bool isProductionPossible() const;
00151 
00156     inline bool isConsumptionPossible() const;
00157 
00164     inline bool demandData();
00165 
00171     inline void markEOS();
00172 
00188     inline void produceData(PBuffer pEnd);
00189 
00205     inline void consumeData(PConstBuffer pEnd);
00206 
00212     inline PConstBuffer getConsumptionStart() const;
00213 
00219     inline PConstBuffer getConsumptionEnd() const;
00220 
00227     inline uint getConsumptionAvailable() const;
00228 
00239     uint getConsumptionAvailableBounded(uint cbLimit);
00240 
00246     inline uint getConsumptionTuplesAvailable();
00247 
00256     inline PConstBuffer spanWholeTuples(PConstBuffer start, uint size);
00257 
00265     inline PBuffer getProductionStart() const;
00266 
00274     inline PBuffer getProductionEnd() const;
00275 
00282     inline uint getProductionAvailable() const;
00283 
00289     inline ExecStreamBufState getState() const;
00290 
00295     inline bool hasPendingEOS() const;
00296 
00302     inline ExecStreamBufProvision getProvision() const;
00303 
00309     inline TupleDescriptor const &getTupleDesc() const;
00310 
00316     inline TupleFormat getTupleFormat() const;
00317 
00324     inline void validateTupleSize(TupleData const &tupleData);
00325 
00335     inline bool produceTuple(TupleData const &tupleData);
00336 
00344     inline TupleAccessor &accessConsumptionTuple();
00345 
00355     inline void unmarshalTuple(TupleData &tupleData, uint iFirstDatum = 0);
00356 
00361     inline void consumeTuple();
00362 
00367     inline bool isTupleConsumptionPending() const;
00368 
00372     inline TupleAccessor &getConsumptionTupleAccessor();
00373 
00378     inline TupleAccessor &getScratchTupleAccessor();
00379 
00385     inline void bindProjection(TupleProjection const &inputProj);
00386 
00392     inline void unmarshalProjectedTuple(TupleData &projTupleData);
00393 };
00394 
00395 inline ExecStreamBufAccessor::ExecStreamBufAccessor()
00396 {
00397     clear();
00398     provision = BUFPROV_NONE;
00399     state = EXECBUF_EOS;
00400     tupleFormat = TUPLE_FORMAT_STANDARD;
00401     cbBuffer = 0;
00402 }
00403 
00404 inline bool ExecStreamBufAccessor::isProductionPossible() const
00405 {
00406     return !pendingEOS && (state != EXECBUF_EOS) && (state != EXECBUF_OVERFLOW);
00407 }
00408 
00409 inline bool ExecStreamBufAccessor::isConsumptionPossible() const
00410 {
00411     return (state == EXECBUF_OVERFLOW) || (state == EXECBUF_NONEMPTY);
00412 }
00413 
00414 inline void ExecStreamBufAccessor::setProvision(
00415     ExecStreamBufProvision provisionInit)
00416 {
00417     assert(provision == BUFPROV_NONE);
00418     provision = provisionInit;
00419 }
00420 
00421 inline void ExecStreamBufAccessor::setTupleShape(
00422     TupleDescriptor const &tupleDescInit,
00423     TupleFormat tupleFormatInit)
00424 {
00425     tupleDesc = tupleDescInit;
00426     tupleFormat = tupleFormatInit;
00427     tupleProductionAccessor.compute(tupleDesc, tupleFormat);
00428     tupleConsumptionAccessor.compute(tupleDesc, tupleFormat);
00429 }
00430 
00431 inline void ExecStreamBufAccessor::clear()
00432 {
00433     pBufStart = NULL;
00434     pBufEnd = NULL;
00435     pProducer = NULL;
00436     pConsumer = NULL;
00437     cbBuffer = 0;
00438     state = EXECBUF_EMPTY;
00439     pendingEOS = false;
00440     tupleProductionAccessor.resetCurrentTupleBuf();
00441     tupleConsumptionAccessor.resetCurrentTupleBuf();
00442 }
00443 
00444 inline void ExecStreamBufAccessor::provideBufferForProduction(
00445     PBuffer pStart,
00446     PBuffer pEnd,
00447     bool reusable)
00448 {
00449     assert((state == EXECBUF_UNDERFLOW) || (state == EXECBUF_EMPTY));
00450     assert(provision == BUFPROV_CONSUMER);
00451     pBufStart = pStart;
00452     pBufEnd = pEnd;
00453     pProducer = pStart;
00454     pConsumer = pStart;
00455     cbBuffer = pEnd - pStart;
00456     state = EXECBUF_UNDERFLOW;
00457 
00458     if (!reusable) {
00459         // indicate that this buffer is not reusable
00460         pBufStart = NULL;
00461     }
00462 }
00463 
00464 inline void ExecStreamBufAccessor::provideBufferForConsumption(
00465     PConstBuffer pStart,
00466     PConstBuffer pEnd)
00467 {
00468     assert((state == EXECBUF_UNDERFLOW) || (state == EXECBUF_EMPTY));
00469     assert(provision == BUFPROV_PRODUCER);
00470     pBufStart = const_cast<PBuffer>(pStart);
00471     pBufEnd = const_cast<PBuffer>(pEnd);
00472     pConsumer = pBufStart;
00473     pProducer = pBufEnd;
00474     state = EXECBUF_OVERFLOW;
00475 
00476     // indicate that this buffer is not reusable
00477     pBufStart = NULL;
00478 }
00479 
00480 inline void ExecStreamBufAccessor::requestProduction()
00481 {
00482     assert((state == EXECBUF_UNDERFLOW) || (state == EXECBUF_EMPTY));
00483     state = EXECBUF_UNDERFLOW;
00484     pProducer = pBufStart;
00485     pConsumer = pBufStart;
00486 }
00487 
00488 inline void ExecStreamBufAccessor::requestConsumption()
00489 {
00490     assert((state == EXECBUF_OVERFLOW) || (state == EXECBUF_NONEMPTY));
00491     state = EXECBUF_OVERFLOW;
00492 }
00493 
00494 inline void ExecStreamBufAccessor::markEOS()
00495 {
00496     if (isConsumptionPossible()) {
00497         pendingEOS = true;
00498         return;
00499     }
00500     setEOS();
00501 }
00502 
00503 inline void ExecStreamBufAccessor::setEOS()
00504 {
00505     assert(pProducer == pConsumer);
00506     clear();
00507     state = EXECBUF_EOS;
00508 }
00509 
00510 inline PConstBuffer ExecStreamBufAccessor::getConsumptionStart() const
00511 {
00512     return pConsumer;
00513 }
00514 
00515 inline PConstBuffer ExecStreamBufAccessor::getConsumptionEnd() const
00516 {
00517     return pProducer;
00518 }
00519 
00520 inline uint ExecStreamBufAccessor::getConsumptionAvailable() const
00521 {
00522     return getConsumptionEnd() - getConsumptionStart();
00523 }
00524 
00525 inline uint ExecStreamBufAccessor::getConsumptionTuplesAvailable()
00526 {
00527     TupleAccessor& acc = getScratchTupleAccessor();
00528     PConstBuffer p = getConsumptionStart(),
00529         end = getConsumptionEnd();
00530     int count = 0;
00531     while (p < end) {
00532         acc.setCurrentTupleBuf(p);
00533         p += acc.getCurrentByteCount();
00534         ++count;
00535     }
00536     return count;
00537 }
00538 
00539 inline PConstBuffer ExecStreamBufAccessor::spanWholeTuples(
00540     PConstBuffer start, uint size)
00541 {
00542     TupleAccessor& acc = getScratchTupleAccessor();
00543     assert(size > 0);
00544     PConstBuffer p = start;
00545     PConstBuffer pend = start + size;
00546     for (int ct = 0; ; ct++) {
00547         assert(p < pend);
00548         acc.setCurrentTupleBuf(p);
00549         PConstBuffer q = p;
00550         p += acc.getCurrentByteCount(); // forward 1 tuple
00551         if (p >= pend) {
00552             if (p == pend) {
00553                 // fit the current tuple, but no more
00554                 return p;
00555             } else {
00556                 // here when p is too far, and the tuple [q, p] did not fit.
00557                 return q;
00558             }
00559         }
00560     }
00561     assert(false); // not reached
00562 }
00563 
00564 
00565 inline PBuffer ExecStreamBufAccessor::getProductionStart() const
00566 {
00567     return pProducer;
00568 }
00569 
00570 inline PBuffer ExecStreamBufAccessor::getProductionEnd() const
00571 {
00572     return pBufEnd;
00573 }
00574 
00575 inline uint ExecStreamBufAccessor::getProductionAvailable() const
00576 {
00577     return pProducer ? (pBufEnd - pProducer) : 0;
00578 }
00579 
00580 inline ExecStreamBufState ExecStreamBufAccessor::getState() const
00581 {
00582     return state;
00583 }
00584 
00585 inline bool  ExecStreamBufAccessor::hasPendingEOS() const
00586 {
00587     return pendingEOS;
00588 }
00589 
00590 inline ExecStreamBufProvision ExecStreamBufAccessor::getProvision() const
00591 {
00592     return provision;
00593 }
00594 
00595 inline TupleFormat ExecStreamBufAccessor::getTupleFormat() const
00596 {
00597     return tupleFormat;
00598 }
00599 
00600 inline TupleDescriptor const &ExecStreamBufAccessor::getTupleDesc() const
00601 {
00602     return tupleDesc;
00603 }
00604 
00605 inline void ExecStreamBufAccessor::produceData(PBuffer pEnd)
00606 {
00607     assert(isProductionPossible());
00608     assert(pEnd > getProductionStart());
00609     assert(pEnd <= getProductionEnd());
00610     pProducer = pEnd;
00611     state = EXECBUF_NONEMPTY;
00612 }
00613 
00614 inline void ExecStreamBufAccessor::consumeData(PConstBuffer pEnd)
00615 {
00616     assert(isConsumptionPossible());
00617     assert(pEnd > getConsumptionStart());
00618     assert(pEnd <= getConsumptionEnd());
00619     pConsumer = const_cast<PBuffer>(pEnd);
00620     if (pConsumer == getConsumptionEnd()) {
00621         if (pendingEOS) {
00622             setEOS();
00623         } else {
00624             state = EXECBUF_EMPTY;
00625         }
00626     } else {
00627         // NOTE jvs 9-Nov-2004:  this is misleading until circular buffering
00628         // gets implemented, but it isn't incorrect either
00629         state = EXECBUF_NONEMPTY;
00630     }
00631 }
00632 
00633 inline void ExecStreamBufAccessor::validateTupleSize(
00634     TupleData const &tupleData)
00635 {
00636     if (cbBuffer == 0) {
00637         return;                         // no buffer yet
00638     }
00639     if (!tupleProductionAccessor.isBufferSufficient(tupleData,  cbBuffer)) {
00640         uint cbTuple = tupleProductionAccessor.getByteCount(tupleData);
00641         throw TupleOverflowExcn(tupleDesc, tupleData, cbTuple, cbBuffer);
00642     }
00643 }
00644 
00645 inline bool ExecStreamBufAccessor::produceTuple(TupleData const &tupleData)
00646 {
00647     assert(getState() != EXECBUF_EOS);
00648     assert(!pendingEOS);
00649 
00650     if (tupleProductionAccessor.isBufferSufficient(
00651             tupleData, getProductionAvailable()))
00652     {
00653         tupleProductionAccessor.marshal(tupleData, getProductionStart());
00654         produceData(
00655             getProductionStart()
00656             + tupleProductionAccessor.getCurrentByteCount());
00657         return true;
00658     } else {
00659         validateTupleSize(tupleData);
00660         if (getState() == EXECBUF_NONEMPTY) {
00661             requestConsumption();
00662         }
00663         return false;
00664     }
00665 }
00666 
00667 inline TupleAccessor &ExecStreamBufAccessor::accessConsumptionTuple()
00668 {
00669     assert(isConsumptionPossible());
00670     assert(!tupleConsumptionAccessor.getCurrentTupleBuf());
00671 
00672     tupleConsumptionAccessor.setCurrentTupleBuf(getConsumptionStart());
00673     return tupleConsumptionAccessor;
00674 }
00675 
00676 inline void ExecStreamBufAccessor::unmarshalTuple(
00677     TupleData &tupleData, uint iFirstDatum)
00678 {
00679     accessConsumptionTuple();
00680     tupleConsumptionAccessor.unmarshal(tupleData, iFirstDatum);
00681 }
00682 
00683 inline void ExecStreamBufAccessor::consumeTuple()
00684 {
00685     assert(tupleConsumptionAccessor.getCurrentTupleBuf());
00686 
00687     consumeData(
00688         getConsumptionStart() + tupleConsumptionAccessor.getCurrentByteCount());
00689     tupleConsumptionAccessor.resetCurrentTupleBuf();
00690 }
00691 
00692 inline bool ExecStreamBufAccessor::isTupleConsumptionPending() const
00693 {
00694     if (tupleConsumptionAccessor.getCurrentTupleBuf()) {
00695         return true;
00696     } else {
00697         return false;
00698     }
00699 }
00700 
00701 inline TupleAccessor &ExecStreamBufAccessor::getConsumptionTupleAccessor()
00702 {
00703     return tupleConsumptionAccessor;
00704 }
00705 
00706 inline TupleAccessor &ExecStreamBufAccessor::getScratchTupleAccessor()
00707 {
00708     // this can be used for scratch purposes since we don't need its state
00709     // across calls to produceTuple
00710     return tupleProductionAccessor;
00711 }
00712 
00713 inline bool ExecStreamBufAccessor::demandData()
00714 {
00715     if (state == EXECBUF_EOS) {
00716         return false;
00717     } else if (isConsumptionPossible()) {
00718         return true;
00719     } else {
00720         requestProduction();
00721         return false;
00722     }
00723 }
00724 
00725 inline void ExecStreamBufAccessor::bindProjection(
00726     TupleProjection const &inputProj)
00727 {
00728     tupleProjectionAccessor.bind(tupleConsumptionAccessor, inputProj);
00729 }
00730 
00731 inline void ExecStreamBufAccessor::unmarshalProjectedTuple(
00732     TupleData &projTupleData)
00733 {
00734     accessConsumptionTuple();
00735     tupleProjectionAccessor.unmarshal(projTupleData);
00736 }
00737 
00738 FENNEL_END_NAMESPACE
00739 
00740 #endif
00741 
00742 // End ExecStreamBufAccessor.h

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1