#include <LhxPartition.h>
Public Member Functions | |
void | open (SharedLhxPartition destPartitionInit, LhxHashInfo const &hashInfo) |
void | open (SharedLhxPartition destPartitionInit, LhxHashInfo &hashInfo, AggComputerList *aggList, uint numWriterCachePages) |
void | allocateResources () |
void | releaseResources () |
void | marshalTuple (TupleData const &inputTuple) |
void | aggAndMarshalTuple (TupleData const &inputTuple) |
void | close () |
Private Attributes | |
SharedLhxPartition | destPartition |
Partition to write to. | |
SharedSegOutputStream | pSegOutputStream |
Stream used for writing to a partition. | |
TupleAccessor | tupleAccessor |
Tuple accessor to marshal the tuple from join inputs. | |
bool | isAggregate |
LhxHashTable | hashTable |
Each writer has a local hash table (which shares the scratch page quota with other writers) to compute partial aggregates before flushing them to disk. | |
LhxHashTableReader | hashTableReader |
TupleData | partialAggTuple |
Definition at line 77 of file LhxPartition.h.
void LhxPartitionWriter::open | ( | SharedLhxPartition | destPartitionInit, | |
LhxHashInfo const & | hashInfo | |||
) |
Definition at line 30 of file LhxPartition.cpp.
References TupleAccessor::compute(), destPartition, LhxHashInfo::externalSegmentAccessor, LhxHashInfo::inputDesc, isAggregate, SegOutputStream::newSegOutputStream(), SegStreamAllocation::newSegStreamAllocation(), pSegOutputStream, and tupleAccessor.
Referenced by LhxPlan::createChildren(), and LhxHashTableTest::writeHashTable().
00033 { 00034 destPartition = destPartitionInit; 00035 00036 tupleAccessor.compute(hashInfo.inputDesc[destPartition->inputIndex]); 00037 00038 pSegOutputStream = SegOutputStream::newSegOutputStream( 00039 hashInfo.externalSegmentAccessor); 00040 destPartition->segStream = 00041 SegStreamAllocation::newSegStreamAllocation(); 00042 destPartition->segStream->beginWrite(pSegOutputStream); 00043 00044 isAggregate = false; 00045 }
void LhxPartitionWriter::open | ( | SharedLhxPartition | destPartitionInit, | |
LhxHashInfo & | hashInfo, | |||
AggComputerList * | aggList, | |||
uint | numWriterCachePages | |||
) |
Definition at line 47 of file LhxPartition.cpp.
References LhxHashTable::calculateNumSlots(), LhxHashInfo::cndKeys, TupleData::compute(), TupleAccessor::compute(), destPartition, LhxHashInfo::externalSegmentAccessor, hashTable, hashTableReader, LhxHashTableReader::init(), LhxHashTable::init(), LhxHashInfo::inputDesc, isAggregate, LhxHashInfo::memSegmentAccessor, SegOutputStream::newSegOutputStream(), SegStreamAllocation::newSegStreamAllocation(), LhxHashInfo::numCachePages, partialAggTuple, SegmentAccessor::pSegment, pSegOutputStream, and tupleAccessor.
00052 { 00053 destPartition = destPartitionInit; 00054 tupleAccessor.compute(hashInfo.inputDesc[destPartition->inputIndex]); 00055 00056 pSegOutputStream = SegOutputStream::newSegOutputStream( 00057 hashInfo.externalSegmentAccessor); 00058 destPartition->segStream = 00059 SegStreamAllocation::newSegStreamAllocation(); 00060 destPartition->segStream->beginWrite(pSegOutputStream); 00061 00062 isAggregate = true; 00063 /* 00064 * Any partition level is fine since the data is hash partitioned already. 00065 */ 00066 uint partitionLevel = 0; 00067 uint savedNumCachePages = hashInfo.numCachePages; 00068 hashInfo.numCachePages = numWriterCachePages; 00069 00070 hashTable.init( 00071 partitionLevel, 00072 hashInfo, 00073 aggList, 00074 destPartition->inputIndex); 00075 hashTableReader.init(&hashTable, hashInfo, destPartition->inputIndex); 00076 00077 hashInfo.numCachePages = savedNumCachePages; 00078 00079 uint cndKeys = hashInfo.cndKeys.back(); 00080 uint usablePageSize = 00081 (hashInfo.memSegmentAccessor.pSegment)->getUsablePageSize(); 00082 00083 hashTable.calculateNumSlots(cndKeys, usablePageSize, numWriterCachePages); 00084 00085 partialAggTuple.compute(hashInfo.inputDesc[destPartition->inputIndex]); 00086 }
void LhxPartitionWriter::allocateResources | ( | ) | [inline] |
Definition at line 484 of file LhxPartition.h.
References LhxHashTable::allocateResources(), and hashTable.
00485 { 00486 bool status = hashTable.allocateResources(); 00487 assert(status); 00488 }
void LhxPartitionWriter::releaseResources | ( | ) | [inline] |
Definition at line 490 of file LhxPartition.h.
References hashTable, and LhxHashTable::releaseResources().
00491 { 00492 hashTable.releaseResources(); 00493 }
void LhxPartitionWriter::marshalTuple | ( | TupleData const & | inputTuple | ) |
Definition at line 107 of file LhxPartition.cpp.
References TupleAccessor::getByteCount(), TupleAccessor::marshal(), pSegOutputStream, and tupleAccessor.
Referenced by LhxPlan::createChildren(), and LhxHashTableTest::writeHashTable().
00108 { 00109 uint tupleStorageLength = tupleAccessor.getByteCount(inputTuple); 00110 PBuffer pDestBuf = pSegOutputStream->getWritePointer(tupleStorageLength); 00111 tupleAccessor.marshal(inputTuple, pDestBuf); 00112 pSegOutputStream->consumeWritePointer(tupleStorageLength); 00113 }
void LhxPartitionWriter::aggAndMarshalTuple | ( | TupleData const & | inputTuple | ) |
Definition at line 115 of file LhxPartition.cpp.
References LhxHashTable::addTuple(), LhxHashTable::allocateResources(), LhxHashTableReader::bindKey(), TupleAccessor::getByteCount(), LhxHashTableReader::getNext(), hashTable, hashTableReader, TupleAccessor::marshal(), partialAggTuple, pSegOutputStream, and tupleAccessor.
00116 { 00117 while (!hashTable.addTuple(inputTuple)) { 00118 /* 00119 * Write everything out to partition. 00120 */ 00121 while (hashTableReader.getNext(partialAggTuple)) { 00122 uint tupleStorageLength = 00123 tupleAccessor.getByteCount(partialAggTuple); 00124 PBuffer pDestBuf = 00125 pSegOutputStream->getWritePointer(tupleStorageLength); 00126 tupleAccessor.marshal(partialAggTuple, pDestBuf); 00127 pSegOutputStream->consumeWritePointer(tupleStorageLength); 00128 } 00129 bool reuse = true; 00130 /* 00131 * hash table size remain unchanged 00132 */ 00133 bool status = hashTable.allocateResources(reuse); 00134 assert(status); 00135 /* 00136 * Reset the reader and bind it to no particular key (will read the 00137 * whole hash table). 00138 */ 00139 hashTableReader.bindKey(NULL); 00140 } 00141 }
void LhxPartitionWriter::close | ( | ) |
Definition at line 88 of file LhxPartition.cpp.
References destPartition, TupleAccessor::getByteCount(), LhxHashTableReader::getNext(), hashTableReader, isAggregate, TupleAccessor::marshal(), partialAggTuple, pSegOutputStream, and tupleAccessor.
Referenced by LhxHashTableTest::writeHashTable().
00089 { 00090 if (isAggregate) { 00091 /* 00092 * Write out the remaining partial aggregates in the local hash table. 00093 */ 00094 while (hashTableReader.getNext(partialAggTuple)) { 00095 uint tupleStorageLength = 00096 tupleAccessor.getByteCount(partialAggTuple); 00097 PBuffer pDestBuf = 00098 pSegOutputStream->getWritePointer(tupleStorageLength); 00099 tupleAccessor.marshal(partialAggTuple, pDestBuf); 00100 pSegOutputStream->consumeWritePointer(tupleStorageLength); 00101 } 00102 } 00103 destPartition->segStream->endWrite(); 00104 pSegOutputStream->close(); 00105 }
Stream used for writing to a partition.
Definition at line 87 of file LhxPartition.h.
Referenced by aggAndMarshalTuple(), close(), marshalTuple(), and open().
Tuple accessor to marshal the tuple from join inputs.
Definition at line 92 of file LhxPartition.h.
Referenced by aggAndMarshalTuple(), close(), marshalTuple(), and open().
bool LhxPartitionWriter::isAggregate [private] |
LhxHashTable LhxPartitionWriter::hashTable [private] |
Each writer has a local hash table (which shares the scratch page quota with other writers) to compute partial aggregates before flushing them to disk.
Definition at line 101 of file LhxPartition.h.
Referenced by aggAndMarshalTuple(), allocateResources(), open(), and releaseResources().
Definition at line 102 of file LhxPartition.h.
Referenced by aggAndMarshalTuple(), close(), and open().
TupleData LhxPartitionWriter::partialAggTuple [private] |
Definition at line 103 of file LhxPartition.h.
Referenced by aggAndMarshalTuple(), close(), and open().