UncollectExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/UncollectExecStream.cpp#2 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2004-2009 SQLstream, Inc.
00006 // Copyright (C) 2009-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/UncollectExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028 #include "fennel/tuple/TuplePrinter.h"
00029 
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/UncollectExecStream.cpp#2 $");
00032 
00033 void UncollectExecStream::prepare(UncollectExecStreamParams const &params)
00034 {
00035     ConduitExecStream::prepare(params);
00036 
00037     FENNEL_TRACE(
00038         TRACE_FINER,
00039         "uncollect xo input TupleDescriptor = "
00040         << pInAccessor->getTupleDesc());
00041 
00042     FENNEL_TRACE(
00043         TRACE_FINER,
00044         "uncollect xo output TupleDescriptor = "
00045         << pOutAccessor->getTupleDesc());
00046 
00047     StandardTypeDescriptorOrdinal ordinal =
00048         StandardTypeDescriptorOrdinal(
00049         pInAccessor->getTupleDesc()[0].pTypeDescriptor->getOrdinal());
00050     assert(ordinal == STANDARD_TYPE_VARBINARY);
00051     assert(1 == pInAccessor->getTupleDesc().size());
00052 
00053     inputTupleData.compute(pInAccessor->getTupleDesc());
00054     outputTupleData.compute(pOutAccessor->getTupleDesc());
00055 }
00056 
00057 
00058 void UncollectExecStream::open(bool restart)
00059 {
00060     ConduitExecStream::open(restart);
00061     bytesWritten = 0;
00062 }
00063 
00064 ExecStreamResult UncollectExecStream::execute(ExecStreamQuantum const &quantum)
00065 {
00066     ExecStreamResult rc = precheckConduitBuffers();
00067     if (EXECRC_YIELD != rc) {
00068         return rc;
00069     }
00070 
00071     if (!pInAccessor->demandData()) {
00072         return EXECRC_BUF_UNDERFLOW;
00073     }
00074 
00075     pInAccessor->unmarshalTuple(inputTupleData);
00076 
00077 #if 0
00078     std::cout<<"input tuple descriptor" << pInAccessor->getTupleDesc()<<std::endl;
00079     std::cout << "input tuple = ";
00080     TupleDescriptor statusDesc = pInAccessor->getTupleDesc();
00081     TuplePrinter tuplePrinter;
00082     tuplePrinter.print(std::cout, statusDesc, inputTupleData);
00083     std::cout << std::endl;
00084 #endif
00085 
00086     TupleAccessor& outTa = pOutAccessor->getScratchTupleAccessor();
00087     while (bytesWritten < inputTupleData[0].cbData) {
00088         // write one item in the input array to the output buffer
00089         outTa.setCurrentTupleBuf(inputTupleData[0].pData + bytesWritten);
00090         outTa.unmarshal(outputTupleData);
00091 #if 0
00092     std::cout << "unmarshalling ouput tuple= ";
00093     TupleDescriptor statusDesc = pOutAccessor->getTupleDesc();
00094     TuplePrinter tuplePrinter;
00095     tuplePrinter.print(std::cout, statusDesc, outputTupleData);
00096     std::cout << std::endl;
00097 #endif
00098 
00099         if (!pOutAccessor->produceTuple(outputTupleData)) {
00100             return EXECRC_BUF_OVERFLOW;
00101         }
00102         bytesWritten += outTa.getCurrentByteCount();
00103     }
00104 
00105     assert(pInAccessor->isTupleConsumptionPending());
00106     assert(bytesWritten == inputTupleData[0].cbData);
00107     pInAccessor->consumeTuple();
00108 
00109     return EXECRC_QUANTUM_EXPIRED;
00110 }
00111 
00112 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/UncollectExecStream.cpp#2 $");
00113 
00114 // End UncollectExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1