00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 
00025 #ifdef USE_LIBAIO_H
00026 
00027 #include "fennel/device/AioLinuxScheduler.h"
00028 #include "fennel/device/RandomAccessDevice.h"
00029 #include "fennel/device/DeviceAccessSchedulerParams.h"
00030 #include "fennel/common/SysCallExcn.h"
00031 #include <errno.h>
00032 
00033 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/device/AioLinuxScheduler.cpp#9 $");
00034 
00035 extern "C"
00036 DeviceAccessScheduler *newAioLinuxScheduler(
00037     DeviceAccessSchedulerParams const ¶ms)
00038 {
00039     return new AioLinuxScheduler(params);
00040 }
00041 
00042 AioLinuxScheduler::AioLinuxScheduler(
00043     DeviceAccessSchedulerParams const ¶ms)
00044 {
00045     quit = false;
00046     nRequestsOutstanding.clear();
00047     context = NULL;
00048     int rc = io_queue_init(params.maxRequests, &context);
00049     if (rc) {
00050         throw SysCallExcn("io_queue_init failed");
00051     }
00052 
00053     
00054     
00055     Thread::start();
00056 }
00057 
00058 inline bool AioLinuxScheduler::isStarted() const
00059 {
00060     return Thread::isStarted();
00061 }
00062 
00063 AioLinuxScheduler::~AioLinuxScheduler()
00064 {
00065     assert(!isStarted());
00066     assert(!nRequestsOutstanding);
00067     int rc = io_queue_release(context);
00068     if (rc) {
00069         throw SysCallExcn("io_queue_release failed");
00070     }
00071 }
00072 
00073 void AioLinuxScheduler::registerDevice(
00074     SharedRandomAccessDevice pDevice)
00075 {
00076     int hFile = pDevice->getHandle();
00077 
00078     
00079     
00080     int flags = fcntl(hFile, F_GETFL);
00081     fcntl(hFile, F_SETFL, flags | O_DIRECT);
00082 }
00083 
00084 bool AioLinuxScheduler::schedule(RandomAccessRequest &request)
00085 {
00086     assert(isStarted());
00087     request.pDevice->prepareTransfer(request);
00088     return submitRequests(request.bindingList);
00089 }
00090 
00091 bool AioLinuxScheduler::submitRequests(
00092     RandomAccessRequest::BindingList &bindingList)
00093 {
00094     iocb *requestsArray[bindingList.size()];
00095     iocb **requests = requestsArray;
00096 
00097     
00098     int n = 0;
00099     RandomAccessRequest::BindingListMutator bindingMutator(bindingList);
00100     for (; bindingMutator; ++n) {
00101         RandomAccessRequestBinding *pBinding = bindingMutator.detach();
00102         requests[n] = pBinding;
00103     }
00104 
00105     if (n == 0) {
00106         
00107         return true;
00108     }
00109 
00110     
00111     int rc = io_submit(context, n, requests);
00112     if (rc == -EAGAIN) {
00113         rc = 0;
00114     }
00115 
00116     if (rc < 0) {
00117         
00118         throw SysCallExcn("io_submit failed");
00119     }
00120 
00121     
00122     
00123     
00124     for (int i = 0; i < rc; ++i) {
00125         ++nRequestsOutstanding;
00126     }
00127 
00128     if (rc == n) {
00129         
00130         return true;
00131     } else {
00132         
00133         
00134         requests += rc;
00135         n -= rc;
00136         deferLeftoverRequests(requests, n);
00137         return false;
00138     }
00139 }
00140 
00141 void AioLinuxScheduler::deferLeftoverRequests(
00142     iocb **ppLeftovers,
00143     uint nLeftovers)
00144 {
00145     assert(nLeftovers > 0);
00146 
00147     
00148     RandomAccessRequest::BindingList bindingList;
00149 
00150     for (uint i = 0; i < nLeftovers; ++i) {
00151         RandomAccessRequestBinding *pBinding =
00152             static_cast<RandomAccessRequestBinding *>(ppLeftovers[i]);
00153         bindingList.push_back(*pBinding);
00154     }
00155 
00156     StrictMutexGuard deferredQueueGuard(deferredQueueMutex);
00157     deferredQueue.push_back(bindingList);
00158 }
00159 
00160 bool AioLinuxScheduler::retryDeferredRequests()
00161 {
00162     for (;;) {
00163         StrictMutexGuard deferredQueueGuard(deferredQueueMutex);
00164         if (deferredQueue.empty()) {
00165             
00166             return true;
00167         }
00168         RandomAccessRequest::BindingList bindingList = deferredQueue.front();
00169         deferredQueue.pop_front();
00170         
00171         deferredQueueGuard.unlock();
00172 
00173         bool success = submitRequests(bindingList);
00174         if (!success) {
00175             
00176             return false;
00177         }
00178     }
00179 }
00180 
00181 void AioLinuxScheduler::stop()
00182 {
00183     assert(isStarted());
00184     quit = true;
00185 
00186     Thread::join();
00187 }
00188 
00189 void AioLinuxScheduler::run()
00190 {
00191     while (nRequestsOutstanding || !quit) {
00192         io_event event;
00193         timespec ts;
00194 
00195         
00196         if (retryDeferredRequests()) {
00197             
00198             
00199             ts.tv_sec = 1;
00200             ts.tv_nsec = 0;
00201         } else {
00202             
00203             
00204             ts.tv_sec = 0;
00205             ts.tv_nsec = 1000000;
00206         }
00207 
00208         long rc = io_getevents(context, 1, 1, &event, &ts);
00209 
00210         
00211         
00212         
00213         if ((rc == 0) || (rc == -EINTR)) {
00214             
00215             continue;
00216         }
00217 
00218         if (rc != 1) {
00219             throw SysCallExcn("io_getevents failed");
00220         }
00221         RandomAccessRequestBinding *pBinding =
00222             static_cast<RandomAccessRequestBinding *>(event.obj);
00223         bool success = (pBinding->getBufferSize() == event.res)
00224             && !event.res2;
00225         pBinding->notifyTransferCompletion(success);
00226         --nRequestsOutstanding;
00227     }
00228 }
00229 
00230 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/device/AioLinuxScheduler.cpp#9 $");
00231 
00232 #endif
00233 
00234