IoCompletionPortScheduler Class Reference

IoCompletionPortScheduler implements DeviceAccessScheduler via the Win32 IoCompletionPort facility. More...

#include <IoCompletionPortScheduler.h>

Inheritance diagram for IoCompletionPortScheduler:

DeviceAccessScheduler List of all members.

Public Member Functions

 IoCompletionPortScheduler (DeviceAccessSchedulerParams const &)
 Constructor.
virtual ~IoCompletionPortScheduler ()
 Destructor: stop must already have been called.
virtual void registerDevice (SharedRandomAccessDevice pDevice)
 Registers a device for which this scheduler will process requests.
virtual bool schedule (RandomAccessRequest &request)
 Initiates a request, the details of which must already have been defined by the caller.
virtual void stop ()
 Shuts down, waiting for all pending requests to complete.
virtual void unregisterDevice (SharedRandomAccessDevice pDevice)
 Unregisters a device.

Static Public Member Functions

static DeviceAccessSchedulernewScheduler (DeviceAccessSchedulerParams const &params)
 Creates a scheduler.

Private Member Functions

bool isStarted () const

Private Attributes

HANDLE hCompletionPort
std::vector< IoCompletionPortThread * > threads
bool quit

Friends

class IoCompletionPortThread

Detailed Description

IoCompletionPortScheduler implements DeviceAccessScheduler via the Win32 IoCompletionPort facility.

Definition at line 43 of file IoCompletionPortScheduler.h.


Constructor & Destructor Documentation

IoCompletionPortScheduler::IoCompletionPortScheduler ( DeviceAccessSchedulerParams const &   )  [explicit]

Constructor.

Definition at line 47 of file IoCompletionPortScheduler.cpp.

References hCompletionPort, IoCompletionPortThread, DeviceAccessSchedulerParams::nThreads, quit, and threads.

00049 {
00050     quit = false;
00051 
00052     hCompletionPort = CreateIoCompletionPort(
00053         INVALID_HANDLE_VALUE,
00054         NULL,
00055         0,
00056         params.nThreads);
00057     if (!hCompletionPort) {
00058         throw SysCallExcn("CreateIoCompletionPort failed for scheduler");
00059     }
00060 
00061     for (uint i = 0; i < params.nThreads; ++i) {
00062         IoCompletionPortThread *pThread = new IoCompletionPortThread(*this);
00063         pThread->start();
00064         threads.push_back(pThread);
00065     }
00066 }

IoCompletionPortScheduler::~IoCompletionPortScheduler (  )  [virtual]

Destructor: stop must already have been called.

Definition at line 68 of file IoCompletionPortScheduler.cpp.

References hCompletionPort, and isStarted().

00069 {
00070     assert(!isStarted());
00071     if (!CloseHandle(hCompletionPort)) {
00072         throw SysCallExcn("CloseHandle failed for IoCompletionPort");
00073     }
00074 }


Member Function Documentation

bool IoCompletionPortScheduler::isStarted (  )  const [inline, private]

Definition at line 52 of file IoCompletionPortScheduler.h.

Referenced by schedule(), stop(), and ~IoCompletionPortScheduler().

00053     {
00054         return !threads.empty();
00055     }

void IoCompletionPortScheduler::registerDevice ( SharedRandomAccessDevice  pDevice  )  [virtual]

Registers a device for which this scheduler will process requests.

The default implementation does nothing.

Parameters:
pDevice device to be registered

Reimplemented from DeviceAccessScheduler.

Definition at line 163 of file IoCompletionPortScheduler.cpp.

References hCompletionPort, and threads.

00165 {
00166     int hFile = pDevice->getHandle();
00167     if (hFile == -1) {
00168         return;
00169     }
00170     if (!CreateIoCompletionPort(
00171             HANDLE(hFile),
00172             hCompletionPort,
00173             0,
00174             threads.size()))
00175     {
00176         throw SysCallExcn("CreateIoCompletionPort failed for device");
00177     }
00178 
00179     // REVIEW:  is it OK to do nothing for unregister?
00180 }

bool IoCompletionPortScheduler::schedule ( RandomAccessRequest request  )  [virtual]

Initiates a request, the details of which must already have been defined by the caller.

When the request completes, this scheduler will call notifyTransferCompletion on each binding associated with the request, and also break up the binding list. The bindings must not be altered by the caller until this notification is received. However, the request parameter itself need not live beyond this call.

Care must be taken to ensure that the schedule/notify sequences cannot deadlock. For example, the caller of schedule may hold a lock on a binding, and the implementation of schedule may acquire a scheduler lock internally. The notification callback may also need to take a lock on the binding. Thus, it is important that no scheduler lock be held while notifyTransferCompletion is called.

Parameters:
request parameters for the request to be scheduled
Returns:
true if the request was successfully scheduled without any retries

Implements DeviceAccessScheduler.

Definition at line 76 of file IoCompletionPortScheduler.cpp.

References RandomAccessRequest::bindingList, RandomAccessRequest::cbOffset, RandomAccessRequest::cbTransfer, IntrusiveListMutator< T, DerivedListNode >::detach(), RandomAccessRequestBinding::getBuffer(), RandomAccessRequestBinding::getBufferSize(), RandomAccessDevice::getHandle(), isStarted(), RandomAccessRequestBinding::notifyTransferCompletion(), RandomAccessRequest::pDevice, RandomAccessRequest::READ, and RandomAccessRequest::type.

