00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/test/ExecStreamUnitTestBase.h"
00026 #include "fennel/segment/ScratchMemExcn.h"
00027 #include "fennel/exec/ExecStreamScheduler.h"
00028 #include "fennel/exec/ExecStream.h"
00029 #include "fennel/exec/ExecStreamGraph.h"
00030 #include "fennel/exec/ExecStreamBufAccessor.h"
00031 #include "fennel/exec/ExecStreamGovernor.h"
00032 #include "fennel/exec/MockResourceExecStream.h"
00033 #include "fennel/exec/BarrierExecStream.h"
00034 #include "fennel/exec/ExecStreamEmbryo.h"
00035 #include "fennel/tuple/StandardTypeDescriptor.h"
00036 
00037 #include <boost/test/test_tools.hpp>
00038 
00039 using namespace fennel;
00040 
00044 class ExecStreamGovernorTest : public ExecStreamUnitTestBase
00045 {
00061     void testGovernor(
00062         uint nProducers,
00063         std::vector<ExecStreamResourceQuantity> const &minReqts,
00064         std::vector<ExecStreamResourceQuantity> const &optReqts,
00065         std::vector<ExecStreamResourceSettingType> optTypes,
00066         std::vector<ExecStreamResourceQuantity> expected,
00067         bool exception = false);
00068 
00069 public:
00070     explicit ExecStreamGovernorTest()
00071     {
00072         FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testOptLessAccurate);
00073         FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testOptLessEstimate);
00074         FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testOptEqualEstimate);
00075         FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testInBetween);
00076         FENNEL_UNIT_TEST_CASE(
00077             ExecStreamGovernorTest, testMinEqualAllocation);
00078         FENNEL_UNIT_TEST_CASE(
00079             ExecStreamGovernorTest, testMinGreaterAllocation);
00080         FENNEL_UNIT_TEST_CASE(
00081             ExecStreamGovernorTest, testMinGreaterAvailable);
00082         FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testReturnResources);
00083     }
00084 
00085     void testOptLessAccurate();
00086     void testOptLessEstimate();
00087     void testOptEqualEstimate();
00088     void testInBetween();
00089     void testMinGreaterAllocation();
00090     void testMinEqualAllocation();
00091     void testMinGreaterAvailable();
00092     void testReturnResources();
00093 
00094     virtual void testCaseSetUp();
00095 };
00096 
00097 void ExecStreamGovernorTest::testCaseSetUp()
00098 {
00099     ExecStreamUnitTestBase::testCaseSetUp();
00100 
00101     
00102     ExecStreamResourceQuantity quantity;
00103     quantity.nCachePages = 100;
00104     pResourceGovernor->setResourceAvailability(
00105         quantity, EXEC_RESOURCE_CACHE_PAGES);
00106 
00107     ExecStreamResourceKnobs knob;
00108     knob.expectedConcurrentStatements = 1;
00109     pResourceGovernor->setResourceKnob(
00110         knob, EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS);
00111 }
00112 
00118 void ExecStreamGovernorTest::testOptLessAccurate()
00119 {
00120     uint nProducers = 2;
00121     std::vector<ExecStreamResourceQuantity> minReqts;
00122     std::vector<ExecStreamResourceQuantity> optReqts;
00123     std::vector<ExecStreamResourceQuantity> expected;
00124     std::vector<ExecStreamResourceSettingType> optTypes;
00125 
00126     ExecStreamResourceQuantity quantity;
00127     ExecStreamResourceSettingType optType;
00128 
00129     
00130     quantity.nCachePages = 10;
00131     minReqts.push_back(quantity);
00132     quantity.nCachePages = 15;
00133     optReqts.push_back(quantity);
00134     expected.push_back(quantity);
00135     optType = EXEC_RESOURCE_ACCURATE;
00136     optTypes.push_back(optType);
00137 
00138     
00139     quantity.nCachePages = 20;
00140     minReqts.push_back(quantity);
00141     quantity.nCachePages = 40;
00142     optReqts.push_back(quantity);
00143     expected.push_back(quantity);
00144     optType = EXEC_RESOURCE_ACCURATE;
00145     optTypes.push_back(optType);
00146 
00147     testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00148 }
00149 
00154 void ExecStreamGovernorTest::testOptLessEstimate()
00155 {
00156     uint nProducers = 4;
00157     std::vector<ExecStreamResourceQuantity> minReqts;
00158     std::vector<ExecStreamResourceQuantity> optReqts;
00159     std::vector<ExecStreamResourceQuantity> expected;
00160     std::vector<ExecStreamResourceSettingType> optTypes;
00161 
00162     ExecStreamResourceQuantity quantity;
00163     ExecStreamResourceSettingType optType;
00164 
00165     
00166     
00167 
00168     
00169     quantity.nCachePages = 10;
00170     minReqts.push_back(quantity);
00171     quantity.nCachePages = 11;
00172     optReqts.push_back(quantity);
00173     expected.push_back(quantity);
00174     optType = EXEC_RESOURCE_ACCURATE;
00175     optTypes.push_back(optType);
00176 
00177     
00178     quantity.nCachePages = 15;
00179     minReqts.push_back(quantity);
00180     quantity.nCachePages = 17;
00181     optReqts.push_back(quantity);
00182     expected.push_back(quantity);
00183     optType = EXEC_RESOURCE_ACCURATE;
00184     optTypes.push_back(optType);
00185 
00186     
00187     quantity.nCachePages = 20;
00188     minReqts.push_back(quantity);
00189     quantity.nCachePages = 23;
00190     optReqts.push_back(quantity);
00191     quantity.nCachePages = 38;
00192     expected.push_back(quantity);
00193     optType = EXEC_RESOURCE_ESTIMATE;
00194     optTypes.push_back(optType);
00195 
00196     
00197     quantity.nCachePages = 25;
00198     minReqts.push_back(quantity);
00199     quantity.nCachePages = 29;
00200     optReqts.push_back(quantity);
00201     expected.push_back(quantity);
00202     optType = EXEC_RESOURCE_ACCURATE;
00203     optTypes.push_back(optType);
00204 
00205     testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00206 }
00207 
00212 void ExecStreamGovernorTest::testOptEqualEstimate()
00213 {
00214     uint nProducers = 4;
00215     std::vector<ExecStreamResourceQuantity> minReqts;
00216     std::vector<ExecStreamResourceQuantity> optReqts;
00217     std::vector<ExecStreamResourceQuantity> expected;
00218     std::vector<ExecStreamResourceSettingType> optTypes;
00219 
00220     ExecStreamResourceQuantity quantity;
00221     ExecStreamResourceSettingType optType;
00222 
00223     
00224     
00225 
00226     
00227     quantity.nCachePages = 10;
00228     minReqts.push_back(quantity);
00229     quantity.nCachePages = 20;
00230     optReqts.push_back(quantity);
00231     expected.push_back(quantity);
00232     optType = EXEC_RESOURCE_ESTIMATE;
00233     optTypes.push_back(optType);
00234 
00235     
00236     quantity.nCachePages = 15;
00237     minReqts.push_back(quantity);
00238     quantity.nCachePages = 17;
00239     optReqts.push_back(quantity);
00240     expected.push_back(quantity);
00241     optType = EXEC_RESOURCE_ACCURATE;
00242     optTypes.push_back(optType);
00243 
00244     
00245     quantity.nCachePages = 20;
00246     minReqts.push_back(quantity);
00247     quantity.nCachePages = 23;
00248     optReqts.push_back(quantity);
00249     expected.push_back(quantity);
00250     optType = EXEC_RESOURCE_ACCURATE;
00251     optTypes.push_back(optType);
00252 
00253     
00254     quantity.nCachePages = 25;
00255     minReqts.push_back(quantity);
00256     quantity.nCachePages = 35;
00257     optReqts.push_back(quantity);
00258     expected.push_back(quantity);
00259     optType = EXEC_RESOURCE_ESTIMATE;
00260     optTypes.push_back(optType);
00261 
00262     testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00263 }
00264 
00269 void ExecStreamGovernorTest::testInBetween()
00270 {
00271     uint nProducers = 4;
00272     std::vector<ExecStreamResourceQuantity> minReqts;
00273     std::vector<ExecStreamResourceQuantity> optReqts;
00274     std::vector<ExecStreamResourceQuantity> expected;
00275     std::vector<ExecStreamResourceSettingType> optTypes;
00276 
00277     ExecStreamResourceQuantity quantity;
00278     ExecStreamResourceSettingType optType;
00279 
00280     
00281     
00282 
00283     
00284     quantity.nCachePages = 10;
00285     minReqts.push_back(quantity);
00286     quantity.nCachePages = 25;
00287     optReqts.push_back(quantity);
00288     quantity.nCachePages = 14;
00289     expected.push_back(quantity);
00290     optType = EXEC_RESOURCE_ACCURATE;
00291     optTypes.push_back(optType);
00292 
00293     
00294     quantity.nCachePages = 15;
00295     minReqts.push_back(quantity);
00296     quantity.nCachePages = 31;
00297     optReqts.push_back(quantity);
00298     quantity.nCachePages = 19;
00299     expected.push_back(quantity);
00300     optType = EXEC_RESOURCE_ESTIMATE;
00301     optTypes.push_back(optType);
00302 
00303     
00304     quantity.nCachePages = 20;
00305     minReqts.push_back(quantity);
00306     quantity.nCachePages = 0;
00307     optReqts.push_back(quantity);
00308     quantity.nCachePages = 31;
00309     expected.push_back(quantity);
00310     optType = EXEC_RESOURCE_UNBOUNDED;
00311     optTypes.push_back(optType);
00312 
00313     
00314     quantity.nCachePages = 25;
00315     minReqts.push_back(quantity);
00316     quantity.nCachePages = 42;
00317     optReqts.push_back(quantity);
00318     quantity.nCachePages = 29;
00319     expected.push_back(quantity);
00320     optType = EXEC_RESOURCE_ESTIMATE;
00321     optTypes.push_back(optType);
00322 
00323     testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00324 }
00325 
00330 void ExecStreamGovernorTest::testMinGreaterAllocation()
00331 {
00332     
00333     
00334     ExecStreamResourceQuantity quantity;
00335     quantity.nCachePages = 200;
00336     pResourceGovernor->setResourceAvailability(
00337         quantity, EXEC_RESOURCE_CACHE_PAGES);
00338     ExecStreamResourceKnobs knob;
00339     knob.expectedConcurrentStatements = 2;
00340     pResourceGovernor->setResourceKnob(
00341         knob, EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS);
00342 
00343     uint nProducers = 2;
00344     std::vector<ExecStreamResourceQuantity> minReqts;
00345     std::vector<ExecStreamResourceQuantity> optReqts;
00346     std::vector<ExecStreamResourceQuantity> expected;
00347     std::vector<ExecStreamResourceSettingType> optTypes;
00348 
00349     ExecStreamResourceSettingType optType;
00350 
00351     
00352     quantity.nCachePages = 50;
00353     minReqts.push_back(quantity);
00354     optReqts.push_back(quantity);
00355     expected.push_back(quantity);
00356     optType = EXEC_RESOURCE_ACCURATE;
00357     optTypes.push_back(optType);
00358 
00359     
00360     quantity.nCachePages = 55;
00361     minReqts.push_back(quantity);
00362     optReqts.push_back(quantity);
00363     expected.push_back(quantity);
00364     optType = EXEC_RESOURCE_ACCURATE;
00365     optTypes.push_back(optType);
00366 
00367     testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00368 }
00369 
00374 void ExecStreamGovernorTest::testMinEqualAllocation()
00375 {
00376     uint nProducers = 2;
00377     std::vector<ExecStreamResourceQuantity> minReqts;
00378     std::vector<ExecStreamResourceQuantity> optReqts;
00379     std::vector<ExecStreamResourceQuantity> expected;
00380     std::vector<ExecStreamResourceSettingType> optTypes;
00381 
00382     ExecStreamResourceQuantity quantity;
00383     ExecStreamResourceSettingType optType;
00384 
00385     
00386     
00387     quantity.nCachePages = 50;
00388     minReqts.push_back(quantity);
00389     quantity.nCachePages = 60;
00390     optReqts.push_back(quantity);
00391     quantity.nCachePages = 50;
00392     expected.push_back(quantity);
00393     optType = EXEC_RESOURCE_ACCURATE;
00394     optTypes.push_back(optType);
00395 
00396     
00397     quantity.nCachePages = 45;
00398     minReqts.push_back(quantity);
00399     quantity.nCachePages = 50;
00400     optReqts.push_back(quantity);
00401     quantity.nCachePages = 45;
00402     expected.push_back(quantity);
00403     optType = EXEC_RESOURCE_ACCURATE;
00404     optTypes.push_back(optType);
00405 
00406     testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00407 }
00408 
00413 void ExecStreamGovernorTest::testMinGreaterAvailable()
00414 {
00415     uint nProducers = 2;
00416     std::vector<ExecStreamResourceQuantity> minReqts;
00417     std::vector<ExecStreamResourceQuantity> optReqts;
00418     std::vector<ExecStreamResourceQuantity> expected;
00419     std::vector<ExecStreamResourceSettingType> optTypes;
00420 
00421     ExecStreamResourceQuantity quantity;
00422     ExecStreamResourceSettingType optType;
00423 
00424     
00425     
00426     quantity.nCachePages = 50;
00427     minReqts.push_back(quantity);
00428     optReqts.push_back(quantity);
00429     expected.push_back(quantity);
00430     optType = EXEC_RESOURCE_ACCURATE;
00431     optTypes.push_back(optType);
00432 
00433     
00434     quantity.nCachePages = 46;
00435     minReqts.push_back(quantity);
00436     optReqts.push_back(quantity);
00437     expected.push_back(quantity);
00438     optType = EXEC_RESOURCE_ACCURATE;
00439     optTypes.push_back(optType);
00440 
00441     testGovernor(nProducers, minReqts, optReqts, optTypes, expected, true);
00442 }
00443 
00449 void ExecStreamGovernorTest::testReturnResources()
00450 {
00451     uint nProducers = 2;
00452     std::vector<ExecStreamResourceQuantity> minReqts;
00453     std::vector<ExecStreamResourceQuantity> optReqts;
00454     std::vector<ExecStreamResourceQuantity> expected;
00455     std::vector<ExecStreamResourceSettingType> optTypes;
00456 
00457     ExecStreamResourceQuantity quantity;
00458     ExecStreamResourceSettingType optType;
00459 
00460     
00461     quantity.nCachePages = 45;
00462     minReqts.push_back(quantity);
00463     optReqts.push_back(quantity);
00464     expected.push_back(quantity);
00465     optType = EXEC_RESOURCE_ACCURATE;
00466     optTypes.push_back(optType);
00467 
00468     
00469     quantity.nCachePages = 45;
00470     minReqts.push_back(quantity);
00471     optReqts.push_back(quantity);
00472     expected.push_back(quantity);
00473     optType = EXEC_RESOURCE_ACCURATE;
00474     optTypes.push_back(optType);
00475 
00476     testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00477 
00478     resetExecStreamTest();
00479     minReqts.clear();
00480     optReqts.clear();
00481     expected.clear();
00482     optTypes.clear();
00483 
00484     
00485     quantity.nCachePages = 45;
00486     minReqts.push_back(quantity);
00487     optReqts.push_back(quantity);
00488     expected.push_back(quantity);
00489     optType = EXEC_RESOURCE_ACCURATE;
00490     optTypes.push_back(optType);
00491 
00492     
00493     quantity.nCachePages = 50;
00494     minReqts.push_back(quantity);
00495     optReqts.push_back(quantity);
00496     expected.push_back(quantity);
00497     optType = EXEC_RESOURCE_ACCURATE;
00498     optTypes.push_back(optType);
00499 
00500     testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00501 }
00502 
00503 void ExecStreamGovernorTest::testGovernor(
00504     uint nProducers,
00505     std::vector<ExecStreamResourceQuantity> const &minReqts,
00506     std::vector<ExecStreamResourceQuantity> const &optReqts,
00507     std::vector<ExecStreamResourceSettingType> optTypes,
00508     std::vector<ExecStreamResourceQuantity> expected,
00509     bool exception)
00510 {
00511     StandardTypeDescriptorFactory stdTypeFactory;
00512     TupleAttributeDescriptor int8AttrDesc(
00513         stdTypeFactory.newDataType(STANDARD_TYPE_INT_8));
00514 
00515     std::vector<ExecStreamEmbryo> producerStreamEmbryos;
00516     for (uint i = 0; i < nProducers; i++) {
00517         MockResourceExecStreamParams producerParams;
00518         producerParams.minReqt = minReqts[i];
00519         producerParams.optReqt = optReqts[i];
00520         producerParams.optTypeInput = optTypes[i];
00521         producerParams.expected = expected[i];
00522 
00523         
00524         
00525         producerParams.scratchAccessor =
00526             pSegmentFactory->newScratchSegment(pCache, expected[i].nCachePages);
00527         producerParams.pCacheAccessor = pCache;
00528         producerParams.outputTupleDesc.push_back(int8AttrDesc);
00529 
00530         ExecStreamEmbryo producerStreamEmbryo;
00531         producerStreamEmbryo.init(
00532             new MockResourceExecStream(), producerParams);
00533         std::ostringstream oss;
00534         oss << "MockResourceExecStream" << "#" << i;
00535         producerStreamEmbryo.getStream()->setName(oss.str());
00536         producerStreamEmbryos.push_back(producerStreamEmbryo);
00537     }
00538 
00539     BarrierExecStreamParams barrierParams;
00540     barrierParams.outputTupleDesc.push_back(int8AttrDesc);
00541     barrierParams.returnMode = BARRIER_RET_ANY_INPUT;
00542 
00543     ExecStreamEmbryo barrierStreamEmbryo;
00544     barrierStreamEmbryo.init(new BarrierExecStream(), barrierParams);
00545     barrierStreamEmbryo.getStream()->setName("BarrierExecStream");
00546 
00547     SharedExecStream pOutputStream = prepareConfluenceGraph(
00548         producerStreamEmbryos, barrierStreamEmbryo);
00549 
00550     int8_t expectedOutput = 1;
00551     TupleData expectedTuple;
00552     expectedTuple.compute(barrierParams.outputTupleDesc);
00553     expectedTuple[0].pData = (PConstBuffer) &expectedOutput;
00554 
00555     
00556     try {
00557         verifyConstantOutput(*pOutputStream, expectedTuple, 1);
00558         if (exception) {
00559             BOOST_FAIL("Cache memory not exhausted");
00560         }
00561     } catch (FennelExcn &ex) {
00562         std::string errMsg = ex.getMessage();
00563         if (errMsg.compare(ScratchMemExcn().getMessage()) != 0) {
00564             BOOST_FAIL("Wrong exception returned");
00565         }
00566     }
00567 }
00568 
00569 FENNEL_UNIT_TEST_SUITE(ExecStreamGovernorTest);
00570 
00571