00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/test/ExecStreamUnitTestBase.h"
00026 #include "fennel/exec/ExecStreamGraph.h"
00027 #include "fennel/exec/ExecStreamGraphEmbryo.h"
00028 #include "fennel/exec/ExecStreamScheduler.h"
00029 #include "fennel/exec/ExecStream.h"
00030 #include "fennel/exec/ScratchBufferExecStream.h"
00031 #include "fennel/exec/ExecStreamEmbryo.h"
00032 #include "fennel/exec/ExecStreamBufAccessor.h"
00033 #include "fennel/exec/MockProducerExecStream.h"
00034 #include "fennel/tuple/TuplePrinter.h"
00035 #include "fennel/cache/QuotaCacheAccessor.h"
00036
00037 #include <boost/test/test_tools.hpp>
00038
00039 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/test/ExecStreamUnitTestBase.cpp#23 $");
00040
00041 SharedExecStream ExecStreamUnitTestBase::prepareSourceGraph(
00042 ExecStreamEmbryo &sourceStreamEmbryo)
00043 {
00044 std::vector<ExecStreamEmbryo> transforms;
00045 return prepareTransformGraph(sourceStreamEmbryo, transforms);
00046 }
00047
00048 SharedExecStream ExecStreamUnitTestBase::prepareTransformGraph(
00049 ExecStreamEmbryo &sourceStreamEmbryo,
00050 ExecStreamEmbryo &transformStreamEmbryo)
00051 {
00052 std::vector<ExecStreamEmbryo> transforms;
00053 transforms.push_back(transformStreamEmbryo);
00054 return prepareTransformGraph(sourceStreamEmbryo, transforms);
00055 }
00056
00057 SharedExecStream ExecStreamUnitTestBase::prepareTransformGraph(
00058 ExecStreamEmbryo &sourceStreamEmbryo,
00059 std::vector<ExecStreamEmbryo> &transforms)
00060 {
00061 pGraphEmbryo->saveStreamEmbryo(sourceStreamEmbryo);
00062 std::vector<ExecStreamEmbryo>::iterator it;
00063
00064
00065 for (it = transforms.begin(); it != transforms.end(); ++it) {
00066 pGraphEmbryo->saveStreamEmbryo(*it);
00067 }
00068
00069
00070 ExecStreamEmbryo& previousStream = sourceStreamEmbryo;
00071 for (it = transforms.begin(); it != transforms.end(); ++it) {
00072 pGraphEmbryo->addDataflow(
00073 previousStream.getStream()->getName(),
00074 (*it).getStream()->getName());
00075 previousStream = *it;
00076 }
00077
00078 SharedExecStream pAdaptedStream =
00079 pGraphEmbryo->addAdapterFor(
00080 previousStream.getStream()->getName(),
00081 0,
00082 BUFPROV_PRODUCER);
00083 pGraph->addOutputDataflow(pAdaptedStream->getStreamId());
00084
00085 pGraphEmbryo->prepareGraph(shared_from_this(), "");
00086 return pAdaptedStream;
00087 }
00088
00089 SharedExecStream ExecStreamUnitTestBase::prepareConfluenceGraph(
00090 ExecStreamEmbryo &sourceStreamEmbryo1,
00091 ExecStreamEmbryo &sourceStreamEmbryo2,
00092 ExecStreamEmbryo &confluenceStreamEmbryo)
00093 {
00094 std::vector<ExecStreamEmbryo> sourceStreamEmbryos;
00095 sourceStreamEmbryos.push_back(sourceStreamEmbryo1);
00096 sourceStreamEmbryos.push_back(sourceStreamEmbryo2);
00097 return prepareConfluenceGraph(sourceStreamEmbryos, confluenceStreamEmbryo);
00098 }
00099
00100 SharedExecStream ExecStreamUnitTestBase::prepareConfluenceTransformGraph(
00101 ExecStreamEmbryo &sourceStreamEmbryo1,
00102 ExecStreamEmbryo &sourceStreamEmbryo2,
00103 ExecStreamEmbryo &confluenceStreamEmbryo,
00104 ExecStreamEmbryo &transformStreamEmbryo)
00105 {
00106 std::vector<ExecStreamEmbryo> sourceStreamEmbryos;
00107 sourceStreamEmbryos.push_back(sourceStreamEmbryo1);
00108 sourceStreamEmbryos.push_back(sourceStreamEmbryo2);
00109
00110 std::vector<ExecStreamEmbryo>::iterator it;
00111
00112 for (it = sourceStreamEmbryos.begin(); it != sourceStreamEmbryos.end();
00113 ++it)
00114 {
00115 pGraphEmbryo->saveStreamEmbryo(*it);
00116 }
00117 pGraphEmbryo->saveStreamEmbryo(confluenceStreamEmbryo);
00118
00119 for (it = sourceStreamEmbryos.begin(); it != sourceStreamEmbryos.end();
00120 ++it)
00121 {
00122 pGraphEmbryo->addDataflow(
00123 (*it).getStream()->getName(),
00124 confluenceStreamEmbryo.getStream()->getName());
00125 }
00126
00127 std::vector<ExecStreamEmbryo> transforms;
00128 transforms.push_back(transformStreamEmbryo);
00129 ExecStreamEmbryo& previousStream = confluenceStreamEmbryo;
00130
00131
00132 for (it = transforms.begin(); it != transforms.end(); ++it) {
00133 pGraphEmbryo->saveStreamEmbryo(*it);
00134 }
00135
00136 for (it = transforms.begin(); it != transforms.end(); ++it) {
00137 pGraphEmbryo->addDataflow(
00138 previousStream.getStream()->getName(),
00139 (*it).getStream()->getName());
00140 previousStream = *it;
00141 }
00142
00143
00144 SharedExecStream pAdaptedStream =
00145 pGraphEmbryo->addAdapterFor(
00146 previousStream.getStream()->getName(),
00147 0,
00148 BUFPROV_PRODUCER);
00149 pGraph->addOutputDataflow(pAdaptedStream->getStreamId());
00150
00151 pGraphEmbryo->prepareGraph(shared_from_this(), "");
00152 return pAdaptedStream;
00153 }
00154
00155 SharedExecStream ExecStreamUnitTestBase::prepareConfluenceGraph(
00156 std::vector<ExecStreamEmbryo> &sourceStreamEmbryos,
00157 ExecStreamEmbryo &confluenceStreamEmbryo)
00158 {
00159 std::vector<std::vector<ExecStreamEmbryo> > sourceStreamEmbryosList;
00160 std::vector<ExecStreamEmbryo>::iterator it;
00161 std::vector<ExecStreamEmbryo> sourceStreamList;
00162 for (it = sourceStreamEmbryos.begin(); it != sourceStreamEmbryos.end();
00163 it++)
00164 {
00165 sourceStreamList.clear();
00166 sourceStreamList.push_back(*it);
00167 sourceStreamEmbryosList.push_back(sourceStreamList);
00168 }
00169
00170 return
00171 prepareConfluenceGraph(sourceStreamEmbryosList, confluenceStreamEmbryo);
00172 }
00173
00174 SharedExecStream ExecStreamUnitTestBase::prepareConfluenceGraph(
00175 std::vector<std::vector<ExecStreamEmbryo> > &sourceStreamEmbryosList,
00176 ExecStreamEmbryo &confluenceStreamEmbryo)
00177 {
00178 pGraphEmbryo->saveStreamEmbryo(confluenceStreamEmbryo);
00179
00180 for (int i = 0; i < sourceStreamEmbryosList.size(); i++) {
00181 for (int j = 0; j < sourceStreamEmbryosList[i].size(); j++) {
00182 pGraphEmbryo->saveStreamEmbryo(sourceStreamEmbryosList[i][j]);
00183 }
00184
00185
00186 for (int j = 1; j < sourceStreamEmbryosList[i].size(); j++) {
00187 pGraphEmbryo->addDataflow(
00188 sourceStreamEmbryosList[i][j - 1].getStream()->getName(),
00189 sourceStreamEmbryosList[i][j].getStream()->getName());
00190 }
00191 pGraphEmbryo->addDataflow(
00192 sourceStreamEmbryosList[i].back().getStream()->getName(),
00193 confluenceStreamEmbryo.getStream()->getName());
00194 }
00195
00196 SharedExecStream pAdaptedStream =
00197 pGraphEmbryo->addAdapterFor(
00198 confluenceStreamEmbryo.getStream()->getName(), 0,
00199 BUFPROV_PRODUCER);
00200 pGraph->addOutputDataflow(
00201 pAdaptedStream->getStreamId());
00202
00203 pGraphEmbryo->prepareGraph(shared_from_this(), "");
00204
00205 return pAdaptedStream;
00206 }
00207
00208 SharedExecStream ExecStreamUnitTestBase::prepareDAG(
00209 ExecStreamEmbryo &srcStreamEmbryo,
00210 ExecStreamEmbryo &splitterStreamEmbryo,
00211 std::vector<ExecStreamEmbryo> &interStreamEmbryos,
00212 ExecStreamEmbryo &destStreamEmbryo,
00213 bool createSink,
00214 bool saveSrc)
00215 {
00216 std::vector<std::vector<ExecStreamEmbryo> > listOfList;
00217
00218
00219
00220
00221 for (uint i = 0; i < interStreamEmbryos.size(); i++) {
00222 std::vector<ExecStreamEmbryo> interStreamEmbryoList;
00223
00224 interStreamEmbryoList.push_back(interStreamEmbryos[i]);
00225 listOfList.push_back(interStreamEmbryoList);
00226 }
00227 return prepareDAG(
00228 srcStreamEmbryo, splitterStreamEmbryo, listOfList, destStreamEmbryo,
00229 createSink, saveSrc);
00230 }
00231
00232 SharedExecStream ExecStreamUnitTestBase::prepareDAG(
00233 ExecStreamEmbryo &srcStreamEmbryo,
00234 ExecStreamEmbryo &splitterStreamEmbryo,
00235 std::vector<std::vector<ExecStreamEmbryo> > &interStreamEmbryos,
00236 ExecStreamEmbryo &destStreamEmbryo,
00237 bool createSink,
00238 bool saveSrc)
00239 {
00240 if (saveSrc) {
00241 pGraphEmbryo->saveStreamEmbryo(srcStreamEmbryo);
00242 }
00243 pGraphEmbryo->saveStreamEmbryo(splitterStreamEmbryo);
00244
00245
00246 for (int i = 0; i < interStreamEmbryos.size(); i++) {
00247 for (int j = 0; j < interStreamEmbryos[i].size(); j++) {
00248 pGraphEmbryo->saveStreamEmbryo(interStreamEmbryos[i][j]);
00249 }
00250
00251
00252 for (int j = 1; j < interStreamEmbryos[i].size(); j++) {
00253 pGraphEmbryo->addDataflow(
00254 interStreamEmbryos[i][j - 1].getStream()->getName(),
00255 interStreamEmbryos[i][j].getStream()->getName());
00256 }
00257 }
00258
00259 pGraphEmbryo->saveStreamEmbryo(destStreamEmbryo);
00260
00261 pGraphEmbryo->addDataflow(
00262 srcStreamEmbryo.getStream()->getName(),
00263 splitterStreamEmbryo.getStream()->getName());
00264
00265
00266 for (int i = 0; i < interStreamEmbryos.size(); i++) {
00267 pGraphEmbryo->addDataflow(
00268 splitterStreamEmbryo.getStream()->getName(),
00269 interStreamEmbryos[i][0].getStream()->getName());
00270 pGraphEmbryo->addDataflow(
00271 interStreamEmbryos[i].back().getStream()->getName(),
00272 destStreamEmbryo.getStream()->getName());
00273 }
00274
00275 SharedExecStream pAdaptedStream;
00276
00277 if (createSink) {
00278 pAdaptedStream = pGraphEmbryo->addAdapterFor(
00279 destStreamEmbryo.getStream()->getName(), 0,
00280 BUFPROV_PRODUCER);
00281 pGraph->addOutputDataflow(pAdaptedStream->getStreamId());
00282
00283 pGraphEmbryo->prepareGraph(shared_from_this(), "");
00284 }
00285
00286 return pAdaptedStream;
00287 }
00288
00289 void ExecStreamUnitTestBase::testCaseSetUp()
00290 {
00291 ExecStreamTestBase::testCaseSetUp();
00292 openRandomSegment();
00293 pGraph = newStreamGraph();
00294 pGraphEmbryo = newStreamGraphEmbryo(pGraph);
00295 pGraph->setResourceGovernor(pResourceGovernor);
00296
00297
00298
00299
00300
00301 pCacheAccessor.reset(
00302 new TransactionalCacheAccessor(pCache));
00303 }
00304
00305 void ExecStreamUnitTestBase::resetExecStreamTest()
00306 {
00307 if (pScheduler) {
00308 pScheduler->stop();
00309 }
00310 tearDownExecStreamTest();
00311
00312 pScheduler.reset(newScheduler());
00313 pGraph = newStreamGraph();
00314 pGraphEmbryo = newStreamGraphEmbryo(pGraph);
00315 pGraph->setResourceGovernor(pResourceGovernor);
00316 }
00317
00318
00319 void ExecStreamUnitTestBase::tearDownExecStreamTest()
00320 {
00321 pGraph.reset();
00322 pGraphEmbryo.reset();
00323 }
00324
00325 void ExecStreamUnitTestBase::verifyOutput(
00326 ExecStream &stream,
00327 uint nRowsExpected,
00328 MockProducerExecStreamGenerator &generator,
00329 bool stopEarly)
00330 {
00331
00332
00333 pResourceGovernor->requestResources(*pGraph);
00334 pGraph->open();
00335 pScheduler->start();
00336 uint nRows = 0;
00337 for (;;) {
00338 ExecStreamBufAccessor &bufAccessor =
00339 pScheduler->readStream(stream);
00340 if (bufAccessor.getState() == EXECBUF_EOS) {
00341 break;
00342 }
00343 BOOST_REQUIRE(bufAccessor.isConsumptionPossible());
00344 const uint nCol =
00345 bufAccessor.getConsumptionTupleAccessor().size();
00346 BOOST_REQUIRE(nCol == bufAccessor.getTupleDesc().size());
00347 BOOST_REQUIRE(nCol >= 1);
00348 TupleData inputTuple;
00349 inputTuple.compute(bufAccessor.getTupleDesc());
00350 for (;;) {
00351 if (!bufAccessor.demandData()) {
00352 break;
00353 }
00354 BOOST_REQUIRE(nRows < nRowsExpected);
00355 bufAccessor.unmarshalTuple(inputTuple);
00356 for (int col = 0; col < nCol; ++col) {
00357 int64_t actualValue =
00358 *reinterpret_cast<int64_t const *>(inputTuple[col].pData);
00359 int64_t expectedValue = generator.generateValue(nRows, col);
00360 if (actualValue != expectedValue) {
00361 std::cout << "(Row, Col) = (" << nRows << ", " << col <<")"
00362 << std::endl;
00363 BOOST_CHECK_EQUAL(expectedValue,actualValue);
00364 return;
00365 }
00366 }
00367 bufAccessor.consumeTuple();
00368 ++nRows;
00369 if (stopEarly && nRows == nRowsExpected) {
00370 return;
00371 }
00372 }
00373 }
00374 BOOST_CHECK_EQUAL(nRowsExpected,nRows);
00375 }
00376
00377 void ExecStreamUnitTestBase::verifyConstantOutput(
00378 ExecStream &stream,
00379 const TupleData &expectedTuple,
00380 uint nRowsExpected)
00381 {
00382
00383
00384 pResourceGovernor->requestResources(*pGraph);
00385 pGraph->open();
00386 pScheduler->start();
00387 uint nRows = 0;
00388 for (;;) {
00389 ExecStreamBufAccessor &bufAccessor =
00390 pScheduler->readStream(stream);
00391 if (bufAccessor.getState() == EXECBUF_EOS) {
00392 break;
00393 }
00394 BOOST_REQUIRE(bufAccessor.isConsumptionPossible());
00395
00396 if (!bufAccessor.demandData()) {
00397 break;
00398 }
00399 BOOST_REQUIRE(nRows < nRowsExpected);
00400
00401 TupleData actualTuple;
00402 actualTuple.compute(bufAccessor.getTupleDesc());
00403 bufAccessor.unmarshalTuple(actualTuple);
00404
00405 int c = bufAccessor.getTupleDesc().compareTuples(
00406 expectedTuple, actualTuple);
00407 bufAccessor.consumeTuple();
00408 ++nRows;
00409 if (c) {
00410 #if 1
00411 TupleDescriptor statusDesc = bufAccessor.getTupleDesc();
00412 TuplePrinter tuplePrinter;
00413 tuplePrinter.print(std::cout, statusDesc, actualTuple);
00414 tuplePrinter.print(std::cout, statusDesc, expectedTuple);
00415 std::cout << std::endl;
00416 #endif
00417 BOOST_CHECK_EQUAL(0,c);
00418 break;
00419 }
00420 }
00421 BOOST_CHECK_EQUAL(nRowsExpected, nRows);
00422 }
00423
00424 void ExecStreamUnitTestBase::verifyBufferedOutput(
00425 ExecStream &stream,
00426 TupleDescriptor outputTupleDesc,
00427 uint nRowsExpected,
00428 PBuffer expectedBuffer)
00429 {
00430
00431
00432 TupleAccessor expectedOutputAccessor;
00433 expectedOutputAccessor.compute(outputTupleDesc);
00434 TupleData expectedTuple(outputTupleDesc);
00435 uint bufOffset = 0;
00436 pResourceGovernor->requestResources(*pGraph);
00437 pGraph->open();
00438 pScheduler->start();
00439 uint nRows = 0;
00440 for (;;) {
00441 ExecStreamBufAccessor &bufAccessor =
00442 pScheduler->readStream(stream);
00443 if (bufAccessor.getState() == EXECBUF_EOS) {
00444 break;
00445 }
00446 BOOST_REQUIRE(bufAccessor.getTupleDesc() == outputTupleDesc);
00447 BOOST_REQUIRE(bufAccessor.isConsumptionPossible());
00448 const uint nCol =
00449 bufAccessor.getConsumptionTupleAccessor().size();
00450 BOOST_REQUIRE(nCol == bufAccessor.getTupleDesc().size());
00451 BOOST_REQUIRE(nCol >= 1);
00452 TupleData inputTuple;
00453 inputTuple.compute(bufAccessor.getTupleDesc());
00454 for (;;) {
00455 if (!bufAccessor.demandData()) {
00456 break;
00457 }
00458 BOOST_REQUIRE(nRows < nRowsExpected);
00459 bufAccessor.unmarshalTuple(inputTuple);
00460 expectedOutputAccessor.setCurrentTupleBuf(
00461 expectedBuffer + bufOffset);
00462 expectedOutputAccessor.unmarshal(expectedTuple);
00463 int c = outputTupleDesc.compareTuples(inputTuple, expectedTuple);
00464 if (c) {
00465 std::cout << "(Row) = (" << nRows << ")"
00466 << " -- Tuples don't match"<< std::endl;
00467 BOOST_CHECK_EQUAL(0,c);
00468 return;
00469 }
00470 bufAccessor.consumeTuple();
00471 bufOffset += expectedOutputAccessor.getCurrentByteCount();
00472 ++nRows;
00473 }
00474 }
00475 BOOST_CHECK_EQUAL(nRowsExpected,nRows);
00476 }
00477
00478 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/test/ExecStreamUnitTestBase.cpp#23 $");
00479
00480