00077 {
00078     assert(isStarted());
00079 
00080     // TODO:  use ReadFileScatter/WriteFileGather
00081 
00082     FileSize cbOffset = request.cbOffset;
00083     RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList);
00084     while (bindingMutator) {
00085         RandomAccessRequestBinding *pBinding = bindingMutator.detach();
00086         LARGE_INTEGER largeInt;
00087         largeInt.QuadPart = cbOffset;
00088         pBinding->Offset = largeInt.LowPart;
00089         pBinding->OffsetHigh = largeInt.HighPart;
00090         BOOL bCompleted;
00091         if (request.type == RandomAccessRequest::READ) {
00092             bCompleted = ReadFile(
00093                 HANDLE(request.pDevice->getHandle()),
00094                 pBinding->getBuffer(),
00095                 pBinding->getBufferSize(),
00096                 NULL,
00097                 pBinding);
00098         } else {
00099             bCompleted = WriteFile(
00100                 HANDLE(request.pDevice->getHandle()),
00101                 pBinding->getBuffer(),
00102                 pBinding->getBufferSize(),
00103                 NULL,
00104                 pBinding);
00105         }
00106         if (!bCompleted) {
00107             if (GetLastError() != ERROR_IO_PENDING) {
00108                 pBinding->notifyTransferCompletion(false);
00109             }
00110         }
00111         cbOffset += pBinding->getBufferSize();
00112     }
00113     assert(cbOffset == request.cbOffset + request.cbTransfer);
00114 
00115     return true;
00116 }

void IoCompletionPortScheduler::stop (  )  [virtual]

Shuts down, waiting for all pending requests to complete.

Implements DeviceAccessScheduler.

Definition at line 118 of file IoCompletionPortScheduler.cpp.

References deleteAndNullify(), hCompletionPort, isStarted(), quit, and threads.

00119 {
00120     assert(isStarted());
00121 
00122     quit = true;
00123 
00124     // post dummy wakeup notifications; threads will see these and
00125     // exit
00126     for (uint i = 0; i < threads.size(); ++i) {
00127         if (!PostQueuedCompletionStatus(hCompletionPort,0,0,NULL)) {
00128             throw SysCallExcn("PostQueuedCompletionStatus failed");
00129         }
00130     }
00131 
00132     for (uint i = 0; i < threads.size(); ++i) {
00133         threads[i]->join();
00134         deleteAndNullify(threads[i]);
00135     }
00136     threads.clear();
00137 }

DeviceAccessScheduler * DeviceAccessScheduler::newScheduler ( DeviceAccessSchedulerParams const &  params  )  [static, inherited]

Creates a scheduler.

Parameters:
params DeviceAccessSchedulerParams to use
Returns:
new scheduler; caller is responsible for deleting it

Definition at line 70 of file DeviceAccessScheduler.cpp.

References DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER, DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER, DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER, dlopenAioLinuxScheduler(), DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER, DeviceAccessSchedulerParams::schedulerType, DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER, and DeviceAccessSchedulerParams::usingDefaultSchedulerType.

Referenced by CacheImpl< PageT, VictimPolicyT >::CacheImpl(), and RandomAccessFileDeviceTest::testAsyncIOImpl().

00072 {
00073     switch (params.schedulerType) {
00074     case DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER:
00075         return new ThreadPoolScheduler(params);
00076 
00077 #ifdef __MSVC__
00078     case DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER:
00079         return new IoCompletionPortScheduler(params);
00080 #endif
00081 
00082 #ifdef USE_LIBAIO_H
00083     case DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER:
00084         {
00085             DeviceAccessScheduler *pScheduler = dlopenAioLinuxScheduler(params);
00086             if (pScheduler) {
00087                 return pScheduler;
00088             } else {
00089                 // if the aioLinux scheduler was explicitly selected (vs simply
00090                 // using the default type for the OS), then the AIO runtime
00091                 // library must be installed; otherwise, fall through to use
00092                 // ThreadPoolScheduler as fallback
00093                 if (params.usingDefaultSchedulerType) {
00094                     break;
00095                 }
00096                 throw FennelExcn(FennelResource::instance().libaioRequired());
00097             }
00098         }
00099 #endif
00100 
00101 #ifdef USE_AIO_H
00102     case DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER:
00103         return new AioPollingScheduler(params);
00104     case DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER:
00105         return new AioSignalScheduler(params);
00106 #endif
00107 
00108     default:
00109         // fall through to use ThreadPoolScheduler as a fallback
00110         break;
00111     }
00112     return new ThreadPoolScheduler(params);
00113 }

void DeviceAccessScheduler::unregisterDevice ( SharedRandomAccessDevice  pDevice  )  [virtual, inherited]

Unregisters a device.

The default implementation does nothing.

Parameters:
pDevice device to be unregistered

Definition at line 147 of file DeviceAccessScheduler.cpp.

Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().

00149 {
00150 }


Friends And Related Function Documentation

friend class IoCompletionPortThread [friend]

Definition at line 46 of file IoCompletionPortScheduler.h.

Referenced by IoCompletionPortScheduler().


Member Data Documentation

HANDLE IoCompletionPortScheduler::hCompletionPort [private]

Definition at line 48 of file IoCompletionPortScheduler.h.

Referenced by IoCompletionPortScheduler(), registerDevice(), IoCompletionPortThread::run(), stop(), and ~IoCompletionPortScheduler().

std::vector<IoCompletionPortThread *> IoCompletionPortScheduler::threads [private]

Definition at line 49 of file IoCompletionPortScheduler.h.

Referenced by IoCompletionPortScheduler(), registerDevice(), and stop().

bool IoCompletionPortScheduler::quit [private]

Definition at line 50 of file IoCompletionPortScheduler.h.

Referenced by IoCompletionPortScheduler(), IoCompletionPortThread::run(), and stop().


The documentation for this class was generated from the following files:
Generated on Mon Jun 22 04:00:33 2009 for Fennel by  doxygen 1.5.1