00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/CartesianJoinExecStream.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/exec/ExecStreamGraph.h"
00028
00029 #include <ostream>
00030
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/CartesianJoinExecStream.cpp#14 $");
00032
00033 void CartesianJoinExecStream::prepare(
00034 CartesianJoinExecStreamParams const ¶ms)
00035 {
00036 assert(checkNumInputs());
00037 pLeftBufAccessor = inAccessors[0];
00038 assert(pLeftBufAccessor);
00039
00040 pRightBufAccessor = inAccessors[1];
00041 assert(pRightBufAccessor);
00042
00043 leftOuter = params.leftOuter;
00044
00045 SharedExecStream pLeftInput = pGraph->getStreamInput(getStreamId(), 0);
00046 assert(pLeftInput);
00047 pRightInput = pGraph->getStreamInput(getStreamId(), 1);
00048 assert(pRightInput);
00049 FENNEL_TRACE(
00050 TRACE_FINE,
00051 "left input " << pLeftInput->getStreamId() <<
00052 ' ' << pLeftInput->getName() <<
00053 ", right input " << pRightInput->getStreamId() <<
00054 ' ' << pRightInput->getName());
00055
00056
00057 TupleDescriptor const &leftDesc = pLeftBufAccessor->getTupleDesc();
00058 TupleDescriptor const &rightDesc = pRightBufAccessor->getTupleDesc();
00059
00060 TupleDescriptor outputDesc;
00061 outputDesc.insert(outputDesc.end(),leftDesc.begin(),leftDesc.end());
00062 uint iFirstRight = outputDesc.size();
00063 outputDesc.insert(outputDesc.end(),rightDesc.begin(),rightDesc.end());
00064 if (leftOuter) {
00065
00066
00067 for (uint i = iFirstRight; i < outputDesc.size(); ++i) {
00068 outputDesc[i].isNullable = true;
00069 }
00070 }
00071 if (params.outputTupleDesc.size()) {
00072 assert(params.outputTupleDesc == outputDesc);
00073 }
00074 outputData.compute(outputDesc);
00075 pOutAccessor->setTupleShape(outputDesc);
00076
00077 nLeftAttributes = leftDesc.size();
00078
00079 ConfluenceExecStream::prepare(params);
00080 }
00081
00082 bool CartesianJoinExecStream::checkNumInputs()
00083 {
00084 return (inAccessors.size() == 2);
00085 }
00086
00087 void CartesianJoinExecStream::open(bool restart)
00088 {
00089 ConfluenceExecStream::open(restart);
00090 }
00091
00092
00093 inline std::ostream& operator<< (
00094 std::ostream& os, SharedExecStreamBufAccessor buf)
00095 {
00096 os << ExecStreamBufState_names[buf->getState()];
00097 if (buf->hasPendingEOS()) {
00098 os << "(EOS pending)";
00099 }
00100 return os;
00101 }
00102
00103 ExecStreamResult CartesianJoinExecStream::execute(
00104 ExecStreamQuantum const &quantum)
00105 {
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119 uint nTuplesProduced = 0;
00120
00121 for (;;) {
00122 if (!pLeftBufAccessor->isTupleConsumptionPending()) {
00123 if (pLeftBufAccessor->getState() == EXECBUF_EOS) {
00124 pOutAccessor->markEOS();
00125 return EXECRC_EOS;
00126 }
00127 if (!pLeftBufAccessor->demandData()) {
00128 FENNEL_TRACE_THREAD(
00129 TRACE_FINE,
00130 "left underflow; left input " << pLeftBufAccessor <<
00131 " right input " << pRightBufAccessor);
00132 return EXECRC_BUF_UNDERFLOW;
00133 }
00134 pLeftBufAccessor->unmarshalTuple(outputData);
00135 processLeftInput();
00136 rightInputEmpty = true;
00137 }
00138 ExecStreamResult rc = preProcessRightInput();
00139 if (rc != EXECRC_YIELD) {
00140 return rc;
00141 }
00142 for (;;) {
00143 if (!pRightBufAccessor->isTupleConsumptionPending()) {
00144 if (pRightBufAccessor->getState() == EXECBUF_EOS) {
00145 if (leftOuter && rightInputEmpty) {
00146
00147
00148 for (int i = nLeftAttributes;
00149 i < outputData.size(); ++i)
00150 {
00151 outputData[i].pData = NULL;
00152 }
00153
00154 if (pOutAccessor->produceTuple(outputData)) {
00155 ++nTuplesProduced;
00156 } else {
00157 return EXECRC_BUF_OVERFLOW;
00158 }
00159
00160 if (nTuplesProduced >= quantum.nTuplesMax) {
00161 return EXECRC_QUANTUM_EXPIRED;
00162 }
00163 }
00164
00165 pLeftBufAccessor->consumeTuple();
00166
00167 pRightInput->open(true);
00168 FENNEL_TRACE_THREAD(
00169 TRACE_FINE,
00170 "re-opened right input " << pRightBufAccessor);
00171
00172
00173 break;
00174 }
00175 if (!pRightBufAccessor->demandData()) {
00176 FENNEL_TRACE_THREAD(
00177 TRACE_FINE,
00178 "right underflow; left input " << pLeftBufAccessor <<
00179 " right input " << pRightBufAccessor);
00180 return EXECRC_BUF_UNDERFLOW;
00181 }
00182 rightInputEmpty = false;
00183 pRightBufAccessor->unmarshalTuple(
00184 outputData, nLeftAttributes);
00185 break;
00186 }
00187
00188 if (pOutAccessor->produceTuple(outputData)) {
00189 ++nTuplesProduced;
00190 } else {
00191 return EXECRC_BUF_OVERFLOW;
00192 }
00193
00194 pRightBufAccessor->consumeTuple();
00195
00196 if (nTuplesProduced >= quantum.nTuplesMax) {
00197 return EXECRC_QUANTUM_EXPIRED;
00198 }
00199 }
00200 }
00201 }
00202
00203 ExecStreamResult CartesianJoinExecStream::preProcessRightInput()
00204 {
00205 return EXECRC_YIELD;
00206 }
00207
00208 void CartesianJoinExecStream::processLeftInput()
00209 {
00210 }
00211
00212 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/CartesianJoinExecStream.cpp#14 $");
00213
00214