00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsClusterAppendExecStream.h"
00024 #include "fennel/lucidera/colstore/LcsClusterNode.h"
00025 #include "fennel/exec/ExecStreamBufAccessor.h"
00026 #include "fennel/btree/BTreeWriter.h"
00027 #include <boost/scoped_array.hpp>
00028
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterAppendExecStream.cpp#23 $");
00030
00031 void LcsClusterAppendExecStream::prepare(
00032 LcsClusterAppendExecStreamParams const ¶ms)
00033 {
00034 BTreeExecStream::prepare(params);
00035 ConduitExecStream::prepare(params);
00036
00037 tableColsTupleDesc = pInAccessor->getTupleDesc();
00038 initTupleLoadParams(params.inputProj);
00039
00040
00041
00042
00043 pInAccessor->bindProjection(params.inputProj);
00044
00045
00046
00047 scratchAccessor = params.scratchAccessor;
00048 bufferLock.accessSegment(scratchAccessor);
00049
00050
00051
00052
00053
00054
00055 TupleDescriptor outputTupleDesc;
00056
00057 outputTupleDesc = pOutAccessor->getTupleDesc();
00058 outputTuple.compute(outputTupleDesc);
00059 outputTuple[0].pData = (PConstBuffer) &numRowCompressed;
00060 if (outputTupleDesc.size() > 1) {
00061 outputTuple[1].pData = (PConstBuffer) &startRow;
00062 }
00063
00064 outputTupleAccessor = & pOutAccessor->getScratchTupleAccessor();
00065
00066 blockSize = treeDescriptor.segmentAccessor.pSegment->getUsablePageSize();
00067
00068 }
00069
00070 void LcsClusterAppendExecStream::initTupleLoadParams(
00071 const TupleProjection &inputProj)
00072 {
00073 numColumns = inputProj.size();
00074 clusterColsTupleDesc.projectFrom(tableColsTupleDesc, inputProj);
00075 clusterColsTupleData.compute(clusterColsTupleDesc);
00076
00077
00078 colTupleDesc.reset(new TupleDescriptor[numColumns]);
00079 for (int i = 0; i < numColumns; i++) {
00080 colTupleDesc[i].push_back(tableColsTupleDesc[inputProj[i]]);
00081 }
00082 }
00083
00084 void LcsClusterAppendExecStream::getResourceRequirements(
00085 ExecStreamResourceQuantity &minQuantity,
00086 ExecStreamResourceQuantity &optQuantity)
00087 {
00088 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity);
00089
00090
00091
00092
00093
00094
00095
00096
00097 minQuantity.nCachePages += (numColumns * 4) + 6;
00098
00099
00100 optQuantity = minQuantity;
00101 }
00102
00103 void LcsClusterAppendExecStream::open(bool restart)
00104 {
00105 BTreeExecStream::open(restart);
00106 ConduitExecStream::open(restart);
00107
00108 if (!restart) {
00109 outputTupleBuffer.reset(
00110 new FixedBuffer[outputTupleAccessor->getMaxByteCount()]);
00111 }
00112
00113 init();
00114 isDone = false;
00115 }
00116
00117 ExecStreamResult LcsClusterAppendExecStream::execute(
00118 ExecStreamQuantum const &quantum)
00119 {
00120 return compress(quantum);
00121 }
00122
00123 void LcsClusterAppendExecStream::closeImpl()
00124 {
00125 BTreeExecStream::closeImpl();
00126 ConduitExecStream::closeImpl();
00127 outputTupleBuffer.reset();
00128 close();
00129 }
00130
00131 void LcsClusterAppendExecStream::init()
00132 {
00133 pIndexBlock = 0;
00134 firstRow = LcsRid(0);
00135 lastRow = LcsRid(0);
00136 startRow = LcsRid(0);
00137 rowCnt = 0;
00138 indexBlockDirty = false;
00139 arraysAlloced = false;
00140 compressCalled = false;
00141 numRowCompressed = 0;
00142 }
00143
00144 ExecStreamResult LcsClusterAppendExecStream::compress(
00145 ExecStreamQuantum const &quantum)
00146 {
00147 uint i, j, k;
00148 bool canFit = false;
00149 bool undoInsert = false;
00150
00151 if (isDone) {
00152
00153 pOutAccessor->markEOS();
00154 return EXECRC_EOS;
00155 }
00156
00157 for (i = 0; i < quantum.nTuplesMax; i++) {
00158
00159
00160
00161 ExecStreamResult rc = getTupleForLoad();
00162
00163
00164 if (rc == EXECRC_EOS) {
00165
00166
00167 if (rowCnt) {
00168
00169
00170 if (rowCnt < 8 || (rowCnt % 8) == 0) {
00171 writeBatch(true);
00172 } else {
00173 writeBatch(false);
00174 }
00175 }
00176
00177
00178
00179
00180
00181 writeBlock();
00182 if (lcsBlockBuilder) {
00183 lcsBlockBuilder->close();
00184 }
00185 close();
00186
00187
00188
00189
00190
00191
00192 outputTupleAccessor->marshal(outputTuple, outputTupleBuffer.get());
00193 pOutAccessor->provideBufferForConsumption(
00194 outputTupleBuffer.get(),
00195 outputTupleBuffer.get() +
00196 outputTupleAccessor->getCurrentByteCount());
00197
00198 isDone = true;
00199 return EXECRC_BUF_OVERFLOW;
00200 } else if (rc != EXECRC_YIELD) {
00201 return rc;
00202 }
00203
00204
00205
00206
00207 undoInsert = false;
00208
00209 for (j = 0; j < numColumns; j++) {
00210 hash[j].insert(
00211 clusterColsTupleData[j], &hashValOrd[j], &undoInsert);
00212
00213 if (undoInsert) {
00214
00215
00216
00217 for (k = 0; k <= j; k++) {
00218 hash[k].undoInsert(clusterColsTupleData[k]);
00219 }
00220 break;
00221 }
00222 }
00223
00224
00225
00226
00227
00228 if (!undoInsert) {
00229 canFit = true;
00230 } else {
00231 canFit = false;
00232 }
00233
00234 if (canFit) {
00235
00236 for (j = 0; j < numColumns; j++) {
00237 addValueOrdinal(j, hashValOrd[j].getValOrd());
00238 }
00239
00240 rowCnt++;
00241
00242
00243 if (isRowArrayFull()) {
00244 writeBatch(false);
00245 }
00246 } else {
00247
00248 writeBatch(false);
00249
00250
00251
00252
00253
00254 continue;
00255 }
00256
00257
00258
00259 postProcessTuple();
00260 }
00261
00262 return EXECRC_QUANTUM_EXPIRED;
00263 }
00264
00265 ExecStreamResult LcsClusterAppendExecStream::getTupleForLoad()
00266 {
00267 if (pInAccessor->getState() == EXECBUF_EOS) {
00268 return EXECRC_EOS;
00269 }
00270
00271 if (!pInAccessor->demandData()) {
00272 return EXECRC_BUF_UNDERFLOW;
00273 }
00274
00275
00276 initLoad();
00277
00278 if (!pInAccessor->isTupleConsumptionPending()) {
00279 pInAccessor->unmarshalProjectedTuple(clusterColsTupleData);
00280 }
00281
00282 return EXECRC_YIELD;
00283 }
00284
00285 void LcsClusterAppendExecStream::initLoad()
00286 {
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297 if (!compressCalled) {
00298 compressCalled = true;
00299
00300
00301
00302
00303
00304 lcsBlockBuilder = SharedLcsClusterNodeWriter(
00305 new LcsClusterNodeWriter(
00306 treeDescriptor,
00307 scratchAccessor,
00308 clusterColsTupleDesc,
00309 getSharedTraceTarget(),
00310 getTraceSourceName()));
00311
00312 allocArrays();
00313
00314
00315 for (uint i = 0; i < numColumns; i++) {
00316 bufferLock.allocatePage();
00317 rowBlock[i] = bufferLock.getPage().getWritableData();
00318 bufferLock.unlock();
00319
00320 bufferLock.allocatePage();
00321 hashBlock[i] = bufferLock.getPage().getWritableData();
00322 bufferLock.unlock();
00323
00324 bufferLock.allocatePage();
00325 builderBlock[i] = bufferLock.getPage().getWritableData();
00326 bufferLock.unlock();
00327
00328 hash[i].init(
00329 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize);
00330 }
00331
00332 nRowsMax = blockSize / sizeof(uint16_t);
00333
00334
00335
00336 PLcsClusterNode pExistingIndexBlock;
00337
00338 bool found = getLastBlock(pExistingIndexBlock);
00339 if (found) {
00340
00341 pIndexBlock = pExistingIndexBlock;
00342
00343
00344
00345 loadExistingBlock();
00346 } else {
00347
00348 startNewBlock();
00349 startRow = LcsRid(0);
00350 }
00351 }
00352 }
00353
00354 void LcsClusterAppendExecStream::postProcessTuple()
00355 {
00356 pInAccessor->consumeTuple();
00357 numRowCompressed++;
00358 }
00359
00360 void LcsClusterAppendExecStream::close()
00361 {
00362 if (scratchAccessor.pSegment) {
00363 scratchAccessor.pSegment->deallocatePageRange(
00364 NULL_PAGE_ID, NULL_PAGE_ID);
00365 }
00366 rowBlock.reset();
00367 hashBlock.reset();
00368 builderBlock.reset();
00369
00370 hash.reset();
00371 hashValOrd.reset();
00372 tempBuf.reset();
00373 maxValueSize.reset();
00374
00375 lcsBlockBuilder.reset();
00376 }
00377
00378 void LcsClusterAppendExecStream::startNewBlock()
00379 {
00380 firstRow = lastRow;
00381
00382
00383 pIndexBlock = lcsBlockBuilder->allocateClusterPage(firstRow);
00384
00385
00386 lcsBlockBuilder->init(
00387 numColumns, reinterpret_cast<uint8_t *> (pIndexBlock),
00388 builderBlock.get(), blockSize);
00389
00390
00391 for (uint i = 0; i < numColumns; i++) {
00392 hash[i].init(
00393 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize);
00394 }
00395
00396
00397
00398
00399
00400 if (rowCnt >= 8) {
00401 rowCnt = 0;
00402 }
00403 indexBlockDirty = false;
00404
00405
00406 lcsBlockBuilder->openNew(firstRow);
00407 }
00408
00409 bool LcsClusterAppendExecStream::getLastBlock(PLcsClusterNode &pBlock)
00410 {
00411 if (!lcsBlockBuilder->getLastClusterPageForWrite(pBlock, firstRow)) {
00412 return false;
00413 } else {
00414 return true;
00415 }
00416 }
00417
00418 void LcsClusterAppendExecStream::loadExistingBlock()
00419 {
00420 boost::scoped_array<uint> numVals;
00421 boost::scoped_array<uint16_t> lastValOff;
00422 boost::scoped_array<boost::scoped_array<FixedBuffer> > aLeftOverBufs;
00423
00424
00425
00426 uint anLeftOvers;
00427
00428
00429
00430 boost::scoped_array<uint> aiFixedSize;
00431
00432
00433 LcsHashValOrd vOrd;
00434
00435 uint i, j;
00436 RecordNum startRowCnt;
00437 RecordNum nrows;
00438
00439 lcsBlockBuilder->init(
00440 numColumns, reinterpret_cast<uint8_t *> (pIndexBlock),
00441 builderBlock.get(), blockSize);
00442
00443 lastValOff.reset(new uint16_t[numColumns]);
00444 numVals.reset(new uint[numColumns]);
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456 bool bStartNewBlock =
00457 lcsBlockBuilder->openAppend(numVals.get(), lastValOff.get(), nrows);
00458 lastRow = firstRow + nrows;
00459 startRow = lastRow;
00460
00461
00462 aiFixedSize.reset(new uint[numColumns]);
00463 aLeftOverBufs.reset(new boost::scoped_array<FixedBuffer>[numColumns]);
00464
00465 startRowCnt = rowCnt;
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478 for (i = 0; i < numColumns; i++) {
00479
00480 rowCnt = startRowCnt;
00481 lcsBlockBuilder->describeLastBatch(i, anLeftOvers, aiFixedSize[i]);
00482
00483
00484
00485
00486 if (anLeftOvers > 0) {
00487 aLeftOverBufs[i].reset(
00488 new FixedBuffer[anLeftOvers * aiFixedSize[i]]);
00489 lcsBlockBuilder->rollBackLastBatch(i, aLeftOverBufs[i].get());
00490 indexBlockDirty = true;
00491 }
00492 }
00493
00494
00495 lastRow -= anLeftOvers;
00496
00497
00498 if (bStartNewBlock) {
00499 writeBlock();
00500 startNewBlock();
00501 }
00502
00503
00504 for (i = 0; i < numColumns; i++) {
00505
00506 rowCnt = startRowCnt;
00507
00508 if (!bStartNewBlock) {
00509
00510
00511
00512
00513
00514 hash[i].restore(numVals[i], lastValOff[i]);
00515 }
00516
00517
00518
00519
00520 if (anLeftOvers > 0) {
00521 uint8_t *val;
00522 bool undoInsert = false;
00523
00524
00525
00526
00527
00528
00529 if (hash[i].isHashFull(anLeftOvers)) {
00530 hash[i].startNewBatch(anLeftOvers);
00531 }
00532
00533 for (j = 0, val = aLeftOverBufs[i].get();
00534 j < anLeftOvers;
00535 j++, val += aiFixedSize[i])
00536 {
00537 hash[i].insert(val, &vOrd, &undoInsert);
00538
00539
00540 assert(!undoInsert);
00541 addValueOrdinal(i, vOrd.getValOrd());
00542 rowCnt++;
00543 }
00544 }
00545 }
00546 }
00547
00548 void LcsClusterAppendExecStream::addValueOrdinal(uint column, uint16_t vOrd)
00549 {
00550 uint16_t *rowWordArray = (uint16_t *) rowBlock[column];
00551 rowWordArray[rowCnt] = vOrd;
00552
00553
00554 indexBlockDirty = true;
00555 }
00556
00557 bool LcsClusterAppendExecStream::isRowArrayFull()
00558 {
00559 if (rowCnt >= nRowsMax) {
00560 return true;
00561 } else {
00562 return false;
00563 }
00564 }
00565
00566 void LcsClusterAppendExecStream::writeBatch(bool lastBatch)
00567 {
00568 uint16_t *oVals;
00569 uint leftOvers;
00570 PBuffer val;
00571 LcsBatchMode mode;
00572 uint i, j;
00573 uint origRowCnt, count = 0;
00574
00575 lastRow += rowCnt;
00576
00577 for (origRowCnt = rowCnt, i = 0; i < numColumns; i++) {
00578 rowCnt = origRowCnt;
00579
00580
00581 maxValueSize[i] = hash[i].getMaxValueSize();
00582
00583
00584 lcsBlockBuilder->pickCompressionMode(
00585 i, maxValueSize[i], rowCnt, &oVals, mode);
00586 leftOvers = rowCnt > 8 ? rowCnt % 8 : 0;
00587
00588
00589
00590
00591
00592
00593 if (leftOvers) {
00594 tempBuf[i].reset(new FixedBuffer[leftOvers * maxValueSize[i]]);
00595 count = leftOvers;
00596
00597 } else if (origRowCnt < 8) {
00598 tempBuf[i].reset(new FixedBuffer[origRowCnt * maxValueSize[i]]);
00599 count = origRowCnt;
00600 } else {
00601
00602 tempBuf[i].reset();
00603 }
00604
00605
00606 if (LCS_FIXED == mode || LCS_VARIABLE == mode) {
00607 hash[i].prepareFixedOrVariableBatch(
00608 (PBuffer) rowBlock[i], rowCnt);
00609 lcsBlockBuilder->putFixedVarBatch(
00610 i, (uint16_t *) rowBlock[i], tempBuf[i].get());
00611 if (mode == LCS_FIXED) {
00612 hash[i].clearFixedEntries();
00613 }
00614
00615 } else {
00616 uint16_t numVals;
00617
00618
00619 hash[i].prepareCompressedBatch(
00620 (PBuffer) rowBlock[i], rowCnt, (uint16_t *) &numVals, oVals);
00621 lcsBlockBuilder->putCompressedBatch(
00622 i, (PBuffer) rowBlock[i], tempBuf[i].get());
00623 }
00624
00625
00626 rowCnt = 0;
00627 hash[i].startNewBatch(!lastBatch ? count : 0);
00628 }
00629
00630
00631 if (!lastBatch) {
00632 lastRow -= count;
00633 }
00634 bool bStartNewBlock;
00635 bStartNewBlock = false;
00636
00637
00638
00639
00640
00641
00642 if (!lastBatch && origRowCnt < 8) {
00643
00644 for (i = 0; i < numColumns; i++) {
00645 lcsBlockBuilder->rollBackLastBatch(i, tempBuf[i].get());
00646 }
00647 bStartNewBlock = true;
00648 }
00649
00650
00651
00652
00653
00654 if (bStartNewBlock || (!lastBatch && lcsBlockBuilder->isEndOfBlock())) {
00655 writeBlock();
00656 startNewBlock();
00657 }
00658
00659
00660 if (!lastBatch) {
00661 for (i = 0; i < numColumns; i++) {
00662 rowCnt = 0;
00663 for (j = 0, val = tempBuf[i].get(); j < count; j++) {
00664 LcsHashValOrd vOrd;
00665 bool undoInsert = false;
00666
00667 hash[i].insert(val, &vOrd, &undoInsert);
00668
00669
00670
00671 assert(!undoInsert);
00672 addValueOrdinal(i, vOrd.getValOrd());
00673 rowCnt++;
00674 val += maxValueSize[i];
00675 }
00676 }
00677 }
00678
00679 for (i = 0; i < numColumns; i++) {
00680 if (tempBuf[i].get()) {
00681 tempBuf[i].reset();
00682 }
00683 }
00684 }
00685
00686 void LcsClusterAppendExecStream::writeBlock()
00687 {
00688 if (indexBlockDirty) {
00689
00690
00691 if (rowCnt) {
00692 writeBatch(true);
00693
00694
00695
00696
00697
00698
00699 if (!indexBlockDirty) {
00700 return;
00701 }
00702 }
00703
00704
00705
00706 lcsBlockBuilder->endBlock();
00707
00708
00709
00710 indexBlockDirty = false;
00711 }
00712 }
00713
00714 void LcsClusterAppendExecStream::allocArrays()
00715 {
00716
00717 if (arraysAlloced) {
00718 return;
00719 }
00720 arraysAlloced = true;
00721
00722
00723 hash.reset(new LcsHash[numColumns]);
00724
00725
00726 rowBlock.reset(new PBuffer[numColumns]);
00727 hashBlock.reset(new PBuffer[numColumns]);
00728
00729 builderBlock.reset(new PBuffer[numColumns]);
00730
00731 hashValOrd.reset(new LcsHashValOrd[numColumns]);
00732 tempBuf.reset(new boost::scoped_array<FixedBuffer>[numColumns]);
00733 maxValueSize.reset(new uint[numColumns]);
00734 }
00735
00736 ExecStreamBufProvision
00737 LcsClusterAppendExecStream::getOutputBufProvision() const
00738 {
00739 return BUFPROV_PRODUCER;
00740 }
00741
00742 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterAppendExecStream.cpp#23 $");
00743
00744