00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsRowScanBaseExecStream.h"
00024 #include "fennel/exec/ExecStreamBufAccessor.h"
00025
00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanBaseExecStream.cpp#12 $");
00027
00028 LcsRowScanBaseExecStream::LcsRowScanBaseExecStream()
00029 {
00030 nClusters = 0;
00031 }
00032
00033 void LcsRowScanBaseExecStream::prepare(
00034 LcsRowScanBaseExecStreamParams const ¶ms)
00035 {
00036 ConfluenceExecStream::prepare(params);
00037
00038
00039
00040
00041
00042 nClusters = params.lcsClusterScanDefs.size();
00043 pClusters.reset(new SharedLcsClusterReader[nClusters]);
00044
00045 uint clusterStart = 0;
00046 uint projCount = 0;
00047 TupleDescriptor allClusterTupleDesc;
00048 TupleProjection newProj, outputProj;
00049
00050 buildOutputProj(outputProj, params);
00051
00052 newProj.resize(outputProj.size());
00053
00054
00055 TupleDescriptor outputTupleDesc = pOutAccessor->getTupleDesc();
00056 for (uint i = 0; i < outputProj.size(); i++) {
00057 if (outputProj[i] == LCS_RID_COLUMN_ID) {
00058 newProj[i] = projCount++;
00059 allClusterTupleDesc.push_back(outputTupleDesc[i]);
00060 nonClusterCols.push_back(params.outputProj[i]);
00061 }
00062 }
00063
00064 allSpecial = (nonClusterCols.size() == newProj.size());
00065
00066 for (uint i = 0; i < nClusters; i++) {
00067 SharedLcsClusterReader &pClu = pClusters[i];
00068
00069 BTreeExecStreamParams const &bTreeParams = params.lcsClusterScanDefs[i];
00070
00071 BTreeDescriptor treeDescriptor;
00072 treeDescriptor.segmentAccessor.pSegment = bTreeParams.pSegment;
00073 treeDescriptor.segmentAccessor.pCacheAccessor =
00074 bTreeParams.pCacheAccessor;
00075 treeDescriptor.tupleDescriptor = bTreeParams.tupleDesc;
00076 treeDescriptor.keyProjection = bTreeParams.keyProj;
00077 treeDescriptor.rootPageId = bTreeParams.rootPageId;
00078 treeDescriptor.segmentId = bTreeParams.segmentId;
00079 treeDescriptor.pageOwnerId = bTreeParams.pageOwnerId;
00080
00081 pClu =
00082 SharedLcsClusterReader(
00083 new LcsClusterReader(treeDescriptor, &ridRuns));
00084
00085
00086
00087 uint clusterEnd = clusterStart +
00088 params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1;
00089
00090
00091
00092
00093 TupleProjection clusterProj;
00094 for (uint j = 0; j < newProj.size(); j++) {
00095 if (outputProj[j] >= clusterStart &&
00096 outputProj[j] <= clusterEnd)
00097 {
00098 clusterProj.push_back(outputProj[j] - clusterStart);
00099 newProj[j] = projCount++;
00100 }
00101 }
00102 clusterStart = clusterEnd + 1;
00103
00104
00105
00106
00107
00108 if (allSpecial) {
00109 clusterProj.push_back(0);
00110 }
00111 pClu->initColumnReaders(
00112 params.lcsClusterScanDefs[i].clusterTupleDesc.size(),
00113 clusterProj);
00114 if (!allSpecial) {
00115 for (uint j = 0; j < pClu->nColsToRead; j++) {
00116 allClusterTupleDesc.push_back(
00117 params.lcsClusterScanDefs[i].
00118 clusterTupleDesc[clusterProj[j]]);
00119 }
00120 }
00121 }
00122
00123
00124
00125
00126 for (uint i = 0; i < newProj.size(); i++) {
00127 projDescriptor.push_back(allClusterTupleDesc[newProj[i]]);
00128 }
00129
00130
00131
00132 projMap.resize(newProj.size());
00133 for (uint i = 0; i < projMap.size(); i++) {
00134 for (uint j = 0; j < newProj.size(); j++) {
00135 if (newProj[j] == i) {
00136 projMap[i] = j;
00137 }
00138 }
00139 }
00140 }
00141
00142 void LcsRowScanBaseExecStream::open(bool restart)
00143 {
00144 ConfluenceExecStream::open(restart);
00145 for (uint i = 0; i < nClusters; i++) {
00146 pClusters[i]->open();
00147 }
00148 }
00149
00150 void LcsRowScanBaseExecStream::getResourceRequirements(
00151 ExecStreamResourceQuantity &minQuantity,
00152 ExecStreamResourceQuantity &optQuantity)
00153 {
00154 ConfluenceExecStream::getResourceRequirements(minQuantity, optQuantity);
00155
00156
00157
00158
00159 minQuantity.nCachePages += (nClusters * 2);
00160
00161 optQuantity = minQuantity;
00162 }
00163
00164 void LcsRowScanBaseExecStream::closeImpl()
00165 {
00166 ConfluenceExecStream::closeImpl();
00167 for (uint i = 0; i < nClusters; i++) {
00168 pClusters[i]->close();
00169 }
00170 }
00171
00172 void LcsRowScanBaseExecStream::syncColumns(SharedLcsClusterReader &pScan)
00173 {
00174 for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) {
00175 pScan->clusterCols[iCluCol].sync();
00176 }
00177 }
00178
00179 bool LcsRowScanBaseExecStream::readColVals(
00180 SharedLcsClusterReader &pScan,
00181 TupleDataWithBuffer &tupleData,
00182 uint colStart)
00183 {
00184 if (!allSpecial) {
00185 for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) {
00186
00187
00188 PBuffer curValue = pScan->clusterCols[iCluCol].getCurrentValue();
00189 uint idx = projMap[colStart + iCluCol];
00190
00191 attrAccessors[idx].loadValue(tupleData[idx], curValue);
00192 if (pScan->clusterCols[iCluCol].getFilters().hasResidualFilters) {
00193 if (!pScan->clusterCols[iCluCol].applyFilters(
00194 projDescriptor,
00195 tupleData))
00196 {
00197 return false;
00198 }
00199 }
00200 }
00201 }
00202 return true;
00203 }
00204
00205 void LcsRowScanBaseExecStream::buildOutputProj(
00206 TupleProjection &outputProj,
00207 LcsRowScanBaseExecStreamParams const ¶ms)
00208 {
00209
00210
00211
00212 for (uint i = 0; i < params.outputProj.size(); i++) {
00213 outputProj.push_back(params.outputProj[i]);
00214 }
00215 }
00216
00217 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanBaseExecStream.cpp#12 $");
00218
00219