SegBufferWriterExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/SegBufferWriterExecStream.cpp#2 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/SegBufferWriterExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/exec/ExecStreamGraphImpl.h"
00028 #include "fennel/exec/DynamicParam.h"
00029 #include "fennel/exec/SegBufferWriter.h"
00030 #include "fennel/tuple/StandardTypeDescriptor.h"
00031 
00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferWriterExecStream.cpp#2 $");
00033 
00034 void SegBufferWriterExecStream::prepare(
00035     SegBufferWriterExecStreamParams const &params)
00036 {
00037     DiffluenceExecStream::prepare(params);
00038     bufferSegmentAccessor = params.scratchAccessor;
00039     readerRefCountParamId = params.readerRefCountParamId;
00040     paramCreated = false;
00041 
00042     for (uint i = 0; i < outAccessors.size(); i++) {
00043         assert(outAccessors[i]->getTupleDesc().size() == 1);
00044     }
00045     outputTuple.compute(outAccessors[0]->getTupleDesc());
00046     outputTuple[0].pData = (PConstBuffer) &firstBufferPageId;
00047     outputBufSize =
00048         outAccessors[0]->getScratchTupleAccessor().getMaxByteCount();
00049     outputWritten.resize(outAccessors.size());
00050 }
00051 
00052 void SegBufferWriterExecStream::getResourceRequirements(
00053     ExecStreamResourceQuantity &minQuantity,
00054     ExecStreamResourceQuantity &optQuantity)
00055 {
00056     DiffluenceExecStream::getResourceRequirements(minQuantity, optQuantity);
00057 
00058     // set aside 1 page for I/O
00059     minQuantity.nCachePages += 1;
00060     optQuantity = minQuantity;
00061 }
00062 
00063 void SegBufferWriterExecStream::open(bool restart)
00064 {
00065     assert(pInAccessor);
00066     assert(pInAccessor->getProvision() == BUFPROV_CONSUMER);
00067 
00068     std::fill(outputWritten.begin(), outputWritten.end(), false);
00069     nOutputsWritten = 0;
00070 
00071     if (!restart) {
00072         StandardTypeDescriptorFactory stdTypeFactory;
00073         TupleAttributeDescriptor attrDesc =
00074             TupleAttributeDescriptor(
00075                 stdTypeFactory.newDataType(STANDARD_TYPE_UINT_64));
00076         pDynamicParamManager->createCounterParam(readerRefCountParamId);
00077         paramCreated = true;
00078         outputTupleBuffer.reset(new FixedBuffer[outputBufSize]);
00079         firstBufferPageId = NULL_PAGE_ID;
00080     }
00081     DiffluenceExecStream::open(restart);
00082 }
00083 
00084 void SegBufferWriterExecStream::closeImpl()
00085 {
00086     assert(readReaderRefCount() == 0);
00087     paramCreated = false;
00088     pSegBufferWriter.reset();
00089     outputTupleBuffer.reset();
00090     DiffluenceExecStream::closeImpl();
00091 }
00092 
00093 bool SegBufferWriterExecStream::canEarlyClose()
00094 {
00095     return (readReaderRefCount() == 0);
00096 }
00097 
00098 int64_t SegBufferWriterExecStream::readReaderRefCount()
00099 {
00100     if (!paramCreated) {
00101         // If the stream was never opened, then the parameter will not have
00102         // been created.
00103         return 0;
00104     }
00105 
00106     int64_t refCount;
00107     TupleDatum refCountDatum;
00108     refCountDatum.pData = (PConstBuffer) &refCount;
00109     refCountDatum.cbData = 8;
00110     pDynamicParamManager->readParam(readerRefCountParamId, refCountDatum);
00111     return refCount;
00112 }
00113 
00114 ExecStreamResult SegBufferWriterExecStream::execute(ExecStreamQuantum const &)
00115 {
00116     if (nOutputsWritten == outAccessors.size()) {
00117         for (uint i = 0; i < outAccessors.size(); i++) {
00118             outAccessors[i]->markEOS();
00119         }
00120         return EXECRC_EOS;
00121     }
00122 
00123     // Buffer the input
00124     if (firstBufferPageId == NULL_PAGE_ID) {
00125         if (!pSegBufferWriter) {
00126             pSegBufferWriter =
00127                 SegBufferWriter::newSegBufferWriter(
00128                     pInAccessor,
00129                     bufferSegmentAccessor,
00130                     true);
00131         }
00132         ExecStreamResult rc = pSegBufferWriter->write();
00133         if (rc != EXECRC_EOS) {
00134             return rc;
00135         }
00136         // Close the upstream producers
00137         ExecStreamGraphImpl &graphImpl =
00138             dynamic_cast<ExecStreamGraphImpl&>(getGraph());
00139         graphImpl.closeProducers(getStreamId());
00140         firstBufferPageId = pSegBufferWriter->getFirstPageId();
00141     }
00142 
00143     // Once the input has been buffered, then pass along the first buffer
00144     // pageId to only those consumers that have explicitly requested data
00145     bool newOutput = false;
00146     for (uint i = 0; i < outAccessors.size(); i++) {
00147         switch (outAccessors[i]->getState()) {
00148         case EXECBUF_NONEMPTY:
00149         case EXECBUF_OVERFLOW:
00150         case EXECBUF_EMPTY:
00151         case EXECBUF_EOS:
00152             break;
00153 
00154         case EXECBUF_UNDERFLOW:
00155             // Underflow means the consumer has explicitly requested data
00156             {
00157                 assert(!outputWritten[i]);
00158                 TupleAccessor *outputTupleAccessor =
00159                     &outAccessors[i]->getScratchTupleAccessor();
00160                 outputTupleAccessor->marshal(
00161                     outputTuple,
00162                     outputTupleBuffer.get());
00163                 outAccessors[i]->provideBufferForConsumption(
00164                     outputTupleBuffer.get(),
00165                     outputTupleBuffer.get() + outputBufSize);
00166                 outputWritten[i] = true;
00167                 nOutputsWritten++;
00168                 newOutput = true;
00169                 break;
00170             }
00171 
00172         default:
00173             permAssert(false);
00174         }
00175     }
00176 
00177     // Verify that at least one output stream was written
00178     assert(newOutput);
00179     return EXECRC_BUF_OVERFLOW;
00180 }
00181 
00182 ExecStreamBufProvision SegBufferWriterExecStream::getInputBufProvision() const
00183 {
00184     return BUFPROV_CONSUMER;
00185 }
00186 
00187 ExecStreamBufProvision SegBufferWriterExecStream::getOutputBufProvision() const
00188 {
00189     return BUFPROV_PRODUCER;
00190 }
00191 
00192 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SegBufferWriterExecStream.cpp#2 $");
00193 
00194 // End SegBufferWriterExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1