00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef Fennel_ExecStreamBufAccessor_Included
00025 #define Fennel_ExecStreamBufAccessor_Included
00026
00027 #include "fennel/exec/ExecStreamDefs.h"
00028 #include "fennel/tuple/TupleDescriptor.h"
00029 #include "fennel/tuple/TupleFormat.h"
00030 #include "fennel/tuple/TupleAccessor.h"
00031 #include "fennel/tuple/TupleProjectionAccessor.h"
00032 #include "fennel/tuple/TupleOverflowExcn.h"
00033
00034 #include <boost/utility.hpp>
00035
00036 FENNEL_BEGIN_NAMESPACE
00037
00045 class FENNEL_EXEC_EXPORT ExecStreamBufAccessor
00046 : public boost::noncopyable
00047 {
00048 PBuffer pBufStart;
00049
00050 PBuffer pBufEnd;
00051
00052 PBuffer pProducer;
00053
00054 PBuffer pConsumer;
00055
00056 ExecStreamBufProvision provision;
00057
00058 ExecStreamBufState state;
00059
00060 bool pendingEOS;
00061
00062 TupleDescriptor tupleDesc;
00063
00064 TupleFormat tupleFormat;
00065
00066 TupleAccessor tupleProductionAccessor;
00067
00068 TupleAccessor tupleConsumptionAccessor;
00069
00070 TupleProjectionAccessor tupleProjectionAccessor;
00071
00072 uint cbBuffer;
00073
00075 inline void setEOS();
00076
00077 public:
00078 inline explicit ExecStreamBufAccessor();
00079
00080 virtual ~ExecStreamBufAccessor()
00081 {
00082 }
00083
00089 inline void setProvision(ExecStreamBufProvision provision);
00090
00098 inline void setTupleShape(
00099 TupleDescriptor const &tupleDesc,
00100 TupleFormat tupleFormat = TUPLE_FORMAT_STANDARD);
00101
00105 inline void clear();
00106
00117 inline void provideBufferForProduction(
00118 PBuffer pStart,
00119 PBuffer pEnd,
00120 bool reusable);
00121
00130 inline void provideBufferForConsumption(
00131 PConstBuffer pStart,
00132 PConstBuffer pEnd);
00133
00138 inline void requestProduction();
00139
00144 inline void requestConsumption();
00145
00150 inline bool isProductionPossible() const;
00151
00156 inline bool isConsumptionPossible() const;
00157
00164 inline bool demandData();
00165
00171 inline void markEOS();
00172
00188 inline void produceData(PBuffer pEnd);
00189
00205 inline void consumeData(PConstBuffer pEnd);
00206
00212 inline PConstBuffer getConsumptionStart() const;
00213
00219 inline PConstBuffer getConsumptionEnd() const;
00220
00227 inline uint getConsumptionAvailable() const;
00228
00239 uint getConsumptionAvailableBounded(uint cbLimit);
00240
00246 inline uint getConsumptionTuplesAvailable();
00247
00256 inline PConstBuffer spanWholeTuples(PConstBuffer start, uint size);
00257
00265 inline PBuffer getProductionStart() const;
00266
00274 inline PBuffer getProductionEnd() const;
00275
00282 inline uint getProductionAvailable() const;
00283
00289 inline ExecStreamBufState getState() const;
00290
00295 inline bool hasPendingEOS() const;
00296
00302 inline ExecStreamBufProvision getProvision() const;
00303
00309 inline TupleDescriptor const &getTupleDesc() const;
00310
00316 inline TupleFormat getTupleFormat() const;
00317
00324 inline void validateTupleSize(TupleData const &tupleData);
00325
00335 inline bool produceTuple(TupleData const &tupleData);
00336
00344 inline TupleAccessor &accessConsumptionTuple();
00345
00355 inline void unmarshalTuple(TupleData &tupleData, uint iFirstDatum = 0);
00356
00361 inline void consumeTuple();
00362
00367 inline bool isTupleConsumptionPending() const;
00368
00372 inline TupleAccessor &getConsumptionTupleAccessor();
00373
00378 inline TupleAccessor &getScratchTupleAccessor();
00379
00385 inline void bindProjection(TupleProjection const &inputProj);
00386
00392 inline void unmarshalProjectedTuple(TupleData &projTupleData);
00393 };
00394
00395 inline ExecStreamBufAccessor::ExecStreamBufAccessor()
00396 {
00397 clear();
00398 provision = BUFPROV_NONE;
00399 state = EXECBUF_EOS;
00400 tupleFormat = TUPLE_FORMAT_STANDARD;
00401 cbBuffer = 0;
00402 }
00403
00404 inline bool ExecStreamBufAccessor::isProductionPossible() const
00405 {
00406 return !pendingEOS && (state != EXECBUF_EOS) && (state != EXECBUF_OVERFLOW);
00407 }
00408
00409 inline bool ExecStreamBufAccessor::isConsumptionPossible() const
00410 {
00411 return (state == EXECBUF_OVERFLOW) || (state == EXECBUF_NONEMPTY);
00412 }
00413
00414 inline void ExecStreamBufAccessor::setProvision(
00415 ExecStreamBufProvision provisionInit)
00416 {
00417 assert(provision == BUFPROV_NONE);
00418 provision = provisionInit;
00419 }
00420
00421 inline void ExecStreamBufAccessor::setTupleShape(
00422 TupleDescriptor const &tupleDescInit,
00423 TupleFormat tupleFormatInit)
00424 {
00425 tupleDesc = tupleDescInit;
00426 tupleFormat = tupleFormatInit;
00427 tupleProductionAccessor.compute(tupleDesc, tupleFormat);
00428 tupleConsumptionAccessor.compute(tupleDesc, tupleFormat);
00429 }
00430
00431 inline void ExecStreamBufAccessor::clear()
00432 {
00433 pBufStart = NULL;
00434 pBufEnd = NULL;
00435 pProducer = NULL;
00436 pConsumer = NULL;
00437 cbBuffer = 0;
00438 state = EXECBUF_EMPTY;
00439 pendingEOS = false;
00440 tupleProductionAccessor.resetCurrentTupleBuf();
00441 tupleConsumptionAccessor.resetCurrentTupleBuf();
00442 }
00443
00444 inline void ExecStreamBufAccessor::provideBufferForProduction(
00445 PBuffer pStart,
00446 PBuffer pEnd,
00447 bool reusable)
00448 {
00449 assert((state == EXECBUF_UNDERFLOW) || (state == EXECBUF_EMPTY));
00450 assert(provision == BUFPROV_CONSUMER);
00451 pBufStart = pStart;
00452 pBufEnd = pEnd;
00453 pProducer = pStart;
00454 pConsumer = pStart;
00455 cbBuffer = pEnd - pStart;
00456 state = EXECBUF_UNDERFLOW;
00457
00458 if (!reusable) {
00459
00460 pBufStart = NULL;
00461 }
00462 }
00463
00464 inline void ExecStreamBufAccessor::provideBufferForConsumption(
00465 PConstBuffer pStart,
00466 PConstBuffer pEnd)
00467 {
00468 assert((state == EXECBUF_UNDERFLOW) || (state == EXECBUF_EMPTY));
00469 assert(provision == BUFPROV_PRODUCER);
00470 pBufStart = const_cast<PBuffer>(pStart);
00471 pBufEnd = const_cast<PBuffer>(pEnd);
00472 pConsumer = pBufStart;
00473 pProducer = pBufEnd;
00474 state = EXECBUF_OVERFLOW;
00475
00476
00477 pBufStart = NULL;
00478 }
00479
00480 inline void ExecStreamBufAccessor::requestProduction()
00481 {
00482 assert((state == EXECBUF_UNDERFLOW) || (state == EXECBUF_EMPTY));
00483 state = EXECBUF_UNDERFLOW;
00484 pProducer = pBufStart;
00485 pConsumer = pBufStart;
00486 }
00487
00488 inline void ExecStreamBufAccessor::requestConsumption()
00489 {
00490 assert((state == EXECBUF_OVERFLOW) || (state == EXECBUF_NONEMPTY));
00491 state = EXECBUF_OVERFLOW;
00492 }
00493
00494 inline void ExecStreamBufAccessor::markEOS()
00495 {
00496 if (isConsumptionPossible()) {
00497 pendingEOS = true;
00498 return;
00499 }
00500 setEOS();
00501 }
00502
00503 inline void ExecStreamBufAccessor::setEOS()
00504 {
00505 assert(pProducer == pConsumer);
00506 clear();
00507 state = EXECBUF_EOS;
00508 }
00509
00510 inline PConstBuffer ExecStreamBufAccessor::getConsumptionStart() const
00511 {
00512 return pConsumer;
00513 }
00514
00515 inline PConstBuffer ExecStreamBufAccessor::getConsumptionEnd() const
00516 {
00517 return pProducer;
00518 }
00519
00520 inline uint ExecStreamBufAccessor::getConsumptionAvailable() const
00521 {
00522 return getConsumptionEnd() - getConsumptionStart();
00523 }
00524
00525 inline uint ExecStreamBufAccessor::getConsumptionTuplesAvailable()
00526 {
00527 TupleAccessor& acc = getScratchTupleAccessor();
00528 PConstBuffer p = getConsumptionStart(),
00529 end = getConsumptionEnd();
00530 int count = 0;
00531 while (p < end) {
00532 acc.setCurrentTupleBuf(p);
00533 p += acc.getCurrentByteCount();
00534 ++count;
00535 }
00536 return count;
00537 }
00538
00539 inline PConstBuffer ExecStreamBufAccessor::spanWholeTuples(
00540 PConstBuffer start, uint size)
00541 {
00542 TupleAccessor& acc = getScratchTupleAccessor();
00543 assert(size > 0);
00544 PConstBuffer p = start;
00545 PConstBuffer pend = start + size;
00546 for (int ct = 0; ; ct++) {
00547 assert(p < pend);
00548 acc.setCurrentTupleBuf(p);
00549 PConstBuffer q = p;
00550 p += acc.getCurrentByteCount();
00551 if (p >= pend) {
00552 if (p == pend) {
00553
00554 return p;
00555 } else {
00556
00557 return q;
00558 }
00559 }
00560 }
00561 assert(false);
00562 }
00563
00564
00565 inline PBuffer ExecStreamBufAccessor::getProductionStart() const
00566 {
00567 return pProducer;
00568 }
00569
00570 inline PBuffer ExecStreamBufAccessor::getProductionEnd() const
00571 {
00572 return pBufEnd;
00573 }
00574
00575 inline uint ExecStreamBufAccessor::getProductionAvailable() const
00576 {
00577 return pProducer ? (pBufEnd - pProducer) : 0;
00578 }
00579
00580 inline ExecStreamBufState ExecStreamBufAccessor::getState() const
00581 {
00582 return state;
00583 }
00584
00585 inline bool ExecStreamBufAccessor::hasPendingEOS() const
00586 {
00587 return pendingEOS;
00588 }
00589
00590 inline ExecStreamBufProvision ExecStreamBufAccessor::getProvision() const
00591 {
00592 return provision;
00593 }
00594
00595 inline TupleFormat ExecStreamBufAccessor::getTupleFormat() const
00596 {
00597 return tupleFormat;
00598 }
00599
00600 inline TupleDescriptor const &ExecStreamBufAccessor::getTupleDesc() const
00601 {
00602 return tupleDesc;
00603 }
00604
00605 inline void ExecStreamBufAccessor::produceData(PBuffer pEnd)
00606 {
00607 assert(isProductionPossible());
00608 assert(pEnd > getProductionStart());
00609 assert(pEnd <= getProductionEnd());
00610 pProducer = pEnd;
00611 state = EXECBUF_NONEMPTY;
00612 }
00613
00614 inline void ExecStreamBufAccessor::consumeData(PConstBuffer pEnd)
00615 {
00616 assert(isConsumptionPossible());
00617 assert(pEnd > getConsumptionStart());
00618 assert(pEnd <= getConsumptionEnd());
00619 pConsumer = const_cast<PBuffer>(pEnd);
00620 if (pConsumer == getConsumptionEnd()) {
00621 if (pendingEOS) {
00622 setEOS();
00623 } else {
00624 state = EXECBUF_EMPTY;
00625 }
00626 } else {
00627
00628
00629 state = EXECBUF_NONEMPTY;
00630 }
00631 }
00632
00633 inline void ExecStreamBufAccessor::validateTupleSize(
00634 TupleData const &tupleData)
00635 {
00636 if (cbBuffer == 0) {
00637 return;
00638 }
00639 if (!tupleProductionAccessor.isBufferSufficient(tupleData, cbBuffer)) {
00640 uint cbTuple = tupleProductionAccessor.getByteCount(tupleData);
00641 throw TupleOverflowExcn(tupleDesc, tupleData, cbTuple, cbBuffer);
00642 }
00643 }
00644
00645 inline bool ExecStreamBufAccessor::produceTuple(TupleData const &tupleData)
00646 {
00647 assert(getState() != EXECBUF_EOS);
00648 assert(!pendingEOS);
00649
00650 if (tupleProductionAccessor.isBufferSufficient(
00651 tupleData, getProductionAvailable()))
00652 {
00653 tupleProductionAccessor.marshal(tupleData, getProductionStart());
00654 produceData(
00655 getProductionStart()
00656 + tupleProductionAccessor.getCurrentByteCount());
00657 return true;
00658 } else {
00659 validateTupleSize(tupleData);
00660 if (getState() == EXECBUF_NONEMPTY) {
00661 requestConsumption();
00662 }
00663 return false;
00664 }
00665 }
00666
00667 inline TupleAccessor &ExecStreamBufAccessor::accessConsumptionTuple()
00668 {
00669 assert(isConsumptionPossible());
00670 assert(!tupleConsumptionAccessor.getCurrentTupleBuf());
00671
00672 tupleConsumptionAccessor.setCurrentTupleBuf(getConsumptionStart());
00673 return tupleConsumptionAccessor;
00674 }
00675
00676 inline void ExecStreamBufAccessor::unmarshalTuple(
00677 TupleData &tupleData, uint iFirstDatum)
00678 {
00679 accessConsumptionTuple();
00680 tupleConsumptionAccessor.unmarshal(tupleData, iFirstDatum);
00681 }
00682
00683 inline void ExecStreamBufAccessor::consumeTuple()
00684 {
00685 assert(tupleConsumptionAccessor.getCurrentTupleBuf());
00686
00687 consumeData(
00688 getConsumptionStart() + tupleConsumptionAccessor.getCurrentByteCount());
00689 tupleConsumptionAccessor.resetCurrentTupleBuf();
00690 }
00691
00692 inline bool ExecStreamBufAccessor::isTupleConsumptionPending() const
00693 {
00694 if (tupleConsumptionAccessor.getCurrentTupleBuf()) {
00695 return true;
00696 } else {
00697 return false;
00698 }
00699 }
00700
00701 inline TupleAccessor &ExecStreamBufAccessor::getConsumptionTupleAccessor()
00702 {
00703 return tupleConsumptionAccessor;
00704 }
00705
00706 inline TupleAccessor &ExecStreamBufAccessor::getScratchTupleAccessor()
00707 {
00708
00709
00710 return tupleProductionAccessor;
00711 }
00712
00713 inline bool ExecStreamBufAccessor::demandData()
00714 {
00715 if (state == EXECBUF_EOS) {
00716 return false;
00717 } else if (isConsumptionPossible()) {
00718 return true;
00719 } else {
00720 requestProduction();
00721 return false;
00722 }
00723 }
00724
00725 inline void ExecStreamBufAccessor::bindProjection(
00726 TupleProjection const &inputProj)
00727 {
00728 tupleProjectionAccessor.bind(tupleConsumptionAccessor, inputProj);
00729 }
00730
00731 inline void ExecStreamBufAccessor::unmarshalProjectedTuple(
00732 TupleData &projTupleData)
00733 {
00734 accessConsumptionTuple();
00735 tupleProjectionAccessor.unmarshal(projTupleData);
00736 }
00737
00738 FENNEL_END_NAMESPACE
00739
00740 #endif
00741
00742