#include <LhxPartition.h>
Public Member Functions | |
void | open (SharedLhxPartition srcPartition, LhxHashInfo const &hashInfo) |
bool | isTupleConsumptionPending () |
bool | demandData () |
void | unmarshalTuple (TupleData &outputTuple) |
void | consumeTuple () |
void | close () |
ExecStreamBufState | getState () const |
TupleDescriptor const & | getTupleDesc () const |
SharedLhxPartition | getSourcePartition () const |
Private Attributes | |
SharedLhxPartition | srcPartition |
Partition to read from. | |
SharedSegInputStream | pSegInputStream |
Helper used for reading a partition. | |
TupleAccessor | tupleAccessor |
Tuple accessor to unmarshal the disk content to the outputTuple. | |
uint | tupleStorageLength |
Storage Length of last tuple read from the underlying partition. | |
bool | srcIsInputStream |
ExecStreamBufState | bufState |
TupleDescriptor | outputTupleDesc |
SharedExecStreamBufAccessor | streamBufAccessor |
If reader is on a partition which comes from the input exec stream, (this is when this accessor is opened on a partition without valid segStream), use an exec stream buf accessor to read tuples. |
Definition at line 123 of file LhxPartition.h.
void LhxPartitionReader::open | ( | SharedLhxPartition | srcPartition, | |
LhxHashInfo const & | hashInfo | |||
) |
Definition at line 143 of file LhxPartition.cpp.
References bufState, TupleAccessor::compute(), EXECBUF_NONEMPTY, LhxHashInfo::inputDesc, outputTupleDesc, pSegInputStream, TupleAccessor::resetCurrentTupleBuf(), srcIsInputStream, srcPartition, LhxHashInfo::streamBufAccessor, streamBufAccessor, and tupleAccessor.
Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPartitionInfo::open(), LhxJoinExecStream::open(), LhxAggExecStream::open(), and LhxHashTableTest::readPartition().
00146 { 00147 bufState = EXECBUF_NONEMPTY; 00148 srcPartition = srcPartitionInit; 00149 00150 if (!srcPartition->segStream) { 00151 /* 00152 * source has never been written to, which means the source 00153 * is not from the disk but from input stream. 00154 */ 00155 srcIsInputStream = true; 00156 } else { 00157 srcIsInputStream = false; 00158 } 00159 00160 if (srcIsInputStream) { 00161 streamBufAccessor = 00162 hashInfo.streamBufAccessor[srcPartition->inputIndex]; 00163 outputTupleDesc = streamBufAccessor->getTupleDesc(); 00164 } else { 00165 outputTupleDesc = hashInfo.inputDesc[srcPartition->inputIndex]; 00166 tupleAccessor.compute(outputTupleDesc); 00167 tupleAccessor.resetCurrentTupleBuf(); 00168 00169 /* 00170 * Since reader now gets input stream from the partition, 00171 * this inputStream will delete content that is read. 00172 * This also means each partition can only be read once. 00173 */ 00174 pSegInputStream = srcPartition->segStream->getInputStream(); 00175 pSegInputStream->startPrefetch(); 00176 } 00177 }
bool LhxPartitionReader::isTupleConsumptionPending | ( | ) |
Definition at line 212 of file LhxPartition.cpp.
References TupleAccessor::getCurrentTupleBuf(), srcIsInputStream, streamBufAccessor, and tupleAccessor.
Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().
00213 { 00214 if (srcIsInputStream) { 00215 return streamBufAccessor->isTupleConsumptionPending(); 00216 } else { 00217 if (tupleAccessor.getCurrentTupleBuf()) { 00218 return true; 00219 } else { 00220 return false; 00221 } 00222 } 00223 }
bool LhxPartitionReader::demandData | ( | ) |
Definition at line 225 of file LhxPartition.cpp.
References bufState, EXECBUF_EOS, TupleAccessor::getBufferByteCount(), pSegInputStream, TupleAccessor::setCurrentTupleBuf(), srcIsInputStream, srcPartition, streamBufAccessor, tupleAccessor, and tupleStorageLength.
Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().
00226 { 00227 if (srcIsInputStream) { 00228 return streamBufAccessor->demandData(); 00229 } else { 00230 /* 00231 * Read from disk. 00232 */ 00233 uint bytesReadable = 0; 00234 PConstBuffer pSrcBuf = 00235 pSegInputStream->getReadPointer(1, &bytesReadable); 00236 00237 /* 00238 * If readable data does not fill a tuple, it means the segment stream 00239 * has reached EOD. 00240 */ 00241 if (!pSrcBuf) { 00242 bufState = EXECBUF_EOS; 00243 return false; 00244 } else { 00245 tupleStorageLength = tupleAccessor.getBufferByteCount(pSrcBuf); 00246 assert(bytesReadable >= tupleStorageLength); 00247 if (bytesReadable == tupleStorageLength) { 00248 // We're processing the last tuple in a buffer, 00249 // so now is a good time to check for abort. 00250 if (srcPartition->pExecStream) { 00251 srcPartition->pExecStream->checkAbort(); 00252 } 00253 } 00254 tupleAccessor.setCurrentTupleBuf(pSrcBuf); 00255 return true; 00256 } 00257 } 00258 }
void LhxPartitionReader::unmarshalTuple | ( | TupleData & | outputTuple | ) |
Definition at line 190 of file LhxPartition.cpp.
References srcIsInputStream, streamBufAccessor, tupleAccessor, and TupleAccessor::unmarshal().
Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().
00191 { 00192 if (srcIsInputStream) { 00193 /* 00194 * Read from stream. 00195 */ 00196 streamBufAccessor->unmarshalTuple(outputTuple); 00197 } else { 00198 tupleAccessor.unmarshal(outputTuple); 00199 } 00200 }
void LhxPartitionReader::consumeTuple | ( | ) |
Definition at line 202 of file LhxPartition.cpp.
References pSegInputStream, TupleAccessor::resetCurrentTupleBuf(), srcIsInputStream, streamBufAccessor, tupleAccessor, and tupleStorageLength.
Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().
00203 { 00204 if (srcIsInputStream) { 00205 streamBufAccessor->consumeTuple(); 00206 } else { 00207 tupleAccessor.resetCurrentTupleBuf(); 00208 pSegInputStream->consumeReadPointer(tupleStorageLength); 00209 } 00210 }
void LhxPartitionReader::close | ( | ) |
Definition at line 179 of file LhxPartition.cpp.
References pSegInputStream, and srcIsInputStream.
Referenced by LhxPartitionInfo::close(), LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().
00180 { 00181 if (srcIsInputStream) { 00182 /* 00183 * Do nothing if reading from stream. 00184 */ 00185 } else { 00186 pSegInputStream->close(); 00187 } 00188 }
ExecStreamBufState LhxPartitionReader::getState | ( | ) | const [inline] |
Definition at line 465 of file LhxPartition.h.
References bufState, srcIsInputStream, and streamBufAccessor.
Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().
00466 { 00467 if (srcIsInputStream) { 00468 return streamBufAccessor->getState(); 00469 } else { 00470 return bufState; 00471 } 00472 }
TupleDescriptor const & LhxPartitionReader::getTupleDesc | ( | ) | const [inline] |
Definition at line 479 of file LhxPartition.h.
References outputTupleDesc.
Referenced by LhxAggExecStream::execute(), and LhxPlan::generatePartitions().
00480 { 00481 return outputTupleDesc; 00482 }
SharedLhxPartition LhxPartitionReader::getSourcePartition | ( | ) | const [inline] |
Definition at line 474 of file LhxPartition.h.
References srcPartition.
Referenced by LhxPartitionInfo::open().
00475 { 00476 return srcPartition; 00477 }
Partition to read from.
Definition at line 128 of file LhxPartition.h.
Referenced by demandData(), getSourcePartition(), and open().
Helper used for reading a partition.
Definition at line 133 of file LhxPartition.h.
Referenced by close(), consumeTuple(), demandData(), and open().
Tuple accessor to unmarshal the disk content to the outputTuple.
Definition at line 138 of file LhxPartition.h.
Referenced by consumeTuple(), demandData(), isTupleConsumptionPending(), open(), and unmarshalTuple().
uint LhxPartitionReader::tupleStorageLength [private] |
Storage Length of last tuple read from the underlying partition.
Definition at line 143 of file LhxPartition.h.
Referenced by consumeTuple(), and demandData().
bool LhxPartitionReader::srcIsInputStream [private] |
Definition at line 145 of file LhxPartition.h.
Referenced by close(), consumeTuple(), demandData(), getState(), isTupleConsumptionPending(), open(), and unmarshalTuple().
If reader is on a partition which comes from the input exec stream, (this is when this accessor is opened on a partition without valid segStream), use an exec stream buf accessor to read tuples.
Definition at line 154 of file LhxPartition.h.
Referenced by consumeTuple(), demandData(), getState(), isTupleConsumptionPending(), open(), and unmarshalTuple().