AioLinuxScheduler Class Reference

AioLinuxScheduler implements DeviceAccessScheduler via Linux-specific kernel-mode libaio calls. More...

#include <AioLinuxScheduler.h>

Inheritance diagram for AioLinuxScheduler:

DeviceAccessScheduler Thread List of all members.

Public Member Functions

 AioLinuxScheduler (DeviceAccessSchedulerParams const &)
 Constructor.
virtual ~AioLinuxScheduler ()
 Destructor: stop must already have been called.
virtual void registerDevice (SharedRandomAccessDevice pDevice)
 Registers a device for which this scheduler will process requests.
virtual bool schedule (RandomAccessRequest &request)
 Initiates a request, the details of which must already have been defined by the caller.
virtual void stop ()
 Shuts down, waiting for all pending requests to complete.
virtual void run ()
virtual void unregisterDevice (SharedRandomAccessDevice pDevice)
 Unregisters a device.
virtual void start ()
 Spawns the OS thread.
void join ()
 Waits for the OS thread to terminate.
bool isStopped () const
 
Returns:
opposite of isStarted()

boost::thread & getBoostThread ()
 Accesses the underlying boost::thread, e.g.
std::string getName ()
void setName (std::string const &s)

Static Public Member Functions

static DeviceAccessSchedulernewScheduler (DeviceAccessSchedulerParams const &params)
 Creates a scheduler.

Protected Member Functions

void initAndRun ()
virtual void beforeRun ()
virtual void afterRun ()

Protected Attributes

boost::thread * pBoostThread
bool bRunning
std::string name

Private Member Functions

bool isStarted () const
 
Returns:
true if start has been called (and subsequent join has not completed)

bool submitRequests (RandomAccessRequest::BindingList &bindingList)
 Submits a list of requests which have already been fully prepared.
void deferLeftoverRequests (iocb **ppLeftovers, uint nLeftovers)
 Saves failed requests to the deferred queue for retry.
bool retryDeferredRequests ()
 Retries submission of requests from the deferred queue; continues until either a submission attempt fails or the queue is exhausted.

Private Attributes

io_context_t context
 Context for calling libaio.
AtomicCounter nRequestsOutstanding
 Number of requests for which io_submit has been called and a corresponding io_getevents notification has not yet been processed.
bool quit
 Flag for passively asking the scheduler to shut down.
std::deque< RandomAccessRequest::BindingListdeferredQueue
 FIFO queue of requests which have been deferred due to not-fully-successful io_submit calls.
StrictMutex deferredQueueMutex
 Mutex used to protect deferredQueue.

Detailed Description

AioLinuxScheduler implements DeviceAccessScheduler via Linux-specific kernel-mode libaio calls.

Author:
John V. Sichi
Version:
Id
//open/dev/fennel/device/AioLinuxScheduler.h#7

Definition at line 46 of file AioLinuxScheduler.h.


Constructor & Destructor Documentation

AioLinuxScheduler::AioLinuxScheduler ( DeviceAccessSchedulerParams const &   )  [explicit]

Constructor.

Definition at line 42 of file AioLinuxScheduler.cpp.

References AtomicCounter::clear(), context, DeviceAccessSchedulerParams::maxRequests, nRequestsOutstanding, quit, and Thread::start().

00044 {
00045     quit = false;
00046     nRequestsOutstanding.clear();
00047     context = NULL;
00048     int rc = io_queue_init(params.maxRequests, &context);
00049     if (rc) {
00050         throw SysCallExcn("io_queue_init failed");
00051     }
00052 
00053     // NOTE jvs 29-Oct-2005:  we ignore params.nThreads because
00054     // no more than one thread is needed for io_getevents
00055     Thread::start();
00056 }

AioLinuxScheduler::~AioLinuxScheduler (  )  [virtual]

Destructor: stop must already have been called.

Definition at line 63 of file AioLinuxScheduler.cpp.

References context, isStarted(), and nRequestsOutstanding.

