IoCompletionPortScheduler.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/device/IoCompletionPortScheduler.cpp#10 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 
00026 #ifdef __MSVC__
00027 
00028 #include "fennel/device/RandomAccessDevice.h"
00029 #include "fennel/device/IoCompletionPortScheduler.h"
00030 #include "fennel/device/DeviceAccessSchedulerParams.h"
00031 #include "fennel/common/SysCallExcn.h"
00032 #include "fennel/synch/Thread.h"
00033 
00034 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/device/IoCompletionPortScheduler.cpp#10 $");
00035 
00036 class IoCompletionPortThread : public Thread
00037 {
00038     IoCompletionPortScheduler &scheduler;
00039 public:
00040     IoCompletionPortThread(IoCompletionPortScheduler &schedulerInit)
00041         : scheduler(schedulerInit)
00042     {
00043     }
00044     virtual void run();
00045 };
00046 
00047 IoCompletionPortScheduler::IoCompletionPortScheduler(
00048     DeviceAccessSchedulerParams const &params)
00049 {
00050     quit = false;
00051 
00052     hCompletionPort = CreateIoCompletionPort(
00053         INVALID_HANDLE_VALUE,
00054         NULL,
00055         0,
00056         params.nThreads);
00057     if (!hCompletionPort) {
00058         throw SysCallExcn("CreateIoCompletionPort failed for scheduler");
00059     }
00060 
00061     for (uint i = 0; i < params.nThreads; ++i) {
00062         IoCompletionPortThread *pThread = new IoCompletionPortThread(*this);
00063         pThread->start();
00064         threads.push_back(pThread);
00065     }
00066 }
00067 
00068 IoCompletionPortScheduler::~IoCompletionPortScheduler()
00069 {
00070     assert(!isStarted());
00071     if (!CloseHandle(hCompletionPort)) {
00072         throw SysCallExcn("CloseHandle failed for IoCompletionPort");
00073     }
00074 }
00075 
00076 bool IoCompletionPortScheduler::schedule(RandomAccessRequest &request)
00077 {
00078     assert(isStarted());
00079 
00080     // TODO:  use ReadFileScatter/WriteFileGather
00081 
00082     FileSize cbOffset = request.cbOffset;
00083     RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList);
00084     while (bindingMutator) {
00085         RandomAccessRequestBinding *pBinding = bindingMutator.detach();
00086         LARGE_INTEGER largeInt;
00087         largeInt.QuadPart = cbOffset;
00088         pBinding->Offset = largeInt.LowPart;
00089         pBinding->OffsetHigh = largeInt.HighPart;
00090         BOOL bCompleted;
00091         if (request.type == RandomAccessRequest::READ) {
00092             bCompleted = ReadFile(
00093                 HANDLE(request.pDevice->getHandle()),
00094                 pBinding->getBuffer(),
00095                 pBinding->getBufferSize(),
00096                 NULL,
00097                 pBinding);
00098         } else {
00099             bCompleted = WriteFile(
00100                 HANDLE(request.pDevice->getHandle()),
00101                 pBinding->getBuffer(),
00102                 pBinding->getBufferSize(),
00103                 NULL,
00104                 pBinding);
00105         }
00106         if (!bCompleted) {
00107             if (GetLastError() != ERROR_IO_PENDING) {
00108                 pBinding->notifyTransferCompletion(false);
00109             }
00110         }
00111         cbOffset += pBinding->getBufferSize();
00112     }
00113     assert(cbOffset == request.cbOffset + request.cbTransfer);
00114 
00115     return true;
00116 }
00117 
00118 void IoCompletionPortScheduler::stop()
00119 {
00120     assert(isStarted());
00121 
00122     quit = true;
00123 
00124     // post dummy wakeup notifications; threads will see these and
00125     // exit
00126     for (uint i = 0; i < threads.size(); ++i) {
00127         if (!PostQueuedCompletionStatus(hCompletionPort,0,0,NULL)) {
00128             throw SysCallExcn("PostQueuedCompletionStatus failed");
00129         }
00130     }
00131 
00132     for (uint i = 0; i < threads.size(); ++i) {
00133         threads[i]->join();
00134         deleteAndNullify(threads[i]);
00135     }
00136     threads.clear();
00137 }
00138 
00139 void IoCompletionPortThread::run()
00140 {
00141     DWORD cbTransfer;
00142     ULONG_PTR pUnused;
00143     OVERLAPPED *pOverlapped;
00144     for (;;) {
00145         BOOL rc = GetQueuedCompletionStatus(
00146             scheduler.hCompletionPort,
00147             &cbTransfer,
00148             &pUnused,
00149             &pOverlapped,
00150             INFINITE);
00151         if (scheduler.quit) {
00152             return;
00153         }
00154         RandomAccessRequestBinding *pBinding =
00155             static_cast<RandomAccessRequestBinding *>(pOverlapped);
00156         if (rc) {
00157             assert(cbTransfer == pBinding->getBufferSize());
00158         }
00159         pBinding->notifyTransferCompletion(rc);
00160     }
00161 }
00162 
00163 void IoCompletionPortScheduler::registerDevice(
00164     SharedRandomAccessDevice pDevice)
00165 {
00166     int hFile = pDevice->getHandle();
00167     if (hFile == -1) {
00168         return;
00169     }
00170     if (!CreateIoCompletionPort(
00171             HANDLE(hFile),
00172             hCompletionPort,
00173             0,
00174             threads.size()))
00175     {
00176         throw SysCallExcn("CreateIoCompletionPort failed for device");
00177     }
00178 
00179     // REVIEW:  is it OK to do nothing for unregister?
00180 }
00181 
00182 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/device/IoCompletionPortScheduler.cpp#10 $");
00183 
00184 #endif
00185 
00186 // End IoCompletionPortScheduler.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1