AioPollingScheduler Class Reference

AioPollingScheduler implements DeviceAccessScheduler via Unix aio calls and threads which poll for completion. More...

#include <AioPollingScheduler.h>

Inheritance diagram for AioPollingScheduler:

DeviceAccessScheduler Thread List of all members.

Public Member Functions

 AioPollingScheduler (DeviceAccessSchedulerParams const &)
 Constructor.
virtual ~AioPollingScheduler ()
 Destructor: stop must already have been called.
virtual bool schedule (RandomAccessRequest &request)
 Initiates a request, the details of which must already have been defined by the caller.
virtual void stop ()
 Shuts down, waiting for all pending requests to complete.
virtual void run ()
virtual void registerDevice (SharedRandomAccessDevice pDevice)
 Registers a device for which this scheduler will process requests.
virtual void unregisterDevice (SharedRandomAccessDevice pDevice)
 Unregisters a device.
virtual void start ()
 Spawns the OS thread.
void join ()
 Waits for the OS thread to terminate.
bool isStarted () const
 
Returns:
true if start has been called (and subsequent join has not completed)

bool isStopped () const
 
Returns:
opposite of isStarted()

boost::thread & getBoostThread ()
 Accesses the underlying boost::thread, e.g.
std::string getName ()
void setName (std::string const &s)

Static Public Member Functions

static DeviceAccessSchedulernewScheduler (DeviceAccessSchedulerParams const &params)
 Creates a scheduler.

Protected Member Functions

void initAndRun ()
virtual void beforeRun ()
virtual void afterRun ()

Protected Attributes

boost::thread * pBoostThread
bool bRunning
std::string name

Private Attributes

StrictMutex mutex
LocalCondition newRequestPending
bool quit
std::vector< aiocb * > currentRequests
std::vector< aiocb * > newRequests

Detailed Description

AioPollingScheduler implements DeviceAccessScheduler via Unix aio calls and threads which poll for completion.

Definition at line 43 of file AioPollingScheduler.h.


Constructor & Destructor Documentation

AioPollingScheduler::AioPollingScheduler ( DeviceAccessSchedulerParams const &   )  [explicit]

Constructor.

Definition at line 35 of file AioPollingScheduler.cpp.

References quit, and Thread::start().

00037 {
00038     // TODO:  pass params.maxRequests on to OS, and use
00039     // params.nThreads
00040     quit = false;
00041     Thread::start();
00042 }

AioPollingScheduler::~AioPollingScheduler (  )  [virtual]

Destructor: stop must already have been called.

Definition at line 44 of file AioPollingScheduler.cpp.

00045 {
00046 }


Member Function Documentation

bool AioPollingScheduler::schedule ( RandomAccessRequest request  )  [virtual]

Initiates a request, the details of which must already have been defined by the caller.

When the request completes, this scheduler will call notifyTransferCompletion on each binding associated with the request, and also break up the binding list. The bindings must not be altered by the caller until this notification is received. However, the request parameter itself need not live beyond this call.

Care must be taken to ensure that the schedule/notify sequences cannot deadlock. For example, the caller of schedule may hold a lock on a binding, and the implementation of schedule may acquire a scheduler lock internally. The notification callback may also need to take a lock on the binding. Thus, it is important that no scheduler lock be held while notifyTransferCompletion is called.

Parameters:
request parameters for the request to be scheduled
Returns:
true if the request was successfully scheduled without any retries

Implements DeviceAccessScheduler.

Definition at line 48 of file AioPollingScheduler.cpp.

References RandomAccessRequest::bindingList, IntrusiveListMutator< T, DerivedListNode >::detach(), mutex, newRequestPending, newRequests, RandomAccessRequest::pDevice, and RandomAccessDevice::prepareTransfer().

00049 {
00050     StrictMutexGuard guard(mutex);
00051     uint iFirst = newRequests.size();
00052     request.pDevice->prepareTransfer(request);
00053     RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList);
00054     while (bindingMutator) {
00055         RandomAccessRequestBinding *pBinding = bindingMutator.detach();
00056         pBinding->aio_sigevent.sigev_notify = SIGEV_NONE;
00057         newRequests.push_back(pBinding);
00058     }
00059     aiocb **pFirst = &(newRequests.front()) + iFirst;
00060     int rc = lio_listio(LIO_NOWAIT,pFirst,newRequests.size() - iFirst,NULL);
00061     // TODO:  handle error cases
00062     assert(rc == 0);
00063     newRequestPending.notify_all();
00064     return true;
00065 }

void AioPollingScheduler::stop (  )  [virtual]

Shuts down, waiting for all pending requests to complete.

Implements DeviceAccessScheduler.

Definition at line 67 of file AioPollingScheduler.cpp.

References Thread::join(), mutex, newRequestPending, and quit.

00068 {
00069     StrictMutexGuard guard(mutex);
00070     quit = true;
00071     newRequestPending.notify_all();
00072     guard.unlock();
00073     Thread::join();
00074 }

void AioPollingScheduler::run (  )  [virtual]

Implements Thread.