00064 {
00065     assert(!isStarted());
00066     assert(!nRequestsOutstanding);
00067     int rc = io_queue_release(context);
00068     if (rc) {
00069         throw SysCallExcn("io_queue_release failed");
00070     }
00071 }


Member Function Documentation

bool AioLinuxScheduler::isStarted (  )  const [inline, private]

Returns:
true if start has been called (and subsequent join has not completed)

Reimplemented from Thread.

Definition at line 58 of file AioLinuxScheduler.cpp.

References Thread::isStarted().

Referenced by schedule(), stop(), and ~AioLinuxScheduler().

00059 {
00060     return Thread::isStarted();
00061 }

bool AioLinuxScheduler::submitRequests ( RandomAccessRequest::BindingList bindingList  )  [private]

Submits a list of requests which have already been fully prepared.

Parameters:
bindingList list of requests
Returns:
whether all requests were successfully submitted without requiring retry

Definition at line 91 of file AioLinuxScheduler.cpp.

References context, deferLeftoverRequests(), IntrusiveListMutator< T, DerivedListNode >::detach(), nRequestsOutstanding, and RawIntrusiveList::size().

Referenced by retryDeferredRequests(), and schedule().

00093 {
00094     iocb *requestsArray[bindingList.size()];
00095     iocb **requests = requestsArray;
00096 
00097     // convert list to array
00098     int n = 0;
00099     RandomAccessRequest::BindingListMutator bindingMutator(bindingList);
00100     for (; bindingMutator; ++n) {
00101         RandomAccessRequestBinding *pBinding = bindingMutator.detach();
00102         requests[n] = pBinding;
00103     }
00104 
00105     if (n == 0) {
00106         // just in case someone asks for a nop
00107         return true;
00108     }
00109 
00110     // submit array
00111     int rc = io_submit(context, n, requests);
00112     if (rc == -EAGAIN) {
00113         rc = 0;
00114     }
00115 
00116     if (rc < 0) {
00117         // hard error
00118         throw SysCallExcn("io_submit failed");
00119     }
00120 
00121     // keep track of the number successfully submitted
00122     // (can't use += because nRequestsOutstanding is
00123     // an AtomicCounter)
00124     for (int i = 0; i < rc; ++i) {
00125         ++nRequestsOutstanding;
00126     }
00127 
00128     if (rc == n) {
00129         // we're done
00130         return true;
00131     } else {
00132         // io_submit is allowed to do less than we asked for, so
00133         // we need to resubmit some leftovers
00134         requests += rc;
00135         n -= rc;
00136         deferLeftoverRequests(requests, n);
00137         return false;
00138     }
00139 }

void AioLinuxScheduler::deferLeftoverRequests ( iocb **  ppLeftovers,
uint  nLeftovers 
) [private]

Saves failed requests to the deferred queue for retry.

Parameters:
ppLeftovers failed requests
nLeftovers number of failed requests

Definition at line 141 of file AioLinuxScheduler.cpp.

References deferredQueue, deferredQueueMutex, and IntrusiveList< T, DerivedListNode >::push_back().

Referenced by submitRequests().

00144 {
00145     assert(nLeftovers > 0);
00146 
00147     // convert array back to list
00148     RandomAccessRequest::BindingList bindingList;
00149 
00150     for (uint i = 0; i < nLeftovers; ++i) {
00151         RandomAccessRequestBinding *pBinding =
00152             static_cast<RandomAccessRequestBinding *>(ppLeftovers[i]);
00153         bindingList.push_back(*pBinding);
00154     }
00155 
00156     StrictMutexGuard deferredQueueGuard(deferredQueueMutex);
00157     deferredQueue.push_back(bindingList);
00158 }

bool AioLinuxScheduler::retryDeferredRequests (  )  [private]

Retries submission of requests from the deferred queue; continues until either a submission attempt fails or the queue is exhausted.

Returns:
whether all deferred requests were successfully submitted (returns true if there were no deferred requests to begin with)

Definition at line 160 of file AioLinuxScheduler.cpp.

References deferredQueue, deferredQueueMutex, and submitRequests().

Referenced by run().

