#include <ThreadPoolScheduler.h>
Inheritance diagram for ThreadPoolScheduler:
Public Member Functions | |
ThreadPoolScheduler (DeviceAccessSchedulerParams const &) | |
Constructor. | |
virtual | ~ThreadPoolScheduler () |
Destructor: stop must already have been called. | |
virtual bool | schedule (RandomAccessRequest &request) |
Initiates a request, the details of which must already have been defined by the caller. | |
virtual void | stop () |
Shuts down, waiting for all pending requests to complete. | |
virtual void | registerDevice (SharedRandomAccessDevice pDevice) |
Registers a device for which this scheduler will process requests. | |
virtual void | unregisterDevice (SharedRandomAccessDevice pDevice) |
Unregisters a device. | |
Static Public Member Functions | |
static DeviceAccessScheduler * | newScheduler (DeviceAccessSchedulerParams const ¶ms) |
Creates a scheduler. | |
Private Attributes | |
ThreadPool< RandomAccessRequest > | pool |
Definition at line 37 of file ThreadPoolScheduler.h.
ThreadPoolScheduler::ThreadPoolScheduler | ( | DeviceAccessSchedulerParams const & | ) | [explicit] |
Constructor.
Definition at line 34 of file ThreadPoolScheduler.cpp.
References DeviceAccessSchedulerParams::maxRequests, pool, and ThreadPoolBase::start().
00036 { 00037 // threads and requests are 1-to-1, but threads are expensive, 00038 // so arbitrarily cap at 10 00039 uint nThreads = std::min<uint>(10, params.maxRequests); 00040 pool.start(nThreads); 00041 }
ThreadPoolScheduler::~ThreadPoolScheduler | ( | ) | [virtual] |
Destructor: stop must already have been called.
Definition at line 43 of file ThreadPoolScheduler.cpp.
bool ThreadPoolScheduler::schedule | ( | RandomAccessRequest & | request | ) | [virtual] |
Initiates a request, the details of which must already have been defined by the caller.
When the request completes, this scheduler will call notifyTransferCompletion on each binding associated with the request, and also break up the binding list. The bindings must not be altered by the caller until this notification is received. However, the request parameter itself need not live beyond this call.
Care must be taken to ensure that the schedule/notify sequences cannot deadlock. For example, the caller of schedule may hold a lock on a binding, and the implementation of schedule may acquire a scheduler lock internally. The notification callback may also need to take a lock on the binding. Thus, it is important that no scheduler lock be held while notifyTransferCompletion is called.
request | parameters for the request to be scheduled |
Implements DeviceAccessScheduler.
Definition at line 47 of file ThreadPoolScheduler.cpp.
References RandomAccessRequest::bindingList, RandomAccessRequest::cbOffset, RandomAccessRequest::cbTransfer, IntrusiveListMutator< T, DerivedListNode >::detach(), RandomAccessRequestBinding::getBufferSize(), RandomAccessRequest::pDevice, pool, ThreadPool< Task >::submitTask(), and RandomAccessRequest::type.
00048 { 00049 RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList); 00050 FileSize cbOffset = request.cbOffset; 00051 // break up the request into one per binding 00052 // TODO: don't do this if device supports scatter/gather; and skip 00053 // breakup if only one binding in the first place 00054 while (bindingMutator) { 00055 RandomAccessRequestBinding *pBinding = bindingMutator.detach(); 00056 if (!pBinding) { 00057 break; 00058 } 00059 RandomAccessRequest subRequest; 00060 subRequest.pDevice = request.pDevice; 00061 subRequest.cbOffset = cbOffset; 00062 subRequest.cbTransfer = pBinding->getBufferSize(); 00063 cbOffset += subRequest.cbTransfer; 00064 subRequest.type = request.type; 00065 subRequest.bindingList.push_back(*pBinding); 00066 pool.submitTask(subRequest); 00067 } 00068 assert(cbOffset == request.cbOffset + request.cbTransfer); 00069 return true; 00070 }
void ThreadPoolScheduler::stop | ( | ) | [virtual] |
Shuts down, waiting for all pending requests to complete.
Implements DeviceAccessScheduler.
Definition at line 72 of file ThreadPoolScheduler.cpp.
References pool, and ThreadPoolBase::stop().
DeviceAccessScheduler * DeviceAccessScheduler::newScheduler | ( | DeviceAccessSchedulerParams const & | params | ) | [static, inherited] |
Creates a scheduler.
params | DeviceAccessSchedulerParams to use |
Definition at line 70 of file DeviceAccessScheduler.cpp.
References DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER, DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER, DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER, dlopenAioLinuxScheduler(), DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER, DeviceAccessSchedulerParams::schedulerType, DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER, and DeviceAccessSchedulerParams::usingDefaultSchedulerType.
Referenced by CacheImpl< PageT, VictimPolicyT >::CacheImpl(), and RandomAccessFileDeviceTest::testAsyncIOImpl().
00072 { 00073 switch (params.schedulerType) { 00074 case DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER: 00075 return new ThreadPoolScheduler(params); 00076 00077 #ifdef __MSVC__ 00078 case DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER: 00079 return new IoCompletionPortScheduler(params); 00080 #endif 00081 00082 #ifdef USE_LIBAIO_H 00083 case DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER: 00084 { 00085 DeviceAccessScheduler *pScheduler = dlopenAioLinuxScheduler(params); 00086 if (pScheduler) { 00087 return pScheduler; 00088 } else { 00089 // if the aioLinux scheduler was explicitly selected (vs simply 00090 // using the default type for the OS), then the AIO runtime 00091 // library must be installed; otherwise, fall through to use 00092 // ThreadPoolScheduler as fallback 00093 if (params.usingDefaultSchedulerType) { 00094 break; 00095 } 00096 throw FennelExcn(FennelResource::instance().libaioRequired()); 00097 } 00098 } 00099 #endif 00100 00101 #ifdef USE_AIO_H 00102 case DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER: 00103 return new AioPollingScheduler(params); 00104 case DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER: 00105 return new AioSignalScheduler(params); 00106 #endif 00107 00108 default: 00109 // fall through to use ThreadPoolScheduler as a fallback 00110 break; 00111 } 00112 return new ThreadPoolScheduler(params); 00113 }
void DeviceAccessScheduler::registerDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual, inherited] |
Registers a device for which this scheduler will process requests.
The default implementation does nothing.
pDevice | device to be registered |
Reimplemented in AioLinuxScheduler, and IoCompletionPortScheduler.
Definition at line 142 of file DeviceAccessScheduler.cpp.
Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().
void DeviceAccessScheduler::unregisterDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual, inherited] |
Unregisters a device.
The default implementation does nothing.
pDevice | device to be unregistered |
Definition at line 147 of file DeviceAccessScheduler.cpp.
Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().
Definition at line 40 of file ThreadPoolScheduler.h.
Referenced by schedule(), stop(), and ThreadPoolScheduler().