CartesianJoinExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/CartesianJoinExecStream.cpp#14 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/CartesianJoinExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/exec/ExecStreamGraph.h"
00028 
00029 #include <ostream>
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/CartesianJoinExecStream.cpp#14 $");
00032 
00033 void CartesianJoinExecStream::prepare(
00034     CartesianJoinExecStreamParams const &params)
00035 {
00036     assert(checkNumInputs());
00037     pLeftBufAccessor = inAccessors[0];
00038     assert(pLeftBufAccessor);
00039 
00040     pRightBufAccessor = inAccessors[1];
00041     assert(pRightBufAccessor);
00042 
00043     leftOuter = params.leftOuter;
00044 
00045     SharedExecStream pLeftInput = pGraph->getStreamInput(getStreamId(), 0);
00046     assert(pLeftInput);
00047     pRightInput = pGraph->getStreamInput(getStreamId(), 1);
00048     assert(pRightInput);
00049     FENNEL_TRACE(
00050         TRACE_FINE,
00051         "left input " << pLeftInput->getStreamId() <<
00052         ' ' << pLeftInput->getName() <<
00053         ", right input " << pRightInput->getStreamId() <<
00054         ' ' << pRightInput->getName());
00055 
00056 
00057     TupleDescriptor const &leftDesc = pLeftBufAccessor->getTupleDesc();
00058     TupleDescriptor const &rightDesc = pRightBufAccessor->getTupleDesc();
00059 
00060     TupleDescriptor outputDesc;
00061     outputDesc.insert(outputDesc.end(),leftDesc.begin(),leftDesc.end());
00062     uint iFirstRight = outputDesc.size();
00063     outputDesc.insert(outputDesc.end(),rightDesc.begin(),rightDesc.end());
00064     if (leftOuter) {
00065         // Right side is null-generating; have to adjust tuple descriptor
00066         // accordingly.
00067         for (uint i = iFirstRight; i < outputDesc.size(); ++i) {
00068             outputDesc[i].isNullable = true;
00069         }
00070     }
00071     if (params.outputTupleDesc.size()) {
00072         assert(params.outputTupleDesc == outputDesc);
00073     }
00074     outputData.compute(outputDesc);
00075     pOutAccessor->setTupleShape(outputDesc);
00076 
00077     nLeftAttributes = leftDesc.size();
00078 
00079     ConfluenceExecStream::prepare(params);
00080 }
00081 
00082 bool CartesianJoinExecStream::checkNumInputs()
00083 {
00084     return (inAccessors.size() == 2);
00085 }
00086 
00087 void CartesianJoinExecStream::open(bool restart)
00088 {
00089     ConfluenceExecStream::open(restart);
00090 }
00091 
00092 // trace buffer state
00093 inline std::ostream& operator<< (
00094     std::ostream& os, SharedExecStreamBufAccessor buf)
00095 {
00096     os << ExecStreamBufState_names[buf->getState()];
00097     if (buf->hasPendingEOS()) {
00098         os << "(EOS pending)";
00099     }
00100     return os;
00101 }
00102 
00103 ExecStreamResult CartesianJoinExecStream::execute(
00104     ExecStreamQuantum const &quantum)
00105 {
00106     // TODO:  lots of small optimizations possible here
00107 
00108     // TODO jvs 6-Nov-2004: one big optimization would be to perform
00109     // buffer-to-buffer joins instead of row-to-buffer joins.  This would
00110     // reduce the number of times the right input needs to be iterated by the
00111     // average number of rows in a buffer from the left input.  However,  the
00112     // output ordering would also be affected, so we might want to provide a
00113     // parameter to control this behavior.
00114 
00115     // Also, for outer join, once we know the right input is empty, it's always
00116     // going to be empty for every left row, so we could just stop trying to
00117     // re-execute the right hand side.
00118 
00119     uint nTuplesProduced = 0;
00120 
00121     for (;;) {
00122         if (!pLeftBufAccessor->isTupleConsumptionPending()) {
00123             if (pLeftBufAccessor->getState() == EXECBUF_EOS) {
00124                 pOutAccessor->markEOS();
00125                 return EXECRC_EOS;
00126             }
00127             if (!pLeftBufAccessor->demandData()) {
00128                 FENNEL_TRACE_THREAD(
00129                     TRACE_FINE,
00130                     "left underflow; left input " << pLeftBufAccessor <<
00131                     " right input " << pRightBufAccessor);
00132                 return EXECRC_BUF_UNDERFLOW;
00133             }
00134             pLeftBufAccessor->unmarshalTuple(outputData);
00135             processLeftInput();
00136             rightInputEmpty = true;
00137         }
00138         ExecStreamResult rc = preProcessRightInput();
00139         if (rc != EXECRC_YIELD) {
00140             return rc;
00141         }
00142         for (;;) {
00143             if (!pRightBufAccessor->isTupleConsumptionPending()) {
00144                 if (pRightBufAccessor->getState() == EXECBUF_EOS) {
00145                     if (leftOuter && rightInputEmpty) {
00146                         // put null in outputdata to the right of
00147                         // nLeftAttributes
00148                         for (int i = nLeftAttributes;
00149                              i < outputData.size(); ++i)
00150                         {
00151                             outputData[i].pData = NULL;
00152                         }
00153 
00154                         if (pOutAccessor->produceTuple(outputData)) {
00155                             ++nTuplesProduced;
00156                         } else {
00157                             return EXECRC_BUF_OVERFLOW;
00158                         }
00159 
00160                         if (nTuplesProduced >= quantum.nTuplesMax) {
00161                             return EXECRC_QUANTUM_EXPIRED;
00162                         }
00163                     }
00164 
00165                     pLeftBufAccessor->consumeTuple();
00166                     // restart right input stream
00167                     pRightInput->open(true);
00168                     FENNEL_TRACE_THREAD(
00169                         TRACE_FINE,
00170                         "re-opened right input " << pRightBufAccessor);
00171                     // NOTE: break out of the inner for loop, which will take
00172                     // us back to the top of the outer for loop
00173                     break;
00174                 }
00175                 if (!pRightBufAccessor->demandData()) {
00176                     FENNEL_TRACE_THREAD(
00177                         TRACE_FINE,
00178                         "right underflow; left input " << pLeftBufAccessor <<
00179                         " right input " << pRightBufAccessor);
00180                     return EXECRC_BUF_UNDERFLOW;
00181                 }
00182                 rightInputEmpty = false;
00183                 pRightBufAccessor->unmarshalTuple(
00184                     outputData, nLeftAttributes);
00185                 break;
00186             }
00187 
00188             if (pOutAccessor->produceTuple(outputData)) {
00189                 ++nTuplesProduced;
00190             } else {
00191                 return EXECRC_BUF_OVERFLOW;
00192             }
00193 
00194             pRightBufAccessor->consumeTuple();
00195 
00196             if (nTuplesProduced >= quantum.nTuplesMax) {
00197                 return EXECRC_QUANTUM_EXPIRED;
00198             }
00199         }
00200     }
00201 }
00202 
00203 ExecStreamResult CartesianJoinExecStream::preProcessRightInput()
00204 {
00205     return EXECRC_YIELD;
00206 }
00207 
00208 void CartesianJoinExecStream::processLeftInput()
00209 {
00210 }
00211 
00212 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/CartesianJoinExecStream.cpp#14 $");
00213 
00214 // End CartesianJoinExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1