00161 {
00162     for (;;) {
00163         StrictMutexGuard deferredQueueGuard(deferredQueueMutex);
00164         if (deferredQueue.empty()) {
00165             // all resubmitted successfully (or none to begin with)
00166             return true;
00167         }
00168         RandomAccessRequest::BindingList bindingList = deferredQueue.front();
00169         deferredQueue.pop_front();
00170         // release mutex now to avoid potential deadlocks
00171         deferredQueueGuard.unlock();
00172 
00173         bool success = submitRequests(bindingList);
00174         if (!success) {
00175             // at least one failed
00176             return false;
00177         }
00178     }
00179 }

void AioLinuxScheduler::registerDevice ( SharedRandomAccessDevice  pDevice  )  [virtual]

Registers a device for which this scheduler will process requests.

The default implementation does nothing.

Parameters:
pDevice device to be registered

Reimplemented from DeviceAccessScheduler.

Definition at line 73 of file AioLinuxScheduler.cpp.

00075 {
00076     int hFile = pDevice->getHandle();
00077 
00078     // Linux requires files accessed via libaio to be opened with O_DIRECT,
00079     // so force that now.
00080     int flags = fcntl(hFile, F_GETFL);
00081     fcntl(hFile, F_SETFL, flags | O_DIRECT);
00082 }

bool AioLinuxScheduler::schedule ( RandomAccessRequest request  )  [virtual]

Initiates a request, the details of which must already have been defined by the caller.

When the request completes, this scheduler will call notifyTransferCompletion on each binding associated with the request, and also break up the binding list. The bindings must not be altered by the caller until this notification is received. However, the request parameter itself need not live beyond this call.

Care must be taken to ensure that the schedule/notify sequences cannot deadlock. For example, the caller of schedule may hold a lock on a binding, and the implementation of schedule may acquire a scheduler lock internally. The notification callback may also need to take a lock on the binding. Thus, it is important that no scheduler lock be held while notifyTransferCompletion is called.

Parameters:
request parameters for the request to be scheduled
Returns:
true if the request was successfully scheduled without any retries

Implements DeviceAccessScheduler.

Definition at line 84 of file AioLinuxScheduler.cpp.

References RandomAccessRequest::bindingList, isStarted(), RandomAccessRequest::pDevice, RandomAccessDevice::prepareTransfer(), and submitRequests().

00085 {
00086     assert(isStarted());
00087     request.pDevice->prepareTransfer(request);
00088     return submitRequests(request.bindingList);
00089 }

void AioLinuxScheduler::stop (  )  [virtual]

Shuts down, waiting for all pending requests to complete.

Implements DeviceAccessScheduler.

Definition at line 181 of file AioLinuxScheduler.cpp.

References isStarted(), Thread::join(), and quit.

00182 {
00183     assert(isStarted());
00184     quit = true;
00185 
00186     Thread::join();
00187 }

void AioLinuxScheduler::run (  )  [virtual]

Implements Thread.

Definition at line 189 of file AioLinuxScheduler.cpp.

References context, RandomAccessRequestBinding::getBufferSize(), RandomAccessRequestBinding::notifyTransferCompletion(), nRequestsOutstanding, quit, and retryDeferredRequests().

00190 {
00191     while (nRequestsOutstanding || !quit) {
00192         io_event event;
00193         timespec ts;
00194 
00195         // Check the deferred request queue before entering wait state.
00196         if (retryDeferredRequests()) {
00197             // If we retried any requests, they all succeeded, so we're in our
00198             // normal wait state: timeout every second to check the quit flag.
00199             ts.tv_sec = 1;
00200             ts.tv_nsec = 0;
00201         } else {
00202             // At least one retry just failed, so during wait, timeout in a
00203             // millisecond so we can retry the failed requests.
00204             ts.tv_sec = 0;
00205             ts.tv_nsec = 1000000;
00206         }
00207 
00208         long rc = io_getevents(context, 1, 1, &event, &ts);
00209 
00210         // NOTE jvs 20-Jan-2008:  Docs don't mention the possibility of
00211         // spurious interrupts, but they can occur, at least while
00212         // debugging with gdb, so treat them as timeout.
00213         if ((rc == 0) || (rc == -EINTR)) {
00214             // timed out
00215             continue;
00216         }
00217 
00218         if (rc != 1) {
00219             throw SysCallExcn("io_getevents failed");
00220         }
00221         RandomAccessRequestBinding *pBinding =
00222             static_cast<RandomAccessRequestBinding *>(event.obj);
00223         bool success = (pBinding->getBufferSize() == event.res)
00224             && !event.res2;
00225         pBinding->notifyTransferCompletion(success);
00226         --nRequestsOutstanding;
00227     }
00228 }

