00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/segment/ScratchMemExcn.h"
00026 #include "fennel/exec/SimpleExecStreamGovernor.h"
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/exec/ExecStreamGraphImpl.h"
00029
00030 #include <math.h>
00031
00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SimpleExecStreamGovernor.cpp#9 $");
00033
00034 SimpleExecStreamGovernor::SimpleExecStreamGovernor(
00035 ExecStreamResourceKnobs const &knobSettings,
00036 ExecStreamResourceQuantity const &resourcesAvailable,
00037 SharedTraceTarget pTraceTargetInit,
00038 std::string nameInit)
00039 : TraceSource(pTraceTargetInit, nameInit),
00040 ExecStreamGovernor(
00041 knobSettings, resourcesAvailable, pTraceTargetInit, nameInit)
00042 {
00043 perGraphAllocation = computePerGraphAllocation();
00044 }
00045
00046 SimpleExecStreamGovernor::~SimpleExecStreamGovernor()
00047 {
00048 }
00049
00050 bool SimpleExecStreamGovernor::setResourceKnob(
00051 ExecStreamResourceKnobs const &knob, ExecStreamResourceKnobType knobType)
00052 {
00053 StrictMutexGuard mutexGuard(mutex);
00054
00055 switch (knobType) {
00056 case EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS:
00057 knobSettings.expectedConcurrentStatements =
00058 knob.expectedConcurrentStatements;
00059 perGraphAllocation = computePerGraphAllocation();
00060 FENNEL_TRACE(
00061 TRACE_FINE,
00062 "Expected concurrent statements set to " <<
00063 knobSettings.expectedConcurrentStatements <<
00064 ". Per graph allocation is now " << perGraphAllocation <<
00065 " cache pages.");
00066 break;
00067
00068 case EXEC_KNOB_CACHE_RESERVE_PERCENTAGE:
00069
00070
00071 double percent = (100 - knobSettings.cacheReservePercentage) / 100.0;
00072 uint totalPagesAvailable = (uint)
00073 ((resourcesAvailable.nCachePages + resourcesAssigned.nCachePages) /
00074 percent);
00075 uint numReserve =
00076 totalPagesAvailable * knob.cacheReservePercentage / 100;
00077 if (totalPagesAvailable - numReserve < resourcesAssigned.nCachePages) {
00078 return false;
00079 }
00080 knobSettings.cacheReservePercentage = knob.cacheReservePercentage;
00081 resourcesAvailable.nCachePages =
00082 totalPagesAvailable - numReserve - resourcesAssigned.nCachePages;
00083 perGraphAllocation = computePerGraphAllocation();
00084 FENNEL_TRACE(
00085 TRACE_FINE,
00086 "Cache reserve percentage set to " <<
00087 knobSettings.cacheReservePercentage <<
00088 ". Per graph allocation is now " << perGraphAllocation <<
00089 " cache pages.");
00090 break;
00091 }
00092
00093 return true;
00094 }
00095
00096 bool SimpleExecStreamGovernor::setResourceAvailability(
00097 ExecStreamResourceQuantity const &available,
00098 ExecStreamResourceType resourceType)
00099 {
00100 StrictMutexGuard mutexGuard(mutex);
00101
00102 switch (resourceType) {
00103 case EXEC_RESOURCE_CACHE_PAGES:
00104 {
00105 uint pagesAvailable =
00106 available.nCachePages *
00107 (100 - knobSettings.cacheReservePercentage) / 100;
00108 if (pagesAvailable < resourcesAssigned.nCachePages) {
00109 return false;
00110 }
00111 resourcesAvailable.nCachePages =
00112 (pagesAvailable - resourcesAssigned.nCachePages);
00113 perGraphAllocation = computePerGraphAllocation();
00114 FENNEL_TRACE(
00115 TRACE_FINE,
00116 resourcesAvailable.nCachePages <<
00117 " cache pages now available for assignment. " <<
00118 "Per graph allocation is now " << perGraphAllocation <<
00119 " cache pages.");
00120 break;
00121 }
00122
00123 case EXEC_RESOURCE_THREADS:
00124 resourcesAvailable.nThreads = available.nThreads;
00125 break;
00126 }
00127
00128 return true;
00129 }
00130 void SimpleExecStreamGovernor::requestResources(ExecStreamGraph &graph)
00131 {
00132 FENNEL_TRACE(TRACE_FINE, "requestResources");
00133
00134 StrictMutexGuard mutexGuard(mutex);
00135
00136 std::vector<SharedExecStream> sortedStreams = graph.getSortedStreams();
00137 boost::scoped_array<ExecStreamResourceRequirements> resourceReqts;
00138 boost::scoped_array<double> sqrtDiffOptMin;
00139 uint nStreams = sortedStreams.size();
00140
00141 resourceReqts.reset(new ExecStreamResourceRequirements[nStreams]);
00142 sqrtDiffOptMin.reset(new double[nStreams]);
00143
00144
00145
00146 uint allocationAmount =
00147 std::min(resourcesAvailable.nCachePages, perGraphAllocation);
00148 FENNEL_TRACE(
00149 TRACE_FINE,
00150 allocationAmount << " cache pages available for stream graph");
00151
00152
00153
00154 uint totalMin = 0;
00155 uint totalOpt = 0;
00156 double totalSqrtDiffs = 0;
00157 bool allAccurate = true;
00158 for (uint i = 0; i < nStreams; i++) {
00159 ExecStreamResourceQuantity minQuantity, optQuantity;
00160 ExecStreamResourceSettingType optType;
00161 sortedStreams[i]->getResourceRequirements(
00162 minQuantity, optQuantity, optType);
00163 assert(
00164 optType == EXEC_RESOURCE_UNBOUNDED ||
00165 minQuantity.nCachePages <= optQuantity.nCachePages);
00166 assert(minQuantity.nThreads <= optQuantity.nThreads);
00167
00168 ExecStreamResourceRequirements &reqt = resourceReqts[i];
00169 reqt.minReqt = minQuantity.nCachePages;
00170 totalMin += reqt.minReqt;
00171 reqt.optType = optType;
00172
00173 switch (optType) {
00174 case EXEC_RESOURCE_ACCURATE:
00175 reqt.optReqt = optQuantity.nCachePages;
00176 sqrtDiffOptMin[i] = sqrt(double(reqt.optReqt - reqt.minReqt));
00177 break;
00178 case EXEC_RESOURCE_ESTIMATE:
00179 reqt.optReqt = optQuantity.nCachePages;
00180 sqrtDiffOptMin[i] = sqrt(double(reqt.optReqt - reqt.minReqt));
00181 allAccurate = false;
00182 break;
00183 case EXEC_RESOURCE_UNBOUNDED:
00184
00185
00186
00187
00188
00189
00190 sqrtDiffOptMin[i] = sqrt(double(allocationAmount));
00191 allAccurate = false;
00192
00193
00194 reqt.optReqt = reqt.minReqt + allocationAmount;
00195 break;
00196 }
00197 totalOpt += reqt.optReqt;
00198 totalSqrtDiffs += sqrtDiffOptMin[i];
00199 }
00200
00201
00202 if (totalMin > allocationAmount &&
00203 totalMin > resourcesAvailable.nCachePages)
00204 {
00205 FENNEL_TRACE(
00206 TRACE_FINE,
00207 "Minimum request of " << totalMin << " cache pages not met");
00208 throw ScratchMemExcn();
00209 }
00210
00211 uint totalAssigned;
00212
00213
00214 if (totalMin >= allocationAmount) {
00215 assignCachePages(sortedStreams, resourceReqts, true);
00216 totalAssigned = totalMin;
00217 FENNEL_TRACE(
00218 TRACE_FINE,
00219 "Mininum request of " << totalMin << " cache pages assigned");
00220
00221 } else if (totalOpt <= allocationAmount) {
00222
00223
00224 if (allAccurate) {
00225 assignCachePages(sortedStreams, resourceReqts, false);
00226 totalAssigned = totalOpt;
00227 FENNEL_TRACE(
00228 TRACE_FINE,
00229 "Optimum request of " << totalOpt << " cache pages assigned");
00230
00231 } else {
00232
00233
00234
00235
00236
00237
00238 uint assigned =
00239 distributeCachePages(
00240 sortedStreams, resourceReqts, sqrtDiffOptMin,
00241 totalSqrtDiffs, allocationAmount - totalOpt, true);
00242 totalAssigned = assigned;
00243 FENNEL_TRACE(
00244 TRACE_FINE,
00245 assigned <<
00246 " cache pages assigned, based on an optimum request for " <<
00247 totalOpt << " cache pages");
00248 }
00249
00250 } else {
00251
00252
00253 uint assigned =
00254 distributeCachePages(
00255 sortedStreams, resourceReqts, sqrtDiffOptMin, totalSqrtDiffs,
00256 allocationAmount - totalMin, false);
00257 totalAssigned = assigned;
00258 FENNEL_TRACE(
00259 TRACE_FINE,
00260 assigned <<
00261 " cache pages assigned based on an optimum request for " <<
00262 totalOpt << " cache pages");
00263 }
00264
00265
00266 resourcesAssigned.nCachePages += totalAssigned;
00267 resourcesAvailable.nCachePages -= totalAssigned;
00268 SharedExecStreamResourceQuantity
00269 pQuantity(new ExecStreamResourceQuantity());
00270 pQuantity->nCachePages = totalAssigned;
00271 resourceMap.insert(
00272 ExecStreamGraphResourceMap::value_type(&graph, pQuantity));
00273
00274 FENNEL_TRACE(
00275 TRACE_FINE,
00276 resourcesAvailable.nCachePages <<
00277 " cache pages remaining for assignment");
00278 }
00279
00280 void SimpleExecStreamGovernor::assignCachePages(
00281 std::vector<SharedExecStream> &streams,
00282 boost::scoped_array<ExecStreamResourceRequirements> const &reqts,
00283 bool assignMin)
00284 {
00285 for (uint i = 0; i < streams.size(); i++) {
00286 ExecStreamResourceQuantity quantity;
00287 quantity.nCachePages =
00288 (assignMin) ? reqts[i].minReqt : reqts[i].optReqt;
00289 streams[i]->setResourceAllocation(quantity);
00290 if (isTracingLevel(TRACE_FINER)) {
00291 traceCachePageRequest(
00292 quantity.nCachePages, reqts[i], streams[i]->getName());
00293 }
00294 }
00295 }
00296
00297 uint SimpleExecStreamGovernor::distributeCachePages(
00298 std::vector<SharedExecStream> &streams,
00299 boost::scoped_array<ExecStreamResourceRequirements> const &reqts,
00300 boost::scoped_array<double> const &sqrtDiffOptMin,
00301 double totalSqrtDiffs,
00302 uint excessAvailable, bool assignOpt)
00303 {
00304
00305
00306
00307 if (assignOpt) {
00308 totalSqrtDiffs = 0;
00309 for (uint i = 0; i < streams.size(); i++) {
00310 if (reqts[i].optType != EXEC_RESOURCE_ACCURATE) {
00311 totalSqrtDiffs += sqrtDiffOptMin[i];
00312 }
00313 }
00314 }
00315
00316 uint excessAssigned = 0;
00317 uint totalAssigned = 0;
00318 for (uint i = 0; i < streams.size(); i++) {
00319 uint amount;
00320 ExecStreamResourceRequirements &reqt = reqts[i];
00321 if (assignOpt && reqt.optType == EXEC_RESOURCE_ACCURATE) {
00322 amount = 0;
00323 } else {
00324 amount = (uint) floor(excessAvailable * sqrtDiffOptMin[i] /
00325 totalSqrtDiffs);
00326 }
00327 assert(amount <= (excessAvailable - excessAssigned));
00328 excessAssigned += amount;
00329
00330 ExecStreamResourceQuantity quantity;
00331 if (assignOpt) {
00332 assert(reqt.optType != EXEC_RESOURCE_UNBOUNDED);
00333 quantity.nCachePages = reqt.optReqt;
00334 } else {
00335 quantity.nCachePages = reqt.minReqt;
00336 }
00337 quantity.nCachePages += amount;
00338 totalAssigned += quantity.nCachePages;
00339 streams[i]->setResourceAllocation(quantity);
00340 if (isTracingLevel(TRACE_FINER)) {
00341 traceCachePageRequest(
00342 quantity.nCachePages, reqt, streams[i]->getName());
00343 }
00344 }
00345
00346 return totalAssigned;
00347 }
00348
00349 void SimpleExecStreamGovernor::returnResources(ExecStreamGraph &graph)
00350 {
00351 StrictMutexGuard mutexGuard(mutex);
00352
00353 ExecStreamGraphResourceMap::const_iterator iter = resourceMap.find(&graph);
00354 if (iter == resourceMap.end()) {
00355
00356 return;
00357 }
00358 SharedExecStreamResourceQuantity pQuantity = iter->second;
00359 resourcesAssigned.nCachePages -= pQuantity->nCachePages;
00360 resourcesAvailable.nCachePages += pQuantity->nCachePages;
00361 FENNEL_TRACE(
00362 TRACE_FINE,
00363 "Returned " << pQuantity->nCachePages << " cache pages. " <<
00364 resourcesAvailable.nCachePages <<
00365 " cache pages now available for assignment");
00366
00367 resourceMap.erase(&graph);
00368 }
00369
00370 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SimpleExecStreamGovernor.cpp#9 $");
00371
00372