#include <AioLinuxScheduler.h>
Inheritance diagram for AioLinuxScheduler:
Public Member Functions | |
AioLinuxScheduler (DeviceAccessSchedulerParams const &) | |
Constructor. | |
virtual | ~AioLinuxScheduler () |
Destructor: stop must already have been called. | |
virtual void | registerDevice (SharedRandomAccessDevice pDevice) |
Registers a device for which this scheduler will process requests. | |
virtual bool | schedule (RandomAccessRequest &request) |
Initiates a request, the details of which must already have been defined by the caller. | |
virtual void | stop () |
Shuts down, waiting for all pending requests to complete. | |
virtual void | run () |
virtual void | unregisterDevice (SharedRandomAccessDevice pDevice) |
Unregisters a device. | |
virtual void | start () |
Spawns the OS thread. | |
void | join () |
Waits for the OS thread to terminate. | |
bool | isStopped () const |
| |
boost::thread & | getBoostThread () |
Accesses the underlying boost::thread, e.g. | |
std::string | getName () |
void | setName (std::string const &s) |
Static Public Member Functions | |
static DeviceAccessScheduler * | newScheduler (DeviceAccessSchedulerParams const ¶ms) |
Creates a scheduler. | |
Protected Member Functions | |
void | initAndRun () |
virtual void | beforeRun () |
virtual void | afterRun () |
Protected Attributes | |
boost::thread * | pBoostThread |
bool | bRunning |
std::string | name |
Private Member Functions | |
bool | isStarted () const |
| |
bool | submitRequests (RandomAccessRequest::BindingList &bindingList) |
Submits a list of requests which have already been fully prepared. | |
void | deferLeftoverRequests (iocb **ppLeftovers, uint nLeftovers) |
Saves failed requests to the deferred queue for retry. | |
bool | retryDeferredRequests () |
Retries submission of requests from the deferred queue; continues until either a submission attempt fails or the queue is exhausted. | |
Private Attributes | |
io_context_t | context |
Context for calling libaio. | |
AtomicCounter | nRequestsOutstanding |
Number of requests for which io_submit has been called and a corresponding io_getevents notification has not yet been processed. | |
bool | quit |
Flag for passively asking the scheduler to shut down. | |
std::deque< RandomAccessRequest::BindingList > | deferredQueue |
FIFO queue of requests which have been deferred due to not-fully-successful io_submit calls. | |
StrictMutex | deferredQueueMutex |
Mutex used to protect deferredQueue. |
Definition at line 46 of file AioLinuxScheduler.h.
AioLinuxScheduler::AioLinuxScheduler | ( | DeviceAccessSchedulerParams const & | ) | [explicit] |
Constructor.
Definition at line 42 of file AioLinuxScheduler.cpp.
References AtomicCounter::clear(), context, DeviceAccessSchedulerParams::maxRequests, nRequestsOutstanding, quit, and Thread::start().
00044 { 00045 quit = false; 00046 nRequestsOutstanding.clear(); 00047 context = NULL; 00048 int rc = io_queue_init(params.maxRequests, &context); 00049 if (rc) { 00050 throw SysCallExcn("io_queue_init failed"); 00051 } 00052 00053 // NOTE jvs 29-Oct-2005: we ignore params.nThreads because 00054 // no more than one thread is needed for io_getevents 00055 Thread::start(); 00056 }
AioLinuxScheduler::~AioLinuxScheduler | ( | ) | [virtual] |
Destructor: stop must already have been called.
Definition at line 63 of file AioLinuxScheduler.cpp.
References context, isStarted(), and nRequestsOutstanding.
00064 { 00065 assert(!isStarted()); 00066 assert(!nRequestsOutstanding); 00067 int rc = io_queue_release(context); 00068 if (rc) { 00069 throw SysCallExcn("io_queue_release failed"); 00070 } 00071 }
bool AioLinuxScheduler::isStarted | ( | ) | const [inline, private] |
Reimplemented from Thread.
Definition at line 58 of file AioLinuxScheduler.cpp.
References Thread::isStarted().
Referenced by schedule(), stop(), and ~AioLinuxScheduler().
00059 { 00060 return Thread::isStarted(); 00061 }
bool AioLinuxScheduler::submitRequests | ( | RandomAccessRequest::BindingList & | bindingList | ) | [private] |
Submits a list of requests which have already been fully prepared.
bindingList | list of requests |
Definition at line 91 of file AioLinuxScheduler.cpp.
References context, deferLeftoverRequests(), IntrusiveListMutator< T, DerivedListNode >::detach(), nRequestsOutstanding, and RawIntrusiveList::size().
Referenced by retryDeferredRequests(), and schedule().
00093 { 00094 iocb *requestsArray[bindingList.size()]; 00095 iocb **requests = requestsArray; 00096 00097 // convert list to array 00098 int n = 0; 00099 RandomAccessRequest::BindingListMutator bindingMutator(bindingList); 00100 for (; bindingMutator; ++n) { 00101 RandomAccessRequestBinding *pBinding = bindingMutator.detach(); 00102 requests[n] = pBinding; 00103 } 00104 00105 if (n == 0) { 00106 // just in case someone asks for a nop 00107 return true; 00108 } 00109 00110 // submit array 00111 int rc = io_submit(context, n, requests); 00112 if (rc == -EAGAIN) { 00113 rc = 0; 00114 } 00115 00116 if (rc < 0) { 00117 // hard error 00118 throw SysCallExcn("io_submit failed"); 00119 } 00120 00121 // keep track of the number successfully submitted 00122 // (can't use += because nRequestsOutstanding is 00123 // an AtomicCounter) 00124 for (int i = 0; i < rc; ++i) { 00125 ++nRequestsOutstanding; 00126 } 00127 00128 if (rc == n) { 00129 // we're done 00130 return true; 00131 } else { 00132 // io_submit is allowed to do less than we asked for, so 00133 // we need to resubmit some leftovers 00134 requests += rc; 00135 n -= rc; 00136 deferLeftoverRequests(requests, n); 00137 return false; 00138 } 00139 }
void AioLinuxScheduler::deferLeftoverRequests | ( | iocb ** | ppLeftovers, | |
uint | nLeftovers | |||
) | [private] |
Saves failed requests to the deferred queue for retry.
ppLeftovers | failed requests | |
nLeftovers | number of failed requests |
Definition at line 141 of file AioLinuxScheduler.cpp.
References deferredQueue, deferredQueueMutex, and IntrusiveList< T, DerivedListNode >::push_back().
Referenced by submitRequests().
00144 { 00145 assert(nLeftovers > 0); 00146 00147 // convert array back to list 00148 RandomAccessRequest::BindingList bindingList; 00149 00150 for (uint i = 0; i < nLeftovers; ++i) { 00151 RandomAccessRequestBinding *pBinding = 00152 static_cast<RandomAccessRequestBinding *>(ppLeftovers[i]); 00153 bindingList.push_back(*pBinding); 00154 } 00155 00156 StrictMutexGuard deferredQueueGuard(deferredQueueMutex); 00157 deferredQueue.push_back(bindingList); 00158 }
bool AioLinuxScheduler::retryDeferredRequests | ( | ) | [private] |
Retries submission of requests from the deferred queue; continues until either a submission attempt fails or the queue is exhausted.
Definition at line 160 of file AioLinuxScheduler.cpp.
References deferredQueue, deferredQueueMutex, and submitRequests().
Referenced by run().
00161 { 00162 for (;;) { 00163 StrictMutexGuard deferredQueueGuard(deferredQueueMutex); 00164 if (deferredQueue.empty()) { 00165 // all resubmitted successfully (or none to begin with) 00166 return true; 00167 } 00168 RandomAccessRequest::BindingList bindingList = deferredQueue.front(); 00169 deferredQueue.pop_front(); 00170 // release mutex now to avoid potential deadlocks 00171 deferredQueueGuard.unlock(); 00172 00173 bool success = submitRequests(bindingList); 00174 if (!success) { 00175 // at least one failed 00176 return false; 00177 } 00178 } 00179 }
void AioLinuxScheduler::registerDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual] |
Registers a device for which this scheduler will process requests.
The default implementation does nothing.
pDevice | device to be registered |
Reimplemented from DeviceAccessScheduler.
Definition at line 73 of file AioLinuxScheduler.cpp.
00075 { 00076 int hFile = pDevice->getHandle(); 00077 00078 // Linux requires files accessed via libaio to be opened with O_DIRECT, 00079 // so force that now. 00080 int flags = fcntl(hFile, F_GETFL); 00081 fcntl(hFile, F_SETFL, flags | O_DIRECT); 00082 }
bool AioLinuxScheduler::schedule | ( | RandomAccessRequest & | request | ) | [virtual] |
Initiates a request, the details of which must already have been defined by the caller.
When the request completes, this scheduler will call notifyTransferCompletion on each binding associated with the request, and also break up the binding list. The bindings must not be altered by the caller until this notification is received. However, the request parameter itself need not live beyond this call.
Care must be taken to ensure that the schedule/notify sequences cannot deadlock. For example, the caller of schedule may hold a lock on a binding, and the implementation of schedule may acquire a scheduler lock internally. The notification callback may also need to take a lock on the binding. Thus, it is important that no scheduler lock be held while notifyTransferCompletion is called.
request | parameters for the request to be scheduled |
Implements DeviceAccessScheduler.
Definition at line 84 of file AioLinuxScheduler.cpp.
References RandomAccessRequest::bindingList, isStarted(), RandomAccessRequest::pDevice, RandomAccessDevice::prepareTransfer(), and submitRequests().
00085 { 00086 assert(isStarted()); 00087 request.pDevice->prepareTransfer(request); 00088 return submitRequests(request.bindingList); 00089 }
void AioLinuxScheduler::stop | ( | ) | [virtual] |
Shuts down, waiting for all pending requests to complete.
Implements DeviceAccessScheduler.
Definition at line 181 of file AioLinuxScheduler.cpp.
References isStarted(), Thread::join(), and quit.
00182 { 00183 assert(isStarted()); 00184 quit = true; 00185 00186 Thread::join(); 00187 }
void AioLinuxScheduler::run | ( | ) | [virtual] |
Implements Thread.
Definition at line 189 of file AioLinuxScheduler.cpp.
References context, RandomAccessRequestBinding::getBufferSize(), RandomAccessRequestBinding::notifyTransferCompletion(), nRequestsOutstanding, quit, and retryDeferredRequests().
00190 { 00191 while (nRequestsOutstanding || !quit) { 00192 io_event event; 00193 timespec ts; 00194 00195 // Check the deferred request queue before entering wait state. 00196 if (retryDeferredRequests()) { 00197 // If we retried any requests, they all succeeded, so we're in our 00198 // normal wait state: timeout every second to check the quit flag. 00199 ts.tv_sec = 1; 00200 ts.tv_nsec = 0; 00201 } else { 00202 // At least one retry just failed, so during wait, timeout in a 00203 // millisecond so we can retry the failed requests. 00204 ts.tv_sec = 0; 00205 ts.tv_nsec = 1000000; 00206 } 00207 00208 long rc = io_getevents(context, 1, 1, &event, &ts); 00209 00210 // NOTE jvs 20-Jan-2008: Docs don't mention the possibility of 00211 // spurious interrupts, but they can occur, at least while 00212 // debugging with gdb, so treat them as timeout. 00213 if ((rc == 0) || (rc == -EINTR)) { 00214 // timed out 00215 continue; 00216 } 00217 00218 if (rc != 1) { 00219 throw SysCallExcn("io_getevents failed"); 00220 } 00221 RandomAccessRequestBinding *pBinding = 00222 static_cast<RandomAccessRequestBinding *>(event.obj); 00223 bool success = (pBinding->getBufferSize() == event.res) 00224 && !event.res2; 00225 pBinding->notifyTransferCompletion(success); 00226 --nRequestsOutstanding; 00227 } 00228 }
DeviceAccessScheduler * DeviceAccessScheduler::newScheduler | ( | DeviceAccessSchedulerParams const & | params | ) | [static, inherited] |
Creates a scheduler.
params | DeviceAccessSchedulerParams to use |
Definition at line 70 of file DeviceAccessScheduler.cpp.
References DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER, DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER, DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER, dlopenAioLinuxScheduler(), DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER, DeviceAccessSchedulerParams::schedulerType, DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER, and DeviceAccessSchedulerParams::usingDefaultSchedulerType.
Referenced by CacheImpl< PageT, VictimPolicyT >::CacheImpl(), and RandomAccessFileDeviceTest::testAsyncIOImpl().
00072 { 00073 switch (params.schedulerType) { 00074 case DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER: 00075 return new ThreadPoolScheduler(params); 00076 00077 #ifdef __MSVC__ 00078 case DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER: 00079 return new IoCompletionPortScheduler(params); 00080 #endif 00081 00082 #ifdef USE_LIBAIO_H 00083 case DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER: 00084 { 00085 DeviceAccessScheduler *pScheduler = dlopenAioLinuxScheduler(params); 00086 if (pScheduler) { 00087 return pScheduler; 00088 } else { 00089 // if the aioLinux scheduler was explicitly selected (vs simply 00090 // using the default type for the OS), then the AIO runtime 00091 // library must be installed; otherwise, fall through to use 00092 // ThreadPoolScheduler as fallback 00093 if (params.usingDefaultSchedulerType) { 00094 break; 00095 } 00096 throw FennelExcn(FennelResource::instance().libaioRequired()); 00097 } 00098 } 00099 #endif 00100 00101 #ifdef USE_AIO_H 00102 case DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER: 00103 return new AioPollingScheduler(params); 00104 case DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER: 00105 return new AioSignalScheduler(params); 00106 #endif 00107 00108 default: 00109 // fall through to use ThreadPoolScheduler as a fallback 00110 break; 00111 } 00112 return new ThreadPoolScheduler(params); 00113 }
void DeviceAccessScheduler::unregisterDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual, inherited] |
Unregisters a device.
The default implementation does nothing.
pDevice | device to be unregistered |
Definition at line 147 of file DeviceAccessScheduler.cpp.
Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().
void Thread::initAndRun | ( | ) | [protected, inherited] |
Definition at line 66 of file Thread.cpp.
References Thread::afterRun(), Thread::beforeRun(), and Thread::run().
Referenced by Thread::start().
void Thread::beforeRun | ( | ) | [protected, virtual, inherited] |
Definition at line 73 of file Thread.cpp.
References Thread::bRunning.
Referenced by Thread::initAndRun().
00074 { 00075 bRunning = true; 00076 }
void Thread::afterRun | ( | ) | [protected, virtual, inherited] |
Definition at line 78 of file Thread.cpp.
References Thread::bRunning.
Referenced by Thread::initAndRun().
00079 { 00080 bRunning = false; 00081 }
void Thread::start | ( | ) | [virtual, inherited] |
Spawns the OS thread.
Definition at line 50 of file Thread.cpp.
References Thread::initAndRun(), and Thread::pBoostThread.
Referenced by AioLinuxScheduler(), AioPollingScheduler::AioPollingScheduler(), StatsTimer::start(), ResourceTest::testConcurrency(), and LocalConditionTest::testNotifyAll().
00051 { 00052 pBoostThread = new boost::thread( 00053 boost::bind(&Thread::initAndRun,this)); 00054 }
void Thread::join | ( | ) | [inherited] |
Waits for the OS thread to terminate.
Definition at line 56 of file Thread.cpp.
References Thread::pBoostThread.
Referenced by CheckpointThread::closeImpl(), TimerThread::stop(), AioPollingScheduler::stop(), stop(), ResourceTest::testConcurrency(), and LocalConditionTest::testNotifyAll().
00057 { 00058 assert(pBoostThread); 00059 boost::thread t; 00060 assert(*pBoostThread != t); 00061 pBoostThread->join(); 00062 delete pBoostThread; 00063 pBoostThread = NULL; 00064 }
bool Thread::isStopped | ( | ) | const [inline, inherited] |
Definition at line 79 of file Thread.h.
00080 { 00081 return !isStarted(); 00082 }
boost::thread& Thread::getBoostThread | ( | ) | [inline, inherited] |
Accesses the underlying boost::thread, e.g.
for use in a boost::thread_group. This thread must already be started.
Definition at line 90 of file Thread.h.
00091 { 00092 assert(isStarted()); 00093 return *pBoostThread; 00094 }
std::string Thread::getName | ( | ) | [inline, inherited] |
void Thread::setName | ( | std::string const & | s | ) | [inline, inherited] |
io_context_t AioLinuxScheduler::context [private] |
Context for calling libaio.
Definition at line 52 of file AioLinuxScheduler.h.
Referenced by AioLinuxScheduler(), run(), submitRequests(), and ~AioLinuxScheduler().
Number of requests for which io_submit has been called and a corresponding io_getevents notification has not yet been processed.
We track these since it is illegal to attempt to close the context while requests are still outstanding.
Definition at line 60 of file AioLinuxScheduler.h.
Referenced by AioLinuxScheduler(), run(), submitRequests(), and ~AioLinuxScheduler().
bool AioLinuxScheduler::quit [private] |
Flag for passively asking the scheduler to shut down.
Definition at line 65 of file AioLinuxScheduler.h.
Referenced by AioLinuxScheduler(), run(), and stop().
std::deque<RandomAccessRequest::BindingList> AioLinuxScheduler::deferredQueue [private] |
FIFO queue of requests which have been deferred due to not-fully-successful io_submit calls.
Note that these do not yet count as outstanding. Front of deque is first-in; back of deque is last-in.
Definition at line 73 of file AioLinuxScheduler.h.
Referenced by deferLeftoverRequests(), and retryDeferredRequests().
Mutex used to protect deferredQueue.
Definition at line 78 of file AioLinuxScheduler.h.
Referenced by deferLeftoverRequests(), and retryDeferredRequests().
boost::thread* Thread::pBoostThread [protected, inherited] |
Definition at line 44 of file Thread.h.
Referenced by Thread::join(), Thread::start(), Thread::Thread(), and Thread::~Thread().
bool Thread::bRunning [protected, inherited] |
Definition at line 45 of file Thread.h.
Referenced by Thread::afterRun(), Thread::beforeRun(), Thread::Thread(), and Thread::~Thread().
std::string Thread::name [protected, inherited] |