LhxPartitionReader Class Reference

#include <LhxPartition.h>

List of all members.

Public Member Functions

void open (SharedLhxPartition srcPartition, LhxHashInfo const &hashInfo)
bool isTupleConsumptionPending ()
bool demandData ()
void unmarshalTuple (TupleData &outputTuple)
void consumeTuple ()
void close ()
ExecStreamBufState getState () const
TupleDescriptor const & getTupleDesc () const
SharedLhxPartition getSourcePartition () const

Private Attributes

SharedLhxPartition srcPartition
 Partition to read from.
SharedSegInputStream pSegInputStream
 Helper used for reading a partition.
TupleAccessor tupleAccessor
 Tuple accessor to unmarshal the disk content to the outputTuple.
uint tupleStorageLength
 Storage Length of last tuple read from the underlying partition.
bool srcIsInputStream
ExecStreamBufState bufState
TupleDescriptor outputTupleDesc
SharedExecStreamBufAccessor streamBufAccessor
 If reader is on a partition which comes from the input exec stream, (this is when this accessor is opened on a partition without valid segStream), use an exec stream buf accessor to read tuples.


Detailed Description

Definition at line 123 of file LhxPartition.h.


Member Function Documentation

void LhxPartitionReader::open ( SharedLhxPartition  srcPartition,
LhxHashInfo const &  hashInfo 
)

Definition at line 143 of file LhxPartition.cpp.

References bufState, TupleAccessor::compute(), EXECBUF_NONEMPTY, LhxHashInfo::inputDesc, outputTupleDesc, pSegInputStream, TupleAccessor::resetCurrentTupleBuf(), srcIsInputStream, srcPartition, LhxHashInfo::streamBufAccessor, streamBufAccessor, and tupleAccessor.

Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPartitionInfo::open(), LhxJoinExecStream::open(), LhxAggExecStream::open(), and LhxHashTableTest::readPartition().

00146 {
00147     bufState = EXECBUF_NONEMPTY;
00148     srcPartition = srcPartitionInit;
00149 
00150     if (!srcPartition->segStream) {
00151         /*
00152          * source has never been written to, which means the source
00153          * is not from the disk but from input stream.
00154          */
00155         srcIsInputStream = true;
00156     } else {
00157         srcIsInputStream = false;
00158     }
00159 
00160     if (srcIsInputStream) {
00161         streamBufAccessor =
00162             hashInfo.streamBufAccessor[srcPartition->inputIndex];
00163         outputTupleDesc = streamBufAccessor->getTupleDesc();
00164     } else {
00165         outputTupleDesc = hashInfo.inputDesc[srcPartition->inputIndex];
00166         tupleAccessor.compute(outputTupleDesc);
00167         tupleAccessor.resetCurrentTupleBuf();
00168 
00169         /*
00170          * Since reader now gets input stream from the partition,
00171          * this inputStream will delete content that is read.
00172          * This also means each partition can only be read once.
00173          */
00174         pSegInputStream = srcPartition->segStream->getInputStream();
00175         pSegInputStream->startPrefetch();
00176     }
00177 }

bool LhxPartitionReader::isTupleConsumptionPending (  ) 

Definition at line 212 of file LhxPartition.cpp.

References TupleAccessor::getCurrentTupleBuf(), srcIsInputStream, streamBufAccessor, and tupleAccessor.

Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().

00213 {
00214     if (srcIsInputStream) {
00215         return streamBufAccessor->isTupleConsumptionPending();
00216     } else {
00217         if (tupleAccessor.getCurrentTupleBuf()) {
00218             return true;
00219         } else {
00220             return false;
00221         }
00222     }
00223 }

bool LhxPartitionReader::demandData (  ) 

Definition at line 225 of file LhxPartition.cpp.

References bufState, EXECBUF_EOS, TupleAccessor::getBufferByteCount(), pSegInputStream, TupleAccessor::setCurrentTupleBuf(), srcIsInputStream, srcPartition, streamBufAccessor, tupleAccessor, and tupleStorageLength.

Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().

00226 {
00227     if (srcIsInputStream) {
00228         return streamBufAccessor->demandData();
00229     } else {
00230         /*
00231          * Read from disk.
00232          */
00233         uint bytesReadable = 0;
00234         PConstBuffer pSrcBuf =
00235             pSegInputStream->getReadPointer(1, &bytesReadable);
00236 
00237         /*
00238          * If readable data does not fill a tuple, it means the segment stream
00239          * has reached EOD.
00240          */
00241         if (!pSrcBuf) {
00242             bufState = EXECBUF_EOS;
00243             return false;
00244         } else {
00245             tupleStorageLength = tupleAccessor.getBufferByteCount(pSrcBuf);
00246             assert(bytesReadable >= tupleStorageLength);
00247             if (bytesReadable == tupleStorageLength) {
00248                 // We're processing the last tuple in a buffer,
00249                 // so now is a good time to check for abort.
00250                 if (srcPartition->pExecStream) {
00251                     srcPartition->pExecStream->checkAbort();
00252                 }
00253             }
00254             tupleAccessor.setCurrentTupleBuf(pSrcBuf);
00255             return true;
00256         }
00257     }
00258 }

