ThreadPoolScheduler Class Reference

ThreadPoolScheduler implements DeviceAccessScheduler by combining a thread pool with synchronous I/O calls. More...

#include <ThreadPoolScheduler.h>

Inheritance diagram for ThreadPoolScheduler:

DeviceAccessScheduler List of all members.

Public Member Functions

 ThreadPoolScheduler (DeviceAccessSchedulerParams const &)
 Constructor.
virtual ~ThreadPoolScheduler ()
 Destructor: stop must already have been called.
virtual bool schedule (RandomAccessRequest &request)
 Initiates a request, the details of which must already have been defined by the caller.
virtual void stop ()
 Shuts down, waiting for all pending requests to complete.
virtual void registerDevice (SharedRandomAccessDevice pDevice)
 Registers a device for which this scheduler will process requests.
virtual void unregisterDevice (SharedRandomAccessDevice pDevice)
 Unregisters a device.

Static Public Member Functions

static DeviceAccessSchedulernewScheduler (DeviceAccessSchedulerParams const &params)
 Creates a scheduler.

Private Attributes

ThreadPool< RandomAccessRequestpool

Detailed Description

ThreadPoolScheduler implements DeviceAccessScheduler by combining a thread pool with synchronous I/O calls.

Definition at line 37 of file ThreadPoolScheduler.h.


Constructor & Destructor Documentation

ThreadPoolScheduler::ThreadPoolScheduler ( DeviceAccessSchedulerParams const &   )  [explicit]

Constructor.

Definition at line 34 of file ThreadPoolScheduler.cpp.

References DeviceAccessSchedulerParams::maxRequests, pool, and ThreadPoolBase::start().

00036 {
00037     // threads and requests are 1-to-1, but threads are expensive,
00038     // so arbitrarily cap at 10
00039     uint nThreads = std::min<uint>(10, params.maxRequests);
00040     pool.start(nThreads);
00041 }

ThreadPoolScheduler::~ThreadPoolScheduler (  )  [virtual]

Destructor: stop must already have been called.

Definition at line 43 of file ThreadPoolScheduler.cpp.

00044 {
00045 }


Member Function Documentation

bool ThreadPoolScheduler::schedule ( RandomAccessRequest request  )  [virtual]

Initiates a request, the details of which must already have been defined by the caller.

When the request completes, this scheduler will call notifyTransferCompletion on each binding associated with the request, and also break up the binding list. The bindings must not be altered by the caller until this notification is received. However, the request parameter itself need not live beyond this call.

Care must be taken to ensure that the schedule/notify sequences cannot deadlock. For example, the caller of schedule may hold a lock on a binding, and the implementation of schedule may acquire a scheduler lock internally. The notification callback may also need to take a lock on the binding. Thus, it is important that no scheduler lock be held while notifyTransferCompletion is called.

Parameters:
request parameters for the request to be scheduled
Returns:
true if the request was successfully scheduled without any retries

Implements DeviceAccessScheduler.

Definition at line 47 of file ThreadPoolScheduler.cpp.

References RandomAccessRequest::bindingList, RandomAccessRequest::cbOffset, RandomAccessRequest::cbTransfer, IntrusiveListMutator< T, DerivedListNode >::detach(), RandomAccessRequestBinding::getBufferSize(), RandomAccessRequest::pDevice, pool, ThreadPool< Task >::submitTask(), and RandomAccessRequest::type.

00048 {
00049     RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList);
00050     FileSize cbOffset = request.cbOffset;
00051     // break up the request into one per binding
00052     // TODO:  don't do this if device supports scatter/gather; and skip
00053     // breakup if only one binding in the first place
00054     while (bindingMutator) {
00055         RandomAccessRequestBinding *pBinding = bindingMutator.detach();
00056         if (!pBinding) {
00057             break;
00058         }
00059         RandomAccessRequest subRequest;
00060         subRequest.pDevice = request.pDevice;
00061         subRequest.cbOffset = cbOffset;
00062         subRequest.cbTransfer = pBinding->getBufferSize();
00063         cbOffset += subRequest.cbTransfer;
00064         subRequest.type = request.type;
00065         subRequest.bindingList.push_back(*pBinding);
00066         pool.submitTask(subRequest);
00067     }
00068     assert(cbOffset == request.cbOffset + request.cbTransfer);
00069     return true;
00070 }

