#include <AioSignalScheduler.h>
Inheritance diagram for AioSignalScheduler:
Public Member Functions | |
AioSignalScheduler (DeviceAccessSchedulerParams const &) | |
Constructor. | |
virtual | ~AioSignalScheduler () |
Destructor: stop must already have been called. | |
virtual bool | schedule (RandomAccessRequest &request) |
Initiates a request, the details of which must already have been defined by the caller. | |
virtual void | stop () |
Shuts down, waiting for all pending requests to complete. | |
virtual void | registerDevice (SharedRandomAccessDevice pDevice) |
Registers a device for which this scheduler will process requests. | |
virtual void | unregisterDevice (SharedRandomAccessDevice pDevice) |
Unregisters a device. | |
Static Public Member Functions | |
static DeviceAccessScheduler * | newScheduler (DeviceAccessSchedulerParams const ¶ms) |
Creates a scheduler. | |
Private Member Functions | |
bool | isStarted () const |
Private Attributes | |
StrictMutex | mutex |
LocalCondition | quitCondition |
sigaction | saOld |
bool | quit |
std::vector< AioSignalHandlerThread * > | threads |
Friends | |
class | AioSignalHandlerThread |
Definition at line 46 of file AioSignalScheduler.h.
AioSignalScheduler::AioSignalScheduler | ( | DeviceAccessSchedulerParams const & | ) | [explicit] |
Constructor.
Definition at line 68 of file AioSignalScheduler.cpp.
References aio_handler(), AioSignalHandlerThread, DeviceAccessSchedulerParams::nThreads, quit, saOld, and threads.
00070 { 00071 // TODO: pass params.maxSimultaneousRequests on to OS 00072 00073 // block signal in this thread so that child threads will also have it 00074 // blocked 00075 int rc; 00076 sigset_t mask; 00077 sigemptyset(&mask); 00078 sigaddset(&mask,SIGRTMIN); 00079 rc = pthread_sigmask(SIG_BLOCK,&mask,NULL); 00080 assert(!rc); 00081 00082 // TODO: come up with a way to ensure signal is blocked in all threads 00083 // except the one spawned below 00084 00085 struct sigaction sa; 00086 sa.sa_flags = SA_SIGINFO; 00087 sa.sa_sigaction = aio_handler; 00088 sigemptyset(&(sa.sa_mask)); 00089 rc = sigaction(SIGRTMIN,&sa,&saOld); 00090 assert(!rc); 00091 00092 quit = false; 00093 for (uint i = 0; i < params.nThreads; ++i) { 00094 AioSignalHandlerThread *pThread = new AioSignalHandlerThread(*this); 00095 pThread->start(); 00096 threads.push_back(pThread); 00097 } 00098 }
AioSignalScheduler::~AioSignalScheduler | ( | ) | [virtual] |
Destructor: stop must already have been called.
Definition at line 100 of file AioSignalScheduler.cpp.
References isStarted().
00101 { 00102 assert(!isStarted()); 00103 }
bool AioSignalScheduler::isStarted | ( | ) | const [inline, private] |
Definition at line 60 of file AioSignalScheduler.h.
Referenced by schedule(), stop(), and ~AioSignalScheduler().
00061 { 00062 return !threads.empty(); 00063 }
bool AioSignalScheduler::schedule | ( | RandomAccessRequest & | request | ) | [virtual] |
Initiates a request, the details of which must already have been defined by the caller.
When the request completes, this scheduler will call notifyTransferCompletion on each binding associated with the request, and also break up the binding list. The bindings must not be altered by the caller until this notification is received. However, the request parameter itself need not live beyond this call.
Care must be taken to ensure that the schedule/notify sequences cannot deadlock. For example, the caller of schedule may hold a lock on a binding, and the implementation of schedule may acquire a scheduler lock internally. The notification callback may also need to take a lock on the binding. Thus, it is important that no scheduler lock be held while notifyTransferCompletion is called.
request | parameters for the request to be scheduled |
Implements DeviceAccessScheduler.
Definition at line 105 of file AioSignalScheduler.cpp.
References RandomAccessRequest::bindingList, IntrusiveListMutator< T, DerivedListNode >::detach(), isStarted(), RandomAccessRequest::pDevice, RandomAccessDevice::prepareTransfer(), RandomAccessRequest::READ, and RandomAccessRequest::type.
00106 { 00107 assert(isStarted()); 00108 00109 int rc; 00110 00111 // TODO: use lio_listio instead? but then is it possible to get 00112 // individual notifications? Or keep chain and notify all at once? 00113 request.pDevice->prepareTransfer(request); 00114 RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList); 00115 while (bindingMutator) { 00116 RandomAccessRequestBinding *pBinding = bindingMutator.detach(); 00117 pBinding->aio_sigevent.sigev_notify = SIGEV_SIGNAL; 00118 pBinding->aio_sigevent.sigev_signo = SIGRTMIN; 00119 pBinding->aio_sigevent.sigev_value.sival_ptr = pBinding; 00120 00121 // static_cast assigned to lpBinding is a workaround 00122 // for a gcc bug that shows up on Ubuntu 8.04 when 00123 // passing pBinding to aio_* methods 00124 aiocb *lpBinding = static_cast<aiocb *>(pBinding); 00125 00126 if (request.type == RandomAccessRequest::READ) { 00127 rc = aio_read(lpBinding); 00128 } else { 00129 rc = aio_write(lpBinding); 00130 } 00131 assert(!rc); 00132 } 00133 00134 return true; 00135 }
void AioSignalScheduler::stop | ( | ) | [virtual] |
Shuts down, waiting for all pending requests to complete.
Implements DeviceAccessScheduler.
Definition at line 137 of file AioSignalScheduler.cpp.
References deleteAndNullify(), isStarted(), mutex, quit, quitCondition, saOld, and threads.
00138 { 00139 assert(isStarted()); 00140 00141 StrictMutexGuard guard(mutex); 00142 quit = true; 00143 quitCondition.notify_all(); 00144 guard.unlock(); 00145 00146 for (uint i = 0; i < threads.size(); ++i) { 00147 threads[i]->join(); 00148 deleteAndNullify(threads[i]); 00149 } 00150 threads.clear(); 00151 00152 int rc = sigaction(SIGRTMIN,&saOld,NULL); 00153 assert(!rc); 00154 }
DeviceAccessScheduler * DeviceAccessScheduler::newScheduler | ( | DeviceAccessSchedulerParams const & | params | ) | [static, inherited] |
Creates a scheduler.
params | DeviceAccessSchedulerParams to use |
Definition at line 70 of file DeviceAccessScheduler.cpp.
References DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER, DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER, DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER, dlopenAioLinuxScheduler(), DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER, DeviceAccessSchedulerParams::schedulerType, DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER, and DeviceAccessSchedulerParams::usingDefaultSchedulerType.
Referenced by CacheImpl< PageT, VictimPolicyT >::CacheImpl(), and RandomAccessFileDeviceTest::testAsyncIOImpl().
00072 { 00073 switch (params.schedulerType) { 00074 case DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER: 00075 return new ThreadPoolScheduler(params); 00076 00077 #ifdef __MSVC__ 00078 case DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER: 00079 return new IoCompletionPortScheduler(params); 00080 #endif 00081 00082 #ifdef USE_LIBAIO_H 00083 case DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER: 00084 { 00085 DeviceAccessScheduler *pScheduler = dlopenAioLinuxScheduler(params); 00086 if (pScheduler) { 00087 return pScheduler; 00088 } else { 00089 // if the aioLinux scheduler was explicitly selected (vs simply 00090 // using the default type for the OS), then the AIO runtime 00091 // library must be installed; otherwise, fall through to use 00092 // ThreadPoolScheduler as fallback 00093 if (params.usingDefaultSchedulerType) { 00094 break; 00095 } 00096 throw FennelExcn(FennelResource::instance().libaioRequired()); 00097 } 00098 } 00099 #endif 00100 00101 #ifdef USE_AIO_H 00102 case DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER: 00103 return new AioPollingScheduler(params); 00104 case DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER: 00105 return new AioSignalScheduler(params); 00106 #endif 00107 00108 default: 00109 // fall through to use ThreadPoolScheduler as a fallback 00110 break; 00111 } 00112 return new ThreadPoolScheduler(params); 00113 }
void DeviceAccessScheduler::registerDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual, inherited] |
Registers a device for which this scheduler will process requests.
The default implementation does nothing.
pDevice | device to be registered |
Reimplemented in AioLinuxScheduler, and IoCompletionPortScheduler.
Definition at line 142 of file DeviceAccessScheduler.cpp.
Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().
void DeviceAccessScheduler::unregisterDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual, inherited] |
Unregisters a device.
The default implementation does nothing.
pDevice | device to be unregistered |
Definition at line 147 of file DeviceAccessScheduler.cpp.
Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().
friend class AioSignalHandlerThread [friend] |
StrictMutex AioSignalScheduler::mutex [private] |
struct sigaction AioSignalScheduler::saOld [private] |
bool AioSignalScheduler::quit [private] |
Definition at line 54 of file AioSignalScheduler.h.
Referenced by AioSignalScheduler(), AioSignalHandlerThread::run(), and stop().
std::vector<AioSignalHandlerThread *> AioSignalScheduler::threads [private] |