00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef Fennel_LcsClusterNodeWriter_Included
00023 #define Fennel_LcsClusterNodeWriter_Included
00024
00025 #include "fennel/lucidera/colstore/LcsClusterAccessBase.h"
00026 #include "fennel/lucidera/colstore/LcsBitOps.h"
00027 #include "fennel/lucidera/colstore/LcsClusterDump.h"
00028 #include "fennel/btree/BTreeWriter.h"
00029 #include "fennel/tuple/TupleData.h"
00030 #include "fennel/tuple/UnalignedAttributeAccessor.h"
00031 #include <boost/scoped_array.hpp>
00032
00033 FENNEL_BEGIN_NAMESPACE
00034
00035 const int LcsMaxRollBack = 8;
00036 const int LcsMaxLeftOver = 7;
00037 const int LcsMaxSzLeftError = 4;
00038
00039 enum ForceMode { none = 0, fixed = 1, variable = 2 };
00040
00046 class FENNEL_LCS_EXPORT LcsClusterNodeWriter
00047 : public LcsClusterAccessBase, public TraceSource
00048 {
00049 private:
00053 SharedBTreeWriter bTreeWriter;
00054
00058 SegmentAccessor scratchAccessor;
00059
00063 ClusterPageLock bufferLock;
00064
00068 PLcsClusterNode pHdr;
00069
00073 uint hdrSize;
00074
00078 PBuffer pIndexBlock;
00079
00083 PBuffer *pBlock;
00084
00088 uint szBlock;
00089
00093 int minSzLeft;
00094
00099 boost::scoped_array<LcsBatchDir> batchDirs;
00100
00105 boost::scoped_array<PBuffer> pValBank;
00106
00111 boost::scoped_array<uint16_t> oValBank;
00112
00116 boost::scoped_array<uint16_t> valBankStart;
00117
00122 boost::scoped_array<uint16_t> batchOffset;
00123
00128 boost::scoped_array<uint> batchCount;
00129
00133 int szLeft;
00134
00139 boost::scoped_array<uint> nBits;
00140
00145 boost::scoped_array<uint> nextWidthChange;
00146
00150 bool arraysAllocated;
00151
00155 boost::scoped_array<ForceMode> bForceMode;
00156
00160 boost::scoped_array<uint> forceModeCount;
00161
00165 boost::scoped_array<uint> maxValueSize;
00166
00170 boost::scoped_array<UnalignedAttributeAccessor> attrAccessors;
00171
00175 SharedLcsClusterDump clusterDump;
00176
00180 TupleDescriptor colTupleDesc;
00181
00198 PBuffer valueSource(
00199 uint16_t lastValOffset, PBuffer pValBank, uint16_t oValBank,
00200 PBuffer pBlock, uint16_t f)
00201 {
00202
00203 if (f < lastValOffset) {
00204 return pValBank + f - oValBank;
00205 } else {
00206 return pBlock + f;
00207 }
00208 }
00209
00215 RecordNum moveFromIndexToTemp();
00216
00221 void moveFromTempToIndex();
00222
00226 void allocArrays();
00227
00233 inline uint32_t round8Boundary(uint32_t val)
00234 {
00235 return val & 0xfffffff8;
00236 }
00237
00243 inline uint32_t roundIf8Boundary(uint32_t val)
00244 {
00245 if (val > 8) {
00246 return round8Boundary(val);
00247 }
00248 }
00249
00250 public:
00251 explicit LcsClusterNodeWriter(
00252 BTreeDescriptor const &treeDescriptorInit,
00253 SegmentAccessor const &accessorInit,
00254 TupleDescriptor const &colTupleDescInit,
00255 SharedTraceTarget pTraceTargetInit,
00256 std::string nameInit);
00257
00258 ~LcsClusterNodeWriter();
00259
00269 bool getLastClusterPageForWrite(PLcsClusterNode &pBlock, LcsRid &firstRid);
00270
00278 PLcsClusterNode allocateClusterPage(LcsRid firstRid);
00279
00294 void init(uint nColumns, PBuffer indexBlock, PBuffer *pBlock, uint szBlock);
00295
00296 void close();
00297
00303 void openNew(LcsRid startRID);
00304
00320 bool openAppend(
00321 uint *nValOffsets, uint16_t *lastValOffsets, RecordNum &nrows);
00322
00334 void describeLastBatch(uint column, uint &dRow, uint &recSize);
00335
00343 uint16_t getNextVal(uint column, uint16_t thisVal);
00344
00354 void rollBackLastBatch(uint column, PBuffer pVal);
00355
00361 inline bool noCompressMode(uint column) const
00362 {
00363 return bForceMode[column] == fixed ||
00364 bForceMode[column] == variable;
00365 };
00366
00376 inline PBuffer getOffsetPtr(uint column, uint16_t offset)
00377 {
00378 return pBlock[column] + offset;
00379 };
00380
00392 bool addValue(uint column, bool bFirstTimeInBatch);
00393
00407 bool addValue(uint column, PBuffer pVal, uint16_t *oVal);
00408
00419 void undoValue(uint column, PBuffer pVal, bool bFirstInBatch);
00420
00442 void putCompressedBatch(uint column, PBuffer pRows, PBuffer pBuf);
00443
00465 void putFixedVarBatch(uint column, uint16_t *pRows, PBuffer pBuf);
00466
00481 void pickCompressionMode(
00482 uint column, uint fixedSize, uint nRows,
00483 uint16_t **pValOffset,
00484 LcsBatchMode &compressionMode);
00485
00489 bool isEndOfBlock()
00490 {
00491 uint col;
00492 int valueSizeNeeded;
00493
00494 for (valueSizeNeeded = 0, col = 0; col < nClusterCols; col++) {
00495 valueSizeNeeded += batchDirs[col].recSize * LcsMaxLeftOver;
00496 }
00497
00498 return szLeft <= (minSzLeft + valueSizeNeeded);
00499 }
00500
00505 void endBlock()
00506 {
00507 moveFromTempToIndex();
00508 }
00509 };
00510
00511 FENNEL_END_NAMESPACE
00512
00513 #endif
00514
00515