00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "fennel/common/CommonPreamble.h"
00023 #include "fennel/lucidera/colstore/LcsClusterReplaceExecStream.h"
00024 #include "fennel/lucidera/colstore/LcsClusterReader.h"
00025 #include "fennel/segment/SegmentFactory.h"
00026 #include "fennel/btree/BTreeBuilder.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReplaceExecStream.cpp#6 $");
00030
00031 void LcsClusterReplaceExecStream::prepare(
00032 LcsClusterReplaceExecStreamParams const ¶ms)
00033 {
00034 LcsClusterAppendExecStream::prepare(params);
00035 newClusterRootParamId = params.rootPageIdParamId;
00036
00037
00038
00039 origRootPageId = treeDescriptor.rootPageId;
00040 }
00041
00042 void LcsClusterReplaceExecStream::initTupleLoadParams(
00043 const TupleProjection &inputProj)
00044 {
00045 numColumns = inputProj.size() - 1;
00046
00047 projInputTupleDesc.projectFrom(tableColsTupleDesc, inputProj);
00048 projInputTupleData.compute(projInputTupleDesc);
00049
00050
00051
00052
00053
00054
00055
00056
00057 pOrigClusterReader =
00058 SharedLcsClusterReader(new LcsClusterReader(treeDescriptor));
00059 TupleProjection proj;
00060 proj.resize(numColumns);
00061 for (uint i = 0; i < numColumns; i++) {
00062 proj[i] = i;
00063 }
00064 pOrigClusterReader->initColumnReaders(numColumns, proj);
00065
00066
00067
00068 std::copy(inputProj.begin() + 1, inputProj.end(), proj.begin());
00069 TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor();
00070 clusterColsTupleAccessor.bind(inputAccessor, proj);
00071 clusterColsTupleDesc.projectFrom(pInAccessor->getTupleDesc(), proj);
00072 clusterColsTupleData.compute(clusterColsTupleDesc);
00073
00074 attrAccessors.resize(clusterColsTupleDesc.size());
00075 for (uint i = 0; i < clusterColsTupleDesc.size(); i++) {
00076 attrAccessors[i].compute(clusterColsTupleDesc[i]);
00077 }
00078
00079 origClusterTupleData.computeAndAllocate(clusterColsTupleDesc);
00080
00081
00082 colTupleDesc.reset(new TupleDescriptor[numColumns]);
00083 for (int i = 0; i < numColumns; i++) {
00084
00085 colTupleDesc[i].push_back(tableColsTupleDesc[inputProj[i + 1]]);
00086 }
00087 }
00088
00089 void LcsClusterReplaceExecStream::getResourceRequirements(
00090 ExecStreamResourceQuantity &minQuantity,
00091 ExecStreamResourceQuantity &optQuantity)
00092 {
00093 LcsClusterAppendExecStream::getResourceRequirements(
00094 minQuantity,
00095 optQuantity);
00096
00097
00098
00099
00100 minQuantity.nCachePages += 2;
00101
00102 optQuantity = minQuantity;
00103 }
00104
00105 void LcsClusterReplaceExecStream::open(bool restart)
00106 {
00107 newData = false;
00108
00109
00110
00111 LcsClusterAppendExecStream::open(restart);
00112
00113
00114 origNumRows = pOrigClusterReader->getNumRows();
00115
00116 if (!restart) {
00117
00118
00119 if (opaqueToInt(newClusterRootParamId) > 0) {
00120 pDynamicParamManager->createParam(
00121 newClusterRootParamId,
00122 pInAccessor->getTupleDesc()[0]);
00123 }
00124
00125
00126
00127 pSnapshotSegment =
00128 SegmentFactory::getSnapshotSegment(
00129 treeDescriptor.segmentAccessor.pSegment);
00130 assert(pSnapshotSegment != NULL);
00131 }
00132
00133 if (opaqueToInt(newClusterRootParamId) > 0) {
00134 TupleDatum rootPageIdDatum;
00135 rootPageIdDatum.pData = (PConstBuffer) &(treeDescriptor.rootPageId);
00136 rootPageIdDatum.cbData = sizeof(treeDescriptor.rootPageId);
00137 pDynamicParamManager->writeParam(
00138 newClusterRootParamId,
00139 rootPageIdDatum);
00140 }
00141
00142 pOrigClusterReader->open();
00143 currLoadRid = LcsRid(0);
00144 currInputRid = LcsRid(MAXU);
00145 needTuple = true;
00146 }
00147
00148 ExecStreamResult LcsClusterReplaceExecStream::getTupleForLoad()
00149 {
00150
00151
00152 if (!needTuple) {
00153 return EXECRC_YIELD;
00154 }
00155
00156 if (pInAccessor->getState() == EXECBUF_EOS) {
00157
00158
00159
00160
00161
00162
00163
00164
00165 if (!newData) {
00166 return EXECRC_EOS;
00167 }
00168 if (opaqueToInt(currLoadRid) < origNumRows) {
00169 readOrigClusterRow();
00170 needTuple = false;
00171
00172 initLoad();
00173 return EXECRC_YIELD;
00174 } else {
00175 pSnapshotSegment->versionPage(
00176 origRootPageId,
00177 treeDescriptor.rootPageId);
00178 return EXECRC_EOS;
00179 }
00180 }
00181
00182 if (!pInAccessor->demandData()) {
00183 return EXECRC_BUF_UNDERFLOW;
00184 }
00185
00186
00187
00188 if (!newData) {
00189 treeDescriptor.rootPageId = NULL_PAGE_ID;
00190 BTreeBuilder builder(
00191 treeDescriptor,
00192 treeDescriptor.segmentAccessor.pSegment);
00193 builder.createEmptyRoot();
00194 treeDescriptor.rootPageId = builder.getRootPageId();
00195 newData = true;
00196 }
00197
00198 initLoad();
00199
00200 if (currLoadRid == LcsRid(0) || currLoadRid > currInputRid) {
00201 assert(!pInAccessor->isTupleConsumptionPending());
00202 pInAccessor->unmarshalProjectedTuple(projInputTupleData);
00203 currInputRid =
00204 *reinterpret_cast<LcsRid const *> (projInputTupleData[0].pData);
00205 }
00206
00207
00208
00209
00210 if (currInputRid > currLoadRid) {
00211 readOrigClusterRow();
00212 } else {
00213 assert(currInputRid == currLoadRid);
00214 clusterColsTupleAccessor.unmarshal(clusterColsTupleData);
00215 }
00216
00217 needTuple = false;
00218 return EXECRC_YIELD;
00219 }
00220
00221 void LcsClusterReplaceExecStream::readOrigClusterRow()
00222 {
00223 origClusterTupleData.resetBuffer();
00224
00225
00226
00227
00228
00229 bool needSync = true;
00230 if (pOrigClusterReader->isPositioned() &&
00231 currLoadRid < pOrigClusterReader->getRangeEndRid())
00232 {
00233 needSync = false;
00234 }
00235 bool rc = pOrigClusterReader->position(currLoadRid);
00236 assert(rc);
00237 for (uint i = 0; i < pOrigClusterReader->nColsToRead; i++) {
00238 if (needSync) {
00239 pOrigClusterReader->clusterCols[i].sync();
00240 }
00241 PBuffer colValue = pOrigClusterReader->clusterCols[i].getCurrentValue();
00242 attrAccessors[i].loadValue(origClusterTupleData[i], colValue);
00243 clusterColsTupleData[i] = origClusterTupleData[i];
00244 }
00245 }
00246
00247 void LcsClusterReplaceExecStream::postProcessTuple()
00248 {
00249
00250
00251 if (currInputRid == currLoadRid) {
00252 LcsClusterAppendExecStream::postProcessTuple();
00253 }
00254 currLoadRid++;
00255 needTuple = true;
00256 }
00257
00258 void LcsClusterReplaceExecStream::close()
00259 {
00260 LcsClusterAppendExecStream::close();
00261 pOrigClusterReader->close();
00262 }
00263
00264 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReplaceExecStream.cpp#6 $");
00265
00266