00001 /* 00002 // $Id: //open/dev/fennel/exec/SegBufferWriterExecStream.cpp#2 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 2004-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/SegBufferWriterExecStream.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 #include "fennel/exec/ExecStreamGraphImpl.h" 00028 #include "fennel/exec/DynamicParam.h" 00029 #include "fennel/exec/SegBufferWriter.h" 00030 #include "fennel/tuple/StandardTypeDescriptor.h" 00031 00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferWriterExecStream.cpp#2 $"); 00033 00034 void SegBufferWriterExecStream::prepare( 00035 SegBufferWriterExecStreamParams const ¶ms) 00036 { 00037 DiffluenceExecStream::prepare(params); 00038 bufferSegmentAccessor = params.scratchAccessor; 00039 readerRefCountParamId = params.readerRefCountParamId; 00040 paramCreated = false; 00041 00042 for (uint i = 0; i < outAccessors.size(); i++) { 00043 assert(outAccessors[i]->getTupleDesc().size() == 1); 00044 } 00045 outputTuple.compute(outAccessors[0]->getTupleDesc()); 00046 outputTuple[0].pData = (PConstBuffer) &firstBufferPageId; 00047 outputBufSize = 00048 outAccessors[0]->getScratchTupleAccessor().getMaxByteCount(); 00049 outputWritten.resize(outAccessors.size()); 00050 } 00051 00052 void SegBufferWriterExecStream::getResourceRequirements( 00053 ExecStreamResourceQuantity &minQuantity, 00054 ExecStreamResourceQuantity &optQuantity) 00055 { 00056 DiffluenceExecStream::getResourceRequirements(minQuantity, optQuantity); 00057 00058 // set aside 1 page for I/O 00059 minQuantity.nCachePages += 1; 00060 optQuantity = minQuantity; 00061 } 00062 00063 void SegBufferWriterExecStream::open(bool restart) 00064 { 00065 assert(pInAccessor); 00066 assert(pInAccessor->getProvision() == BUFPROV_CONSUMER); 00067 00068 std::fill(outputWritten.begin(), outputWritten.end(), false); 00069 nOutputsWritten = 0; 00070 00071 if (!restart) { 00072 StandardTypeDescriptorFactory stdTypeFactory; 00073 TupleAttributeDescriptor attrDesc = 00074 TupleAttributeDescriptor( 00075 stdTypeFactory.newDataType(STANDARD_TYPE_UINT_64)); 00076 pDynamicParamManager->createCounterParam(readerRefCountParamId); 00077 paramCreated = true; 00078 outputTupleBuffer.reset(new FixedBuffer[outputBufSize]); 00079 firstBufferPageId = NULL_PAGE_ID; 00080 } 00081 DiffluenceExecStream::open(restart); 00082 } 00083 00084 void SegBufferWriterExecStream::closeImpl() 00085 { 00086 assert(readReaderRefCount() == 0); 00087 paramCreated = false; 00088 pSegBufferWriter.reset(); 00089 outputTupleBuffer.reset(); 00090 DiffluenceExecStream::closeImpl(); 00091 } 00092 00093 bool SegBufferWriterExecStream::canEarlyClose() 00094 { 00095 return (readReaderRefCount() == 0); 00096 } 00097 00098 int64_t SegBufferWriterExecStream::readReaderRefCount() 00099 { 00100 if (!paramCreated) { 00101 // If the stream was never opened, then the parameter will not have 00102 // been created. 00103 return 0; 00104 } 00105 00106 int64_t refCount; 00107 TupleDatum refCountDatum; 00108 refCountDatum.pData = (PConstBuffer) &refCount; 00109 refCountDatum.cbData = 8; 00110 pDynamicParamManager->readParam(readerRefCountParamId, refCountDatum); 00111 return refCount; 00112 } 00113 00114 ExecStreamResult SegBufferWriterExecStream::execute(ExecStreamQuantum const &) 00115 { 00116 if (nOutputsWritten == outAccessors.size()) { 00117 for (uint i = 0; i < outAccessors.size(); i++) { 00118 outAccessors[i]->markEOS(); 00119 } 00120 return EXECRC_EOS; 00121 } 00122 00123 // Buffer the input 00124 if (firstBufferPageId == NULL_PAGE_ID) { 00125 if (!pSegBufferWriter) { 00126 pSegBufferWriter = 00127 SegBufferWriter::newSegBufferWriter( 00128 pInAccessor, 00129 bufferSegmentAccessor, 00130 true); 00131 } 00132 ExecStreamResult rc = pSegBufferWriter->write(); 00133 if (rc != EXECRC_EOS) { 00134 return rc; 00135 } 00136 // Close the upstream producers 00137 ExecStreamGraphImpl &graphImpl = 00138 dynamic_cast<ExecStreamGraphImpl&>(getGraph()); 00139 graphImpl.closeProducers(getStreamId()); 00140 firstBufferPageId = pSegBufferWriter->getFirstPageId(); 00141 } 00142 00143 // Once the input has been buffered, then pass along the first buffer 00144 // pageId to only those consumers that have explicitly requested data 00145 bool newOutput = false; 00146 for (uint i = 0; i < outAccessors.size(); i++) { 00147 switch (outAccessors[i]->getState()) { 00148 case EXECBUF_NONEMPTY: 00149 case EXECBUF_OVERFLOW: 00150 case EXECBUF_EMPTY: 00151 case EXECBUF_EOS: 00152 break; 00153 00154 case EXECBUF_UNDERFLOW: 00155 // Underflow means the consumer has explicitly requested data 00156 { 00157 assert(!outputWritten[i]); 00158 TupleAccessor *outputTupleAccessor = 00159 &outAccessors[i]->getScratchTupleAccessor(); 00160 outputTupleAccessor->marshal( 00161 outputTuple, 00162 outputTupleBuffer.get()); 00163 outAccessors[i]->provideBufferForConsumption( 00164 outputTupleBuffer.get(), 00165 outputTupleBuffer.get() + outputBufSize); 00166 outputWritten[i] = true; 00167 nOutputsWritten++; 00168 newOutput = true; 00169 break; 00170 } 00171 00172 default: 00173 permAssert(false); 00174 } 00175 } 00176 00177 // Verify that at least one output stream was written 00178 assert(newOutput); 00179 return EXECRC_BUF_OVERFLOW; 00180 } 00181 00182 ExecStreamBufProvision SegBufferWriterExecStream::getInputBufProvision() const 00183 { 00184 return BUFPROV_CONSUMER; 00185 } 00186 00187 ExecStreamBufProvision SegBufferWriterExecStream::getOutputBufProvision() const 00188 { 00189 return BUFPROV_PRODUCER; 00190 } 00191 00192 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferWriterExecStream.cpp#2 $"); 00193 00194 // End SegBufferWriterExecStream.cpp