Definition at line 76 of file AioPollingScheduler.cpp.

References currentRequests, mutex, newRequestPending, newRequests, RandomAccessRequestBinding::notifyTransferCompletion(), and quit.

00077 {
00078     int rc;
00079     struct timespec ts;
00080     // poll every tenth of a millisecond
00081     // TODO:  determine a reasonable default, or adjust this
00082     // dynamically?
00083     ts.tv_sec = 0;
00084     ts.tv_nsec = 100000;
00085     for (;;) {
00086         StrictMutexGuard guard(mutex);
00087         while (!newRequests.size()) {
00088             if (quit) {
00089                 return;
00090             }
00091             newRequestPending.wait(guard);
00092         }
00093         currentRequests.resize(newRequests.size());
00094         std::copy(
00095             newRequests.begin(),
00096             newRequests.end(),
00097             currentRequests.begin());
00098         newRequests.clear();
00099         guard.unlock();
00100         do {
00101             // REVIEW jvs 4-Aug-2004:  Using &front like this is not portable.
00102             rc = aio_suspend(
00103                 &(currentRequests.front()),
00104                 currentRequests.size(),
00105                 &ts);
00106             if (rc) {
00107                 switch (errno) {
00108                 case EAGAIN:
00109                 case EINTR:
00110                     continue;
00111                 default:
00112                     std::cerr << rc << std::endl;
00113                     permAssert(false);
00114                 }
00115             }
00116         } while (false);
00117         // currentRequests does not need a lock, since this thread is the only
00118         // one which manipulates it.  And the lock cannot be held when
00119         // notifyTransferCompletion is called, otherwise deadlock is possible.
00120         // However, access to newRequests does require a lock, since other
00121         // threads may be calling schedule, so use a fine-grained lock on it.
00122         for (uint i = 0; i < currentRequests.size(); ++i) {
00123             aiocb *pcb = currentRequests[i];
00124             RandomAccessRequestBinding *pBinding =
00125                 static_cast<RandomAccessRequestBinding *>(pcb);
00126             rc = aio_error(pcb);
00127             if (rc == EINPROGRESS) {
00128                 guard.lock();
00129                 // cout << "EINPROGRESS " << pcb->aio_offset << std::endl;
00130                 newRequests.push_back(pcb);
00131                 guard.unlock();
00132             } else {
00133                 rc = aio_return(pcb);
00134                 // guard.lock();
00135                 // cout << "complete " << pcb->aio_offset << " " << rc
00136                 // << std::endl;
00137                 // guard.unlock();
00138                 pBinding->notifyTransferCompletion(rc >= 0);
00139             }
00140         }
00141         currentRequests.clear();
00142     }
00143 }

DeviceAccessScheduler * DeviceAccessScheduler::newScheduler ( DeviceAccessSchedulerParams const &  params  )  [static, inherited]

Creates a scheduler.

Parameters:
params DeviceAccessSchedulerParams to use
Returns:
new scheduler; caller is responsible for deleting it

Definition at line 70 of file DeviceAccessScheduler.cpp.

References DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER, DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER, DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER, dlopenAioLinuxScheduler(), DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER, DeviceAccessSchedulerParams::schedulerType, DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER, and DeviceAccessSchedulerParams::usingDefaultSchedulerType.

Referenced by CacheImpl< PageT, VictimPolicyT >::CacheImpl(), and RandomAccessFileDeviceTest::testAsyncIOImpl().

00072 {
00073     switch (params.schedulerType) {
00074     case DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER:
00075         return new ThreadPoolScheduler(params);
00076 
00077 #ifdef __MSVC__
00078     case DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER:
00079         return new IoCompletionPortScheduler(params);
00080 #endif
00081 
00082 #ifdef USE_LIBAIO_H
00083     case DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER:
00084         {
00085             DeviceAccessScheduler *pScheduler = dlopenAioLinuxScheduler(params);
00086             if (pScheduler) {
00087                 return pScheduler;
00088             } else {
00089                 // if the aioLinux scheduler was explicitly selected (vs simply
00090                 // using the default type for the OS), then the AIO runtime
00091                 // library must be installed; otherwise, fall through to use
00092                 // ThreadPoolScheduler as fallback
00093                 if (params.usingDefaultSchedulerType) {
00094                     break;
00095                 }
00096                 throw FennelExcn(FennelResource::instance().libaioRequired());
00097             }
00098         }
00099 #endif
00100 
00101 #ifdef USE_AIO_H
00102     case DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER:
00103         return new AioPollingScheduler(params);
00104     case DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER:
00105         return new AioSignalScheduler(params);
00106 #endif
00107 
00108     default:
00109         // fall through to use ThreadPoolScheduler as a fallback
00110         break;
00111     }
00112     return new ThreadPoolScheduler(params);
00113 }

void DeviceAccessScheduler::registerDevice ( SharedRandomAccessDevice  pDevice  )  [virtual, inherited]

Registers a device for which this scheduler will process requests.

The default implementation does nothing.

Parameters:
pDevice device to be registered

Reimplemented in AioLinuxScheduler, and IoCompletionPortScheduler.