DeviceAccessScheduler * DeviceAccessScheduler::newScheduler ( DeviceAccessSchedulerParams const &  params  )  [static, inherited]

Creates a scheduler.

Parameters:
params DeviceAccessSchedulerParams to use
Returns:
new scheduler; caller is responsible for deleting it

Definition at line 70 of file DeviceAccessScheduler.cpp.

References DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER, DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER, DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER, dlopenAioLinuxScheduler(), DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER, DeviceAccessSchedulerParams::schedulerType, DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER, and DeviceAccessSchedulerParams::usingDefaultSchedulerType.

Referenced by CacheImpl< PageT, VictimPolicyT >::CacheImpl(), and RandomAccessFileDeviceTest::testAsyncIOImpl().

00072 {
00073     switch (params.schedulerType) {
00074     case DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER:
00075         return new ThreadPoolScheduler(params);
00076 
00077 #ifdef __MSVC__
00078     case DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER:
00079         return new IoCompletionPortScheduler(params);
00080 #endif
00081 
00082 #ifdef USE_LIBAIO_H
00083     case DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER:
00084         {
00085             DeviceAccessScheduler *pScheduler = dlopenAioLinuxScheduler(params);
00086             if (pScheduler) {
00087                 return pScheduler;
00088             } else {
00089                 // if the aioLinux scheduler was explicitly selected (vs simply
00090                 // using the default type for the OS), then the AIO runtime
00091                 // library must be installed; otherwise, fall through to use
00092                 // ThreadPoolScheduler as fallback
00093                 if (params.usingDefaultSchedulerType) {
00094                     break;
00095                 }
00096                 throw FennelExcn(FennelResource::instance().libaioRequired());
00097             }
00098         }
00099 #endif
00100 
00101 #ifdef USE_AIO_H
00102     case DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER:
00103         return new AioPollingScheduler(params);
00104     case DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER:
00105         return new AioSignalScheduler(params);
00106 #endif
00107 
00108     default:
00109         // fall through to use ThreadPoolScheduler as a fallback
00110         break;
00111     }
00112     return new ThreadPoolScheduler(params);
00113 }

void DeviceAccessScheduler::unregisterDevice ( SharedRandomAccessDevice  pDevice  )  [virtual, inherited]

Unregisters a device.

The default implementation does nothing.

Parameters:
pDevice device to be unregistered

Definition at line 147 of file DeviceAccessScheduler.cpp.

Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().

00149 {
00150 }

void Thread::initAndRun (  )  [protected, inherited]

Definition at line 66 of file Thread.cpp.

References Thread::afterRun(), Thread::beforeRun(), and Thread::run().

Referenced by Thread::start().

00067 {
00068     beforeRun();
00069     run();
00070     afterRun();
00071 }

void Thread::beforeRun (  )  [protected, virtual, inherited]

Definition at line 73 of file Thread.cpp.

References Thread::bRunning.

Referenced by Thread::initAndRun().

00074 {
00075     bRunning = true;
00076 }

void Thread::afterRun (  )  [protected, virtual, inherited]

Definition at line 78 of file Thread.cpp.

References Thread::bRunning.

Referenced by Thread::initAndRun().

00079 {
00080     bRunning = false;
00081 }

void Thread::start (  )  [virtual, inherited]

Spawns the OS thread.

