00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/sorter/ExternalSortMerger.h"
00026 #include "fennel/sorter/ExternalSortInfo.h"
00027 #include "fennel/sorter/ExternalSortRunAccessor.h"
00028 #include "fennel/exec/ExecStream.h"
00029
00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortMerger.cpp#1 $");
00031
00032 ExternalSortMerger::ExternalSortMerger(ExternalSortInfo &sortInfoIn)
00033 : sortInfo(sortInfoIn)
00034 {
00035 nRuns = 0;
00036
00037 tupleAccessor.compute(sortInfo.tupleDesc);
00038 tupleAccessor2.compute(sortInfo.tupleDesc);
00039
00040 keyData.compute(sortInfo.keyDesc);
00041 keyData2 = keyData;
00042
00043 keyAccessor.bind(tupleAccessor,sortInfo.keyProj);
00044 keyAccessor2.bind(tupleAccessor2,sortInfo.keyProj);
00045
00046 nMergeMemPages = sortInfo.nSortMemPages + 1;
00047
00048 ppRunAccessors.reset(new SharedExternalSortRunAccessor[nMergeMemPages]);
00049 ppFetchArrays.reset(new ExternalSortFetchArray *[nMergeMemPages]);
00050
00051 pOrds.reset(new uint[nMergeMemPages]);
00052 memset(pOrds.get(),0,sizeof(uint) * nMergeMemPages);
00053
00054 mergeInfo.reset(new ExternalSortMergeInfo[nMergeMemPages]);
00055
00056 fetchArray.ppTupleBuffers = &(ppTupleBuffers[0]);
00057 }
00058
00059 ExternalSortMerger::~ExternalSortMerger()
00060 {
00061 releaseResources();
00062 }
00063
00064 void ExternalSortMerger::initRunAccess()
00065 {
00066 for (uint i = 1; i < nMergeMemPages; i++) {
00067 ppRunAccessors[i].reset(new ExternalSortRunAccessor(sortInfo));
00068 ppRunAccessors[i]->initRead();
00069 ppFetchArrays[i] = &(ppRunAccessors[i]->bindFetchArray());
00070 }
00071 }
00072
00073 void ExternalSortMerger::releaseResources()
00074 {
00075 ppRunAccessors.reset();
00076 ppFetchArrays.reset();
00077 pOrds.reset();
00078 mergeInfo.reset();
00079 }
00080
00081 void ExternalSortMerger::startMerge(
00082 std::vector<SharedSegStreamAllocation>::iterator pStoredRun,
00083 uint nRunsToMerge)
00084 {
00085 assert (nRunsToMerge < nMergeMemPages);
00086
00087 nRuns = nRunsToMerge;
00088
00089 for (uint i = 1; i < nMergeMemPages; i++) {
00090 ppRunAccessors[i]->resetRead();
00091 pOrds[i] = 0;
00092 }
00093
00094 for (uint i = 1; i <= nRuns; i++) {
00095 ppRunAccessors[i]->startRead(pStoredRun[i - 1]);
00096 ppRunAccessors[i]->fetch(EXTSORT_FETCH_ARRAY_SIZE);
00097 mergeInfo[i].val = ppFetchArrays[i]->ppTupleBuffers[0];
00098 mergeInfo[i].runOrd = i;
00099 }
00100 fetchArray.nTuples = 0;
00101
00102 heapBuild();
00103 }
00104
00105 ExternalSortFetchArray &ExternalSortMerger::bindFetchArray()
00106 {
00107 return fetchArray;
00108 }
00109
00110 inline uint ExternalSortMerger::heapParent(uint i)
00111 {
00112 return (i >> 1);
00113 }
00114
00115 inline uint ExternalSortMerger::heapLeft(uint i)
00116 {
00117 return (i << 1);
00118 }
00119
00120 inline uint ExternalSortMerger::heapRight(uint i)
00121 {
00122 return (i << 1) + 1;
00123 }
00124
00125 inline void ExternalSortMerger::heapExchange(uint i,uint j)
00126 {
00127 std::swap(mergeInfo[i],mergeInfo[j]);
00128 }
00129
00130 inline ExternalSortMergeInfo &ExternalSortMerger::getMergeHigh()
00131 {
00132 return mergeInfo[1];
00133 }
00134
00135 void ExternalSortMerger::heapify(uint i)
00136 {
00137 uint l, r, highest = i;
00138
00139 l = heapLeft(i);
00140 r = heapRight(i);
00141
00142 if (l <= nRuns) {
00143 tupleAccessor.setCurrentTupleBuf(mergeInfo[l].val);
00144 tupleAccessor2.setCurrentTupleBuf(mergeInfo[i].val);
00145 keyAccessor.unmarshal(keyData);
00146 keyAccessor2.unmarshal(keyData2);
00147 if (sortInfo.compareKeys(keyData,keyData2) < 0) {
00148 highest = l;
00149 }
00150 }
00151
00152 if (r <= nRuns) {
00153 tupleAccessor.setCurrentTupleBuf(mergeInfo[r].val);
00154 tupleAccessor2.setCurrentTupleBuf(mergeInfo[highest].val);
00155 keyAccessor.unmarshal(keyData);
00156 keyAccessor2.unmarshal(keyData2);
00157 if (sortInfo.compareKeys(keyData,keyData2) < 0) {
00158 highest = r;
00159 }
00160 }
00161
00162 if (highest != i) {
00163 heapExchange(highest,i);
00164 heapify(highest);
00165 }
00166 }
00167
00168 void ExternalSortMerger::heapBuild()
00169 {
00170 for (uint i = (nRuns / 2); i > 0; i--) {
00171 heapify(i);
00172 }
00173 }
00174
00175 ExternalSortRC ExternalSortMerger::checkFetch()
00176 {
00177 if (nRuns == 0) {
00178 return EXTSORT_ENDOFDATA;
00179 }
00180 if (pOrds[getMergeHigh().runOrd]
00181 >= ppFetchArrays[getMergeHigh().runOrd]->nTuples)
00182 {
00183 ExternalSortRC rc = ppRunAccessors[getMergeHigh().runOrd]->fetch(
00184 EXTSORT_FETCH_ARRAY_SIZE);
00185 if (rc != EXTSORT_SUCCESS) {
00186 assert(rc == EXTSORT_ENDOFDATA);
00187 heapExchange(1, nRuns);
00188 if (--nRuns == 0) {
00189 return EXTSORT_ENDOFDATA;
00190 }
00191 } else {
00192 pOrds[getMergeHigh().runOrd] = 0;
00193 }
00194 }
00195
00196 return EXTSORT_SUCCESS;
00197 }
00198
00199 ExternalSortRC ExternalSortMerger::fetch(uint nTuplesRequested)
00200 {
00201 sortInfo.stream.checkAbort();
00202
00203 if (nTuplesRequested > EXTSORT_FETCH_ARRAY_SIZE) {
00204 nTuplesRequested = EXTSORT_FETCH_ARRAY_SIZE;
00205 }
00206
00207 ExternalSortRC rc = checkFetch();
00208 if (rc != EXTSORT_SUCCESS) {
00209
00210 return rc;
00211 }
00212
00213 fetchArray.nTuples = 0;
00214 do {
00215 getMergeHigh().val =
00216 ppFetchArrays[getMergeHigh().runOrd]->ppTupleBuffers[
00217 pOrds[getMergeHigh().runOrd]];
00218
00219 heapify(1);
00220
00221 fetchArray.ppTupleBuffers[fetchArray.nTuples] = getMergeHigh().val;
00222 fetchArray.nTuples++;
00223 nTuplesRequested--;
00224
00225 pOrds[getMergeHigh().runOrd]++;
00226 } while (nTuplesRequested
00227 && (pOrds[getMergeHigh().runOrd]
00228 < ppFetchArrays[getMergeHigh().runOrd]->nTuples));
00229
00230 return EXTSORT_SUCCESS;
00231 }
00232
00233 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortMerger.cpp#1 $");
00234
00235