#include <IoCompletionPortScheduler.h>
Inheritance diagram for IoCompletionPortScheduler:
Public Member Functions | |
IoCompletionPortScheduler (DeviceAccessSchedulerParams const &) | |
Constructor. | |
virtual | ~IoCompletionPortScheduler () |
Destructor: stop must already have been called. | |
virtual void | registerDevice (SharedRandomAccessDevice pDevice) |
Registers a device for which this scheduler will process requests. | |
virtual bool | schedule (RandomAccessRequest &request) |
Initiates a request, the details of which must already have been defined by the caller. | |
virtual void | stop () |
Shuts down, waiting for all pending requests to complete. | |
virtual void | unregisterDevice (SharedRandomAccessDevice pDevice) |
Unregisters a device. | |
Static Public Member Functions | |
static DeviceAccessScheduler * | newScheduler (DeviceAccessSchedulerParams const ¶ms) |
Creates a scheduler. | |
Private Member Functions | |
bool | isStarted () const |
Private Attributes | |
HANDLE | hCompletionPort |
std::vector< IoCompletionPortThread * > | threads |
bool | quit |
Friends | |
class | IoCompletionPortThread |
Definition at line 43 of file IoCompletionPortScheduler.h.
IoCompletionPortScheduler::IoCompletionPortScheduler | ( | DeviceAccessSchedulerParams const & | ) | [explicit] |
Constructor.
Definition at line 47 of file IoCompletionPortScheduler.cpp.
References hCompletionPort, IoCompletionPortThread, DeviceAccessSchedulerParams::nThreads, quit, and threads.
00049 { 00050 quit = false; 00051 00052 hCompletionPort = CreateIoCompletionPort( 00053 INVALID_HANDLE_VALUE, 00054 NULL, 00055 0, 00056 params.nThreads); 00057 if (!hCompletionPort) { 00058 throw SysCallExcn("CreateIoCompletionPort failed for scheduler"); 00059 } 00060 00061 for (uint i = 0; i < params.nThreads; ++i) { 00062 IoCompletionPortThread *pThread = new IoCompletionPortThread(*this); 00063 pThread->start(); 00064 threads.push_back(pThread); 00065 } 00066 }
IoCompletionPortScheduler::~IoCompletionPortScheduler | ( | ) | [virtual] |
Destructor: stop must already have been called.
Definition at line 68 of file IoCompletionPortScheduler.cpp.
References hCompletionPort, and isStarted().
00069 { 00070 assert(!isStarted()); 00071 if (!CloseHandle(hCompletionPort)) { 00072 throw SysCallExcn("CloseHandle failed for IoCompletionPort"); 00073 } 00074 }
bool IoCompletionPortScheduler::isStarted | ( | ) | const [inline, private] |
Definition at line 52 of file IoCompletionPortScheduler.h.
Referenced by schedule(), stop(), and ~IoCompletionPortScheduler().
00053 { 00054 return !threads.empty(); 00055 }
void IoCompletionPortScheduler::registerDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual] |
Registers a device for which this scheduler will process requests.
The default implementation does nothing.
pDevice | device to be registered |
Reimplemented from DeviceAccessScheduler.
Definition at line 163 of file IoCompletionPortScheduler.cpp.
References hCompletionPort, and threads.
00165 { 00166 int hFile = pDevice->getHandle(); 00167 if (hFile == -1) { 00168 return; 00169 } 00170 if (!CreateIoCompletionPort( 00171 HANDLE(hFile), 00172 hCompletionPort, 00173 0, 00174 threads.size())) 00175 { 00176 throw SysCallExcn("CreateIoCompletionPort failed for device"); 00177 } 00178 00179 // REVIEW: is it OK to do nothing for unregister? 00180 }
bool IoCompletionPortScheduler::schedule | ( | RandomAccessRequest & | request | ) | [virtual] |
Initiates a request, the details of which must already have been defined by the caller.
When the request completes, this scheduler will call notifyTransferCompletion on each binding associated with the request, and also break up the binding list. The bindings must not be altered by the caller until this notification is received. However, the request parameter itself need not live beyond this call.
Care must be taken to ensure that the schedule/notify sequences cannot deadlock. For example, the caller of schedule may hold a lock on a binding, and the implementation of schedule may acquire a scheduler lock internally. The notification callback may also need to take a lock on the binding. Thus, it is important that no scheduler lock be held while notifyTransferCompletion is called.
request | parameters for the request to be scheduled |
Implements DeviceAccessScheduler.
Definition at line 76 of file IoCompletionPortScheduler.cpp.
References RandomAccessRequest::bindingList, RandomAccessRequest::cbOffset, RandomAccessRequest::cbTransfer, IntrusiveListMutator< T, DerivedListNode >::detach(), RandomAccessRequestBinding::getBuffer(), RandomAccessRequestBinding::getBufferSize(), RandomAccessDevice::getHandle(), isStarted(), RandomAccessRequestBinding::notifyTransferCompletion(), RandomAccessRequest::pDevice, RandomAccessRequest::READ, and RandomAccessRequest::type.
00077 { 00078 assert(isStarted()); 00079 00080 // TODO: use ReadFileScatter/WriteFileGather 00081 00082 FileSize cbOffset = request.cbOffset; 00083 RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList); 00084 while (bindingMutator) { 00085 RandomAccessRequestBinding *pBinding = bindingMutator.detach(); 00086 LARGE_INTEGER largeInt; 00087 largeInt.QuadPart = cbOffset; 00088 pBinding->Offset = largeInt.LowPart; 00089 pBinding->OffsetHigh = largeInt.HighPart; 00090 BOOL bCompleted; 00091 if (request.type == RandomAccessRequest::READ) { 00092 bCompleted = ReadFile( 00093 HANDLE(request.pDevice->getHandle()), 00094 pBinding->getBuffer(), 00095 pBinding->getBufferSize(), 00096 NULL, 00097 pBinding); 00098 } else { 00099 bCompleted = WriteFile( 00100 HANDLE(request.pDevice->getHandle()), 00101 pBinding->getBuffer(), 00102 pBinding->getBufferSize(), 00103 NULL, 00104 pBinding); 00105 } 00106 if (!bCompleted) { 00107 if (GetLastError() != ERROR_IO_PENDING) { 00108 pBinding->notifyTransferCompletion(false); 00109 } 00110 } 00111 cbOffset += pBinding->getBufferSize(); 00112 } 00113 assert(cbOffset == request.cbOffset + request.cbTransfer); 00114 00115 return true; 00116 }
void IoCompletionPortScheduler::stop | ( | ) | [virtual] |
Shuts down, waiting for all pending requests to complete.
Implements DeviceAccessScheduler.
Definition at line 118 of file IoCompletionPortScheduler.cpp.
References deleteAndNullify(), hCompletionPort, isStarted(), quit, and threads.
00119 { 00120 assert(isStarted()); 00121 00122 quit = true; 00123 00124 // post dummy wakeup notifications; threads will see these and 00125 // exit 00126 for (uint i = 0; i < threads.size(); ++i) { 00127 if (!PostQueuedCompletionStatus(hCompletionPort,0,0,NULL)) { 00128 throw SysCallExcn("PostQueuedCompletionStatus failed"); 00129 } 00130 } 00131 00132 for (uint i = 0; i < threads.size(); ++i) { 00133 threads[i]->join(); 00134 deleteAndNullify(threads[i]); 00135 } 00136 threads.clear(); 00137 }
DeviceAccessScheduler * DeviceAccessScheduler::newScheduler | ( | DeviceAccessSchedulerParams const & | params | ) | [static, inherited] |
Creates a scheduler.
params | DeviceAccessSchedulerParams to use |
Definition at line 70 of file DeviceAccessScheduler.cpp.
References DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER, DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER, DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER, dlopenAioLinuxScheduler(), DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER, DeviceAccessSchedulerParams::schedulerType, DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER, and DeviceAccessSchedulerParams::usingDefaultSchedulerType.
Referenced by CacheImpl< PageT, VictimPolicyT >::CacheImpl(), and RandomAccessFileDeviceTest::testAsyncIOImpl().
00072 { 00073 switch (params.schedulerType) { 00074 case DeviceAccessSchedulerParams::THREAD_POOL_SCHEDULER: 00075 return new ThreadPoolScheduler(params); 00076 00077 #ifdef __MSVC__ 00078 case DeviceAccessSchedulerParams::IO_COMPLETION_PORT_SCHEDULER: 00079 return new IoCompletionPortScheduler(params); 00080 #endif 00081 00082 #ifdef USE_LIBAIO_H 00083 case DeviceAccessSchedulerParams::AIO_LINUX_SCHEDULER: 00084 { 00085 DeviceAccessScheduler *pScheduler = dlopenAioLinuxScheduler(params); 00086 if (pScheduler) { 00087 return pScheduler; 00088 } else { 00089 // if the aioLinux scheduler was explicitly selected (vs simply 00090 // using the default type for the OS), then the AIO runtime 00091 // library must be installed; otherwise, fall through to use 00092 // ThreadPoolScheduler as fallback 00093 if (params.usingDefaultSchedulerType) { 00094 break; 00095 } 00096 throw FennelExcn(FennelResource::instance().libaioRequired()); 00097 } 00098 } 00099 #endif 00100 00101 #ifdef USE_AIO_H 00102 case DeviceAccessSchedulerParams::AIO_POLLING_SCHEDULER: 00103 return new AioPollingScheduler(params); 00104 case DeviceAccessSchedulerParams::AIO_SIGNAL_SCHEDULER: 00105 return new AioSignalScheduler(params); 00106 #endif 00107 00108 default: 00109 // fall through to use ThreadPoolScheduler as a fallback 00110 break; 00111 } 00112 return new ThreadPoolScheduler(params); 00113 }
void DeviceAccessScheduler::unregisterDevice | ( | SharedRandomAccessDevice | pDevice | ) | [virtual, inherited] |
Unregisters a device.
The default implementation does nothing.
pDevice | device to be unregistered |
Definition at line 147 of file DeviceAccessScheduler.cpp.
Referenced by RandomAccessFileDeviceTest::testAsyncIOImpl().
friend class IoCompletionPortThread [friend] |
Definition at line 46 of file IoCompletionPortScheduler.h.
Referenced by IoCompletionPortScheduler().
Definition at line 48 of file IoCompletionPortScheduler.h.
Referenced by IoCompletionPortScheduler(), registerDevice(), IoCompletionPortThread::run(), stop(), and ~IoCompletionPortScheduler().
std::vector<IoCompletionPortThread *> IoCompletionPortScheduler::threads [private] |
Definition at line 49 of file IoCompletionPortScheduler.h.
Referenced by IoCompletionPortScheduler(), registerDevice(), and stop().
bool IoCompletionPortScheduler::quit [private] |
Definition at line 50 of file IoCompletionPortScheduler.h.
Referenced by IoCompletionPortScheduler(), IoCompletionPortThread::run(), and stop().