00001 /* 00002 // $Id: //open/dev/fennel/exec/CopyExecStream.cpp#11 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/CopyExecStream.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/CopyExecStream.cpp#11 $"); 00029 00030 void CopyExecStream::prepare(CopyExecStreamParams const ¶ms) 00031 { 00032 ConduitExecStream::prepare(params); 00033 } 00034 00035 ExecStreamResult CopyExecStream::execute(ExecStreamQuantum const &quantum) 00036 { 00037 ExecStreamResult rc = precheckConduitBuffers(); 00038 if (rc != EXECRC_YIELD) { 00039 return rc; 00040 } 00041 00042 uint cbAvailableIn = pInAccessor->getConsumptionAvailable(); 00043 uint cbAvailableOut = pOutAccessor->getProductionAvailable(); 00044 00045 PConstBuffer pSrc = pInAccessor->getConsumptionStart(); 00046 PBuffer pDst = pOutAccessor->getProductionStart(); 00047 00048 if (cbAvailableOut < cbAvailableIn) { 00049 // In a non-DFS scheduler, the first call to this XO may be made before 00050 // the first call to its consumer. If so, the consumer may not have 00051 // allocated its buffer; punt until it has. 00052 if (cbAvailableOut == 0 && pOutAccessor->getState() == EXECBUF_EMPTY) { 00053 return EXECRC_BUF_OVERFLOW; 00054 } 00055 // oops, impedance mismatch: have to figure out how many 00056 // complete tuples we can safely copy without overflow 00057 cbAvailableIn = 00058 pInAccessor->getConsumptionAvailableBounded(cbAvailableOut); 00059 permAssert(cbAvailableIn > 0); 00060 } else { 00061 rc = EXECRC_BUF_UNDERFLOW; 00062 } 00063 00064 memcpy(pDst,pSrc,cbAvailableIn); 00065 pInAccessor->consumeData(pSrc + cbAvailableIn); 00066 pOutAccessor->produceData(pDst + cbAvailableIn); 00067 00068 // we can't use whatever's left in output buffer, so tell consumer 00069 // to give us a fresh one next time 00070 pOutAccessor->requestConsumption(); 00071 00072 if ((rc == EXECRC_BUF_UNDERFLOW) 00073 && (pInAccessor->getState() != EXECBUF_EOS)) 00074 { 00075 pInAccessor->requestProduction(); 00076 } 00077 00078 return EXECRC_BUF_OVERFLOW; 00079 } 00080 00081 // TODO jvs 20-Nov-2006: move this to ExecStreamBufAccessor.cpp 00082 // once it exists 00083 uint ExecStreamBufAccessor::getConsumptionAvailableBounded(uint cbLimit) 00084 { 00085 uint cbAvailable = getConsumptionAvailable(); 00086 if (cbAvailable <= cbLimit) { 00087 return cbAvailable; 00088 } 00089 00090 TupleAccessor const &tupleAccessor = getConsumptionTupleAccessor(); 00091 PConstBuffer pSrc = getConsumptionStart(); 00092 00093 PConstBuffer pTuple = pSrc; 00094 PConstBuffer pTupleSafe = pTuple; 00095 PConstBuffer pEnd = pSrc + cbLimit; 00096 for (;;) { 00097 uint cbTuple = tupleAccessor.getBufferByteCount(pTuple); 00098 pTuple += cbTuple; 00099 if (pTuple > pEnd) { 00100 // this tuple would put us over the limit 00101 break; 00102 } 00103 // this tuple will fit 00104 pTupleSafe = pTuple; 00105 } 00106 return pTupleSafe - pSrc; 00107 } 00108 00109 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/CopyExecStream.cpp#11 $"); 00110 00111 // End CopyExecStream.cpp