00001 /* 00002 // $Id: //open/dev/fennel/device/AioPollingScheduler.cpp#12 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // Portions Copyright (C) 1999-2009 John V. Sichi 00008 // 00009 // This program is free software; you can redistribute it and/or modify it 00010 // under the terms of the GNU General Public License as published by the Free 00011 // Software Foundation; either version 2 of the License, or (at your option) 00012 // any later version approved by The Eigenbase Project. 00013 // 00014 // This program is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 // GNU General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU General Public License 00020 // along with this program; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 */ 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 00026 #ifdef USE_AIO_H 00027 00028 #include "fennel/device/AioPollingScheduler.h" 00029 #include "fennel/device/DeviceAccessSchedulerParams.h" 00030 #include "fennel/device/RandomAccessDevice.h" 00031 #include <errno.h> 00032 00033 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/device/AioPollingScheduler.cpp#12 $"); 00034 00035 AioPollingScheduler::AioPollingScheduler( 00036 DeviceAccessSchedulerParams const &) 00037 { 00038 // TODO: pass params.maxRequests on to OS, and use 00039 // params.nThreads 00040 quit = false; 00041 Thread::start(); 00042 } 00043 00044 AioPollingScheduler::~AioPollingScheduler() 00045 { 00046 } 00047 00048 bool AioPollingScheduler::schedule(RandomAccessRequest &request) 00049 { 00050 StrictMutexGuard guard(mutex); 00051 uint iFirst = newRequests.size(); 00052 request.pDevice->prepareTransfer(request); 00053 RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList); 00054 while (bindingMutator) { 00055 RandomAccessRequestBinding *pBinding = bindingMutator.detach(); 00056 pBinding->aio_sigevent.sigev_notify = SIGEV_NONE; 00057 newRequests.push_back(pBinding); 00058 } 00059 aiocb **pFirst = &(newRequests.front()) + iFirst; 00060 int rc = lio_listio(LIO_NOWAIT,pFirst,newRequests.size() - iFirst,NULL); 00061 // TODO: handle error cases 00062 assert(rc == 0); 00063 newRequestPending.notify_all(); 00064 return true; 00065 } 00066 00067 void AioPollingScheduler::stop() 00068 { 00069 StrictMutexGuard guard(mutex); 00070 quit = true; 00071 newRequestPending.notify_all(); 00072 guard.unlock(); 00073 Thread::join(); 00074 } 00075 00076 void AioPollingScheduler::run() 00077 { 00078 int rc; 00079 struct timespec ts; 00080 // poll every tenth of a millisecond 00081 // TODO: determine a reasonable default, or adjust this 00082 // dynamically? 00083 ts.tv_sec = 0; 00084 ts.tv_nsec = 100000; 00085 for (;;) { 00086 StrictMutexGuard guard(mutex); 00087 while (!newRequests.size()) { 00088 if (quit) { 00089 return; 00090 } 00091 newRequestPending.wait(guard); 00092 } 00093 currentRequests.resize(newRequests.size()); 00094 std::copy( 00095 newRequests.begin(), 00096 newRequests.end(), 00097 currentRequests.begin()); 00098 newRequests.clear(); 00099 guard.unlock(); 00100 do { 00101 // REVIEW jvs 4-Aug-2004: Using &front like this is not portable. 00102 rc = aio_suspend( 00103 &(currentRequests.front()), 00104 currentRequests.size(), 00105 &ts); 00106 if (rc) { 00107 switch (errno) { 00108 case EAGAIN: 00109 case EINTR: 00110 continue; 00111 default: 00112 std::cerr << rc << std::endl; 00113 permAssert(false); 00114 } 00115 } 00116 } while (false); 00117 // currentRequests does not need a lock, since this thread is the only 00118 // one which manipulates it. And the lock cannot be held when 00119 // notifyTransferCompletion is called, otherwise deadlock is possible. 00120 // However, access to newRequests does require a lock, since other 00121 // threads may be calling schedule, so use a fine-grained lock on it. 00122 for (uint i = 0; i < currentRequests.size(); ++i) { 00123 aiocb *pcb = currentRequests[i]; 00124 RandomAccessRequestBinding *pBinding = 00125 static_cast<RandomAccessRequestBinding *>(pcb); 00126 rc = aio_error(pcb); 00127 if (rc == EINPROGRESS) { 00128 guard.lock(); 00129 // cout << "EINPROGRESS " << pcb->aio_offset << std::endl; 00130 newRequests.push_back(pcb); 00131 guard.unlock(); 00132 } else { 00133 rc = aio_return(pcb); 00134 // guard.lock(); 00135 // cout << "complete " << pcb->aio_offset << " " << rc 00136 // << std::endl; 00137 // guard.unlock(); 00138 pBinding->notifyTransferCompletion(rc >= 0); 00139 } 00140 } 00141 currentRequests.clear(); 00142 } 00143 } 00144 00145 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/device/AioPollingScheduler.cpp#12 $"); 00146 00147 #endif 00148 00149 // End AioPollingScheduler.cpp