CollectExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/CollectExecStream.cpp#2 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2004-2009 SQLstream, Inc.
00006 // Copyright (C) 2009-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/CollectExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/tuple/TuplePrinter.h"
00028 #include "fennel/tuple/StandardTypeDescriptor.h"
00029 
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/CollectExecStream.cpp#2 $");
00032 
00033 void CollectExecStream::prepare(CollectExecStreamParams const &params)
00034 {
00035     ConduitExecStream::prepare(params);
00036     FENNEL_TRACE(
00037         TRACE_FINER,
00038         "collect xo input TupleDescriptor = "
00039         << pInAccessor->getTupleDesc());
00040 
00041     FENNEL_TRACE(
00042         TRACE_FINER,
00043         "collect xo output TupleDescriptor = "
00044         << pOutAccessor->getTupleDesc());
00045 
00046     StandardTypeDescriptorOrdinal ordinal =
00047         StandardTypeDescriptorOrdinal(
00048         pOutAccessor->getTupleDesc()[0].pTypeDescriptor->getOrdinal());
00049     assert(ordinal == STANDARD_TYPE_VARBINARY);
00050     assert(1 == pOutAccessor->getTupleDesc().size());
00051 }
00052 
00053 void CollectExecStream::open(bool restart)
00054 {
00055     ConduitExecStream::open(restart);
00056     outputTupleData.compute(pOutAccessor->getTupleDesc());
00057     inputTupleData.compute(pInAccessor->getTupleDesc());
00058 
00059     uint cbOutMaxsize =
00060         pOutAccessor->getConsumptionTupleAccessor().getMaxByteCount();
00061     pOutputBuffer.reset(new FixedBuffer[cbOutMaxsize]);
00062     bytesWritten = 0;
00063     alreadyWrittenToOutput = false;
00064 }
00065 
00066 void CollectExecStream::close()
00067 {
00068     pOutputBuffer.reset();
00069     ConduitExecStream::closeImpl();
00070 }
00071 
00072 ExecStreamResult CollectExecStream::execute(ExecStreamQuantum const &quantum)
00073 {
00074     if (!alreadyWrittenToOutput && (EXECBUF_EOS == pInAccessor->getState())) {
00075         outputTupleData[0].pData = pOutputBuffer.get();
00076         outputTupleData[0].cbData = bytesWritten;
00077         if (!pOutAccessor->produceTuple(outputTupleData)) {
00078             return EXECRC_BUF_OVERFLOW;
00079         }
00080         alreadyWrittenToOutput = true;
00081     }
00082 
00083     ExecStreamResult rc = precheckConduitBuffers();
00084     if (EXECRC_YIELD != rc) {
00085         return rc;
00086     }
00087 
00088     for (uint nTuples = 0; nTuples < quantum.nTuplesMax; ++nTuples) {
00089         assert(!pInAccessor->isTupleConsumptionPending());
00090         if (!pInAccessor->demandData()) {
00091             return EXECRC_BUF_UNDERFLOW;
00092         }
00093 
00094         pInAccessor->unmarshalTuple(inputTupleData);
00095 
00096 #if 0
00097     TupleDescriptor statusDesc = pInAccessor->getTupleDesc();
00098     TuplePrinter tuplePrinter;
00099     tuplePrinter.print(std::cout, statusDesc, inputTupleData);
00100     std::cout << std::endl;
00101 #endif
00102 
00103         // write one input tuple to the staging output buffer
00104         memcpy(
00105             pOutputBuffer.get() + bytesWritten,
00106             pInAccessor->getConsumptionStart(),
00107             pInAccessor->getConsumptionTupleAccessor().getCurrentByteCount());
00108         bytesWritten +=
00109             pInAccessor->getConsumptionTupleAccessor().getCurrentByteCount();
00110         pInAccessor->consumeTuple();
00111     }
00112     return EXECRC_QUANTUM_EXPIRED;
00113 }
00114 
00115 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/CollectExecStream.cpp#2 $");
00116 
00117 // End CollectExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1