#include <AioPollingScheduler.h>
Inheritance diagram for AioPollingScheduler:
Public Member Functions | |
AioPollingScheduler (DeviceAccessSchedulerParams const &) | |
Constructor. | |
virtual | ~AioPollingScheduler () |
Destructor: stop must already have been called. | |
virtual bool | schedule (RandomAccessRequest &request) |
Initiates a request, the details of which must already have been defined by the caller. | |
virtual void | stop () |
Shuts down, waiting for all pending requests to complete. | |
virtual void | run () |
virtual void | registerDevice (SharedRandomAccessDevice pDevice) |
Registers a device for which this scheduler will process requests. | |
virtual void | unregisterDevice (SharedRandomAccessDevice pDevice) |
Unregisters a device. | |
virtual void | start () |
Spawns the OS thread. | |
void | join () |
Waits for the OS thread to terminate. | |
bool | isStarted () const |
| |
bool | isStopped () const |
| |
boost::thread & | getBoostThread () |
Accesses the underlying boost::thread, e.g. | |
std::string | getName () |
void | setName (std::string const &s) |
Static Public Member Functions | |
static DeviceAccessScheduler * | newScheduler (DeviceAccessSchedulerParams const ¶ms) |
Creates a scheduler. | |
Protected Member Functions | |
void | initAndRun () |
virtual void | beforeRun () |
virtual void | afterRun () |
Protected Attributes | |
boost::thread * | pBoostThread |
bool | bRunning |
std::string | name |
Private Attributes | |
StrictMutex | mutex |
LocalCondition | newRequestPending |
bool | quit |
std::vector< aiocb * > | currentRequests |
std::vector< aiocb * > | newRequests |
Definition at line 43 of file AioPollingScheduler.h.
AioPollingScheduler::AioPollingScheduler | ( | DeviceAccessSchedulerParams const & | ) | [explicit] |
Constructor.
Definition at line 35 of file AioPollingScheduler.cpp.
References quit, and Thread::start().
00037 { 00038 // TODO: pass params.maxRequests on to OS, and use 00039 // params.nThreads 00040 quit = false; 00041 Thread::start(); 00042 }
AioPollingScheduler::~AioPollingScheduler | ( | ) | [virtual] |
Destructor: stop must already have been called.
Definition at line 44 of file AioPollingScheduler.cpp.
bool AioPollingScheduler::schedule | ( | RandomAccessRequest & | request | ) | [virtual] |
Initiates a request, the details of which must already have been defined by the caller.
When the request completes, this scheduler will call notifyTransferCompletion on each binding associated with the request, and also break up the binding list. The bindings must not be altered by the caller until this notification is received. However, the request parameter itself need not live beyond this call.
Care must be taken to ensure that the schedule/notify sequences cannot deadlock. For example, the caller of schedule may hold a lock on a binding, and the implementation of schedule may acquire a scheduler lock internally. The notification callback may also need to take a lock on the binding. Thus, it is important that no scheduler lock be held while notifyTransferCompletion is called.
request | parameters for the request to be scheduled |
Implements DeviceAccessScheduler.
Definition at line 48 of file AioPollingScheduler.cpp.
References RandomAccessRequest::bindingList, IntrusiveListMutator< T, DerivedListNode >::detach(), mutex, newRequestPending, newRequests, RandomAccessRequest::pDevice, and RandomAccessDevice::prepareTransfer().
00049 { 00050 StrictMutexGuard guard(mutex); 00051 uint iFirst = newRequests.size(); 00052 request.pDevice->prepareTransfer(request); 00053 RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList); 00054 while (bindingMutator) { 00055 RandomAccessRequestBinding *pBinding = bindingMutator.detach(); 00056 pBinding->aio_sigevent.sigev_notify = SIGEV_NONE; 00057 newRequests.push_back(pBinding); 00058 } 00059 aiocb **pFirst = &(newRequests.front()) + iFirst; 00060 int rc = lio_listio(LIO_NOWAIT,pFirst,newRequests.size() - iFirst,NULL); 00061 // TODO: handle error cases 00062 assert(rc == 0); 00063 newRequestPending.notify_all(); 00064 return true; 00065 }
void AioPollingScheduler::stop | ( | ) | [virtual] |
Shuts down, waiting for all pending requests to complete.
Implements DeviceAccessScheduler.
Definition at line 67 of file AioPollingScheduler.cpp.
References Thread::join(), mutex, newRequestPending, and quit.
00068 { 00069 StrictMutexGuard guard(mutex); 00070 quit = true; 00071 newRequestPending.notify_all(); 00072 guard.unlock(); 00073 Thread::join(); 00074 }
void AioPollingScheduler::run | ( | ) | [virtual] |
Implements Thread.
Definition at line 76 of file AioPollingScheduler.cpp.
References currentRequests, mutex, newRequestPending, newRequests, RandomAccessRequestBinding::notifyTransferCompletion(), and quit.
00077 { 00078 int rc; 00079 struct timespec ts; 00080 // poll every tenth of a millisecond 00081 // TODO: determine a reasonable default, or adjust this 00082 // dynamically? 00083 ts.tv_sec = 0; 00084 ts.tv_nsec = 100000; 00085 for (;;) { 00086 StrictMutexGuard guard(mutex); 00087 while (!newRequests.size()) { 00088 if (quit) { 00089 return; 00090 } 00091 newRequestPending.wait(guard); 00092 } 00093 currentRequests.resize(newRequests.size()); 00094 std::copy( 00095 newRequests.begin(), 00096 newRequests.end(), 00097 currentRequests.begin()); 00098 newRequests.clear(); 00099 guard.unlock(); 00100 do { 00101 // REVIEW jvs 4-Aug-2004: Using &front like this is not portable. 00102 rc = aio_suspend( 00103 &(currentRequests.front()), 00104 currentRequests.size(), 00105 &ts); 00106 if (rc) { 00107 switch (errno) { 00108 case EAGAIN: 00109 case EINTR: 00110 continue; 00111 default: 00112 std::cerr << rc << std::endl; 00113 permAssert(false); 00114 } 00115 } 00116 } while (false); 00117 // currentRequests does not need a lock, since this thread is the only 00118 // one which manipulates it. And the lock cannot be held when 00119 // notifyTransferCompletion is called, otherwise deadlock is possible. 00120 // However, access to newRequests does require a lock, since other 00121 // threads may be calling schedule, so use a fine-grained lock on it. 00122 for (uint i = 0; i < currentRequests.size(); ++i) { 00123 aiocb *pcb = currentRequests[i]; 00124 RandomAccessRequestBinding *pBinding = 00125 static_cast<RandomAccessRequestBinding *>(pcb); 00126 rc = aio_error(pcb); 00127 if (rc == EINPROGRESS) { 00128 guard.lock(); 00129 // cout << "EINPROGRESS " << pcb->aio_offset << std::endl; 00130 newRequests.push_back(pcb); 00131 guard.unlock(); 00132 } else { 00133 rc = aio_return(pcb); 00134 // guard.lock(); 00135 // cout << "complete " << pcb->aio_offset << " " << rc 00136 // << std::endl; 00137 // guard.unlock(); 00138 pBinding->notifyTransferCompletion(rc >= 0); 00139 } 00140 } 00141 currentRequests.clear(); 00142 } 00143 }
DeviceAccessScheduler * DeviceAccessScheduler::newScheduler | ( | DeviceAccessSchedulerParams const & | params | ) | [static, inherited] |
Creates a scheduler.
params | DeviceAccessSchedulerParams to use |
Definition at line 70 of file DeviceAccessScheduler.cpp.
References DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER, DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER, DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER, dlopenAioLinuxScheduler(), DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER, DeviceAccessSchedulerParams::schedulerType, DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER, and DeviceAccessSchedulerParams::usingDefaultSchedulerType.
Referenced by CacheImpl< PageT, VictimPolicyT >::CacheImpl(), and RandomAccessFileDeviceTest::testAsyncIOImpl().
00072 { 00073 switch (params.schedulerType) { 00074 case DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER: 00075 return new ThreadPoolScheduler(params); 00076 00077 #ifdef __MSVC__ 00078 case DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER: 00079 return new IoCompletionPortScheduler(params); 00080 #endif 00081 00082 #ifdef USE_LIBAIO_H 00083 case DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER: 00084 { 00085 DeviceAccessScheduler *pScheduler = dlopenAioLinuxScheduler(params); 00086 if (pScheduler) { 00087 return pScheduler; 00088 } else { 00089 // if the aioLinux scheduler was explicitly selected (vs simply 00090 // using the default type for the OS), then the AIO runtime 00091 // library must be installed; otherwise, fall through to use 00092 // ThreadPoolScheduler as fallback 00093 if (params.usingDefaultSchedulerType) { 00094 break; 00095 } 00096 throw FennelExcn(FennelResource::instance().libaioRequired()); 00097 } 00098 } 00099 #endif 00100 00101 #ifdef USE_AIO_H 00102 case DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER: 00103 return new AioPollingScheduler(params); 00104 case DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER: 00105 return new AioSignalScheduler(params); 00106 #endif 00107 00108 default: 00109 // fall through to use ThreadPoolScheduler as a fallback 00110 break; 00111 } 00112 return new ThreadPoolScheduler(params); 00113 }
void DeviceAccessScheduler::registerDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual, inherited] |
Registers a device for which this scheduler will process requests.
The default implementation does nothing.
pDevice | device to be registered |
Reimplemented in AioLinuxScheduler, and IoCompletionPortScheduler.
Definition at line 142 of file DeviceAccessScheduler.cpp.
Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().
void DeviceAccessScheduler::unregisterDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual, inherited] |
Unregisters a device.
The default implementation does nothing.
pDevice | device to be unregistered |
Definition at line 147 of file DeviceAccessScheduler.cpp.
Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().
void Thread::initAndRun | ( | ) | [protected, inherited] |
Definition at line 66 of file Thread.cpp.
References Thread::afterRun(), Thread::beforeRun(), and Thread::run().
Referenced by Thread::start().
void Thread::beforeRun | ( | ) | [protected, virtual, inherited] |
Definition at line 73 of file Thread.cpp.
References Thread::bRunning.
Referenced by Thread::initAndRun().
00074 { 00075 bRunning = true; 00076 }
void Thread::afterRun | ( | ) | [protected, virtual, inherited] |
Definition at line 78 of file Thread.cpp.
References Thread::bRunning.
Referenced by Thread::initAndRun().
00079 { 00080 bRunning = false; 00081 }
void Thread::start | ( | ) | [virtual, inherited] |
Spawns the OS thread.
Definition at line 50 of file Thread.cpp.
References Thread::initAndRun(), and Thread::pBoostThread.
Referenced by AioLinuxScheduler::AioLinuxScheduler(), AioPollingScheduler(), StatsTimer::start(), ResourceTest::testConcurrency(), and LocalConditionTest::testNotifyAll().
00051 { 00052 pBoostThread = new boost::thread( 00053 boost::bind(&Thread::initAndRun,this)); 00054 }
void Thread::join | ( | ) | [inherited] |
Waits for the OS thread to terminate.
Definition at line 56 of file Thread.cpp.
References Thread::pBoostThread.
Referenced by CheckpointThread::closeImpl(), TimerThread::stop(), stop(), AioLinuxScheduler::stop(), ResourceTest::testConcurrency(), and LocalConditionTest::testNotifyAll().
00057 { 00058 assert(pBoostThread); 00059 boost::thread t; 00060 assert(*pBoostThread != t); 00061 pBoostThread->join(); 00062 delete pBoostThread; 00063 pBoostThread = NULL; 00064 }
bool Thread::isStarted | ( | ) | const [inline, inherited] |
Reimplemented in AioLinuxScheduler.
Definition at line 71 of file Thread.h.
Referenced by CheckpointThread::closeImpl(), CacheImpl< PageT, VictimPolicyT >::closeImpl(), AioLinuxScheduler::isStarted(), and TimerThread::stop().
00072 { 00073 return pBoostThread ? true : false; 00074 }
bool Thread::isStopped | ( | ) | const [inline, inherited] |
Definition at line 79 of file Thread.h.
00080 { 00081 return !isStarted(); 00082 }
boost::thread& Thread::getBoostThread | ( | ) | [inline, inherited] |
Accesses the underlying boost::thread, e.g.
for use in a boost::thread_group. This thread must already be started.
Definition at line 90 of file Thread.h.
00091 { 00092 assert(isStarted()); 00093 return *pBoostThread; 00094 }
std::string Thread::getName | ( | ) | [inline, inherited] |
void Thread::setName | ( | std::string const & | s | ) | [inline, inherited] |
StrictMutex AioPollingScheduler::mutex [private] |
bool AioPollingScheduler::quit [private] |
Definition at line 48 of file AioPollingScheduler.h.
Referenced by AioPollingScheduler(), run(), and stop().
std::vector<aiocb *> AioPollingScheduler::currentRequests [private] |
std::vector<aiocb *> AioPollingScheduler::newRequests [private] |
boost::thread* Thread::pBoostThread [protected, inherited] |
Definition at line 44 of file Thread.h.
Referenced by Thread::join(), Thread::start(), Thread::Thread(), and Thread::~Thread().
bool Thread::bRunning [protected, inherited] |
Definition at line 45 of file Thread.h.
Referenced by Thread::afterRun(), Thread::beforeRun(), Thread::Thread(), and Thread::~Thread().
std::string Thread::name [protected, inherited] |