void LhxPartitionReader::unmarshalTuple ( TupleData outputTuple  ) 

Definition at line 190 of file LhxPartition.cpp.

References srcIsInputStream, streamBufAccessor, tupleAccessor, and TupleAccessor::unmarshal().

Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().

00191 {
00192     if (srcIsInputStream) {
00193         /*
00194          * Read from stream.
00195          */
00196         streamBufAccessor->unmarshalTuple(outputTuple);
00197     } else {
00198         tupleAccessor.unmarshal(outputTuple);
00199     }
00200 }

void LhxPartitionReader::consumeTuple (  ) 

Definition at line 202 of file LhxPartition.cpp.

References pSegInputStream, TupleAccessor::resetCurrentTupleBuf(), srcIsInputStream, streamBufAccessor, tupleAccessor, and tupleStorageLength.

Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().

00203 {
00204     if (srcIsInputStream) {
00205         streamBufAccessor->consumeTuple();
00206     } else {
00207         tupleAccessor.resetCurrentTupleBuf();
00208         pSegInputStream->consumeReadPointer(tupleStorageLength);
00209     }
00210 }

void LhxPartitionReader::close (  ) 

Definition at line 179 of file LhxPartition.cpp.

References pSegInputStream, and srcIsInputStream.

Referenced by LhxPartitionInfo::close(), LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().

00180 {
00181     if (srcIsInputStream) {
00182         /*
00183          * Do nothing if reading from stream.
00184          */
00185     } else {
00186         pSegInputStream->close();
00187     }
00188 }

ExecStreamBufState LhxPartitionReader::getState (  )  const [inline]

Definition at line 465 of file LhxPartition.h.

References bufState, srcIsInputStream, and streamBufAccessor.

Referenced by LhxPlan::createChildren(), LhxJoinExecStream::execute(), LhxAggExecStream::execute(), LhxPlan::generatePartitions(), and LhxHashTableTest::readPartition().

00466 {
00467     if (srcIsInputStream) {
00468         return streamBufAccessor->getState();
00469     } else {
00470         return bufState;
00471     }
00472 }

TupleDescriptor const & LhxPartitionReader::getTupleDesc (  )  const [inline]

Definition at line 479 of file LhxPartition.h.

References outputTupleDesc.

Referenced by LhxAggExecStream::execute(), and LhxPlan::generatePartitions().

00480 {
00481     return outputTupleDesc;
00482 }

SharedLhxPartition LhxPartitionReader::getSourcePartition (  )  const [inline]

Definition at line 474 of file LhxPartition.h.

References srcPartition.

Referenced by LhxPartitionInfo::open().

00475 {
00476     return srcPartition;
00477 }


Member Data Documentation

SharedLhxPartition LhxPartitionReader::srcPartition [private]

Partition to read from.

Definition at line 128 of file LhxPartition.h.

Referenced by demandData(), getSourcePartition(), and open().

SharedSegInputStream LhxPartitionReader::pSegInputStream [private]

Helper used for reading a partition.

Definition at line 133 of file LhxPartition.h.

Referenced by close(), consumeTuple(), demandData(), and open().

TupleAccessor LhxPartitionReader::tupleAccessor [private]

Tuple accessor to unmarshal the disk content to the outputTuple.

Definition at line 138 of file LhxPartition.h.

Referenced by consumeTuple(), demandData(), isTupleConsumptionPending(), open(), and unmarshalTuple().

uint LhxPartitionReader::tupleStorageLength [private]

Storage Length of last tuple read from the underlying partition.

Definition at line 143 of file LhxPartition.h.

Referenced by consumeTuple(), and demandData().

bool LhxPartitionReader::srcIsInputStream [private]

Definition at line 145 of file LhxPartition.h.

Referenced by close(), consumeTuple(), demandData(), getState(), isTupleConsumptionPending(), open(), and unmarshalTuple().

ExecStreamBufState LhxPartitionReader::bufState [private]

Definition at line 146 of file LhxPartition.h.

Referenced by demandData(), getState(), and open().

TupleDescriptor LhxPartitionReader::outputTupleDesc [private]

Definition at line 147 of file LhxPartition.h.

Referenced by getTupleDesc(), and open().

SharedExecStreamBufAccessor LhxPartitionReader::streamBufAccessor [private]

If reader is on a partition which comes from the input exec stream, (this is when this accessor is opened on a partition without valid segStream), use an exec stream buf accessor to read tuples.

Definition at line 154 of file LhxPartition.h.

Referenced by consumeTuple(), demandData(), getState(), isTupleConsumptionPending(), open(), and unmarshalTuple().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:38 2009 for Fennel by  doxygen 1.5.1