JavaTransformExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/farrago/JavaTransformExecStream.cpp#10 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2006-2009 The Eigenbase Project
00005 // Copyright (C) 2006-2009 SQLstream, Inc.
00006 // Copyright (C) 2006-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/exec/ExecStreamBufAccessor.h"
00025 #include "fennel/exec/ExecStreamGraph.h"
00026 #include "fennel/farrago/JavaTransformExecStream.h"
00027 #include "fennel/farrago/JniUtil.h"
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/JavaTransformExecStream.cpp#10 $");
00030 
00031 JavaTransformExecStreamParams::JavaTransformExecStreamParams()
00032 {
00033     outputTupleFormat = TUPLE_FORMAT_STANDARD;
00034     javaClassName = "";
00035 }
00036 
00037 JavaTransformExecStream::JavaTransformExecStream()
00038 {
00039     pStreamGraphHandle = NULL;
00040     outputByteBuffer1 = NULL;
00041     outputByteBuffer2 = NULL;
00042     pBuffer1 = NULL;
00043     pBuffer2 = NULL;
00044     farragoTransform = NULL;
00045 }
00046 
00047 JavaTransformExecStream::~JavaTransformExecStream()
00048 {
00049 }
00050 
00051 void JavaTransformExecStream::setInputBufAccessors(
00052     std::vector<SharedExecStreamBufAccessor> const &inAccessorsInit)
00053 {
00054     inAccessors = inAccessorsInit;
00055 }
00056 
00057 void JavaTransformExecStream::setOutputBufAccessors(
00058     std::vector<SharedExecStreamBufAccessor> const &outAccessors)
00059 {
00060     assert(outAccessors.size() <= 1);
00061 
00062     if (outAccessors.size() > 0) {
00063         pOutAccessor = outAccessors[0];
00064     }
00065 }
00066 
00067 void JavaTransformExecStream::prepare(
00068     JavaTransformExecStreamParams const &params)
00069 {
00070     ExecStream::prepare(params);
00071 
00072     if (pOutAccessor) {
00073         assert(pOutAccessor->getProvision() == getOutputBufProvision());
00074         if (pOutAccessor->getTupleDesc().empty()) {
00075             assert(!params.outputTupleDesc.empty());
00076             pOutAccessor->setTupleShape(
00077                 params.outputTupleDesc,
00078                 params.outputTupleFormat);
00079         }
00080     }
00081 
00082     for (uint i = 0; i < inAccessors.size(); ++i) {
00083         assert(inAccessors[i]->getProvision() == getInputBufProvision());
00084     }
00085 
00086     javaClassName = params.javaClassName;
00087     pStreamGraphHandle = params.pStreamGraphHandle;
00088 }
00089 
00090 void JavaTransformExecStream::open(bool restart)
00091 {
00092     FENNEL_TRACE(TRACE_FINER, "open" << (restart ? " (restart)" : ""));
00093     ExecStream::open(restart);
00094 
00095     JniEnvAutoRef pEnv;
00096     if (restart) {
00097         if (pOutAccessor) {
00098             pOutAccessor->clear();
00099         }
00100 
00101         // restart inputs
00102         for (uint i = 0; i < inAccessors.size(); ++i) {
00103             inAccessors[i]->clear();
00104             pGraph->getStreamInput(getStreamId(),i)->open(true);
00105         }
00106 
00107         assert(farragoTransform);
00108         pEnv->CallVoidMethod(
00109             farragoTransform,
00110             JniUtil::methFarragoTransformRestart,
00111             NULL);
00112         return;
00113     }
00114 
00115     // find java peer (a FarragoTransform)
00116     FENNEL_TRACE(TRACE_FINER, "finding java peer, class " << javaClassName);
00117     jobject o =
00118         pEnv->CallObjectMethod(
00119             pStreamGraphHandle->javaRuntimeContext,
00120             JniUtil::methFarragoRuntimeContextFindFarragoTransform,
00121             pEnv->NewStringUTF(javaClassName.c_str()));
00122     assert(o);
00123     farragoTransform = pEnv->NewGlobalRef(o);
00124 }
00125 
00126 
00127 ExecStreamResult JavaTransformExecStream::execute(
00128     ExecStreamQuantum const &quantum)
00129 {
00130     FENNEL_TRACE(TRACE_FINEST, "execute");
00131 
00132     if (pOutAccessor) {
00133         switch (pOutAccessor->getState()) {
00134         case EXECBUF_NONEMPTY:
00135         case EXECBUF_OVERFLOW:
00136             FENNEL_TRACE(TRACE_FINER, "overflow");
00137             return EXECRC_BUF_OVERFLOW;
00138         case EXECBUF_EOS:
00139             FENNEL_TRACE(TRACE_FINER, "eos");
00140             return EXECRC_EOS;
00141         default:
00142             break;
00143         }
00144     }
00145 
00146     checkEmptyInputs();
00147 
00148     jlong jquantum = static_cast<jlong>(quantum.nTuplesMax);
00149     JniEnvAutoRef pEnv;
00150     assert(farragoTransform);
00151     PBuffer pBuffer;
00152     if (pOutAccessor) {
00153         pBuffer = pOutAccessor->getProductionStart();
00154         if (!outputByteBuffer1) {
00155             outputByteBuffer1 = pEnv->NewDirectByteBuffer(
00156                 pBuffer,
00157                 pOutAccessor->getProductionAvailable());
00158             outputByteBuffer1 = pEnv->NewGlobalRef(outputByteBuffer1);
00159             pBuffer1 = pBuffer;
00160         } else if (!outputByteBuffer2) {
00161             if (pBuffer1 != pBuffer) {
00162                 outputByteBuffer2 = pEnv->NewDirectByteBuffer(
00163                     pBuffer,
00164                     pOutAccessor->getProductionAvailable());
00165                 outputByteBuffer2 = pEnv->NewGlobalRef(outputByteBuffer2);
00166                 pBuffer2 = pBuffer;
00167             }
00168         }
00169     } else {
00170         pBuffer = NULL;
00171     }
00172 
00173     // If this assertion fails, it means we're dealing with something
00174     // other than a ScratchBufferExecStream or DoubleBufferExecStream
00175     // immediately downstream.
00176     assert((pBuffer == pBuffer1) || (pBuffer == pBuffer2));
00177 
00178     // REVIEW jvs 18-Dec-2008:  Is it OK to pass NULL for the buffer
00179     // in the case where no outputs are defined?  There are no
00180     // unit tests demonstrating this pattern in Fennel, but
00181     // there should be.
00182 
00183     int cb = pEnv->CallIntMethod(
00184         farragoTransform,
00185         JniUtil::methFarragoTransformExecute,
00186         (pBuffer == pBuffer1) ? outputByteBuffer1 : outputByteBuffer2,
00187         jquantum);
00188 
00189     if (cb > 0) {
00190         assert(pOutAccessor);
00191         pOutAccessor->produceData(pBuffer + cb);
00192         FENNEL_TRACE(TRACE_FINER, "wrote " << cb << " bytes");
00193         return EXECRC_BUF_OVERFLOW;
00194     } else if (cb < 0) {
00195         FENNEL_TRACE(TRACE_FINER, "underflow");
00196         checkEmptyInputs();
00197         // TODO mb 10/28/08: return EXECRC_YIELD when executing in data-push
00198         // mode.
00199         return EXECRC_BUF_UNDERFLOW;
00200     } else {
00201         FENNEL_TRACE(TRACE_FINER, "marking EOS");
00202         if (pOutAccessor) {
00203             pOutAccessor->markEOS();
00204         }
00205         return EXECRC_EOS;
00206     }
00207 }
00208 
00209 void JavaTransformExecStream::checkEmptyInputs()
00210 {
00211     for (uint i = 0; i < inAccessors.size(); ++i) {
00212         SharedExecStreamBufAccessor inAccessor = inAccessors[i];
00213         if (inAccessor->getState() == EXECBUF_EMPTY) {
00214             inAccessor->requestProduction();
00215         }
00216     }
00217 }
00218 
00219 void JavaTransformExecStream::closeImpl()
00220 {
00221     JniEnvAutoRef pEnv;
00222     if (farragoTransform) {
00223         pEnv->DeleteGlobalRef(farragoTransform);
00224         farragoTransform = NULL;
00225     }
00226     if (outputByteBuffer1) {
00227         pEnv->DeleteGlobalRef(outputByteBuffer1);
00228         outputByteBuffer1 = NULL;
00229     }
00230     if (outputByteBuffer2) {
00231         pEnv->DeleteGlobalRef(outputByteBuffer2);
00232         outputByteBuffer2 = NULL;
00233     }
00234     pBuffer1 = NULL;
00235     pBuffer2 = NULL;
00236     ExecStream::closeImpl();
00237 }
00238 
00239 ExecStreamBufProvision JavaTransformExecStream::getInputBufProvision() const
00240 {
00241     return BUFPROV_PRODUCER;
00242 }
00243 
00244 ExecStreamBufProvision JavaTransformExecStream::getOutputBufProvision() const
00245 {
00246     return BUFPROV_CONSUMER;
00247 }
00248 
00249 ExecStreamBufProvision JavaTransformExecStream::getOutputBufConversion() const
00250 {
00251     // Although JavaTransformExecStream itself does not provide buffers,
00252     // it relies on having either ScratchBufferExecStream or
00253     // DoubleBufferExecStream available immediately downstream; it cannot
00254     // handle any other kind of buffers such as those from
00255     // SegBufferExecStream.
00256     return BUFPROV_PRODUCER;
00257 }
00258 
00259 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/JavaTransformExecStream.cpp#10 $");
00260 
00261 // End JavaTransformExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1