CorrelationJoinExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/CorrelationJoinExecStream.cpp#3 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2004-2009 SQLstream, Inc.
00006 // Copyright (C) 2009-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/CorrelationJoinExecStream.h"
00026 #include "fennel/exec/DynamicParam.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 #include "fennel/exec/ExecStreamGraph.h"
00029 #include "fennel/exec/ExecStreamScheduler.h"
00030 #include "fennel/tuple/TuplePrinter.h"
00031 
00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/CorrelationJoinExecStream.cpp#3 $");
00033 
00034 void CorrelationJoinExecStream::prepare(
00035     CorrelationJoinExecStreamParams const &params)
00036 {
00037     assert(inAccessors.size() == 2);
00038 
00039     pLeftBufAccessor = inAccessors[0];
00040     assert(pLeftBufAccessor);
00041 
00042     pRightBufAccessor = inAccessors[1];
00043     assert(pRightBufAccessor);
00044 
00045     TupleDescriptor const &leftDesc = pLeftBufAccessor->getTupleDesc();
00046     TupleDescriptor const &rightDesc = pRightBufAccessor->getTupleDesc();
00047 
00048     TupleDescriptor outputDesc;
00049     outputDesc.insert(outputDesc.end(),leftDesc.begin(),leftDesc.end());
00050     outputDesc.insert(outputDesc.end(),rightDesc.begin(),rightDesc.end());
00051     outputData.compute(outputDesc);
00052     pOutAccessor->setTupleShape(outputDesc);
00053 
00054     nLeftAttributes = leftDesc.size();
00055     correlations.assign(
00056         params.correlations.begin(),
00057         params.correlations.end());
00058     //correlations.resize(correlations.size());
00059     //assert(correlations.size() > 0);
00060     assert(correlations.size() <= nLeftAttributes);
00061 
00062     ConfluenceExecStream::prepare(params);
00063 }
00064 
00065 void CorrelationJoinExecStream::open(bool restart)
00066 {
00067     ConfluenceExecStream::open(restart);
00068 
00069     if (!restart) {
00070         leftRowCount = 0;
00071         for (std::vector<Correlation>::iterator it = correlations.begin();
00072              it != correlations.end(); ++it)
00073         {
00074             pDynamicParamManager->createParam(
00075                 it->dynamicParamId,
00076                 pLeftBufAccessor->getTupleDesc()[it->leftAttributeOrdinal]);
00077 
00078             // Make right-hand child and its descendants (upstream XOs)
00079             // non-runnable. We don't want them to execute until we have
00080             // read a row from the left and called open(restart=true).
00081             const std::vector<ExecStreamId> &readerStreamIds =
00082                 pGraph->getDynamicParamReaders(it->dynamicParamId);
00083             for (std::vector<ExecStreamId>::const_iterator it2 =
00084                      readerStreamIds.begin();
00085                  it2 != readerStreamIds.end(); ++it2)
00086             {
00087                 pGraph->getScheduler()->setRunnable(
00088                     *pGraph->getStream(*it2), false);
00089             }
00090         }
00091     }
00092 }
00093 
00094 void CorrelationJoinExecStream::close()
00095 {
00096     std::vector<Correlation>::iterator it = correlations.begin();
00097     for (/* empty */ ; it != correlations.end(); ++it) {
00098         pDynamicParamManager->deleteParam(it->dynamicParamId);
00099     }
00100     ConfluenceExecStream::closeImpl();
00101 }
00102 
00103 ExecStreamResult CorrelationJoinExecStream::execute(
00104     ExecStreamQuantum const &quantum)
00105 {
00106     // Note: implementation similar to CartesianJoinExecStream.
00107     uint nTuplesProduced = 0;
00108 
00109     for (;;) {
00110         if (!pLeftBufAccessor->isTupleConsumptionPending()) {
00111             if (pLeftBufAccessor->getState() == EXECBUF_EOS) {
00112                 pOutAccessor->markEOS();
00113                 return EXECRC_EOS;
00114             }
00115             if (!pLeftBufAccessor->demandData()) {
00116                 return EXECRC_BUF_UNDERFLOW;
00117             }
00118             pLeftBufAccessor->unmarshalTuple(outputData);
00119             // updating the dynamic param(s) with the new left value(s)
00120             std::vector<Correlation>::iterator it = correlations.begin();
00121             for (/* empty */ ; it != correlations.end(); ++it) {
00122                 pDynamicParamManager->writeParam(
00123                     it->dynamicParamId, outputData[it->leftAttributeOrdinal]);
00124             }
00125 
00126             // restart right input stream
00127             pGraph->getStreamInput(getStreamId(),1)->open(true);
00128 
00129             // make runnable
00130             if (++leftRowCount == 1) {
00131                 for (std::vector<Correlation>::iterator it =
00132                      correlations.begin();
00133                      it != correlations.end(); ++it)
00134                 {
00135                     // Make the right-hand descendant that uses the
00136                     // variable runnable. Note that we made it
00137                     // non-runnable in open so that it didn't read an
00138                     // uninitialized variable.
00139                     const std::vector<ExecStreamId> &readerStreamIds =
00140                         pGraph->getDynamicParamReaders(it->dynamicParamId);
00141                     for (std::vector<ExecStreamId>::const_iterator it2 =
00142                              readerStreamIds.begin();
00143                          it2 != readerStreamIds.end(); ++it2)
00144                     {
00145                         pGraph->getScheduler()->setRunnable(
00146                              *pGraph->getStream(*it2), true);
00147                     }
00148                 }
00149             }
00150         }
00151         for (;;) {
00152             if (!pRightBufAccessor->isTupleConsumptionPending()) {
00153                 if (pRightBufAccessor->getState() == EXECBUF_EOS) {
00154                     pLeftBufAccessor->consumeTuple();
00155                     break;
00156                 }
00157                 if (!pRightBufAccessor->demandData()) {
00158                     return EXECRC_BUF_UNDERFLOW;
00159                 }
00160                 pRightBufAccessor->unmarshalTuple(
00161                     outputData, nLeftAttributes);
00162                 break;
00163             }
00164 
00165             if (pOutAccessor->produceTuple(outputData)) {
00166                 ++nTuplesProduced;
00167             } else {
00168                 return EXECRC_BUF_OVERFLOW;
00169             }
00170 
00171             pRightBufAccessor->consumeTuple();
00172 
00173             if (nTuplesProduced >= quantum.nTuplesMax) {
00174                 return EXECRC_QUANTUM_EXPIRED;
00175             }
00176         }
00177     }
00178 }
00179 
00180 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/CorrelationJoinExecStream.cpp#3 $");
00181 
00182 // End CorrelationJoinExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1