CopyExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/CopyExecStream.cpp#11 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/CopyExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 
00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/CopyExecStream.cpp#11 $");
00029 
00030 void CopyExecStream::prepare(CopyExecStreamParams const &params)
00031 {
00032     ConduitExecStream::prepare(params);
00033 }
00034 
00035 ExecStreamResult CopyExecStream::execute(ExecStreamQuantum const &quantum)
00036 {
00037     ExecStreamResult rc = precheckConduitBuffers();
00038     if (rc != EXECRC_YIELD) {
00039         return rc;
00040     }
00041 
00042     uint cbAvailableIn = pInAccessor->getConsumptionAvailable();
00043     uint cbAvailableOut = pOutAccessor->getProductionAvailable();
00044 
00045     PConstBuffer pSrc = pInAccessor->getConsumptionStart();
00046     PBuffer pDst = pOutAccessor->getProductionStart();
00047 
00048     if (cbAvailableOut < cbAvailableIn) {
00049         // In a non-DFS scheduler, the first call to this XO may be made before
00050         // the first call to its consumer. If so, the consumer may not have
00051         // allocated its buffer; punt until it has.
00052         if (cbAvailableOut == 0 && pOutAccessor->getState() == EXECBUF_EMPTY) {
00053             return EXECRC_BUF_OVERFLOW;
00054         }
00055         // oops, impedance mismatch:  have to figure out how many
00056         // complete tuples we can safely copy without overflow
00057         cbAvailableIn =
00058             pInAccessor->getConsumptionAvailableBounded(cbAvailableOut);
00059         permAssert(cbAvailableIn > 0);
00060     } else {
00061         rc = EXECRC_BUF_UNDERFLOW;
00062     }
00063 
00064     memcpy(pDst,pSrc,cbAvailableIn);
00065     pInAccessor->consumeData(pSrc + cbAvailableIn);
00066     pOutAccessor->produceData(pDst + cbAvailableIn);
00067 
00068     // we can't use whatever's left in output buffer, so tell consumer
00069     // to give us a fresh one next time
00070     pOutAccessor->requestConsumption();
00071 
00072     if ((rc == EXECRC_BUF_UNDERFLOW)
00073         && (pInAccessor->getState() != EXECBUF_EOS))
00074     {
00075         pInAccessor->requestProduction();
00076     }
00077 
00078     return EXECRC_BUF_OVERFLOW;
00079 }
00080 
00081 // TODO jvs 20-Nov-2006:  move this to ExecStreamBufAccessor.cpp
00082 // once it exists
00083 uint ExecStreamBufAccessor::getConsumptionAvailableBounded(uint cbLimit)
00084 {
00085     uint cbAvailable = getConsumptionAvailable();
00086     if (cbAvailable <= cbLimit) {
00087         return cbAvailable;
00088     }
00089 
00090     TupleAccessor const &tupleAccessor = getConsumptionTupleAccessor();
00091     PConstBuffer pSrc = getConsumptionStart();
00092 
00093     PConstBuffer pTuple = pSrc;
00094     PConstBuffer pTupleSafe = pTuple;
00095     PConstBuffer pEnd = pSrc + cbLimit;
00096     for (;;) {
00097         uint cbTuple = tupleAccessor.getBufferByteCount(pTuple);
00098         pTuple += cbTuple;
00099         if (pTuple > pEnd) {
00100             // this tuple would put us over the limit
00101             break;
00102         }
00103         // this tuple will fit
00104         pTupleSafe = pTuple;
00105     }
00106     return pTupleSafe - pSrc;
00107 }
00108 
00109 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/CopyExecStream.cpp#11 $");
00110 
00111 // End CopyExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1