00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/exec/CorrelationJoinExecStream.h"
00026 #include "fennel/exec/DynamicParam.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028 #include "fennel/exec/ExecStreamGraph.h"
00029 #include "fennel/exec/ExecStreamScheduler.h"
00030 #include "fennel/tuple/TuplePrinter.h"
00031
00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/CorrelationJoinExecStream.cpp#3 $");
00033
00034 void CorrelationJoinExecStream::prepare(
00035 CorrelationJoinExecStreamParams const ¶ms)
00036 {
00037 assert(inAccessors.size() == 2);
00038
00039 pLeftBufAccessor = inAccessors[0];
00040 assert(pLeftBufAccessor);
00041
00042 pRightBufAccessor = inAccessors[1];
00043 assert(pRightBufAccessor);
00044
00045 TupleDescriptor const &leftDesc = pLeftBufAccessor->getTupleDesc();
00046 TupleDescriptor const &rightDesc = pRightBufAccessor->getTupleDesc();
00047
00048 TupleDescriptor outputDesc;
00049 outputDesc.insert(outputDesc.end(),leftDesc.begin(),leftDesc.end());
00050 outputDesc.insert(outputDesc.end(),rightDesc.begin(),rightDesc.end());
00051 outputData.compute(outputDesc);
00052 pOutAccessor->setTupleShape(outputDesc);
00053
00054 nLeftAttributes = leftDesc.size();
00055 correlations.assign(
00056 params.correlations.begin(),
00057 params.correlations.end());
00058
00059
00060 assert(correlations.size() <= nLeftAttributes);
00061
00062 ConfluenceExecStream::prepare(params);
00063 }
00064
00065 void CorrelationJoinExecStream::open(bool restart)
00066 {
00067 ConfluenceExecStream::open(restart);
00068
00069 if (!restart) {
00070 leftRowCount = 0;
00071 for (std::vector<Correlation>::iterator it = correlations.begin();
00072 it != correlations.end(); ++it)
00073 {
00074 pDynamicParamManager->createParam(
00075 it->dynamicParamId,
00076 pLeftBufAccessor->getTupleDesc()[it->leftAttributeOrdinal]);
00077
00078
00079
00080
00081 const std::vector<ExecStreamId> &readerStreamIds =
00082 pGraph->getDynamicParamReaders(it->dynamicParamId);
00083 for (std::vector<ExecStreamId>::const_iterator it2 =
00084 readerStreamIds.begin();
00085 it2 != readerStreamIds.end(); ++it2)
00086 {
00087 pGraph->getScheduler()->setRunnable(
00088 *pGraph->getStream(*it2), false);
00089 }
00090 }
00091 }
00092 }
00093
00094 void CorrelationJoinExecStream::close()
00095 {
00096 std::vector<Correlation>::iterator it = correlations.begin();
00097 for ( ; it != correlations.end(); ++it) {
00098 pDynamicParamManager->deleteParam(it->dynamicParamId);
00099 }
00100 ConfluenceExecStream::closeImpl();
00101 }
00102
00103 ExecStreamResult CorrelationJoinExecStream::execute(
00104 ExecStreamQuantum const &quantum)
00105 {
00106
00107 uint nTuplesProduced = 0;
00108
00109 for (;;) {
00110 if (!pLeftBufAccessor->isTupleConsumptionPending()) {
00111 if (pLeftBufAccessor->getState() == EXECBUF_EOS) {
00112 pOutAccessor->markEOS();
00113 return EXECRC_EOS;
00114 }
00115 if (!pLeftBufAccessor->demandData()) {
00116 return EXECRC_BUF_UNDERFLOW;
00117 }
00118 pLeftBufAccessor->unmarshalTuple(outputData);
00119
00120 std::vector<Correlation>::iterator it = correlations.begin();
00121 for ( ; it != correlations.end(); ++it) {
00122 pDynamicParamManager->writeParam(
00123 it->dynamicParamId, outputData[it->leftAttributeOrdinal]);
00124 }
00125
00126
00127 pGraph->getStreamInput(getStreamId(),1)->open(true);
00128
00129
00130 if (++leftRowCount == 1) {
00131 for (std::vector<Correlation>::iterator it =
00132 correlations.begin();
00133 it != correlations.end(); ++it)
00134 {
00135
00136
00137
00138
00139 const std::vector<ExecStreamId> &readerStreamIds =
00140 pGraph->getDynamicParamReaders(it->dynamicParamId);
00141 for (std::vector<ExecStreamId>::const_iterator it2 =
00142 readerStreamIds.begin();
00143 it2 != readerStreamIds.end(); ++it2)
00144 {
00145 pGraph->getScheduler()->setRunnable(
00146 *pGraph->getStream(*it2), true);
00147 }
00148 }
00149 }
00150 }
00151 for (;;) {
00152 if (!pRightBufAccessor->isTupleConsumptionPending()) {
00153 if (pRightBufAccessor->getState() == EXECBUF_EOS) {
00154 pLeftBufAccessor->consumeTuple();
00155 break;
00156 }
00157 if (!pRightBufAccessor->demandData()) {
00158 return EXECRC_BUF_UNDERFLOW;
00159 }
00160 pRightBufAccessor->unmarshalTuple(
00161 outputData, nLeftAttributes);
00162 break;
00163 }
00164
00165 if (pOutAccessor->produceTuple(outputData)) {
00166 ++nTuplesProduced;
00167 } else {
00168 return EXECRC_BUF_OVERFLOW;
00169 }
00170
00171 pRightBufAccessor->consumeTuple();
00172
00173 if (nTuplesProduced >= quantum.nTuplesMax) {
00174 return EXECRC_QUANTUM_EXPIRED;
00175 }
00176 }
00177 }
00178 }
00179
00180 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/CorrelationJoinExecStream.cpp#3 $");
00181
00182