Definition at line 50 of file Thread.cpp.

References Thread::initAndRun(), and Thread::pBoostThread.

Referenced by AioLinuxScheduler(), AioPollingScheduler::AioPollingScheduler(), StatsTimer::start(), ResourceTest::testConcurrency(), and LocalConditionTest::testNotifyAll().

00051 {
00052     pBoostThread = new boost::thread(
00053         boost::bind(&Thread::initAndRun,this));
00054 }

void Thread::join (  )  [inherited]

Waits for the OS thread to terminate.

Definition at line 56 of file Thread.cpp.

References Thread::pBoostThread.

Referenced by CheckpointThread::closeImpl(), TimerThread::stop(), AioPollingScheduler::stop(), stop(), ResourceTest::testConcurrency(), and LocalConditionTest::testNotifyAll().

00057 {
00058     assert(pBoostThread);
00059     boost::thread t;
00060     assert(*pBoostThread != t);
00061     pBoostThread->join();
00062     delete pBoostThread;
00063     pBoostThread = NULL;
00064 }

bool Thread::isStopped (  )  const [inline, inherited]

Returns:
opposite of isStarted()

Definition at line 79 of file Thread.h.

00080     {
00081         return !isStarted();
00082     }

boost::thread& Thread::getBoostThread (  )  [inline, inherited]

Accesses the underlying boost::thread, e.g.

for use in a boost::thread_group. This thread must already be started.

Returns:
the underlying boost::thread

Definition at line 90 of file Thread.h.

00091     {
00092         assert(isStarted());
00093         return *pBoostThread;
00094     }

std::string Thread::getName (  )  [inline, inherited]

Definition at line 96 of file Thread.h.

00097     {
00098         return name;
00099     }

void Thread::setName ( std::string const &  s  )  [inline, inherited]

Definition at line 101 of file Thread.h.

00102     {
00103         name = s;
00104     }


Member Data Documentation

io_context_t AioLinuxScheduler::context [private]

Context for calling libaio.

Definition at line 52 of file AioLinuxScheduler.h.

Referenced by AioLinuxScheduler(), run(), submitRequests(), and ~AioLinuxScheduler().

AtomicCounter AioLinuxScheduler::nRequestsOutstanding [private]

Number of requests for which io_submit has been called and a corresponding io_getevents notification has not yet been processed.

We track these since it is illegal to attempt to close the context while requests are still outstanding.

Definition at line 60 of file AioLinuxScheduler.h.

Referenced by AioLinuxScheduler(), run(), submitRequests(), and ~AioLinuxScheduler().

bool AioLinuxScheduler::quit [private]

Flag for passively asking the scheduler to shut down.

Definition at line 65 of file AioLinuxScheduler.h.

Referenced by AioLinuxScheduler(), run(), and stop().

std::deque<RandomAccessRequest::BindingList> AioLinuxScheduler::deferredQueue [private]

FIFO queue of requests which have been deferred due to not-fully-successful io_submit calls.

Note that these do not yet count as outstanding. Front of deque is first-in; back of deque is last-in.

Definition at line 73 of file AioLinuxScheduler.h.

Referenced by deferLeftoverRequests(), and retryDeferredRequests().

StrictMutex AioLinuxScheduler::deferredQueueMutex [private]

Mutex used to protect deferredQueue.

Definition at line 78 of file AioLinuxScheduler.h.

Referenced by deferLeftoverRequests(), and retryDeferredRequests().

boost::thread* Thread::pBoostThread [protected, inherited]

Definition at line 44 of file Thread.h.

Referenced by Thread::join(), Thread::start(), Thread::Thread(), and Thread::~Thread().

bool Thread::bRunning [protected, inherited]

Definition at line 45 of file Thread.h.

Referenced by Thread::afterRun(), Thread::beforeRun(), Thread::Thread(), and Thread::~Thread().

std::string Thread::name [protected, inherited]

Definition at line 46 of file Thread.h.

Referenced by Thread::Thread().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:24 2009 for Fennel by  doxygen 1.5.1