00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/sorter/ExternalSortRunLoader.h"
00026 #include "fennel/sorter/ExternalSortInfo.h"
00027 #include "fennel/exec/ExecStreamBufAccessor.h"
00028
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortRunLoader.cpp#1 $");
00030
00031 ExternalSortRunLoader::ExternalSortRunLoader(ExternalSortInfo &sortInfoIn)
00032 : sortInfo(sortInfoIn)
00033 {
00034 nMemPagesMax = 0;
00035 pDataBuffer = NULL;
00036 pIndexBuffer = NULL;
00037
00038 nTuplesLoaded = nTuplesFetched = 0;
00039
00040 runningParallelTask = false;
00041
00042 bufferLock.accessSegment(sortInfo.memSegmentAccessor);
00043
00044 tupleAccessor.compute(sortInfo.tupleDesc);
00045 tupleAccessor2.compute(sortInfo.tupleDesc);
00046
00047 keyData.compute(sortInfo.keyDesc);
00048 keyData2 = keyData;
00049
00050 keyAccessor.bind(tupleAccessor,sortInfo.keyProj);
00051 keyAccessor2.bind(tupleAccessor2,sortInfo.keyProj);
00052
00053
00054
00055 uint nKeysPerPage = sortInfo.cbPage / sizeof(PBuffer);
00056 assert(nKeysPerPage > 1);
00057 nKeysPerPage >>= 1;
00058 indexToPageShift = 1;
00059 indexPageMask = 1;
00060 while ((nKeysPerPage & indexPageMask) == 0) {
00061 ++indexToPageShift;
00062 indexPageMask <<= 1;
00063 ++indexPageMask;
00064 }
00065 }
00066
00067 ExternalSortRunLoader::~ExternalSortRunLoader()
00068 {
00069 releaseResources();
00070 }
00071
00072 inline PBuffer &ExternalSortRunLoader::getPointerArrayEntry(uint iTuple)
00073 {
00074
00075
00076 uint iPage = iTuple >> indexToPageShift;
00077 uint iSubKey = iTuple & indexPageMask;
00078 PBuffer *pPage = reinterpret_cast<PBuffer *>(indexBuffers[iPage]);
00079 return pPage[iSubKey];
00080 }
00081
00082 void ExternalSortRunLoader::startRun()
00083 {
00084 if (!nMemPagesMax) {
00085 nMemPagesMax = sortInfo.nSortMemPagesPerRun;
00086 }
00087
00088 freeBuffers.insert(
00089 freeBuffers.end(),indexBuffers.begin(),indexBuffers.end());
00090 freeBuffers.insert(
00091 freeBuffers.end(),dataBuffers.begin(),dataBuffers.end());
00092 indexBuffers.clear();
00093 dataBuffers.clear();
00094 pDataBuffer = NULL;
00095 pIndexBuffer = NULL;
00096
00097 if (!allocateDataBuffer()) {
00098 permAssert(false);
00099 }
00100
00101 if (!allocateIndexBuffer()) {
00102 permAssert(false);
00103 }
00104
00105 nTuplesLoaded = nTuplesFetched = 0;
00106
00107 fetchArray.nTuples = 0;
00108 }
00109
00110 PBuffer ExternalSortRunLoader::allocateBuffer()
00111 {
00112 PBuffer pBuffer;
00113
00114 if (!freeBuffers.empty()) {
00115 pBuffer = freeBuffers.back();
00116 freeBuffers.pop_back();
00117 return pBuffer;
00118 }
00119
00120 if ((indexBuffers.size() + dataBuffers.size()) >= nMemPagesMax) {
00121 return NULL;
00122 }
00123
00124 bufferLock.allocatePage();
00125 pBuffer = bufferLock.getPage().getWritableData();
00126
00127
00128
00129
00130 bufferLock.unlock();
00131
00132 return pBuffer;
00133 }
00134
00135 bool ExternalSortRunLoader::allocateDataBuffer()
00136 {
00137 pDataBuffer = allocateBuffer();
00138 if (pDataBuffer) {
00139 dataBuffers.push_back(pDataBuffer);
00140 pDataBufferEnd = pDataBuffer + sortInfo.cbPage;
00141 return true;
00142 } else {
00143 return false;
00144 }
00145 }
00146
00147 bool ExternalSortRunLoader::allocateIndexBuffer()
00148 {
00149 pIndexBuffer = allocateBuffer();
00150 if (pIndexBuffer) {
00151 indexBuffers.push_back(pIndexBuffer);
00152 pIndexBufferEnd = pIndexBuffer + sortInfo.cbPage;
00153 return true;
00154 } else {
00155 return false;
00156 }
00157 }
00158
00159 void ExternalSortRunLoader::releaseResources()
00160 {
00161 freeBuffers.clear();
00162 indexBuffers.clear();
00163 dataBuffers.clear();
00164 nMemPagesMax = 0;
00165
00166
00167
00168
00169 sortInfo.memSegmentAccessor.pSegment->deallocatePageRange(
00170 NULL_PAGE_ID,NULL_PAGE_ID);
00171 }
00172
00173 ExternalSortRC ExternalSortRunLoader::loadRun(
00174 ExecStreamBufAccessor &bufAccessor)
00175 {
00176 for (;;) {
00177 if (!bufAccessor.demandData()) {
00178 break;
00179 }
00180 uint cbAvailable = bufAccessor.getConsumptionAvailable();
00181 assert(cbAvailable);
00182 PConstBuffer pSrc = bufAccessor.getConsumptionStart();
00183 bool overflow = false;
00184 uint cbCopy = 0;
00185 while (cbCopy < cbAvailable) {
00186 PConstBuffer pSrcTuple = pSrc + cbCopy;
00187 uint cbTuple = tupleAccessor.getBufferByteCount(pSrcTuple);
00188 assert(cbTuple);
00189 assert(cbTuple <= tupleAccessor.getMaxByteCount());
00190
00191
00192 if (pIndexBuffer >= pIndexBufferEnd) {
00193 if (!allocateIndexBuffer()) {
00194 overflow = true;
00195 break;
00196 }
00197 }
00198
00199
00200 if (pDataBuffer + cbCopy + cbTuple > pDataBufferEnd) {
00201 if (!cbCopy) {
00202
00203 if (!allocateDataBuffer()) {
00204
00205 return EXTSORT_OVERFLOW;
00206 }
00207 }
00208
00209
00210 break;
00211 }
00212
00213 *((PBuffer *) pIndexBuffer) = pDataBuffer + cbCopy;
00214 pIndexBuffer += sizeof(PBuffer);
00215 nTuplesLoaded++;
00216 cbCopy += cbTuple;
00217 }
00218 if (cbCopy) {
00219 memcpy(pDataBuffer,pSrc,cbCopy);
00220 pDataBuffer += cbCopy;
00221 bufAccessor.consumeData(pSrc + cbCopy);
00222 }
00223 if (overflow) {
00224 return EXTSORT_OVERFLOW;
00225 }
00226 }
00227
00228 assert(nTuplesLoaded);
00229 return EXTSORT_SUCCESS;
00230 }
00231
00232 void ExternalSortRunLoader::sort()
00233 {
00234 assert (nTuplesLoaded);
00235
00236 quickSort(0,nTuplesLoaded - 1);
00237 }
00238
00239 ExternalSortFetchArray &ExternalSortRunLoader::bindFetchArray()
00240 {
00241 return fetchArray;
00242 }
00243
00244 ExternalSortRC ExternalSortRunLoader::fetch(uint nTuplesRequested)
00245 {
00246 if (nTuplesFetched >= nTuplesLoaded) {
00247 return EXTSORT_ENDOFDATA;
00248 }
00249
00250 fetchArray.ppTupleBuffers = (PBuffer *)
00251 &(getPointerArrayEntry(nTuplesFetched));
00252 uint pageEnd = (nTuplesFetched | indexPageMask) + 1;
00253 pageEnd = std::min(pageEnd,nTuplesLoaded);
00254 fetchArray.nTuples = std::min(nTuplesRequested,pageEnd - nTuplesFetched);
00255 nTuplesFetched += fetchArray.nTuples;
00256
00257 return EXTSORT_SUCCESS;
00258 }
00259
00260 inline void ExternalSortRunLoader::quickSortSwap(uint l,uint r)
00261 {
00262 std::swap(getPointerArrayEntry(l),getPointerArrayEntry(r));
00263 }
00264
00265
00266 const uint step_factor = 7;
00267
00268 PBuffer ExternalSortRunLoader::quickSortFindPivot(uint l, uint r)
00269 {
00270 uint i, j, cnt, step;
00271 PBuffer vals[step_factor];
00272
00273 if (r <= l) {
00274 return NULL;
00275 }
00276
00277 cnt = 0;
00278 step = ((r - l) / step_factor) + 1;
00279 for (i = l; i <= r; i += step) {
00280 vals[cnt] = getPointerArrayEntry(i);
00281 j = cnt++;
00282 while (j > 0) {
00283 tupleAccessor.setCurrentTupleBuf(vals[j]);
00284 tupleAccessor2.setCurrentTupleBuf(vals[j - 1]);
00285 keyAccessor.unmarshal(keyData);
00286 keyAccessor2.unmarshal(keyData2);
00287 if (sortInfo.compareKeys(keyData,keyData2) >= 0) {
00288 break;
00289 }
00290 std::swap(vals[j],vals[j - 1]);
00291 j--;
00292 }
00293 }
00294 if (step == 1) {
00295 for (i = 0; i < cnt; ++i) {
00296 getPointerArrayEntry(l + i) = vals[i];
00297 }
00298 return NULL;
00299 }
00300 return vals[(cnt >> 1)];
00301 }
00302
00303 uint ExternalSortRunLoader::quickSortPartition(uint l,uint r,PBuffer pivot)
00304 {
00305 l--;
00306 r++;
00307
00308 tupleAccessor.setCurrentTupleBuf(pivot);
00309 keyAccessor.unmarshal(keyData);
00310 for (;;) {
00311 for (;;) {
00312 ++l;
00313 tupleAccessor2.setCurrentTupleBuf(getPointerArrayEntry(l));
00314 keyAccessor2.unmarshal(keyData2);
00315 if (sortInfo.compareKeys(keyData2,keyData) >= 0) {
00316 break;
00317 }
00318 }
00319 for (;;) {
00320 --r;
00321 tupleAccessor2.setCurrentTupleBuf(getPointerArrayEntry(r));
00322 keyAccessor2.unmarshal(keyData2);
00323 if (sortInfo.compareKeys(keyData2,keyData) <= 0) {
00324 break;
00325 }
00326 }
00327 if (l < r) {
00328 quickSortSwap(l, r);
00329 } else {
00330 return l;
00331 }
00332 }
00333 }
00334
00335 void ExternalSortRunLoader::quickSort(uint l,uint r)
00336 {
00337 PBuffer pPivotTuple;
00338 uint x;
00339
00340 pPivotTuple = quickSortFindPivot(l,r);
00341 if (pPivotTuple) {
00342 x = quickSortPartition(l,r,pPivotTuple);
00343 if (x == l) {
00344
00345
00346 x++;
00347 }
00348 quickSort(l,x - 1);
00349 quickSort(x,r);
00350 }
00351 }
00352
00353 uint ExternalSortRunLoader::getLoadedTupleCount()
00354 {
00355 return nTuplesLoaded;
00356 }
00357
00358 bool ExternalSortRunLoader::isStarted()
00359 {
00360 return nTuplesLoaded > nTuplesFetched;
00361 }
00362
00363 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortRunLoader.cpp#1 $");
00364
00365