SimpleExecStreamGovernor.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/SimpleExecStreamGovernor.cpp#9 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/segment/ScratchMemExcn.h"
00026 #include "fennel/exec/SimpleExecStreamGovernor.h"
00027 #include "fennel/exec/ExecStream.h"
00028 #include "fennel/exec/ExecStreamGraphImpl.h"
00029 
00030 #include <math.h>
00031 
00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SimpleExecStreamGovernor.cpp#9 $");
00033 
00034 SimpleExecStreamGovernor::SimpleExecStreamGovernor(
00035     ExecStreamResourceKnobs const &knobSettings,
00036     ExecStreamResourceQuantity const &resourcesAvailable,
00037     SharedTraceTarget pTraceTargetInit,
00038     std::string nameInit)
00039     : TraceSource(pTraceTargetInit, nameInit),
00040       ExecStreamGovernor(
00041             knobSettings, resourcesAvailable, pTraceTargetInit, nameInit)
00042 {
00043     perGraphAllocation = computePerGraphAllocation();
00044 }
00045 
00046 SimpleExecStreamGovernor::~SimpleExecStreamGovernor()
00047 {
00048 }
00049 
00050 bool SimpleExecStreamGovernor::setResourceKnob(
00051     ExecStreamResourceKnobs const &knob, ExecStreamResourceKnobType knobType)
00052 {
00053     StrictMutexGuard mutexGuard(mutex);
00054 
00055     switch (knobType) {
00056     case EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS:
00057         knobSettings.expectedConcurrentStatements =
00058             knob.expectedConcurrentStatements;
00059         perGraphAllocation = computePerGraphAllocation();
00060         FENNEL_TRACE(
00061             TRACE_FINE,
00062             "Expected concurrent statements set to " <<
00063             knobSettings.expectedConcurrentStatements <<
00064             ". Per graph allocation is now " << perGraphAllocation <<
00065             " cache pages.");
00066         break;
00067 
00068     case EXEC_KNOB_CACHE_RESERVE_PERCENTAGE:
00069         // make sure we have enough unassigned pages to set aside the new
00070         // reserve amount
00071         double percent = (100 - knobSettings.cacheReservePercentage) / 100.0;
00072         uint totalPagesAvailable = (uint)
00073             ((resourcesAvailable.nCachePages + resourcesAssigned.nCachePages) /
00074             percent);
00075         uint numReserve =
00076             totalPagesAvailable * knob.cacheReservePercentage / 100;
00077         if (totalPagesAvailable - numReserve < resourcesAssigned.nCachePages) {
00078             return false;
00079         }
00080         knobSettings.cacheReservePercentage = knob.cacheReservePercentage;
00081         resourcesAvailable.nCachePages =
00082             totalPagesAvailable - numReserve - resourcesAssigned.nCachePages;
00083         perGraphAllocation = computePerGraphAllocation();
00084         FENNEL_TRACE(
00085             TRACE_FINE,
00086             "Cache reserve percentage set to " <<
00087             knobSettings.cacheReservePercentage <<
00088             ". Per graph allocation is now " << perGraphAllocation <<
00089             " cache pages.");
00090         break;
00091     }
00092 
00093     return true;
00094 }
00095 
00096 bool SimpleExecStreamGovernor::setResourceAvailability(
00097     ExecStreamResourceQuantity const &available,
00098     ExecStreamResourceType resourceType)
00099 {
00100     StrictMutexGuard mutexGuard(mutex);
00101 
00102     switch (resourceType) {
00103     case EXEC_RESOURCE_CACHE_PAGES:
00104         {
00105         uint pagesAvailable =
00106             available.nCachePages *
00107             (100 - knobSettings.cacheReservePercentage) / 100;
00108         if (pagesAvailable < resourcesAssigned.nCachePages) {
00109             return false;
00110         }
00111         resourcesAvailable.nCachePages =
00112             (pagesAvailable - resourcesAssigned.nCachePages);
00113         perGraphAllocation = computePerGraphAllocation();
00114         FENNEL_TRACE(
00115             TRACE_FINE,
00116             resourcesAvailable.nCachePages <<
00117             " cache pages now available for assignment.  " <<
00118             "Per graph allocation is now " << perGraphAllocation <<
00119             " cache pages.");
00120         break;
00121         }
00122 
00123     case EXEC_RESOURCE_THREADS:
00124         resourcesAvailable.nThreads = available.nThreads;
00125         break;
00126     }
00127 
00128     return true;
00129 }
00130 void SimpleExecStreamGovernor::requestResources(ExecStreamGraph &graph)
00131 {
00132     FENNEL_TRACE(TRACE_FINE, "requestResources");
00133 
00134     StrictMutexGuard mutexGuard(mutex);
00135 
00136     std::vector<SharedExecStream> sortedStreams = graph.getSortedStreams();
00137     boost::scoped_array<ExecStreamResourceRequirements> resourceReqts;
00138     boost::scoped_array<double> sqrtDiffOptMin;
00139     uint nStreams = sortedStreams.size();
00140 
00141     resourceReqts.reset(new ExecStreamResourceRequirements[nStreams]);
00142     sqrtDiffOptMin.reset(new double[nStreams]);
00143 
00144     // scale down the number of pages that can be allocated based on how
00145     // much still remains
00146     uint allocationAmount =
00147         std::min(resourcesAvailable.nCachePages, perGraphAllocation);
00148     FENNEL_TRACE(
00149         TRACE_FINE,
00150         allocationAmount << " cache pages available for stream graph");
00151 
00152     // Total the minimum and optimum resource requirements and determine
00153     // if we have any estimate/unbounded optimum settings
00154     uint totalMin = 0;
00155     uint totalOpt = 0;
00156     double totalSqrtDiffs = 0;
00157     bool allAccurate = true;
00158     for (uint i = 0; i < nStreams; i++) {
00159         ExecStreamResourceQuantity minQuantity, optQuantity;
00160         ExecStreamResourceSettingType optType;
00161         sortedStreams[i]->getResourceRequirements(
00162             minQuantity, optQuantity, optType);
00163         assert(
00164             optType == EXEC_RESOURCE_UNBOUNDED ||
00165             minQuantity.nCachePages <= optQuantity.nCachePages);
00166         assert(minQuantity.nThreads <= optQuantity.nThreads);
00167 
00168         ExecStreamResourceRequirements &reqt = resourceReqts[i];
00169         reqt.minReqt = minQuantity.nCachePages;
00170         totalMin += reqt.minReqt;
00171         reqt.optType = optType;
00172 
00173         switch (optType) {
00174         case EXEC_RESOURCE_ACCURATE:
00175             reqt.optReqt = optQuantity.nCachePages;
00176             sqrtDiffOptMin[i] = sqrt(double(reqt.optReqt - reqt.minReqt));
00177             break;
00178         case EXEC_RESOURCE_ESTIMATE:
00179             reqt.optReqt = optQuantity.nCachePages;
00180             sqrtDiffOptMin[i] = sqrt(double(reqt.optReqt - reqt.minReqt));
00181             allAccurate = false;
00182             break;
00183         case EXEC_RESOURCE_UNBOUNDED:
00184             // in the unbounded case, since we're trying to use as much
00185             // memory as available, set the difference to how much is
00186             // available; this way, we set it to something large relative
00187             // to availability, but still set it to a finite value to
00188             // allow some allocation to go towards those streams that
00189             // have estimated optimums
00190             sqrtDiffOptMin[i] = sqrt(double(allocationAmount));
00191             allAccurate = false;
00192             // in the unbounded case, we don't have an optimum setting, so
00193             // set it to assume the full allocation amount plus the min
00194             reqt.optReqt = reqt.minReqt + allocationAmount;
00195             break;
00196         }
00197         totalOpt += reqt.optReqt;
00198         totalSqrtDiffs += sqrtDiffOptMin[i];
00199     }
00200 
00201     // not enough pages even to assign the minimum requirements
00202     if (totalMin > allocationAmount &&
00203         totalMin > resourcesAvailable.nCachePages)
00204     {
00205         FENNEL_TRACE(
00206             TRACE_FINE,
00207             "Minimum request of " << totalMin << " cache pages not met");
00208         throw ScratchMemExcn();
00209     }
00210 
00211     uint totalAssigned;
00212 
00213     // only enough to assign the minimum
00214     if (totalMin >= allocationAmount) {
00215         assignCachePages(sortedStreams, resourceReqts, true);
00216         totalAssigned = totalMin;
00217         FENNEL_TRACE(
00218             TRACE_FINE,
00219             "Mininum request of " << totalMin << " cache pages assigned");
00220 
00221     } else if (totalOpt <= allocationAmount) {
00222         // if all streams have accurate optimum settings, and we have enough
00223         // to assign the optimum amount, then do so
00224         if (allAccurate) {
00225             assignCachePages(sortedStreams, resourceReqts, false);
00226             totalAssigned = totalOpt;
00227             FENNEL_TRACE(
00228                 TRACE_FINE,
00229                 "Optimum request of " << totalOpt << " cache pages assigned");
00230 
00231         } else {
00232             // even though total optimum is less than the allocation amount,
00233             // since some streams have estimate settings, we want to try and
00234             // give a little extra to those streams; the streams that have
00235             // accurate settings will receive their full optimum amount;
00236             // note that in this case, there should not be any streams with
00237             // optimum settings
00238             uint assigned =
00239                 distributeCachePages(
00240                     sortedStreams, resourceReqts, sqrtDiffOptMin,
00241                     totalSqrtDiffs, allocationAmount - totalOpt, true);
00242             totalAssigned = assigned;
00243             FENNEL_TRACE(
00244                 TRACE_FINE,
00245                 assigned <<
00246                 " cache pages assigned, based on an optimum request for " <<
00247                 totalOpt << " cache pages");
00248         }
00249 
00250     } else {
00251         // allocate the minimum to each stream and then distribute what
00252         // remains
00253         uint assigned =
00254             distributeCachePages(
00255                 sortedStreams, resourceReqts, sqrtDiffOptMin, totalSqrtDiffs,
00256                 allocationAmount - totalMin, false);
00257         totalAssigned = assigned;
00258         FENNEL_TRACE(
00259             TRACE_FINE,
00260             assigned <<
00261             " cache pages assigned based on an optimum request for " <<
00262             totalOpt << " cache pages");
00263     }
00264 
00265     // update structures to reflect what's been assigned
00266     resourcesAssigned.nCachePages += totalAssigned;
00267     resourcesAvailable.nCachePages -= totalAssigned;
00268     SharedExecStreamResourceQuantity
00269         pQuantity(new ExecStreamResourceQuantity());
00270     pQuantity->nCachePages = totalAssigned;
00271     resourceMap.insert(
00272         ExecStreamGraphResourceMap::value_type(&graph, pQuantity));
00273 
00274     FENNEL_TRACE(
00275         TRACE_FINE,
00276         resourcesAvailable.nCachePages <<
00277         " cache pages remaining for assignment");
00278 }
00279 
00280 void SimpleExecStreamGovernor::assignCachePages(
00281     std::vector<SharedExecStream> &streams,
00282     boost::scoped_array<ExecStreamResourceRequirements> const &reqts,
00283     bool assignMin)
00284 {
00285     for (uint i = 0; i < streams.size(); i++) {
00286         ExecStreamResourceQuantity quantity;
00287         quantity.nCachePages =
00288             (assignMin) ? reqts[i].minReqt : reqts[i].optReqt;
00289         streams[i]->setResourceAllocation(quantity);
00290         if (isTracingLevel(TRACE_FINER)) {
00291             traceCachePageRequest(
00292                 quantity.nCachePages, reqts[i], streams[i]->getName());
00293         }
00294     }
00295 }
00296 
00297 uint SimpleExecStreamGovernor::distributeCachePages(
00298     std::vector<SharedExecStream> &streams,
00299     boost::scoped_array<ExecStreamResourceRequirements> const &reqts,
00300     boost::scoped_array<double> const &sqrtDiffOptMin,
00301     double totalSqrtDiffs,
00302     uint excessAvailable, bool assignOpt)
00303 {
00304     // if there's enough to assign the optimum amount to each stream, then
00305     // adjust totalSqrtDiffs so we don't allocate any extra to the
00306     // streams with accurate settings
00307     if (assignOpt) {
00308         totalSqrtDiffs = 0;
00309         for (uint i = 0; i < streams.size(); i++) {
00310             if (reqts[i].optType != EXEC_RESOURCE_ACCURATE) {
00311                 totalSqrtDiffs += sqrtDiffOptMin[i];
00312             }
00313         }
00314     }
00315 
00316     uint excessAssigned = 0;
00317     uint totalAssigned = 0;
00318     for (uint i = 0; i < streams.size(); i++) {
00319         uint amount;
00320         ExecStreamResourceRequirements &reqt = reqts[i];
00321         if (assignOpt && reqt.optType == EXEC_RESOURCE_ACCURATE) {
00322             amount = 0;
00323         } else {
00324             amount = (uint) floor(excessAvailable * sqrtDiffOptMin[i] /
00325                 totalSqrtDiffs);
00326         }
00327         assert(amount <= (excessAvailable - excessAssigned));
00328         excessAssigned += amount;
00329 
00330         ExecStreamResourceQuantity quantity;
00331         if (assignOpt) {
00332             assert(reqt.optType != EXEC_RESOURCE_UNBOUNDED);
00333             quantity.nCachePages = reqt.optReqt;
00334         } else {
00335             quantity.nCachePages = reqt.minReqt;
00336         }
00337         quantity.nCachePages += amount;
00338         totalAssigned += quantity.nCachePages;
00339         streams[i]->setResourceAllocation(quantity);
00340         if (isTracingLevel(TRACE_FINER)) {
00341             traceCachePageRequest(
00342                 quantity.nCachePages, reqt, streams[i]->getName());
00343         }
00344     }
00345 
00346     return totalAssigned;
00347 }
00348 
00349 void SimpleExecStreamGovernor::returnResources(ExecStreamGraph &graph)
00350 {
00351     StrictMutexGuard mutexGuard(mutex);
00352 
00353     ExecStreamGraphResourceMap::const_iterator iter = resourceMap.find(&graph);
00354     if (iter == resourceMap.end()) {
00355         // no allocation may have been done
00356         return;
00357     }
00358     SharedExecStreamResourceQuantity pQuantity = iter->second;
00359     resourcesAssigned.nCachePages -= pQuantity->nCachePages;
00360     resourcesAvailable.nCachePages += pQuantity->nCachePages;
00361     FENNEL_TRACE(
00362         TRACE_FINE,
00363         "Returned " << pQuantity->nCachePages << " cache pages. " <<
00364         resourcesAvailable.nCachePages <<
00365         " cache pages now available for assignment");
00366 
00367     resourceMap.erase(&graph);
00368 }
00369 
00370 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SimpleExecStreamGovernor.cpp#9 $");
00371 
00372 // End SimpleExecStreamGovernor.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1