LhxHashTable.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/hashexe/LhxHashTable.cpp#3 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 The Eigenbase Project
00005 // Copyright (C) 2009-2009 SQLstream, Inc.
00006 // Copyright (C) 2006-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/hashexe/LhxHashTable.h"
00025 #include "fennel/hashexe/LhxHashTableDump.h"
00026 #include "fennel/tuple/TuplePrinter.h"
00027 #include <sstream>
00028 
00029 using namespace std;
00030 
00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxHashTable.cpp#3 $");
00032 
00033 void LhxHashDataAccessor::init(TupleDescriptor const &inputDataDesc)
00034 {
00035     dataDescriptor = inputDataDesc;
00036     dataTuple.compute(dataDescriptor);
00037     dataAccessor.compute(dataDescriptor);
00038 }
00039 
00040 void LhxHashDataAccessor::unpack(
00041     TupleData &outputTuple,
00042     TupleProjection &destProj)
00043 {
00044     PBuffer buf = getBuffer();
00045 
00046     assert (buf != NULL);
00047 
00048     if (destProj.size() > 0) {
00049         // REVIEW jvs 25-Aug-2006:  It looks like there's a potential
00050         // for unmarshalling unneeded fields here.  If there could
00051         // be a lot of those, set up a TupleProjectionAccessor
00052         // and use that (over and over) instead.
00053 
00054         /*
00055          * Destination positions in the outputTuple should be enough to hold
00056          * fields returned by dataAccessor.
00057          */
00058         uint tupleSize = min(destProj.size(), dataTuple.size());
00059 
00060         /*
00061          * Set pointers in the tmp tuple, and then pass them on to fields in
00062          * output tuple.
00063          */
00064         dataAccessor.unmarshal(dataTuple);
00065 
00066         for (int i = 0; i < tupleSize; i ++) {
00067             outputTuple[destProj[i]].copyFrom(dataTuple[i]);
00068         }
00069     } else {
00070         /*
00071          * Set pointers in the outputTuple.
00072          */
00073         dataAccessor.unmarshal(outputTuple);
00074     }
00075 }
00076 
00077 string LhxHashDataAccessor::toString()
00078 {
00079     TuplePrinter tuplePrinter;
00080     ostringstream dataTrace;
00081     TupleProjection allFields;
00082     allFields.clear();
00083 
00084     unpack(dataTuple, allFields);
00085     dataTrace << "[Data Node] ";
00086     tuplePrinter.print(dataTrace, dataDescriptor, dataTuple);
00087     return dataTrace.str();
00088 }
00089 
00090 LhxHashKeyAccessor::LhxHashKeyAccessor()
00091     : LhxHashNodeAccessor(
00092         sizeof(PBuffer) + sizeof(uint8_t) + sizeof(PBuffer *))
00093 {
00094     firstDataOffset = 0;
00095     /*
00096      * firstData pointer is of type PBuffer
00097      */
00098     isMatchedOffset = firstDataOffset + sizeof(PBuffer);
00099     /*
00100      * isMatched indicator is of type uint8_t
00101      */
00102     nextSlotOffset = isMatchedOffset + sizeof(uint8_t);
00103 }
00104 
00105 void LhxHashKeyAccessor::init(
00106     TupleDescriptor const &keyDescInit,
00107     TupleProjection const &keyColsProjInit,
00108     TupleProjection const &aggsProjInit)
00109 {
00110     keyDescriptor = keyDescInit;
00111     keyTuple.compute(keyDescriptor);
00112     keyAccessor.compute(keyDescriptor);
00113 
00114     keyColsProj = keyColsProjInit;
00115     aggsProj = aggsProjInit;
00116 
00117     keyColsDesc.projectFrom(keyDescriptor, keyColsProj);
00118 }
00119 
00120 void LhxHashKeyAccessor::addData(PBuffer inputData)
00121 {
00122     PBuffer firstDataNode = getFirstData();
00123     /*
00124      * The original first data node becomes the next.
00125      */
00126     firstData.setNext(inputData, firstDataNode);
00127     setFirstData(inputData);
00128 }
00129 
00130 void LhxHashKeyAccessor::unpack(
00131     TupleData &outputTuple,
00132     TupleProjection &destProj)
00133 {
00134     PBuffer buf = getBuffer();
00135 
00136     assert (buf != NULL);
00137 
00138     if (destProj.size() > 0) {
00139         /*
00140          * Destination positions in the outputTuple should be enough to hold
00141          * fields returned by dataAccessor.
00142          */
00143         uint tupleSize = min(destProj.size(), keyTuple.size());
00144 
00145         /*
00146          * Set pointers in the tmp tuple, and then pass them on to fields in
00147          * output tuple.
00148          */
00149         keyAccessor.unmarshal(keyTuple);
00150 
00151         for (int i = 0; i < tupleSize; i ++) {
00152             outputTuple[destProj[i]].copyFrom(keyTuple[i]);
00153         }
00154     } else {
00155         /*
00156          * Set pointers in the outputtuple.
00157          */
00158         keyAccessor.unmarshal(outputTuple);
00159     }
00160 }
00161 
00162 bool LhxHashKeyAccessor::matches(
00163     TupleData const &inputTuple,
00164     TupleProjection const &inputKeyProj)
00165 {
00166     assert(inputKeyProj.size() == keyColsProj.size());
00167 
00168     inputKey.projectFrom(inputTuple, inputKeyProj);
00169 
00170     keyAccessor.unmarshal(keyTuple);
00171 
00172     currentKey.projectFrom(keyTuple, keyColsProj);
00173 
00174     return keyColsDesc.compareTuples(
00175         keyTuple, keyColsProj,
00176         inputTuple, inputKeyProj) == 0;
00177 }
00178 
00179 string LhxHashKeyAccessor::toString()
00180 {
00181     TuplePrinter tuplePrinter;
00182     ostringstream keyTrace;
00183     TupleProjection allFields;
00184     allFields.clear();
00185 
00186     keyTuple.compute(keyDescriptor);
00187     unpack(keyTuple, allFields);
00188     keyTrace << "[Key Node] ["
00189              << (isMatched() ? "matched" : "unmatched")
00190              << " next " << getNextSlot() << "] ";
00191     tuplePrinter.print(keyTrace, keyDescriptor, keyTuple);
00192     return keyTrace.str();
00193 }
00194 
00195 void LhxHashBlockAccessor::init(uint usablePageSize)
00196 {
00197     blockUsableSize = usablePageSize - getBufferOffset();
00198     numSlotsPerBlock = blockUsableSize / sizeof(PBuffer);
00199 }
00200 
00201 void LhxHashBlockAccessor::setCurrent(
00202     PBuffer blockPtrInit,
00203     bool valid,
00204     bool clearContent)
00205 {
00206     LhxHashNodeAccessor::setCurrent(blockPtrInit);
00207     freePtr = getBuffer();
00208     assert(freePtr);
00209     endPtr = freePtr + blockUsableSize;
00210 
00211     if (valid) {
00212         freePtr = endPtr;
00213     } else if (clearContent) {
00214         /*
00215          * Clear the memory if it is not valid.
00216          * The next link is not reset however since this block can be part of a
00217          * reusable block list.
00218          */
00219         memset(freePtr, 0, blockUsableSize);
00220     }
00221 
00222 }
00223 
00224 PBuffer LhxHashBlockAccessor::allocBuffer(uint bufSize)
00225 {
00226     PBuffer resultPtr = freePtr;
00227 
00228     if (freePtr + bufSize > endPtr) {
00229         resultPtr = NULL;
00230     } else {
00231         freePtr += bufSize;
00232     }
00233     return resultPtr;
00234 }
00235 
00236 PBuffer *LhxHashBlockAccessor::getSlot(uint slotNum)
00237 {
00238     assert (getCurrent() != NULL);
00239     if (slotNum >= numSlotsPerBlock) {
00240         /*
00241          * slotNum starts from 0.
00242          */
00243         return NULL;
00244     } else {
00245         return (PBuffer *)(getBuffer() + slotNum * sizeof(PBuffer));
00246     }
00247 }
00248 
00249 void LhxHashTable::init(
00250     uint partitionLevelInit,
00251     LhxHashInfo const &hashInfo,
00252     uint buildInputIndex)
00253 {
00254     maxBlockCount = hashInfo.numCachePages;
00255     assert (maxBlockCount > 1);
00256     scratchAccessor = hashInfo.memSegmentAccessor;
00257     partitionLevel = partitionLevelInit;
00258     bufferLock.accessSegment(scratchAccessor);
00259     currentBlockCount = 0;
00260 
00261     /*
00262      * Recompute num slots based on hashInfo.numCachePages
00263      */
00264     RecordNum cndKeys = hashInfo.cndKeys[buildInputIndex];
00265     uint usablePageSize = scratchAccessor.pSegment->getUsablePageSize();
00266 
00267     calculateNumSlots(cndKeys, usablePageSize, maxBlockCount);
00268 
00269     /*
00270      * special hash table properties.
00271      */
00272     filterNull = hashInfo.filterNull[buildInputIndex];
00273 
00274     filterNullKeyProj = hashInfo.filterNullKeyProj[buildInputIndex];
00275     removeDuplicate = hashInfo.removeDuplicate[buildInputIndex];
00276 
00277     blockAccessor.init(usablePageSize);
00278     nodeBlockAccessor.init(usablePageSize);
00279     maxBufferSize = nodeBlockAccessor.getUsableSize();
00280 
00281     hashGen.init(partitionLevel);
00282     hashGenSub.init(partitionLevel + 1);
00283 
00284     uint i;
00285 
00286     /*
00287      * The last input is the build side.
00288      */
00289     TupleDescriptor const &buildTupleDesc = hashInfo.inputDesc[buildInputIndex];
00290     keyColsProj = hashInfo.keyProj[buildInputIndex];
00291 
00292     /*
00293      * Initialize varchar type indicator for the build side. (Assumed to be the
00294      * last input.)
00295      */
00296     isKeyColVarChar = hashInfo.isKeyColVarChar[buildInputIndex];
00297     aggsProj = hashInfo.aggsProj;
00298     dataProj = hashInfo.dataProj[buildInputIndex];
00299 
00300     isGroupBy = false;
00301 
00302     /*
00303      * These steps initialize the keyColsProjInKey and aggsProjInKey which are
00304      * based on the new keyColsAndAggs tuple.
00305      */
00306     TupleDescriptor keyDesc;
00307     TupleDescriptor dataDesc;
00308     TupleProjection keyColsProjInKey;
00309     TupleProjection aggsProjInKey;
00310 
00311     uint keyCount = keyColsProj.size();
00312     for (i = 0; i < keyCount; i++) {
00313         keyDesc.push_back(buildTupleDesc[keyColsProj[i]]);
00314         keyColsProjInKey.push_back(i);
00315     }
00316 
00317     keyColsAndAggsProj = keyColsProj;
00318     for (i = 0; i < aggsProj.size(); i++) {
00319         keyColsAndAggsProj.push_back(aggsProj[i]);
00320         keyDesc.push_back(buildTupleDesc[aggsProj[i]]);
00321         aggsProjInKey.push_back(i + keyCount);
00322     }
00323 
00324     hashKeyAccessor.init(keyDesc, keyColsProjInKey, aggsProjInKey);
00325 
00326     for (i = 0; i < dataProj.size(); i++) {
00327         dataDesc.push_back(buildTupleDesc[dataProj[i]]);
00328     }
00329 
00330     hashDataAccessor.init(dataDesc);
00331 }
00332 
00333 void LhxHashTable::init(
00334     uint partitionLevelInit,
00335     LhxHashInfo const &hashInfo,
00336     AggComputerList *aggList,
00337     uint buildInputIndex)
00338 {
00339     init(partitionLevelInit, hashInfo, buildInputIndex);
00340 
00341     aggComputers = aggList;
00342     /*
00343      * The last input is the build side. In the group by case, there is only
00344      * one input.
00345      */
00346     aggWorkingTuple.compute(hashInfo.inputDesc[buildInputIndex]);
00347     aggResultTuple.computeAndAllocate(hashInfo.inputDesc[buildInputIndex]);
00348 
00349     isGroupBy = true;
00350 
00351     if (aggList->size() > 0) {
00352         hasAggregates = true;
00353     } else {
00354         hasAggregates = false;
00355     }
00356 }
00357 
00358 PBuffer LhxHashTable::allocBlock()
00359 {
00360     PBuffer resultBlock;
00361 
00362     if (currentBlockCount < maxBlockCount) {
00363         currentBlockCount ++;
00364         /*
00365          * Allocate a new block.
00366          */
00367         bufferLock.allocatePage();
00368         resultBlock = bufferLock.getPage().getWritableData();
00369         bufferLock.unlock();
00370 
00371         /*
00372          * The new block is not linked in yet.
00373          */
00374         blockAccessor.setCurrent(resultBlock, false, false);
00375         blockAccessor.setNext(NULL);
00376     } else {
00377         /*
00378          * Hash Table reached its maximum size.
00379          */
00380         resultBlock = NULL;
00381     }
00382     return resultBlock;
00383 }
00384 
00385 PBuffer LhxHashTable::allocBuffer(uint bufSize)
00386 {
00387     PBuffer resultBuf = nodeBlockAccessor.allocBuffer(bufSize);
00388 
00389     if (!resultBuf) {
00390         /*
00391          * Current block out of memory
00392          */
00393         PBuffer nextBlock = nodeBlockAccessor.getNext();
00394         if (nextBlock) {
00395             currentBlock = nextBlock;
00396         } else {
00397             PBuffer newBlock = allocBlock();
00398             nodeBlockAccessor.setNext(newBlock);
00399             currentBlock = newBlock;
00400         }
00401 
00402         if (currentBlock) {
00403             nodeBlockAccessor.setCurrent(currentBlock, false, false);
00404             resultBuf = nodeBlockAccessor.allocBuffer(bufSize);
00405 
00406             assert (resultBuf);
00407         }
00408     }
00409 
00410     return resultBuf;
00411 }
00412 
00413 bool LhxHashTable::allocateResources(bool reuse)
00414 {
00415     assert (numSlots != 0);
00416 
00417     PBuffer newBlock;
00418 
00419     slotBlocks.clear();
00420     firstSlot = NULL;
00421     lastSlot = NULL;
00422 
00423     if (!reuse) {
00424         firstBlock = allocBlock();
00425     }
00426 
00427     currentBlock = firstBlock;
00428 
00429     /*
00430      * Should be able to allocate at least one block.
00431      */
00432     assert (currentBlock != NULL);
00433 
00434     uint numSlotsPerBlock = blockAccessor.getSlotsPerBlock();
00435 
00436     /*
00437      * Initialize the block (clear all bytes etc).
00438      */
00439     nodeBlockAccessor.setCurrent(currentBlock, false, true);
00440     slotBlocks.push_back(currentBlock);
00441 
00442     if (numSlots <= numSlotsPerBlock) {
00443         /*
00444          * This will be the first "node block", i.e. it contains key or
00445          * data nodes.
00446          * The allocate call sets the freePtr of the currentBlock
00447          * correctly.
00448          */
00449         nodeBlockAccessor.allocSlots(numSlots);
00450         return true;
00451     }
00452 
00453     /*
00454      * Need to allocate more than one block.
00455      */
00456     int numSlotsToAlloc = numSlots - numSlotsPerBlock;
00457 
00458     while (numSlotsToAlloc > 0) {
00459         newBlock = NULL;
00460         if (reuse) {
00461             newBlock = nodeBlockAccessor.getNext();
00462         }
00463 
00464         if (!newBlock) {
00465             newBlock = allocBlock();
00466             if (!newBlock) {
00467                 return false;
00468             }
00469         }
00470 
00471         /*
00472          * New block is linked to the end of the allocated block list.
00473          */
00474         nodeBlockAccessor.setNext(newBlock);
00475         currentBlock = newBlock;
00476         nodeBlockAccessor.setCurrent(currentBlock, false, true);
00477         slotBlocks.push_back(currentBlock);
00478 
00479         if (numSlotsToAlloc <= numSlotsPerBlock) {
00480             /*
00481              * This will be the first "node block", i.e. it contains key or
00482              * data nodes.
00483              * The allocate call sets the freePtr of the currentBlock
00484              * correctly.
00485              */
00486             nodeBlockAccessor.allocSlots(numSlotsToAlloc);
00487         }
00488 
00489         numSlotsToAlloc -= numSlotsPerBlock;
00490     }
00491     return true;
00492 }
00493 
00494 void LhxHashTable::releaseResources(bool reuse)
00495 {
00496     /*
00497      * Note: User of hash table needs to supply it with a private
00498      * scratchAccessor; otherwise, this call here can deallocate pages from
00499      * other clients of the shared scratchAccessor.
00500      */
00501     if (!reuse && scratchAccessor.pSegment) {
00502         scratchAccessor.pSegment->deallocatePageRange(
00503             NULL_PAGE_ID,
00504             NULL_PAGE_ID);
00505         firstBlock = NULL;
00506         currentBlockCount = 0;
00507     }
00508 
00509     hashKeyAccessor.reset();
00510     hashDataAccessor.reset();
00511     blockAccessor.reset();
00512     nodeBlockAccessor.reset();
00513     currentBlock = NULL;
00514 }
00515 
00516 void LhxHashTable::calculateNumSlots(
00517     RecordNum cndKeys,
00518     uint usablePageSize,
00519     BlockNum numBlocks)
00520 {
00521     // if we don't have stats for the number of distinct keys, just
00522     // use a default value
00523     if (isMAXU(cndKeys)) {
00524         cndKeys = RecordNum(10000);
00525     }
00526 
00527     /*
00528      * Use at least 1%, but no more than 10% of hash table cache pages to store
00529      * slots.
00530      */
00531     uint slotsLow = numBlocks * usablePageSize / sizeof(PBuffer) / 100;
00532     uint slotsHigh = numBlocks * usablePageSize / sizeof(PBuffer) / 10;
00533 
00534     numSlots =
00535         max(slotsNeeded(cndKeys), slotsLow);
00536 
00537     numSlots = min(numSlots, slotsHigh);
00538 }
00539 
00540 void LhxHashTable::calculateSize(
00541     LhxHashInfo const &hashInfo,
00542     uint inputIndex,
00543     BlockNum &numBlocks)
00544 {
00545     uint usablePageSize =
00546         (hashInfo.memSegmentAccessor.pSegment)->getUsablePageSize()
00547         - sizeof(PBuffer);
00548 
00549     TupleDescriptor const &inputDesc  = hashInfo.inputDesc[inputIndex];
00550 
00551     TupleProjection const &keyProj  = hashInfo.keyProj[inputIndex];
00552 
00553     TupleProjection const &dataProj  = hashInfo.dataProj[inputIndex];
00554 
00555     RecordNum cndKeys = hashInfo.cndKeys[inputIndex];
00556     RecordNum numRows = hashInfo.numRows[inputIndex];
00557     // if we don't have stats, don't bother trying to compute the hash table
00558     // size
00559     if (isMAXU(cndKeys) || isMAXU(numRows)) {
00560         numBlocks = MAXU;
00561         return;
00562     }
00563 
00564     TupleDescriptor keyDesc;
00565     keyDesc.projectFrom(inputDesc, keyProj);
00566 
00567     TupleDescriptor dataDesc;
00568     dataDesc.projectFrom(inputDesc, dataProj);
00569 
00570     LhxHashKeyAccessor tmpKey;
00571     LhxHashDataAccessor tmpData;
00572 
00573     TupleProjection tmpKeyProj;
00574     TupleProjection tmpAggsProj;
00575 
00576     /*
00577      * When estimating hash table size, ignore aggregate fields.
00578      */
00579     for (int i = 0; i < keyDesc.size(); i ++) {
00580         tmpKeyProj.push_back(i);
00581     }
00582 
00583     tmpKey.init(keyDesc, tmpKeyProj, tmpAggsProj);
00584     tmpData.init(dataDesc);
00585 
00586     double totalBytes =
00587         slotsNeeded(cndKeys) * sizeof(PBuffer)
00588         + cndKeys * tmpKey.getAvgStorageSize()
00589         + numRows * tmpData.getAvgStorageSize();
00590     double nBlocks = ceil(totalBytes / usablePageSize);
00591     if (nBlocks >= BlockNum(MAXU)) {
00592         numBlocks = BlockNum(MAXU) - 1;
00593     } else {
00594         numBlocks = BlockNum(nBlocks);
00595     }
00596 }
00597 
00598 
00599 PBuffer *LhxHashTable::getSlot(uint slotNum)
00600 {
00601     PBuffer *slot;
00602     uint slotsPerBlock = blockAccessor.getSlotsPerBlock();
00603 
00604     blockAccessor.setCurrent(slotBlocks[slotNum / slotsPerBlock], true, false);
00605 
00606     slot = blockAccessor.getSlot(slotNum % slotsPerBlock);
00607 
00608     assert (slot);
00609 
00610     return slot;
00611 }
00612 
00613 PBuffer LhxHashTable::findKeyLocation(
00614     TupleData const &inputTuple,
00615     TupleProjection const &inputKeyProj,
00616     bool isProbing,
00617     bool removeDuplicateProbe)
00618 {
00619     uint slotNum =
00620         (hashGen.hash(inputTuple, inputKeyProj, isKeyColVarChar)) % numSlots;
00621 
00622     PBuffer *slot = getSlot(slotNum);
00623     PBuffer keyLocation = (PBuffer)slot;
00624     PBuffer firstKey = *slot;
00625     PBuffer nextKey;
00626 
00627     if (firstKey) {
00628         /*
00629          * Keep searching if the key has already been linked to keys in the
00630          * same slot.
00631          */
00632         hashKeyAccessor.setCurrent(firstKey, true);
00633         while (!hashKeyAccessor.matches(inputTuple, inputKeyProj)) {
00634             nextKey = hashKeyAccessor.getNext();
00635             if (!nextKey) {
00636                 return NULL;
00637             }
00638 
00639             keyLocation = hashKeyAccessor.getNextLocation();
00640             hashKeyAccessor.setCurrent(nextKey, true);
00641         }
00642     } else {
00643         return NULL;
00644     }
00645 
00646     /*
00647      * Found a matching key
00648      */
00649     if (removeDuplicateProbe && hashKeyAccessor.isMatched()) {
00650         return NULL;
00651     }
00652 
00653     if (isProbing) {
00654         hashKeyAccessor.setMatched(true);
00655     }
00656 
00657     return keyLocation;
00658 }
00659 
00660 bool LhxHashTable::addKeyData(TupleData const &inputTuple)
00661 {
00662     // REVIEW jvs 25-Aug-2006:  If we're not using a power of two to allow
00663     // for fast modulo, then it should probably be a prime number to
00664     // reduce collisions.  Broadbase had a table "BBPrime" which
00665     // allowed it to quickly find the closest prime number after doing
00666     // other calculations like resource estimation.
00667     uint slotNum =
00668         (hashGen.hash(inputTuple, keyColsProj, isKeyColVarChar)) % numSlots;
00669 
00670     PBuffer *slot = getSlot(slotNum);
00671     PBuffer *newLastSlot = NULL;
00672 
00673     if (!firstSlot) {
00674         firstSlot = slot;
00675         lastSlot  = slot;
00676     } else {
00677         if (!(*slot)) {
00678             // first time inserting into this slot
00679             // need to chain the slot in if insertion successful
00680             newLastSlot = slot;
00681         }
00682     }
00683 
00684     PBuffer newNextKey = *slot;
00685 
00686     PBuffer newKey = NULL;
00687 
00688     if (!isGroupBy) {
00689         tmpKeyTuple.projectFrom(inputTuple, keyColsProj);
00690         hashKeyAccessor.checkStorageSize(tmpKeyTuple, maxBufferSize);
00691         uint newKeyLen =
00692             hashKeyAccessor.getStorageSize(tmpKeyTuple);
00693         newKey = allocBuffer(newKeyLen);
00694     } else {
00695         aggResultTuple.resetBuffer();
00696         for (int i = 0; i < keyColsProj.size() ; i ++) {
00697             aggResultTuple[i].copyFrom(inputTuple[keyColsProj[i]]);
00698         }
00699 
00700         for (int i = 0; i < aggComputers->size(); i ++) {
00701             (*aggComputers)[i].initAccumulator(
00702                 aggResultTuple[aggsProj[i]], inputTuple);
00703         }
00704         hashKeyAccessor.checkStorageSize(aggResultTuple, maxBufferSize);
00705         newKey =
00706             allocBuffer(hashKeyAccessor.getStorageSize(aggResultTuple));
00707     }
00708 
00709     PBuffer newData = NULL;
00710 
00711     if (!isGroupBy) {
00712         /*
00713          * Tuple contains data portion. i.e. this is not a group by case.
00714          */
00715         tmpDataTuple.projectFrom(inputTuple, dataProj);
00716         hashDataAccessor.checkStorageSize(tmpDataTuple, maxBufferSize);
00717         uint newDataLen = hashDataAccessor.getStorageSize(tmpDataTuple);
00718         newData = allocBuffer(newDataLen);
00719     }
00720 
00721     if (!newKey || (!isGroupBy && !newData)) {
00722         /*
00723          * Ran out of memory.
00724          */
00725         return false;
00726     }
00727 
00728     PBuffer *nextSlot = NULL;
00729 
00730     if (newNextKey) {
00731         // if slot not empty
00732         // copy the nextSlot field from newNextKey to newKey
00733         hashKeyAccessor.setCurrent(newNextKey, true);
00734         nextSlot = hashKeyAccessor.getNextSlot();
00735         hashKeyAccessor.setNextSlot(NULL);
00736     }
00737 
00738     *slot = newKey;
00739     hashKeyAccessor.setCurrent(newKey, false);
00740     hashKeyAccessor.setMatched(false);
00741     hashKeyAccessor.setNext(newNextKey);
00742     hashKeyAccessor.setNextSlot(nextSlot);
00743     hashKeyAccessor.setFirstData(NULL);
00744 
00745     if (!isGroupBy) {
00746         /*
00747          * Store the key.
00748          */
00749         hashKeyAccessor.pack(tmpKeyTuple);
00750 
00751         /*
00752          * Add data portion to this key.
00753          */
00754         hashKeyAccessor.setCurrent(newKey, true);
00755         hashDataAccessor.setCurrent(newData, false);
00756         hashDataAccessor.pack(tmpDataTuple);
00757         hashKeyAccessor.addData(newData);
00758     } else {
00759         /*
00760          * Store the key and the aggs.
00761          */
00762         hashKeyAccessor.pack(aggResultTuple);
00763     }
00764 
00765 
00766     /*
00767      * Link this slot (if inserted to for the first time) into the linked list.
00768      */
00769     if (newLastSlot) {
00770         hashKeyAccessor.setCurrent((*lastSlot), true);
00771         hashKeyAccessor.setNextSlot(newLastSlot);
00772         lastSlot = newLastSlot;
00773     }
00774 
00775     return true;
00776 }
00777 
00778 bool LhxHashTable::addData(PBuffer keyNode, TupleData const &inputTuple)
00779 {
00780     /*
00781      * REVIEW: optimization possible here if dataProj is empty; i.e. key
00782      * contains all cols. We can keep a count in the key, instead of storing
00783      * empty data nodes following the key. See test case
00784      * LhxHashTableTest::testInsert1Ka().
00785      * Another case is to support setop ALL in future.
00786      */
00787     hashKeyAccessor.setCurrent(keyNode, true);
00788 
00789     tmpDataTuple.projectFrom(inputTuple, dataProj);
00790 
00791     hashDataAccessor.checkStorageSize(tmpDataTuple, maxBufferSize);
00792 
00793     uint newDataLen =
00794         hashDataAccessor.getStorageSize(tmpDataTuple);
00795     PBuffer newData = allocBuffer(newDataLen);
00796 
00797     if (!newData) {
00798         /*
00799          * Hash table out of memory.
00800          */
00801         return false;
00802     }
00803 
00804     hashDataAccessor.setCurrent(newData, false);
00805     hashDataAccessor.pack(tmpDataTuple);
00806     hashKeyAccessor.addData(newData);
00807     return true;
00808 }
00809 
00810 bool LhxHashTable::aggData(PBuffer destKeyLoc, TupleData const &inputTuple)
00811 {
00812     PBuffer destKey;
00813     /*
00814      * Need to copy destKey out as destKeyLoc might not be aligned.
00815      */
00816     memcpy((PBuffer)&destKey, destKeyLoc, sizeof(PBuffer));
00817 
00818     hashKeyAccessor.setCurrent(destKey, true);
00819 
00820     aggResultTuple.resetBuffer();
00821 
00822     hashKeyAccessor.unpack(aggWorkingTuple, keyColsAndAggsProj);
00823 
00824     for (int i = 0; i < keyColsProj.size() ; i ++) {
00825         aggResultTuple[i].copyFrom(inputTuple[keyColsProj[i]]);
00826     }
00827 
00828     for (int i = 0; i < aggComputers->size(); i ++) {
00829         (*aggComputers)[i].updateAccumulator(
00830             aggWorkingTuple[aggsProj[i]],
00831             aggResultTuple[aggsProj[i]],
00832             inputTuple);
00833     }
00834 
00835     hashKeyAccessor.checkStorageSize(aggResultTuple, maxBufferSize);
00836 
00837     uint newResultSize =
00838         hashKeyAccessor.getStorageSize(aggResultTuple);
00839 
00840     uint oldResultSize =
00841         hashKeyAccessor.getStorageSize(aggWorkingTuple);
00842 
00843     if (newResultSize > oldResultSize) {
00844         PBuffer newKey = NULL;
00845         PBuffer newNextKey = hashKeyAccessor.getNext();
00846 
00847         /*
00848          * The key buffer will not hold the new result. Need to allocate buffer
00849          * again.
00850          */
00851         newKey = allocBuffer(newResultSize);
00852 
00853         if (newKey) {
00854             /*
00855              * Save the current key's next slot so we can set it in the new
00856              * key
00857              */
00858             PBuffer *nextSlot = hashKeyAccessor.getNextSlot();
00859 
00860             /*
00861              * The old key buffer is not used any more. Write in the key
00862              * location the new key buffer.
00863              */
00864             memcpy(destKeyLoc, (PBuffer)&newKey, sizeof(PBuffer));
00865 
00866             hashKeyAccessor.setCurrent(newKey, false);
00867             hashKeyAccessor.setMatched(false);
00868             hashKeyAccessor.setNext(newNextKey);
00869             hashKeyAccessor.pack(aggResultTuple);
00870             hashKeyAccessor.setNextSlot(nextSlot);
00871             return true;
00872         } else {
00873             return false;
00874         }
00875     } else {
00876         /*
00877          * The key buffer can hold aggResultTuple.
00878          */
00879         hashKeyAccessor.pack(aggResultTuple);
00880         return true;
00881     }
00882 }
00883 
00884 bool LhxHashTable::addTuple(TupleData const &inputTuple)
00885 {
00886     if (filterNull && inputTuple.containsNull(filterNullKeyProj)) {
00887         /*
00888          * When null values are filtered, and this tuple does
00889          * contain null in its key columns, do not add to hash
00890          * table.
00891          */
00892         return true;
00893     }
00894 
00895     /*
00896      * We are building the hash table.
00897      */
00898     bool isProbing = false;
00899     bool removeDuplicateProbe = false;
00900     PBuffer destKeyLoc =
00901         findKeyLocation(
00902             inputTuple, keyColsProj, isProbing,
00903             removeDuplicateProbe);
00904 
00905     if (!destKeyLoc) {
00906         /*
00907          * Key is not present in the hash table. Add both the key and the data.
00908          */
00909         return addKeyData(inputTuple);
00910     } else if (removeDuplicate) {
00911         /*
00912          * Do not add duplicate keys.
00913          */
00914         return true;
00915     } else {
00916         /*
00917          * Key is present in the hash table.
00918          * If hash join, add to the data list corresponding this key.
00919          * If hash aggregate, aggregate the new data.
00920          */
00921         if (!isGroupBy) {
00922             PBuffer destKey;
00923             /*
00924              * Need to copy destKey out as destKeyLoc might not be aligned.
00925              */
00926             memcpy((PBuffer*)&destKey, destKeyLoc, sizeof(PBuffer));
00927 
00928             assert (destKey);
00929 
00930             return addData(destKey, inputTuple);
00931         } else {
00932             if (!hasAggregates) {
00933                 return true;
00934             }
00935             return aggData(destKeyLoc, inputTuple);
00936         }
00937     }
00938 }
00939 
00940 PBuffer LhxHashTable::findKey(
00941     TupleData const &inputTuple,
00942     TupleProjection const &inputKeyProj,
00943     bool removeDuplicateProbe)
00944 {
00945     PBuffer destKey;
00946     PBuffer destKeyLoc;
00947     bool isProbing = true;
00948     destKeyLoc =
00949         findKeyLocation(
00950             inputTuple, inputKeyProj, isProbing,
00951             removeDuplicateProbe);
00952 
00953     if (destKeyLoc) {
00954         /*
00955          * Need to copy destKey out as destKeyLoc might not be aligned.
00956          */
00957         memcpy((PBuffer)&destKey, destKeyLoc, sizeof(PBuffer));
00958         return destKey;
00959     } else {
00960         return NULL;
00961     }
00962 }
00963 
00964 string LhxHashTable::printSlot(uint slotNum)
00965 {
00966     ostringstream slotTrace;
00967     PBuffer *slot = getSlot(slotNum);
00968 
00969     slotTrace << "[Slot] [" << slotNum << "] [" << slot <<"]\n";
00970 
00971     /*
00972      * Print all keys in this slot.
00973      */
00974     PBuffer currentHashKey = *slot;
00975     while (currentHashKey) {
00976         hashKeyAccessor.setCurrent(currentHashKey, true);
00977         slotTrace << "     " << hashKeyAccessor.toString() << "\n";
00978 
00979         /*
00980          * Print all data with the same key.
00981          */
00982         PBuffer currentHashData = hashKeyAccessor.getFirstData();
00983         while (currentHashData) {
00984             hashDataAccessor.setCurrent(currentHashData, true);
00985             slotTrace << "          " << hashDataAccessor.toString() << "\n";
00986             /*
00987              * next data.
00988              */
00989             currentHashData = hashDataAccessor.getNext();
00990         }
00991 
00992         /*
00993          * next key.
00994          */
00995         currentHashKey = hashKeyAccessor.getNext();
00996     }
00997     return slotTrace.str();
00998 }
00999 
01000 string LhxHashTable::toString()
01001 {
01002     ostringstream hashTableTrace;
01003 
01004     hashTableTrace << "\n"
01005         << "[Hash Table : maximum # blocks = " << maxBlockCount     << "]\n"
01006         << "[             current # blocks = " << currentBlockCount << "]\n"
01007         << "[             # slots          = " << numSlots          << "]\n"
01008         << "[             partition level  = " << partitionLevel    << "]\n"
01009         << "[             first slot       = " << firstSlot         << "]\n"
01010         << "[             last  slot       = " << lastSlot          << "]\n";
01011 
01012     for (int i = 0; i < numSlots; i ++) {
01013         hashTableTrace << printSlot(i);
01014     }
01015 
01016     return hashTableTrace.str();
01017 }
01018 
01019 bool LhxHashTableReader::advanceSlot()
01020 {
01021     if (!boundKey) {
01022         curKey = NULL;
01023 
01024         if (!isPositioned) {
01025             curSlot = hashTable->getFirstSlot();
01026         } else {
01027             curSlot = hashTable->getNextSlot(curSlot);
01028         }
01029 
01030         if (curSlot && *curSlot) {
01031             curKey = *curSlot;
01032             if (returnUnMatched) {
01033                 /*
01034                  * Look at all the keys in this slot.
01035                  */
01036                 hashKeyAccessor.setCurrent(*curSlot, true);
01037 
01038                 /*
01039                  * Only return unmatched keys
01040                  */
01041                 while (hashKeyAccessor.isMatched()) {
01042                     curKey = hashKeyAccessor.getNext();
01043                     if (!curKey) {
01044                         curSlot = hashTable->getNextSlot(curSlot);
01045                         if (curSlot) {
01046                             curKey = *curSlot;
01047                         } else {
01048                             curKey = NULL;
01049                         }
01050                     }
01051 
01052                     if (curKey) {
01053                         hashKeyAccessor.setCurrent(curKey, true);
01054                     } else {
01055                         /*
01056                          * Reached the end of the slot list.
01057                          */
01058                         break;
01059                     }
01060                 }
01061             }
01062         }
01063 
01064         if (!curKey) {
01065             /*
01066              * Cound not find any slot with fitting key.
01067              */
01068             return false;
01069         }
01070     }
01071 
01072     hashKeyAccessor.setCurrent(curKey, true);
01073 
01074     if (!isGroupBy) {
01075         curData = hashKeyAccessor.getFirstData();
01076         assert(curData);
01077         hashDataAccessor.setCurrent(curData, true);
01078     }
01079 
01080     return true;
01081 }
01082 
01083 bool LhxHashTableReader::advanceKey()
01084 {
01085     while ((curKey = hashKeyAccessor.getNext())) {
01086         if (!returnUnMatched) {
01087             break;
01088         } else {
01089             hashKeyAccessor.setCurrent(curKey, true);
01090             if (!hashKeyAccessor.isMatched()) {
01091                 break;
01092             }
01093         }
01094     }
01095 
01096     if (curKey) {
01097         hashKeyAccessor.setCurrent(curKey, true);
01098         if (!isGroupBy) {
01099             curData = hashKeyAccessor.getFirstData();
01100             assert(curData);
01101             hashDataAccessor.setCurrent(curData, true);
01102         }
01103         return true;
01104     } else {
01105         return false;
01106     }
01107 }
01108 
01109 bool LhxHashTableReader::advanceData()
01110 {
01111     if (isGroupBy) {
01112         return false;
01113     }
01114 
01115     curData = hashDataAccessor.getNext();
01116     if (curData) {
01117         hashDataAccessor.setCurrent(curData, true);
01118         return true;
01119     } else {
01120         return false;
01121     }
01122 }
01123 
01124 void LhxHashTableReader::produceTuple(TupleData &outputTuple)
01125 {
01126     hashKeyAccessor.unpack(outputTuple, keyColsAndAggsProj);
01127     if (!isGroupBy) {
01128         hashDataAccessor.unpack(outputTuple, dataProj);
01129     }
01130 }
01131 
01132 void LhxHashTableReader::init(
01133     LhxHashTable *hashTableInit,
01134     LhxHashInfo const &hashInfo,
01135     uint buildInputIndex)
01136 {
01137     /*
01138      * The last input is the build side.
01139      */
01140     TupleDescriptor const &outputTupleDesc =
01141         hashInfo.inputDesc[buildInputIndex];
01142     TupleProjection const &keyColsProj = hashInfo.keyProj[buildInputIndex];
01143     TupleProjection const &aggsProj = hashInfo.aggsProj;
01144 
01145     dataProj = hashInfo.dataProj[buildInputIndex];
01146 
01147     /*
01148      * These steps initialize the keyColsProjInKey and aggsProjInKey which are
01149      * based on the new keyColsAndAggs tuple.
01150      */
01151     TupleDescriptor keyDesc;
01152     TupleDescriptor dataDesc;
01153     TupleProjection keyColsProjInKey;
01154     TupleProjection aggsProjInKey;
01155     uint keyCount = keyColsProj.size();
01156     uint i;
01157 
01158     for (i = 0; i < keyCount; i++) {
01159         keyDesc.push_back(outputTupleDesc[keyColsProj[i]]);
01160         keyColsProjInKey.push_back(i);
01161     }
01162 
01163     keyColsAndAggsProj = keyColsProj;
01164     uint aggsProjSize = aggsProj.size();
01165 
01166     for (i = 0; i < aggsProjSize; i ++) {
01167         keyColsAndAggsProj.push_back(aggsProj[i]);
01168         keyDesc.push_back(outputTupleDesc[aggsProj[i]]);
01169         aggsProjInKey.push_back(i + keyCount);
01170     }
01171 
01172     hashKeyAccessor.init(keyDesc, keyColsProjInKey, aggsProjInKey);
01173 
01174     for (i = 0; i < dataProj.size(); i++) {
01175         dataDesc.push_back(outputTupleDesc[dataProj[i]]);
01176     }
01177 
01178     hashDataAccessor.init(dataDesc);
01179 
01180     hashTable = hashTableInit;
01181     isGroupBy = hashTable->isHashGroupBy();
01182 
01183     /*
01184      * By default, this reader is not associated with any key, i.e. it covers
01185      * the whole hash table.
01186      */
01187     bindKey(NULL);
01188 }
01189 
01190 bool LhxHashTableReader::getNext(TupleData &outputTuple)
01191 {
01192     if (!isPositioned) {
01193         assert (!(boundKey && returnUnMatched));
01194 
01195         /*
01196          * Position at the first qualifying key of the first slot.
01197          */
01198         if (!advanceSlot()) {
01199             /*
01200              * Nothing to output.
01201              */
01202             return false;
01203         }
01204         produceTuple(outputTuple);
01205         isPositioned = true;
01206         return true;
01207     }
01208 
01209     if (advanceData()) {
01210         produceTuple(outputTuple);
01211         return true;
01212     } else {
01213         if (boundKey) {
01214             /*
01215              * End of data for this key.
01216              */
01217             return false;
01218         } else {
01219             /*
01220              * Get the next key in the same slot.
01221              */
01222             if (advanceKey()) {
01223                 produceTuple(outputTuple);
01224                 return true;
01225             } else {
01226                 /*
01227                  * End of list for keys in the same slot..
01228                  * Start the next slot.
01229                  */
01230                 if (advanceSlot()) {
01231                     produceTuple(outputTuple);
01232                     return true;
01233                 } else {
01234                     return false;
01235                 }
01236             }
01237         }
01238     }
01239 }
01240 
01241 // force references to some classes which aren't referenced elsewhere
01242 #ifdef __MSVC__
01243 class UnreferencedHashexeStructs
01244 {
01245     LhxHashTableDump dump;
01246 };
01247 #endif
01248 
01249 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxHashTable.cpp#3 $");
01250 
01251 // End LhxHashTable.cpp

Generated on Mon Jun 22 04:00:19 2009 for Fennel by  doxygen 1.5.1