Definition at line 142 of file DeviceAccessScheduler.cpp.

Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().

00144 {
00145 }

void DeviceAccessScheduler::unregisterDevice ( SharedRandomAccessDevice  pDevice  )  [virtual, inherited]

Unregisters a device.

The default implementation does nothing.

Parameters:
pDevice device to be unregistered

Definition at line 147 of file DeviceAccessScheduler.cpp.

Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().

00149 {
00150 }

void Thread::initAndRun (  )  [protected, inherited]

Definition at line 66 of file Thread.cpp.

References Thread::afterRun(), Thread::beforeRun(), and Thread::run().

Referenced by Thread::start().

00067 {
00068     beforeRun();
00069     run();
00070     afterRun();
00071 }

void Thread::beforeRun (  )  [protected, virtual, inherited]

Definition at line 73 of file Thread.cpp.

References Thread::bRunning.

Referenced by Thread::initAndRun().

00074 {
00075     bRunning = true;
00076 }

void Thread::afterRun (  )  [protected, virtual, inherited]

Definition at line 78 of file Thread.cpp.

References Thread::bRunning.

Referenced by Thread::initAndRun().

00079 {
00080     bRunning = false;
00081 }

void Thread::start (  )  [virtual, inherited]

Spawns the OS thread.

Definition at line 50 of file Thread.cpp.

References Thread::initAndRun(), and Thread::pBoostThread.

Referenced by AioLinuxScheduler::AioLinuxScheduler(), AioPollingScheduler(), StatsTimer::start(), ResourceTest::testConcurrency(), and LocalConditionTest::testNotifyAll().

00051 {
00052     pBoostThread = new boost::thread(
00053         boost::bind(&Thread::initAndRun,this));
00054 }

void Thread::join (  )  [inherited]

Waits for the OS thread to terminate.

Definition at line 56 of file Thread.cpp.

References Thread::pBoostThread.

Referenced by CheckpointThread::closeImpl(), TimerThread::stop(), stop(), AioLinuxScheduler::stop(), ResourceTest::testConcurrency(), and LocalConditionTest::testNotifyAll().

00057 {
00058     assert(pBoostThread);
00059     boost::thread t;
00060     assert(*pBoostThread != t);
00061     pBoostThread->join();
00062     delete pBoostThread;
00063     pBoostThread = NULL;
00064 }

bool Thread::isStarted (  )  const [inline, inherited]

Returns:
true if start has been called (and subsequent join has not completed)

Reimplemented in AioLinuxScheduler.

Definition at line 71 of file Thread.h.

Referenced by CheckpointThread::closeImpl(), CacheImpl< PageT, VictimPolicyT >::closeImpl(), AioLinuxScheduler::isStarted(), and TimerThread::stop().

00072     {
00073         return pBoostThread ? true : false;
00074     }

bool Thread::isStopped (  )  const [inline, inherited]

Returns:
opposite of isStarted()

Definition at line 79 of file Thread.h.

00080     {
00081         return !isStarted();
00082     }

boost::thread& Thread::getBoostThread (  )  [inline, inherited]

Accesses the underlying boost::thread, e.g.

for use in a boost::thread_group. This thread must already be started.

Returns:
the underlying boost::thread

Definition at line 90 of file Thread.h.

00091     {
00092         assert(isStarted());
00093         return *pBoostThread;
00094     }

std::string Thread::getName (  )  [inline, inherited]

Definition at line 96 of file Thread.h.

00097     {
00098         return name;
00099     }

void Thread::setName ( std::string const &  s  )  [inline, inherited]

Definition at line 101 of file Thread.h.

00102     {
00103         name = s;
00104     }


Member Data Documentation

StrictMutex AioPollingScheduler::mutex [private]

Definition at line 46 of file AioPollingScheduler.h.

Referenced by run(), schedule(), and stop().

LocalCondition AioPollingScheduler::newRequestPending [private]

Definition at line 47 of file AioPollingScheduler.h.

Referenced by run(), schedule(), and stop().

bool AioPollingScheduler::quit [private]

Definition at line 48 of file AioPollingScheduler.h.

Referenced by AioPollingScheduler(), run(), and stop().

std::vector<aiocb *> AioPollingScheduler::currentRequests [private]

Definition at line 50 of file AioPollingScheduler.h.

Referenced by run().

std::vector<aiocb *> AioPollingScheduler::newRequests [private]

Definition at line 51 of file AioPollingScheduler.h.

Referenced by run(), and schedule().

boost::thread* Thread::pBoostThread [protected, inherited]

Definition at line 44 of file Thread.h.

Referenced by Thread::join(), Thread::start(), Thread::Thread(), and Thread::~Thread().

bool Thread::bRunning [protected, inherited]

Definition at line 45 of file Thread.h.

Referenced by Thread::afterRun(), Thread::beforeRun(), Thread::Thread(), and Thread::~Thread().

std::string Thread::name [protected, inherited]

Definition at line 46 of file Thread.h.

Referenced by Thread::Thread().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:24 2009 for Fennel by  doxygen 1.5.1