00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/hashexe/LhxAggExecStream.h"
00025 #include "fennel/segment/Segment.h"
00026 #include "fennel/exec/ExecStreamBufAccessor.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028
00029 using namespace std;
00030
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxAggExecStream.cpp#2 $");
00032
00033 void LhxAggExecStream::prepare(
00034 LhxAggExecStreamParams const ¶ms)
00035 {
00036 ConduitExecStream::prepare(params);
00037
00038 setHashInfo(params);
00039 setAggComputers(hashInfo, params.aggInvocations);
00040
00041
00042
00043
00044 forcePartitionLevel = params.forcePartitionLevel;
00045 enableSubPartStat = params.enableSubPartStat;
00046
00047 buildInputIndex = hashInfo.inputDesc.size() - 1;
00048
00049
00050
00051
00052
00053 hashTable.calculateSize(hashInfo, buildInputIndex, numBlocksHashTable);
00054
00055 TupleDescriptor outputDesc;
00056
00057 outputDesc = hashInfo.inputDesc[buildInputIndex];
00058
00059 if (!params.outputTupleDesc.empty()) {
00060 assert (outputDesc == params.outputTupleDesc);
00061 }
00062
00063 outputTuple.compute(outputDesc);
00064 pOutAccessor->setTupleShape(outputDesc);
00065
00066
00067
00068
00069 uint numInputs = 1;
00070 numMiscCacheBlocks = LhxPlan::LhxChildPartCount * numInputs;
00071 }
00072
00073 void LhxAggExecStream::getResourceRequirements(
00074 ExecStreamResourceQuantity &minQuantity,
00075 ExecStreamResourceQuantity &optQuantity,
00076 ExecStreamResourceSettingType &optType)
00077 {
00078 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
00079
00080 uint minPages =
00081 LhxHashTable::LhxHashTableMinPages * LhxPlan::LhxChildPartCount
00082 + numMiscCacheBlocks;
00083 minQuantity.nCachePages += minPages;
00084
00085 if (isMAXU(numBlocksHashTable)) {
00086 optType = EXEC_RESOURCE_UNBOUNDED;
00087 } else {
00088
00089
00090 optQuantity.nCachePages += std::max(minPages + 1, numBlocksHashTable);
00091 optType = EXEC_RESOURCE_ESTIMATE;
00092 }
00093 }
00094
00095 void LhxAggExecStream::setResourceAllocation(
00096 ExecStreamResourceQuantity &quantity)
00097 {
00098 ConduitExecStream::setResourceAllocation(quantity);
00099 hashInfo.numCachePages = quantity.nCachePages - numMiscCacheBlocks;
00100 }
00101
00102
00103 void LhxAggExecStream::open(bool restart)
00104 {
00105 ConduitExecStream::open(restart);
00106
00107 if (restart) {
00108 hashTable.releaseResources();
00109 }
00110
00111 uint partitionLevel = 0;
00112 hashTable.init(partitionLevel, hashInfo, &aggComputers, buildInputIndex);
00113 hashTableReader.init(&hashTable, hashInfo, buildInputIndex);
00114
00115 bool status = hashTable.allocateResources();
00116
00117 assert(status);
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130 vector<SharedLhxPartition> partitionList;
00131
00132 buildPart = SharedLhxPartition(new LhxPartition(this));
00133
00134
00135 buildPart->segStream.reset();
00136 buildPart->inputIndex = 0;
00137 partitionList.push_back(buildPart);
00138
00139 rootPlan = SharedLhxPlan(new LhxPlan());
00140 rootPlan->init(
00141 WeakLhxPlan(),
00142 partitionLevel,
00143 partitionList,
00144 enableSubPartStat);
00145
00146
00147
00148
00149 partInfo.init(&hashInfo);
00150
00151
00152
00153
00154 curPlan = rootPlan.get();
00155 isTopPlan = true;
00156
00157 buildReader.open(curPlan->getPartition(buildInputIndex), hashInfo);
00158
00159 aggState = (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00160 }
00161
00162 ExecStreamResult LhxAggExecStream::execute(ExecStreamQuantum const &quantum)
00163 {
00164 while (true) {
00165
00166
00167
00168
00169 switch (aggState) {
00170
00171
00172
00173
00174 case ForcePartitionBuild:
00175 {
00176
00177
00178
00179
00180
00181 inputTuple.compute(buildReader.getTupleDesc());
00182 for (;;) {
00183 if (!buildReader.isTupleConsumptionPending()) {
00184 if (buildReader.getState() == EXECBUF_EOS) {
00185 numTuplesProduced = 0;
00186
00187
00188
00189 aggState = Produce;
00190 break;
00191 }
00192
00193 if (!buildReader.demandData()) {
00194 if (isTopPlan) {
00195
00196
00197
00198
00199 return EXECRC_BUF_UNDERFLOW;
00200 } else {
00201
00202
00203
00204
00205
00206 break;
00207 }
00208 }
00209 buildReader.unmarshalTuple(inputTuple);
00210 }
00211
00212
00213
00214
00215
00216
00217
00218 if (curPlan->getPartitionLevel() < forcePartitionLevel ||
00219 !hashTable.addTuple(inputTuple)) {
00220 if (isTopPlan) {
00221 partInfo.open(
00222 &hashTableReader,
00223 &buildReader,
00224 inputTuple,
00225 &aggComputers);
00226 } else {
00227 partInfo.open(
00228 &hashTableReader,
00229 &buildReader,
00230 inputTuple,
00231 &partialAggComputers);
00232 }
00233 aggState = Partition;
00234 break;
00235 }
00236 buildReader.consumeTuple();
00237 }
00238 break;
00239 }
00240 case Build:
00241 {
00242
00243
00244
00245 inputTuple.compute(buildReader.getTupleDesc());
00246 for (;;) {
00247 if (!buildReader.isTupleConsumptionPending()) {
00248 if (buildReader.getState() == EXECBUF_EOS) {
00249 buildReader.close();
00250 numTuplesProduced = 0;
00251
00252
00253
00254 aggState = Produce;
00255 break;
00256 }
00257
00258 if (!buildReader.demandData()) {
00259 if (isTopPlan) {
00260
00261
00262
00263
00264 return EXECRC_BUF_UNDERFLOW;
00265 } else {
00266
00267
00268
00269
00270
00271 break;
00272 }
00273 }
00274 buildReader.unmarshalTuple(inputTuple);
00275 }
00276
00277
00278
00279
00280 if (!hashTable.addTuple(inputTuple)) {
00281 if (isTopPlan) {
00282 partInfo.open(
00283 &hashTableReader,
00284 &buildReader,
00285 inputTuple,
00286 &aggComputers);
00287 } else {
00288 partInfo.open(
00289 &hashTableReader,
00290 &buildReader,
00291 inputTuple,
00292 &partialAggComputers);
00293 }
00294 aggState = Partition;
00295 break;
00296 }
00297 buildReader.consumeTuple();
00298 }
00299 break;
00300 }
00301 case Partition:
00302 {
00303 for (;;) {
00304 if (curPlan->generatePartitions(hashInfo, partInfo)
00305 == PartitionUnderflow) {
00306
00307
00308
00309 return EXECRC_BUF_UNDERFLOW;
00310 } else {
00311
00312
00313
00314
00315
00316 break;
00317 }
00318 }
00319 partInfo.close();
00320 aggState = CreateChildPlan;
00321 break;
00322 }
00323 case CreateChildPlan:
00324 {
00325
00326
00327
00328 curPlan->createChildren(partInfo, false, false);
00329
00330 FENNEL_TRACE(TRACE_FINE, curPlan->toString());
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340 curPlan = curPlan->getFirstChild().get();
00341 isTopPlan = false;
00342
00343 hashTable.releaseResources();
00344
00345
00346
00347
00348
00349
00350
00351 hashTable.init(
00352 curPlan->getPartitionLevel(),
00353 hashInfo,
00354 &partialAggComputers,
00355 buildInputIndex);
00356 hashTableReader.init(
00357 &hashTable,
00358 hashInfo,
00359 buildInputIndex);
00360
00361 bool status = hashTable.allocateResources();
00362 assert(status);
00363
00364 buildReader.open(
00365 curPlan->getPartition(buildInputIndex),
00366 hashInfo);
00367
00368 aggState =
00369 (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00370 break;
00371 }
00372 case GetNextPlan:
00373 {
00374 hashTable.releaseResources();
00375
00376 checkAbort();
00377
00378 curPlan = curPlan->getNextLeaf();
00379
00380 if (curPlan) {
00381
00382
00383
00384
00385
00386
00387 hashTable.init(
00388 curPlan->getPartitionLevel(),
00389 hashInfo,
00390 &partialAggComputers,
00391 buildInputIndex);
00392 hashTableReader.init(&hashTable, hashInfo, buildInputIndex);
00393 bool status = hashTable.allocateResources();
00394 assert(status);
00395
00396 buildReader.open(
00397 curPlan->getPartition(buildInputIndex),
00398 hashInfo);
00399
00400 aggState =
00401 (forcePartitionLevel > 0) ? ForcePartitionBuild : Build;
00402 } else {
00403 aggState = Done;
00404 }
00405 break;
00406 }
00407 case Produce:
00408 {
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420 if (hashTableReader.getNext(outputTuple)) {
00421 aggState = ProducePending;
00422
00423
00424
00425
00426 nextState = Produce;
00427 } else {
00428 aggState = GetNextPlan;
00429 }
00430 break;
00431 }
00432 case ProducePending:
00433 {
00434 if (pOutAccessor->produceTuple(outputTuple)) {
00435 numTuplesProduced++;
00436 aggState = nextState;
00437 } else {
00438 numTuplesProduced = 0;
00439 return EXECRC_BUF_OVERFLOW;
00440 }
00441
00442
00443
00444
00445
00446 if (numTuplesProduced >= quantum.nTuplesMax) {
00447
00448
00449
00450 numTuplesProduced = 0;
00451 return EXECRC_QUANTUM_EXPIRED;
00452 }
00453 break;
00454 }
00455 case Done:
00456 {
00457 pOutAccessor->markEOS();
00458 return EXECRC_EOS;
00459 }
00460 }
00461 }
00462
00463
00464
00465
00466 assert(false);
00467 }
00468
00469 void LhxAggExecStream::closeImpl()
00470 {
00471 hashTable.releaseResources();
00472 if (rootPlan) {
00473 rootPlan->close();
00474 rootPlan.reset();
00475 }
00476
00477
00478
00479 ConduitExecStream::closeImpl();
00480 }
00481
00482 void LhxAggExecStream::setAggComputers(
00483 LhxHashInfo &hashInfo,
00484 AggInvocationList const &aggInvocations)
00485 {
00486
00487
00488
00489 TupleDescriptor inputDesc = pInAccessor->getTupleDesc();
00490
00491
00492
00493
00494
00495 TupleDescriptor &hashDesc = hashInfo.inputDesc.back();
00496
00497
00498
00499
00500 TupleProjection &aggsProj = hashInfo.aggsProj;
00501
00506 AggFunction partialAggFunction;
00507
00508 uint i = 0;
00509
00510 assert (aggInvocations.size() == aggsProj.size());
00511
00512 for (AggInvocationConstIter pInvocation(aggInvocations.begin());
00513 pInvocation != aggInvocations.end();
00514 ++pInvocation)
00515 {
00516 switch (pInvocation->aggFunction) {
00517 case AGG_FUNC_COUNT:
00518 partialAggFunction = AGG_FUNC_SUM;
00519 break;
00520 case AGG_FUNC_SUM:
00521 case AGG_FUNC_MIN:
00522 case AGG_FUNC_MAX:
00523 case AGG_FUNC_SINGLE_VALUE:
00524 partialAggFunction = pInvocation->aggFunction;
00525 break;
00526 default:
00527 permFail("unknown aggregation function: "
00528 << pInvocation->aggFunction);
00529 break;
00530 }
00531
00532
00533
00534
00535 TupleAttributeDescriptor const *pInputAttr = NULL;
00536 if (pInvocation->iInputAttr != -1) {
00537 pInputAttr = &(inputDesc[pInvocation->iInputAttr]);
00538 }
00539 aggComputers.push_back(
00540 AggComputer::newAggComputer(
00541 pInvocation->aggFunction, pInputAttr));
00542 aggComputers.back().setInputAttrIndex(pInvocation->iInputAttr);
00543
00544
00545
00546
00547 TupleAttributeDescriptor const *pInputAttrPartialAgg =
00548 &(hashDesc[aggsProj[i]]);
00549 partialAggComputers.push_back(
00550 AggComputer::newAggComputer(
00551 partialAggFunction, pInputAttrPartialAgg));
00552 partialAggComputers.back().setInputAttrIndex(aggsProj[i]);
00553 i ++;
00554 }
00555 }
00556
00557 void LhxAggExecStream::setHashInfo(
00558 LhxAggExecStreamParams const ¶ms)
00559 {
00560 TupleDescriptor inputDesc = pInAccessor->getTupleDesc();
00561
00562 hashInfo.streamBufAccessor.push_back(pInAccessor);
00563
00564 hashInfo.cndKeys.push_back(params.cndGroupByKeys);
00565
00566 hashInfo.numRows.push_back(params.numRows);
00567
00568 hashInfo.filterNull.push_back(false);
00569
00570
00571 TupleProjection filterNullKeyProj;
00572 hashInfo.filterNullKeyProj.push_back(filterNullKeyProj);
00573
00574 hashInfo.removeDuplicate.push_back(false);
00575 hashInfo.useJoinFilter.push_back(false);
00576
00577 hashInfo.memSegmentAccessor = params.scratchAccessor;
00578 hashInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor;
00579 hashInfo.externalSegmentAccessor.pSegment = params.pTempSegment;
00580
00581 TupleProjection keyProj;
00582 vector<LhxHashTrim> isKeyColVarChar;
00583
00584 for (int i = 0; i < params.groupByKeyCount; i ++) {
00585 keyProj.push_back(i);
00586
00587
00588
00589
00590 StoredTypeDescriptor::Ordinal ordinal =
00591 inputDesc[i].pTypeDescriptor->getOrdinal();
00592 if (ordinal == STANDARD_TYPE_VARCHAR) {
00593 isKeyColVarChar.push_back(HASH_TRIM_VARCHAR);
00594 } else if (ordinal == STANDARD_TYPE_UNICODE_VARCHAR) {
00595 isKeyColVarChar.push_back(HASH_TRIM_UNICODE_VARCHAR);
00596 } else {
00597 isKeyColVarChar.push_back(HASH_TRIM_NONE);
00598 }
00599 }
00600 hashInfo.keyProj.push_back(keyProj);
00601 hashInfo.isKeyColVarChar.push_back(isKeyColVarChar);
00602
00603
00604
00605
00606 TupleProjection dataProj;
00607 hashInfo.dataProj.push_back(dataProj);
00608
00609
00610
00611
00612 TupleDescriptor keyDesc;
00613 keyDesc.projectFrom(inputDesc, keyProj);
00614
00615
00616
00617
00618 StandardTypeDescriptorFactory stdTypeFactory;
00619 TupleAttributeDescriptor countDesc(
00620 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640 int i = params.groupByKeyCount;
00641 for (AggInvocationConstIter pInvocation(params.aggInvocations.begin());
00642 pInvocation != params.aggInvocations.end();
00643 ++pInvocation)
00644 {
00645 switch (pInvocation->aggFunction) {
00646 case AGG_FUNC_COUNT:
00647 keyDesc.push_back(countDesc);
00648 break;
00649 case AGG_FUNC_SUM:
00650 case AGG_FUNC_MIN:
00651 case AGG_FUNC_MAX:
00652 case AGG_FUNC_SINGLE_VALUE:
00653
00654 keyDesc.push_back(inputDesc[pInvocation->iInputAttr]);
00655 keyDesc.back().isNullable = true;
00656 break;
00657 }
00658 hashInfo.aggsProj.push_back(i++);
00659 }
00660
00661 hashInfo.inputDesc.push_back(keyDesc);
00662 }
00663
00664 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxAggExecStream.cpp#2 $");
00665
00666