AioSignalScheduler.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/device/AioSignalScheduler.cpp#11 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 1999-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 
00026 #ifdef USE_AIO_H
00027 
00028 #include "fennel/device/RandomAccessDevice.h"
00029 #include "fennel/device/AioSignalScheduler.h"
00030 #include "fennel/device/DeviceAccessSchedulerParams.h"
00031 #include "fennel/synch/Thread.h"
00032 
00033 #include <errno.h>
00034 #include <sys/signal.h>
00035 
00036 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/device/AioSignalScheduler.cpp#11 $");
00037 
00038 class AioSignalHandlerThread : public Thread
00039 {
00040     AioSignalScheduler &scheduler;
00041 public:
00042     AioSignalHandlerThread(AioSignalScheduler &schedulerInit)
00043         : scheduler(schedulerInit)
00044     {
00045     }
00046     virtual void run();
00047 };
00048 
00049 static void aio_handler(int,siginfo_t *pSiginfo,void *)
00050 {
00051     assert(pSiginfo->si_code == SI_ASYNCIO);
00052     RandomAccessRequestBinding *pBinding =
00053         static_cast<RandomAccessRequestBinding *>(pSiginfo->si_value.sival_ptr);
00054 
00055     // static_cast assigned to lpBinding is a workaround
00056     // for a gcc bug that shows up on Ubuntu 8.04 when
00057     // passing pBinding to aio_* methods
00058     aiocb *lpBinding = static_cast<aiocb *>(pBinding);
00059 
00060     int rc = aio_error(lpBinding);
00061     if (rc != EINPROGRESS) {
00062         rc = aio_return(lpBinding);
00063         pBinding->notifyTransferCompletion(rc >= 0);
00064     }
00065     // TODO:  chain?
00066 }
00067 
00068 AioSignalScheduler::AioSignalScheduler(
00069     DeviceAccessSchedulerParams const &params)
00070 {
00071     // TODO:  pass params.maxSimultaneousRequests on to OS
00072 
00073     // block signal in this thread so that child threads will also have it
00074     // blocked
00075     int rc;
00076     sigset_t mask;
00077     sigemptyset(&mask);
00078     sigaddset(&mask,SIGRTMIN);
00079     rc = pthread_sigmask(SIG_BLOCK,&mask,NULL);
00080     assert(!rc);
00081 
00082     // TODO:  come up with a way to ensure signal is blocked in all threads
00083     // except the one spawned below
00084 
00085     struct sigaction sa;
00086     sa.sa_flags = SA_SIGINFO;
00087     sa.sa_sigaction = aio_handler;
00088     sigemptyset(&(sa.sa_mask));
00089     rc = sigaction(SIGRTMIN,&sa,&saOld);
00090     assert(!rc);
00091 
00092     quit = false;
00093     for (uint i = 0; i < params.nThreads; ++i) {
00094         AioSignalHandlerThread *pThread = new AioSignalHandlerThread(*this);
00095         pThread->start();
00096         threads.push_back(pThread);
00097     }
00098 }
00099 
00100 AioSignalScheduler::~AioSignalScheduler()
00101 {
00102     assert(!isStarted());
00103 }
00104 
00105 bool AioSignalScheduler::schedule(RandomAccessRequest &request)
00106 {
00107     assert(isStarted());
00108 
00109     int rc;
00110 
00111     // TODO: use lio_listio instead?  but then is it possible to get
00112     // individual notifications?  Or keep chain and notify all at once?
00113     request.pDevice->prepareTransfer(request);
00114     RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList);
00115     while (bindingMutator) {
00116         RandomAccessRequestBinding *pBinding = bindingMutator.detach();
00117         pBinding->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
00118         pBinding->aio_sigevent.sigev_signo = SIGRTMIN;
00119         pBinding->aio_sigevent.sigev_value.sival_ptr = pBinding;
00120 
00121         // static_cast assigned to lpBinding is a workaround
00122         // for a gcc bug that shows up on Ubuntu 8.04 when
00123         // passing pBinding to aio_* methods
00124         aiocb *lpBinding = static_cast<aiocb *>(pBinding);
00125 
00126         if (request.type == RandomAccessRequest::READ) {
00127             rc = aio_read(lpBinding);
00128         } else {
00129             rc = aio_write(lpBinding);
00130         }
00131         assert(!rc);
00132     }
00133 
00134     return true;
00135 }
00136 
00137 void AioSignalScheduler::stop()
00138 {
00139     assert(isStarted());
00140 
00141     StrictMutexGuard guard(mutex);
00142     quit = true;
00143     quitCondition.notify_all();
00144     guard.unlock();
00145 
00146     for (uint i = 0; i < threads.size(); ++i) {
00147         threads[i]->join();
00148         deleteAndNullify(threads[i]);
00149     }
00150     threads.clear();
00151 
00152     int rc = sigaction(SIGRTMIN,&saOld,NULL);
00153     assert(!rc);
00154 }
00155 
00156 void AioSignalHandlerThread::run()
00157 {
00158     int rc;
00159 
00160     // unblock signal in this thread only
00161     sigset_t mask;
00162     sigemptyset(&mask);
00163     sigaddset(&mask,SIGRTMIN);
00164     rc = pthread_sigmask(SIG_UNBLOCK,&mask,NULL);
00165     assert(!rc);
00166 
00167     // NOTE: had to boost priority of this thread to get signal handler
00168     // to run frequently.
00169 #ifdef sun
00170     int policy;
00171     sched_param param;
00172     rc = pthread_getschedparam(pthread_self(),&policy,&param);
00173     assert(!rc);
00174     param.sched_priority++;
00175     rc = pthread_setschedparam(pthread_self(),SCHED_RR,&param);
00176     assert(!rc);
00177 #endif
00178 
00179     // NOTE: using a condition variable causes the signal handler
00180     // to run too infrequently.  Try again after switching threading models.
00181     while (!scheduler.quit) {
00182         sleep(1);
00183     }
00184 }
00185 
00186 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/device/AioSignalScheduler.cpp#11 $");
00187 
00188 #endif
00189 
00190 // End AioSignalScheduler.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1