00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025
00026 #ifdef __MSVC__
00027
00028 #include "fennel/device/RandomAccessDevice.h"
00029 #include "fennel/device/IoCompletionPortScheduler.h"
00030 #include "fennel/device/DeviceAccessSchedulerParams.h"
00031 #include "fennel/common/SysCallExcn.h"
00032 #include "fennel/synch/Thread.h"
00033
00034 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/device/IoCompletionPortScheduler.cpp#10 $");
00035
00036 class IoCompletionPortThread : public Thread
00037 {
00038 IoCompletionPortScheduler &scheduler;
00039 public:
00040 IoCompletionPortThread(IoCompletionPortScheduler &schedulerInit)
00041 : scheduler(schedulerInit)
00042 {
00043 }
00044 virtual void run();
00045 };
00046
00047 IoCompletionPortScheduler::IoCompletionPortScheduler(
00048 DeviceAccessSchedulerParams const ¶ms)
00049 {
00050 quit = false;
00051
00052 hCompletionPort = CreateIoCompletionPort(
00053 INVALID_HANDLE_VALUE,
00054 NULL,
00055 0,
00056 params.nThreads);
00057 if (!hCompletionPort) {
00058 throw SysCallExcn("CreateIoCompletionPort failed for scheduler");
00059 }
00060
00061 for (uint i = 0; i < params.nThreads; ++i) {
00062 IoCompletionPortThread *pThread = new IoCompletionPortThread(*this);
00063 pThread->start();
00064 threads.push_back(pThread);
00065 }
00066 }
00067
00068 IoCompletionPortScheduler::~IoCompletionPortScheduler()
00069 {
00070 assert(!isStarted());
00071 if (!CloseHandle(hCompletionPort)) {
00072 throw SysCallExcn("CloseHandle failed for IoCompletionPort");
00073 }
00074 }
00075
00076 bool IoCompletionPortScheduler::schedule(RandomAccessRequest &request)
00077 {
00078 assert(isStarted());
00079
00080
00081
00082 FileSize cbOffset = request.cbOffset;
00083 RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList);
00084 while (bindingMutator) {
00085 RandomAccessRequestBinding *pBinding = bindingMutator.detach();
00086 LARGE_INTEGER largeInt;
00087 largeInt.QuadPart = cbOffset;
00088 pBinding->Offset = largeInt.LowPart;
00089 pBinding->OffsetHigh = largeInt.HighPart;
00090 BOOL bCompleted;
00091 if (request.type == RandomAccessRequest::READ) {
00092 bCompleted = ReadFile(
00093 HANDLE(request.pDevice->getHandle()),
00094 pBinding->getBuffer(),
00095 pBinding->getBufferSize(),
00096 NULL,
00097 pBinding);
00098 } else {
00099 bCompleted = WriteFile(
00100 HANDLE(request.pDevice->getHandle()),
00101 pBinding->getBuffer(),
00102 pBinding->getBufferSize(),
00103 NULL,
00104 pBinding);
00105 }
00106 if (!bCompleted) {
00107 if (GetLastError() != ERROR_IO_PENDING) {
00108 pBinding->notifyTransferCompletion(false);
00109 }
00110 }
00111 cbOffset += pBinding->getBufferSize();
00112 }
00113 assert(cbOffset == request.cbOffset + request.cbTransfer);
00114
00115 return true;
00116 }
00117
00118 void IoCompletionPortScheduler::stop()
00119 {
00120 assert(isStarted());
00121
00122 quit = true;
00123
00124
00125
00126 for (uint i = 0; i < threads.size(); ++i) {
00127 if (!PostQueuedCompletionStatus(hCompletionPort,0,0,NULL)) {
00128 throw SysCallExcn("PostQueuedCompletionStatus failed");
00129 }
00130 }
00131
00132 for (uint i = 0; i < threads.size(); ++i) {
00133 threads[i]->join();
00134 deleteAndNullify(threads[i]);
00135 }
00136 threads.clear();
00137 }
00138
00139 void IoCompletionPortThread::run()
00140 {
00141 DWORD cbTransfer;
00142 ULONG_PTR pUnused;
00143 OVERLAPPED *pOverlapped;
00144 for (;;) {
00145 BOOL rc = GetQueuedCompletionStatus(
00146 scheduler.hCompletionPort,
00147 &cbTransfer,
00148 &pUnused,
00149 &pOverlapped,
00150 INFINITE);
00151 if (scheduler.quit) {
00152 return;
00153 }
00154 RandomAccessRequestBinding *pBinding =
00155 static_cast<RandomAccessRequestBinding *>(pOverlapped);
00156 if (rc) {
00157 assert(cbTransfer == pBinding->getBufferSize());
00158 }
00159 pBinding->notifyTransferCompletion(rc);
00160 }
00161 }
00162
00163 void IoCompletionPortScheduler::registerDevice(
00164 SharedRandomAccessDevice pDevice)
00165 {
00166 int hFile = pDevice->getHandle();
00167 if (hFile == -1) {
00168 return;
00169 }
00170 if (!CreateIoCompletionPort(
00171 HANDLE(hFile),
00172 hCompletionPort,
00173 0,
00174 threads.size()))
00175 {
00176 throw SysCallExcn("CreateIoCompletionPort failed for device");
00177 }
00178
00179
00180 }
00181
00182 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/device/IoCompletionPortScheduler.cpp#10 $");
00183
00184 #endif
00185
00186