00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/test/ExecStreamUnitTestBase.h"
00026 #include "fennel/segment/ScratchMemExcn.h"
00027 #include "fennel/exec/ExecStreamScheduler.h"
00028 #include "fennel/exec/ExecStream.h"
00029 #include "fennel/exec/ExecStreamGraph.h"
00030 #include "fennel/exec/ExecStreamBufAccessor.h"
00031 #include "fennel/exec/ExecStreamGovernor.h"
00032 #include "fennel/exec/MockResourceExecStream.h"
00033 #include "fennel/exec/BarrierExecStream.h"
00034 #include "fennel/exec/ExecStreamEmbryo.h"
00035 #include "fennel/tuple/StandardTypeDescriptor.h"
00036
00037 #include <boost/test/test_tools.hpp>
00038
00039 using namespace fennel;
00040
00044 class ExecStreamGovernorTest : public ExecStreamUnitTestBase
00045 {
00061 void testGovernor(
00062 uint nProducers,
00063 std::vector<ExecStreamResourceQuantity> const &minReqts,
00064 std::vector<ExecStreamResourceQuantity> const &optReqts,
00065 std::vector<ExecStreamResourceSettingType> optTypes,
00066 std::vector<ExecStreamResourceQuantity> expected,
00067 bool exception = false);
00068
00069 public:
00070 explicit ExecStreamGovernorTest()
00071 {
00072 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testOptLessAccurate);
00073 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testOptLessEstimate);
00074 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testOptEqualEstimate);
00075 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testInBetween);
00076 FENNEL_UNIT_TEST_CASE(
00077 ExecStreamGovernorTest, testMinEqualAllocation);
00078 FENNEL_UNIT_TEST_CASE(
00079 ExecStreamGovernorTest, testMinGreaterAllocation);
00080 FENNEL_UNIT_TEST_CASE(
00081 ExecStreamGovernorTest, testMinGreaterAvailable);
00082 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testReturnResources);
00083 }
00084
00085 void testOptLessAccurate();
00086 void testOptLessEstimate();
00087 void testOptEqualEstimate();
00088 void testInBetween();
00089 void testMinGreaterAllocation();
00090 void testMinEqualAllocation();
00091 void testMinGreaterAvailable();
00092 void testReturnResources();
00093
00094 virtual void testCaseSetUp();
00095 };
00096
00097 void ExecStreamGovernorTest::testCaseSetUp()
00098 {
00099 ExecStreamUnitTestBase::testCaseSetUp();
00100
00101
00102 ExecStreamResourceQuantity quantity;
00103 quantity.nCachePages = 100;
00104 pResourceGovernor->setResourceAvailability(
00105 quantity, EXEC_RESOURCE_CACHE_PAGES);
00106
00107 ExecStreamResourceKnobs knob;
00108 knob.expectedConcurrentStatements = 1;
00109 pResourceGovernor->setResourceKnob(
00110 knob, EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS);
00111 }
00112
00118 void ExecStreamGovernorTest::testOptLessAccurate()
00119 {
00120 uint nProducers = 2;
00121 std::vector<ExecStreamResourceQuantity> minReqts;
00122 std::vector<ExecStreamResourceQuantity> optReqts;
00123 std::vector<ExecStreamResourceQuantity> expected;
00124 std::vector<ExecStreamResourceSettingType> optTypes;
00125
00126 ExecStreamResourceQuantity quantity;
00127 ExecStreamResourceSettingType optType;
00128
00129
00130 quantity.nCachePages = 10;
00131 minReqts.push_back(quantity);
00132 quantity.nCachePages = 15;
00133 optReqts.push_back(quantity);
00134 expected.push_back(quantity);
00135 optType = EXEC_RESOURCE_ACCURATE;
00136 optTypes.push_back(optType);
00137
00138
00139 quantity.nCachePages = 20;
00140 minReqts.push_back(quantity);
00141 quantity.nCachePages = 40;
00142 optReqts.push_back(quantity);
00143 expected.push_back(quantity);
00144 optType = EXEC_RESOURCE_ACCURATE;
00145 optTypes.push_back(optType);
00146
00147 testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00148 }
00149
00154 void ExecStreamGovernorTest::testOptLessEstimate()
00155 {
00156 uint nProducers = 4;
00157 std::vector<ExecStreamResourceQuantity> minReqts;
00158 std::vector<ExecStreamResourceQuantity> optReqts;
00159 std::vector<ExecStreamResourceQuantity> expected;
00160 std::vector<ExecStreamResourceSettingType> optTypes;
00161
00162 ExecStreamResourceQuantity quantity;
00163 ExecStreamResourceSettingType optType;
00164
00165
00166
00167
00168
00169 quantity.nCachePages = 10;
00170 minReqts.push_back(quantity);
00171 quantity.nCachePages = 11;
00172 optReqts.push_back(quantity);
00173 expected.push_back(quantity);
00174 optType = EXEC_RESOURCE_ACCURATE;
00175 optTypes.push_back(optType);
00176
00177
00178 quantity.nCachePages = 15;
00179 minReqts.push_back(quantity);
00180 quantity.nCachePages = 17;
00181 optReqts.push_back(quantity);
00182 expected.push_back(quantity);
00183 optType = EXEC_RESOURCE_ACCURATE;
00184 optTypes.push_back(optType);
00185
00186
00187 quantity.nCachePages = 20;
00188 minReqts.push_back(quantity);
00189 quantity.nCachePages = 23;
00190 optReqts.push_back(quantity);
00191 quantity.nCachePages = 38;
00192 expected.push_back(quantity);
00193 optType = EXEC_RESOURCE_ESTIMATE;
00194 optTypes.push_back(optType);
00195
00196
00197 quantity.nCachePages = 25;
00198 minReqts.push_back(quantity);
00199 quantity.nCachePages = 29;
00200 optReqts.push_back(quantity);
00201 expected.push_back(quantity);
00202 optType = EXEC_RESOURCE_ACCURATE;
00203 optTypes.push_back(optType);
00204
00205 testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00206 }
00207
00212 void ExecStreamGovernorTest::testOptEqualEstimate()
00213 {
00214 uint nProducers = 4;
00215 std::vector<ExecStreamResourceQuantity> minReqts;
00216 std::vector<ExecStreamResourceQuantity> optReqts;
00217 std::vector<ExecStreamResourceQuantity> expected;
00218 std::vector<ExecStreamResourceSettingType> optTypes;
00219
00220 ExecStreamResourceQuantity quantity;
00221 ExecStreamResourceSettingType optType;
00222
00223
00224
00225
00226
00227 quantity.nCachePages = 10;
00228 minReqts.push_back(quantity);
00229 quantity.nCachePages = 20;
00230 optReqts.push_back(quantity);
00231 expected.push_back(quantity);
00232 optType = EXEC_RESOURCE_ESTIMATE;
00233 optTypes.push_back(optType);
00234
00235
00236 quantity.nCachePages = 15;
00237 minReqts.push_back(quantity);
00238 quantity.nCachePages = 17;
00239 optReqts.push_back(quantity);
00240 expected.push_back(quantity);
00241 optType = EXEC_RESOURCE_ACCURATE;
00242 optTypes.push_back(optType);
00243
00244
00245 quantity.nCachePages = 20;
00246 minReqts.push_back(quantity);
00247 quantity.nCachePages = 23;
00248 optReqts.push_back(quantity);
00249 expected.push_back(quantity);
00250 optType = EXEC_RESOURCE_ACCURATE;
00251 optTypes.push_back(optType);
00252
00253
00254 quantity.nCachePages = 25;
00255 minReqts.push_back(quantity);
00256 quantity.nCachePages = 35;
00257 optReqts.push_back(quantity);
00258 expected.push_back(quantity);
00259 optType = EXEC_RESOURCE_ESTIMATE;
00260 optTypes.push_back(optType);
00261
00262 testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00263 }
00264
00269 void ExecStreamGovernorTest::testInBetween()
00270 {
00271 uint nProducers = 4;
00272 std::vector<ExecStreamResourceQuantity> minReqts;
00273 std::vector<ExecStreamResourceQuantity> optReqts;
00274 std::vector<ExecStreamResourceQuantity> expected;
00275 std::vector<ExecStreamResourceSettingType> optTypes;
00276
00277 ExecStreamResourceQuantity quantity;
00278 ExecStreamResourceSettingType optType;
00279
00280
00281
00282
00283
00284 quantity.nCachePages = 10;
00285 minReqts.push_back(quantity);
00286 quantity.nCachePages = 25;
00287 optReqts.push_back(quantity);
00288 quantity.nCachePages = 14;
00289 expected.push_back(quantity);
00290 optType = EXEC_RESOURCE_ACCURATE;
00291 optTypes.push_back(optType);
00292
00293
00294 quantity.nCachePages = 15;
00295 minReqts.push_back(quantity);
00296 quantity.nCachePages = 31;
00297 optReqts.push_back(quantity);
00298 quantity.nCachePages = 19;
00299 expected.push_back(quantity);
00300 optType = EXEC_RESOURCE_ESTIMATE;
00301 optTypes.push_back(optType);
00302
00303
00304 quantity.nCachePages = 20;
00305 minReqts.push_back(quantity);
00306 quantity.nCachePages = 0;
00307 optReqts.push_back(quantity);
00308 quantity.nCachePages = 31;
00309 expected.push_back(quantity);
00310 optType = EXEC_RESOURCE_UNBOUNDED;
00311 optTypes.push_back(optType);
00312
00313
00314 quantity.nCachePages = 25;
00315 minReqts.push_back(quantity);
00316 quantity.nCachePages = 42;
00317 optReqts.push_back(quantity);
00318 quantity.nCachePages = 29;
00319 expected.push_back(quantity);
00320 optType = EXEC_RESOURCE_ESTIMATE;
00321 optTypes.push_back(optType);
00322
00323 testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00324 }
00325
00330 void ExecStreamGovernorTest::testMinGreaterAllocation()
00331 {
00332
00333
00334 ExecStreamResourceQuantity quantity;
00335 quantity.nCachePages = 200;
00336 pResourceGovernor->setResourceAvailability(
00337 quantity, EXEC_RESOURCE_CACHE_PAGES);
00338 ExecStreamResourceKnobs knob;
00339 knob.expectedConcurrentStatements = 2;
00340 pResourceGovernor->setResourceKnob(
00341 knob, EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS);
00342
00343 uint nProducers = 2;
00344 std::vector<ExecStreamResourceQuantity> minReqts;
00345 std::vector<ExecStreamResourceQuantity> optReqts;
00346 std::vector<ExecStreamResourceQuantity> expected;
00347 std::vector<ExecStreamResourceSettingType> optTypes;
00348
00349 ExecStreamResourceSettingType optType;
00350
00351
00352 quantity.nCachePages = 50;
00353 minReqts.push_back(quantity);
00354 optReqts.push_back(quantity);
00355 expected.push_back(quantity);
00356 optType = EXEC_RESOURCE_ACCURATE;
00357 optTypes.push_back(optType);
00358
00359
00360 quantity.nCachePages = 55;
00361 minReqts.push_back(quantity);
00362 optReqts.push_back(quantity);
00363 expected.push_back(quantity);
00364 optType = EXEC_RESOURCE_ACCURATE;
00365 optTypes.push_back(optType);
00366
00367 testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00368 }
00369
00374 void ExecStreamGovernorTest::testMinEqualAllocation()
00375 {
00376 uint nProducers = 2;
00377 std::vector<ExecStreamResourceQuantity> minReqts;
00378 std::vector<ExecStreamResourceQuantity> optReqts;
00379 std::vector<ExecStreamResourceQuantity> expected;
00380 std::vector<ExecStreamResourceSettingType> optTypes;
00381
00382 ExecStreamResourceQuantity quantity;
00383 ExecStreamResourceSettingType optType;
00384
00385
00386
00387 quantity.nCachePages = 50;
00388 minReqts.push_back(quantity);
00389 quantity.nCachePages = 60;
00390 optReqts.push_back(quantity);
00391 quantity.nCachePages = 50;
00392 expected.push_back(quantity);
00393 optType = EXEC_RESOURCE_ACCURATE;
00394 optTypes.push_back(optType);
00395
00396
00397 quantity.nCachePages = 45;
00398 minReqts.push_back(quantity);
00399 quantity.nCachePages = 50;
00400 optReqts.push_back(quantity);
00401 quantity.nCachePages = 45;
00402 expected.push_back(quantity);
00403 optType = EXEC_RESOURCE_ACCURATE;
00404 optTypes.push_back(optType);
00405
00406 testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00407 }
00408
00413 void ExecStreamGovernorTest::testMinGreaterAvailable()
00414 {
00415 uint nProducers = 2;
00416 std::vector<ExecStreamResourceQuantity> minReqts;
00417 std::vector<ExecStreamResourceQuantity> optReqts;
00418 std::vector<ExecStreamResourceQuantity> expected;
00419 std::vector<ExecStreamResourceSettingType> optTypes;
00420
00421 ExecStreamResourceQuantity quantity;
00422 ExecStreamResourceSettingType optType;
00423
00424
00425
00426 quantity.nCachePages = 50;
00427 minReqts.push_back(quantity);
00428 optReqts.push_back(quantity);
00429 expected.push_back(quantity);
00430 optType = EXEC_RESOURCE_ACCURATE;
00431 optTypes.push_back(optType);
00432
00433
00434 quantity.nCachePages = 46;
00435 minReqts.push_back(quantity);
00436 optReqts.push_back(quantity);
00437 expected.push_back(quantity);
00438 optType = EXEC_RESOURCE_ACCURATE;
00439 optTypes.push_back(optType);
00440
00441 testGovernor(nProducers, minReqts, optReqts, optTypes, expected, true);
00442 }
00443
00449 void ExecStreamGovernorTest::testReturnResources()
00450 {
00451 uint nProducers = 2;
00452 std::vector<ExecStreamResourceQuantity> minReqts;
00453 std::vector<ExecStreamResourceQuantity> optReqts;
00454 std::vector<ExecStreamResourceQuantity> expected;
00455 std::vector<ExecStreamResourceSettingType> optTypes;
00456
00457 ExecStreamResourceQuantity quantity;
00458 ExecStreamResourceSettingType optType;
00459
00460
00461 quantity.nCachePages = 45;
00462 minReqts.push_back(quantity);
00463 optReqts.push_back(quantity);
00464 expected.push_back(quantity);
00465 optType = EXEC_RESOURCE_ACCURATE;
00466 optTypes.push_back(optType);
00467
00468
00469 quantity.nCachePages = 45;
00470 minReqts.push_back(quantity);
00471 optReqts.push_back(quantity);
00472 expected.push_back(quantity);
00473 optType = EXEC_RESOURCE_ACCURATE;
00474 optTypes.push_back(optType);
00475
00476 testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00477
00478 resetExecStreamTest();
00479 minReqts.clear();
00480 optReqts.clear();
00481 expected.clear();
00482 optTypes.clear();
00483
00484
00485 quantity.nCachePages = 45;
00486 minReqts.push_back(quantity);
00487 optReqts.push_back(quantity);
00488 expected.push_back(quantity);
00489 optType = EXEC_RESOURCE_ACCURATE;
00490 optTypes.push_back(optType);
00491
00492
00493 quantity.nCachePages = 50;
00494 minReqts.push_back(quantity);
00495 optReqts.push_back(quantity);
00496 expected.push_back(quantity);
00497 optType = EXEC_RESOURCE_ACCURATE;
00498 optTypes.push_back(optType);
00499
00500 testGovernor(nProducers, minReqts, optReqts, optTypes, expected);
00501 }
00502
00503 void ExecStreamGovernorTest::testGovernor(
00504 uint nProducers,
00505 std::vector<ExecStreamResourceQuantity> const &minReqts,
00506 std::vector<ExecStreamResourceQuantity> const &optReqts,
00507 std::vector<ExecStreamResourceSettingType> optTypes,
00508 std::vector<ExecStreamResourceQuantity> expected,
00509 bool exception)
00510 {
00511 StandardTypeDescriptorFactory stdTypeFactory;
00512 TupleAttributeDescriptor int8AttrDesc(
00513 stdTypeFactory.newDataType(STANDARD_TYPE_INT_8));
00514
00515 std::vector<ExecStreamEmbryo> producerStreamEmbryos;
00516 for (uint i = 0; i < nProducers; i++) {
00517 MockResourceExecStreamParams producerParams;
00518 producerParams.minReqt = minReqts[i];
00519 producerParams.optReqt = optReqts[i];
00520 producerParams.optTypeInput = optTypes[i];
00521 producerParams.expected = expected[i];
00522
00523
00524
00525 producerParams.scratchAccessor =
00526 pSegmentFactory->newScratchSegment(pCache, expected[i].nCachePages);
00527 producerParams.pCacheAccessor = pCache;
00528 producerParams.outputTupleDesc.push_back(int8AttrDesc);
00529
00530 ExecStreamEmbryo producerStreamEmbryo;
00531 producerStreamEmbryo.init(
00532 new MockResourceExecStream(), producerParams);
00533 std::ostringstream oss;
00534 oss << "MockResourceExecStream" << "#" << i;
00535 producerStreamEmbryo.getStream()->setName(oss.str());
00536 producerStreamEmbryos.push_back(producerStreamEmbryo);
00537 }
00538
00539 BarrierExecStreamParams barrierParams;
00540 barrierParams.outputTupleDesc.push_back(int8AttrDesc);
00541 barrierParams.returnMode = BARRIER_RET_ANY_INPUT;
00542
00543 ExecStreamEmbryo barrierStreamEmbryo;
00544 barrierStreamEmbryo.init(new BarrierExecStream(), barrierParams);
00545 barrierStreamEmbryo.getStream()->setName("BarrierExecStream");
00546
00547 SharedExecStream pOutputStream = prepareConfluenceGraph(
00548 producerStreamEmbryos, barrierStreamEmbryo);
00549
00550 int8_t expectedOutput = 1;
00551 TupleData expectedTuple;
00552 expectedTuple.compute(barrierParams.outputTupleDesc);
00553 expectedTuple[0].pData = (PConstBuffer) &expectedOutput;
00554
00555
00556 try {
00557 verifyConstantOutput(*pOutputStream, expectedTuple, 1);
00558 if (exception) {
00559 BOOST_FAIL("Cache memory not exhausted");
00560 }
00561 } catch (FennelExcn &ex) {
00562 std::string errMsg = ex.getMessage();
00563 if (errMsg.compare(ScratchMemExcn().getMessage()) != 0) {
00564 BOOST_FAIL("Wrong exception returned");
00565 }
00566 }
00567 }
00568
00569 FENNEL_UNIT_TEST_SUITE(ExecStreamGovernorTest);
00570
00571