void ThreadPoolScheduler::stop (  )  [virtual]

Shuts down, waiting for all pending requests to complete.

Implements DeviceAccessScheduler.

Definition at line 72 of file ThreadPoolScheduler.cpp.

References pool, and ThreadPoolBase::stop().

00073 {
00074     pool.stop();
00075 }

DeviceAccessScheduler * DeviceAccessScheduler::newScheduler ( DeviceAccessSchedulerParams const &  params  )  [static, inherited]

Creates a scheduler.

Parameters:
params DeviceAccessSchedulerParams to use
Returns:
new scheduler; caller is responsible for deleting it

Definition at line 70 of file DeviceAccessScheduler.cpp.

References DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER, DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER, DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER, dlopenAioLinuxScheduler(), DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER, DeviceAccessSchedulerParams::schedulerType, DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER, and DeviceAccessSchedulerParams::usingDefaultSchedulerType.

Referenced by CacheImpl< PageT, VictimPolicyT >::CacheImpl(), and RandomAccessFileDeviceTest::testAsyncIOImpl().

00072 {
00073     switch (params.schedulerType) {
00074     case DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER:
00075         return new ThreadPoolScheduler(params);
00076 
00077 #ifdef __MSVC__
00078     case DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER:
00079         return new IoCompletionPortScheduler(params);
00080 #endif
00081 
00082 #ifdef USE_LIBAIO_H
00083     case DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER:
00084         {
00085             DeviceAccessScheduler *pScheduler = dlopenAioLinuxScheduler(params);
00086             if (pScheduler) {
00087                 return pScheduler;
00088             } else {
00089                 // if the aioLinux scheduler was explicitly selected (vs simply
00090                 // using the default type for the OS), then the AIO runtime
00091                 // library must be installed; otherwise, fall through to use
00092                 // ThreadPoolScheduler as fallback
00093                 if (params.usingDefaultSchedulerType) {
00094                     break;
00095                 }
00096                 throw FennelExcn(FennelResource::instance().libaioRequired());
00097             }
00098         }
00099 #endif
00100 
00101 #ifdef USE_AIO_H
00102     case DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER:
00103         return new AioPollingScheduler(params);
00104     case DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER:
00105         return new AioSignalScheduler(params);
00106 #endif
00107 
00108     default:
00109         // fall through to use ThreadPoolScheduler as a fallback
00110         break;
00111     }
00112     return new ThreadPoolScheduler(params);
00113 }

void DeviceAccessScheduler::registerDevice ( SharedRandomAccessDevice  pDevice  )  [virtual, inherited]

Registers a device for which this scheduler will process requests.

The default implementation does nothing.

Parameters:
pDevice device to be registered

Reimplemented in AioLinuxScheduler, and IoCompletionPortScheduler.

Definition at line 142 of file DeviceAccessScheduler.cpp.

Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().

00144 {
00145 }

void DeviceAccessScheduler::unregisterDevice ( SharedRandomAccessDevice  pDevice  )  [virtual, inherited]

Unregisters a device.

The default implementation does nothing.

Parameters:
pDevice device to be unregistered

Definition at line 147 of file DeviceAccessScheduler.cpp.

Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().

00149 {
00150 }


Member Data Documentation

ThreadPool<RandomAccessRequest> ThreadPoolScheduler::pool [private]

Definition at line 40 of file ThreadPoolScheduler.h.

Referenced by schedule(), stop(), and ThreadPoolScheduler().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:48 2009 for Fennel by  doxygen 1.5.1