00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/ftrs/FtrsTableWriter.h"
00026 #include "fennel/ftrs/FtrsTableWriterFactory.h"
00027 #include "fennel/btree/BTreeWriter.h"
00028 #include "fennel/tuple/TupleDescriptor.h"
00029 #include "fennel/txn/LogicalTxn.h"
00030 #include "fennel/common/ByteOutputStream.h"
00031 #include "fennel/common/ByteInputStream.h"
00032 #include "fennel/synch/SXMutex.h"
00033 #include "fennel/exec/ExecStreamBufAccessor.h"
00034
00035 #include <boost/bind.hpp>
00036 #include <numeric>
00037
00038 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/ftrs/FtrsTableWriter.cpp#11 $");
00039
00040 const LogicalActionType FtrsTableWriter::ACTION_INSERT = 1;
00041
00042 const LogicalActionType FtrsTableWriter::ACTION_DELETE = 2;
00043
00044 const LogicalActionType FtrsTableWriter::ACTION_UPDATE = 3;
00045
00046 const LogicalActionType FtrsTableWriter::ACTION_REVERSE_UPDATE = 4;
00047
00048 FtrsTableWriter::FtrsTableWriter(FtrsTableWriterParams const ¶ms)
00049 {
00050 updateProj = params.updateProj;
00051 pClusteredIndexWriter = NULL;
00052 indexWriters.resize(params.indexParams.size());
00053 std::transform(
00054 indexWriters.begin(),
00055 indexWriters.end(),
00056 params.indexParams.begin(),
00057 indexWriters.begin(),
00058 boost::bind(&FtrsTableWriter::createIndexWriter,this,_1,_2));
00059 assert(pClusteredIndexWriter);
00060
00061 pTupleData = &(pClusteredIndexWriter->tupleData);
00062
00063 if (!updateProj.empty()) {
00064 TupleDescriptor tupleDesc =
00065 pClusteredIndexWriter->pWriter->getTupleDescriptor();
00066 for (uint i = 0; i < updateProj.size(); ++i) {
00067 tupleDesc.push_back(tupleDesc[updateProj[i]]);
00068 }
00069 tupleAccessor.compute(tupleDesc);
00070 updateTupleData.compute(tupleDesc);
00071 pTupleData = &updateTupleData;
00072 }
00073
00074 nAttrs = pClusteredIndexWriter->tupleData.size();
00075
00076 logBuf.reset(new FixedBuffer[tupleAccessor.getMaxByteCount()]);
00077 }
00078
00079 FtrsTableIndexWriter &FtrsTableWriter::createIndexWriter(
00080 FtrsTableIndexWriter &indexWriter,
00081 FtrsTableIndexWriterParams const &indexParams)
00082 {
00083 indexWriter.pWriter = BTreeExecStream::newWriter(indexParams);
00084 indexWriter.distinctness = indexParams.distinctness;
00085 indexWriter.updateInPlace = indexParams.updateInPlace;
00086 indexWriter.inputProj = indexParams.inputProj;
00087 indexWriter.pRootMap = indexParams.pRootMap;
00088 if (indexWriter.inputProj.size()) {
00089 indexWriter.tupleData.compute(
00090 indexWriter.pWriter->getTupleDescriptor());
00091
00092 TupleProjection const &keyProj =
00093 indexWriter.pWriter->getKeyProjection();
00094 for (uint i = 0; i < keyProj.size(); ++i) {
00095 indexWriter.inputKeyProj.push_back(
00096 indexWriter.inputProj[keyProj[i]]);
00097 }
00098 } else {
00099
00100
00101
00102 TupleDescriptor const &clusteredTupleDesc =
00103 indexWriter.pWriter->getTupleDescriptor();
00104 tupleAccessor.compute(clusteredTupleDesc);
00105 indexWriter.tupleData.compute(clusteredTupleDesc);
00106 assert(!pClusteredIndexWriter);
00107 pClusteredIndexWriter = &indexWriter;
00108
00109 indexWriter.inputKeyProj =
00110 indexWriter.pWriter->getKeyProjection();
00111 }
00112 return indexWriter;
00113 }
00114
00115 inline bool FtrsTableWriter::searchForIndexKey(
00116 FtrsTableIndexWriter &indexWriter)
00117 {
00118 TupleData &keyData = indexWriter.pWriter->getSearchKeyForWrite();
00119 for (uint i = 0; i < indexWriter.inputKeyProj.size(); ++i) {
00120 keyData[i] =
00121 pClusteredIndexWriter->tupleData[indexWriter.inputKeyProj[i]];
00122 }
00123 return indexWriter.pWriter->searchForKey(keyData,DUP_SEEK_ANY);
00124 }
00125
00126 inline void FtrsTableWriter::insertIntoIndex(
00127 FtrsTableIndexWriter &indexWriter)
00128 {
00129 for (uint i = 0; i < indexWriter.inputProj.size(); ++i) {
00130 indexWriter.tupleData[i] =
00131 pClusteredIndexWriter->tupleData[indexWriter.inputProj[i]];
00132 }
00133 if (indexWriter.updateInPlace) {
00134 if (searchForIndexKey(indexWriter)) {
00135 if (indexWriter.pWriter->updateCurrent(indexWriter.tupleData)) {
00136 indexWriter.pWriter->endSearch();
00137 return;
00138 }
00139
00140 indexWriter.pWriter->deleteCurrent();
00141
00142
00143
00144
00145 indexWriter.pWriter->endSearch();
00146 } else {
00147
00148 permAssert(false);
00149 }
00150 }
00151 indexWriter.pWriter->insertTupleData(
00152 indexWriter.tupleData,
00153 indexWriter.distinctness);
00154 }
00155
00156 inline void FtrsTableWriter::deleteFromIndex(
00157 FtrsTableIndexWriter &indexWriter)
00158 {
00159 if (searchForIndexKey(indexWriter)) {
00160
00161
00162 indexWriter.pWriter->deleteCurrent();
00163 }
00164 indexWriter.pWriter->endSearch();
00165 }
00166
00167 inline void FtrsTableWriter::modifySomeIndexes(
00168 LogicalActionType actionType,
00169 IndexWriterVector::iterator &first,
00170 IndexWriterVector::iterator last)
00171 {
00172 switch (actionType) {
00173 case ACTION_INSERT:
00174 for (; first != last; ++first) {
00175 insertIntoIndex(*first);
00176 }
00177 break;
00178 case ACTION_DELETE:
00179 for (; first != last; ++first) {
00180 if (!first->updateInPlace) {
00181 deleteFromIndex(*first);
00182 }
00183 }
00184 break;
00185 default:
00186 permAssert(false);
00187 }
00188 }
00189
00190 inline void FtrsTableWriter::modifyAllIndexes(LogicalActionType actionType)
00191 {
00192 IndexWriterVector::iterator first = indexWriters.begin();
00193 IndexWriterVector::iterator current = first;
00194 try {
00195 modifySomeIndexes(actionType,current,indexWriters.end());
00196 } catch (...) {
00197
00198
00199 try {
00200 LogicalActionType compensatingActionType =
00201 (actionType == ACTION_INSERT) ? ACTION_DELETE : ACTION_INSERT;
00202 modifySomeIndexes(compensatingActionType,first,current);
00203 } catch (...) {
00204
00205
00206 permAssert(false);
00207 }
00208 throw;
00209 }
00210 }
00211
00212 inline void FtrsTableWriter::copyNewValues()
00213 {
00214 for (uint i = 0; i < updateProj.size(); ++i) {
00215 pClusteredIndexWriter->tupleData[updateProj[i]] =
00216 (*pTupleData)[nAttrs + i];
00217 }
00218 }
00219
00220 inline void FtrsTableWriter::copyOldValues()
00221 {
00222 for (uint i = 0; i < updateProj.size(); ++i) {
00223 pClusteredIndexWriter->tupleData[updateProj[i]] =
00224 (*pTupleData)[updateProj[i]];
00225 }
00226 }
00227
00228 void FtrsTableWriter::executeUpdate(bool reverse)
00229 {
00230
00231 std::copy(
00232 pTupleData->begin(),
00233 pTupleData->begin() + nAttrs,
00234 pClusteredIndexWriter->tupleData.begin());
00235
00236 if (reverse) {
00237
00238 copyNewValues();
00239 }
00240
00241 modifyAllIndexes(ACTION_DELETE);
00242
00243 if (reverse) {
00244
00245 copyOldValues();
00246 } else {
00247
00248 copyNewValues();
00249 }
00250
00251 try {
00252 modifyAllIndexes(ACTION_INSERT);
00253 } catch (...) {
00254
00255 try {
00256 if (reverse) {
00257 copyNewValues();
00258 } else {
00259 copyOldValues();
00260 }
00261 modifyAllIndexes(ACTION_INSERT);
00262 } catch (...) {
00263
00264
00265 permAssert(false);
00266 }
00267 throw;
00268 }
00269 }
00270
00271 inline void FtrsTableWriter::executeTuple(LogicalActionType actionType)
00272 {
00273 switch (actionType) {
00274 case ACTION_INSERT:
00275 case ACTION_DELETE:
00276 modifyAllIndexes(actionType);
00277 break;
00278 case ACTION_UPDATE:
00279 executeUpdate(false);
00280 break;
00281 case ACTION_REVERSE_UPDATE:
00282 executeUpdate(true);
00283 break;
00284 default:
00285 permAssert(false);
00286 }
00287 }
00288
00289 RecordNum FtrsTableWriter::execute(
00290 ExecStreamQuantum const &quantum,
00291 ExecStreamBufAccessor &bufAccessor,
00292 LogicalActionType actionType,
00293 SXMutex &actionMutex)
00294 {
00295
00296
00297
00298 assert(isLoggingEnabled());
00299
00300 RecordNum nTuples = 0;
00301
00302
00303 TupleAccessor &tupleAccessor = bufAccessor.getConsumptionTupleAccessor();
00304
00305 do {
00306 if (!bufAccessor.demandData()) {
00307 break;
00308 }
00309 bufAccessor.unmarshalTuple(*pTupleData);
00310
00311
00312
00313
00314 SXMutexSharedGuard actionMutexGuard(actionMutex);
00315 executeTuple(actionType);
00316 uint cb = tupleAccessor.getCurrentByteCount();
00317 LogicalTxn *pTxn = getLogicalTxn();
00318 ByteOutputStream &logStream =
00319 pTxn->beginLogicalAction(*this,actionType);
00320 logStream.writeBytes(tupleAccessor.getCurrentTupleBuf(),cb);
00321 pTxn->endLogicalAction();
00322 bufAccessor.consumeTuple();
00323 ++nTuples;
00324 } while (nTuples < quantum.nTuplesMax);
00325
00326 return nTuples;
00327 }
00328
00329 LogicalTxnClassId FtrsTableWriter::getParticipantClassId() const
00330 {
00331 return FtrsTableWriterFactory::getParticipantClassId();
00332 }
00333
00334 void FtrsTableWriter::describeParticipant(
00335 ByteOutputStream &logStream)
00336 {
00337 TupleDescriptor const &clusteredTupleDesc =
00338 pClusteredIndexWriter->pWriter->getTupleDescriptor();
00339 clusteredTupleDesc.writePersistent(logStream);
00340 uint nIndexes = indexWriters.size();
00341 logStream.writeValue(nIndexes);
00342 std::for_each(
00343 indexWriters.begin(),
00344 indexWriters.end(),
00345 boost::bind(&FtrsTableWriter::describeIndex,this,_1,&logStream));
00346 updateProj.writePersistent(logStream);
00347 }
00348
00349 void FtrsTableWriter::describeIndex(
00350 FtrsTableIndexWriter &indexWriter,
00351 ByteOutputStream *pLogStream)
00352 {
00353 pLogStream->writeValue(indexWriter.pWriter->getSegmentId());
00354 pLogStream->writeValue(indexWriter.pWriter->getPageOwnerId());
00355 pLogStream->writeValue(indexWriter.pWriter->getRootPageId());
00356 pLogStream->writeValue(indexWriter.distinctness);
00357 pLogStream->writeValue(indexWriter.updateInPlace);
00358 indexWriter.inputProj.writePersistent(*pLogStream);
00359 indexWriter.pWriter->getKeyProjection().writePersistent(
00360 *pLogStream);
00361 }
00362
00363 void FtrsTableWriter::undoLogicalAction(
00364 LogicalActionType actionType,
00365 ByteInputStream &logStream)
00366 {
00367 switch (actionType) {
00368 case ACTION_INSERT:
00369 redoLogicalAction(ACTION_DELETE,logStream);
00370 break;
00371 case ACTION_DELETE:
00372 redoLogicalAction(ACTION_INSERT,logStream);
00373 break;
00374 case ACTION_UPDATE:
00375 redoLogicalAction(ACTION_REVERSE_UPDATE,logStream);
00376 break;
00377 default:
00378 permAssert(false);
00379 }
00380 }
00381
00382 void FtrsTableWriter::redoLogicalAction(
00383 LogicalActionType actionType,
00384 ByteInputStream &logStream)
00385 {
00386 uint cbMin = tupleAccessor.getMinByteCount();
00387 uint cbActual = logStream.readBytes(logBuf.get(), cbMin);
00388 assert(cbMin == cbActual);
00389 tupleAccessor.setCurrentTupleBuf(logBuf.get());
00390 uint cb = tupleAccessor.getCurrentByteCount();
00391 uint cbRemainder = cb - cbMin;
00392 if (cbRemainder) {
00393 cbActual = logStream.readBytes(logBuf.get() + cbMin, cbRemainder);
00394 assert(cbActual == cbRemainder);
00395 }
00396
00397 tupleAccessor.unmarshal(*pTupleData);
00398 executeTuple(actionType);
00399 }
00400
00401 PageOwnerId FtrsTableWriter::getTableId()
00402 {
00403 return pClusteredIndexWriter->pWriter->getPageOwnerId();
00404 }
00405
00406 uint FtrsTableWriter::getIndexCount() const
00407 {
00408 return indexWriters.size();
00409 }
00410
00411 void FtrsTableWriter::openIndexWriters()
00412 {
00413 for (uint i = 0; i < indexWriters.size(); ++i) {
00414 FtrsTableIndexWriter &indexWriter = indexWriters[i];
00415 if (!indexWriter.pRootMap) {
00416 continue;
00417 }
00418 PageId rootPageId = indexWriter.pRootMap->getRoot(
00419 indexWriter.pWriter->getPageOwnerId());
00420 indexWriter.pWriter->setRootPageId(rootPageId);
00421 }
00422 }
00423
00424 void FtrsTableWriter::closeIndexWriters()
00425 {
00426 for (uint i = 0; i < indexWriters.size(); ++i) {
00427 FtrsTableIndexWriter &indexWriter = indexWriters[i];
00428 indexWriter.pWriter->releaseScratchBuffers();
00429
00430
00431
00432 #if 0
00433 if (indexWriter.pRootMap) {
00434 indexWriter.pWriter->setRootPageId(NULL_PAGE_ID);
00435 }
00436 #endif
00437 }
00438 }
00439
00440 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/ftrs/FtrsTableWriter.cpp#11 $");
00441
00442