AioPollingScheduler.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/device/AioPollingScheduler.cpp#12 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 
00026 #ifdef USE_AIO_H
00027 
00028 #include "fennel/device/AioPollingScheduler.h"
00029 #include "fennel/device/DeviceAccessSchedulerParams.h"
00030 #include "fennel/device/RandomAccessDevice.h"
00031 #include <errno.h>
00032 
00033 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/device/AioPollingScheduler.cpp#12 $");
00034 
00035 AioPollingScheduler::AioPollingScheduler(
00036     DeviceAccessSchedulerParams const &)
00037 {
00038     // TODO:  pass params.maxRequests on to OS, and use
00039     // params.nThreads
00040     quit = false;
00041     Thread::start();
00042 }
00043 
00044 AioPollingScheduler::~AioPollingScheduler()
00045 {
00046 }
00047 
00048 bool AioPollingScheduler::schedule(RandomAccessRequest &request)
00049 {
00050     StrictMutexGuard guard(mutex);
00051     uint iFirst = newRequests.size();
00052     request.pDevice->prepareTransfer(request);
00053     RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList);
00054     while (bindingMutator) {
00055         RandomAccessRequestBinding *pBinding = bindingMutator.detach();
00056         pBinding->aio_sigevent.sigev_notify = SIGEV_NONE;
00057         newRequests.push_back(pBinding);
00058     }
00059     aiocb **pFirst = &(newRequests.front()) + iFirst;
00060     int rc = lio_listio(LIO_NOWAIT,pFirst,newRequests.size() - iFirst,NULL);
00061     // TODO:  handle error cases
00062     assert(rc == 0);
00063     newRequestPending.notify_all();
00064     return true;
00065 }
00066 
00067 void AioPollingScheduler::stop()
00068 {
00069     StrictMutexGuard guard(mutex);
00070     quit = true;
00071     newRequestPending.notify_all();
00072     guard.unlock();
00073     Thread::join();
00074 }
00075 
00076 void AioPollingScheduler::run()
00077 {
00078     int rc;
00079     struct timespec ts;
00080     // poll every tenth of a millisecond
00081     // TODO:  determine a reasonable default, or adjust this
00082     // dynamically?
00083     ts.tv_sec = 0;
00084     ts.tv_nsec = 100000;
00085     for (;;) {
00086         StrictMutexGuard guard(mutex);
00087         while (!newRequests.size()) {
00088             if (quit) {
00089                 return;
00090             }
00091             newRequestPending.wait(guard);
00092         }
00093         currentRequests.resize(newRequests.size());
00094         std::copy(
00095             newRequests.begin(),
00096             newRequests.end(),
00097             currentRequests.begin());
00098         newRequests.clear();
00099         guard.unlock();
00100         do {
00101             // REVIEW jvs 4-Aug-2004:  Using &front like this is not portable.
00102             rc = aio_suspend(
00103                 &(currentRequests.front()),
00104                 currentRequests.size(),
00105                 &ts);
00106             if (rc) {
00107                 switch (errno) {
00108                 case EAGAIN:
00109                 case EINTR:
00110                     continue;
00111                 default:
00112                     std::cerr << rc << std::endl;
00113                     permAssert(false);
00114                 }
00115             }
00116         } while (false);
00117         // currentRequests does not need a lock, since this thread is the only
00118         // one which manipulates it.  And the lock cannot be held when
00119         // notifyTransferCompletion is called, otherwise deadlock is possible.
00120         // However, access to newRequests does require a lock, since other
00121         // threads may be calling schedule, so use a fine-grained lock on it.
00122         for (uint i = 0; i < currentRequests.size(); ++i) {
00123             aiocb *pcb = currentRequests[i];
00124             RandomAccessRequestBinding *pBinding =
00125                 static_cast<RandomAccessRequestBinding *>(pcb);
00126             rc = aio_error(pcb);
00127             if (rc == EINPROGRESS) {
00128                 guard.lock();
00129                 // cout << "EINPROGRESS " << pcb->aio_offset << std::endl;
00130                 newRequests.push_back(pcb);
00131                 guard.unlock();
00132             } else {
00133                 rc = aio_return(pcb);
00134                 // guard.lock();
00135                 // cout << "complete " << pcb->aio_offset << " " << rc
00136                 // << std::endl;
00137                 // guard.unlock();
00138                 pBinding->notifyTransferCompletion(rc >= 0);
00139             }
00140         }
00141         currentRequests.clear();
00142     }
00143 }
00144 
00145 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/device/AioPollingScheduler.cpp#12 $");
00146 
00147 #endif
00148 
00149 // End AioPollingScheduler.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1