00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/exec/ExecStreamBufAccessor.h"
00025 #include "fennel/exec/ExecStreamGraph.h"
00026 #include "fennel/farrago/JavaTransformExecStream.h"
00027 #include "fennel/farrago/JniUtil.h"
00028 
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/JavaTransformExecStream.cpp#10 $");
00030 
00031 JavaTransformExecStreamParams::JavaTransformExecStreamParams()
00032 {
00033     outputTupleFormat = TUPLE_FORMAT_STANDARD;
00034     javaClassName = "";
00035 }
00036 
00037 JavaTransformExecStream::JavaTransformExecStream()
00038 {
00039     pStreamGraphHandle = NULL;
00040     outputByteBuffer1 = NULL;
00041     outputByteBuffer2 = NULL;
00042     pBuffer1 = NULL;
00043     pBuffer2 = NULL;
00044     farragoTransform = NULL;
00045 }
00046 
00047 JavaTransformExecStream::~JavaTransformExecStream()
00048 {
00049 }
00050 
00051 void JavaTransformExecStream::setInputBufAccessors(
00052     std::vector<SharedExecStreamBufAccessor> const &inAccessorsInit)
00053 {
00054     inAccessors = inAccessorsInit;
00055 }
00056 
00057 void JavaTransformExecStream::setOutputBufAccessors(
00058     std::vector<SharedExecStreamBufAccessor> const &outAccessors)
00059 {
00060     assert(outAccessors.size() <= 1);
00061 
00062     if (outAccessors.size() > 0) {
00063         pOutAccessor = outAccessors[0];
00064     }
00065 }
00066 
00067 void JavaTransformExecStream::prepare(
00068     JavaTransformExecStreamParams const ¶ms)
00069 {
00070     ExecStream::prepare(params);
00071 
00072     if (pOutAccessor) {
00073         assert(pOutAccessor->getProvision() == getOutputBufProvision());
00074         if (pOutAccessor->getTupleDesc().empty()) {
00075             assert(!params.outputTupleDesc.empty());
00076             pOutAccessor->setTupleShape(
00077                 params.outputTupleDesc,
00078                 params.outputTupleFormat);
00079         }
00080     }
00081 
00082     for (uint i = 0; i < inAccessors.size(); ++i) {
00083         assert(inAccessors[i]->getProvision() == getInputBufProvision());
00084     }
00085 
00086     javaClassName = params.javaClassName;
00087     pStreamGraphHandle = params.pStreamGraphHandle;
00088 }
00089 
00090 void JavaTransformExecStream::open(bool restart)
00091 {
00092     FENNEL_TRACE(TRACE_FINER, "open" << (restart ? " (restart)" : ""));
00093     ExecStream::open(restart);
00094 
00095     JniEnvAutoRef pEnv;
00096     if (restart) {
00097         if (pOutAccessor) {
00098             pOutAccessor->clear();
00099         }
00100 
00101         
00102         for (uint i = 0; i < inAccessors.size(); ++i) {
00103             inAccessors[i]->clear();
00104             pGraph->getStreamInput(getStreamId(),i)->open(true);
00105         }
00106 
00107         assert(farragoTransform);
00108         pEnv->CallVoidMethod(
00109             farragoTransform,
00110             JniUtil::methFarragoTransformRestart,
00111             NULL);
00112         return;
00113     }
00114 
00115     
00116     FENNEL_TRACE(TRACE_FINER, "finding java peer, class " << javaClassName);
00117     jobject o =
00118         pEnv->CallObjectMethod(
00119             pStreamGraphHandle->javaRuntimeContext,
00120             JniUtil::methFarragoRuntimeContextFindFarragoTransform,
00121             pEnv->NewStringUTF(javaClassName.c_str()));
00122     assert(o);
00123     farragoTransform = pEnv->NewGlobalRef(o);
00124 }
00125 
00126 
00127 ExecStreamResult JavaTransformExecStream::execute(
00128     ExecStreamQuantum const &quantum)
00129 {
00130     FENNEL_TRACE(TRACE_FINEST, "execute");
00131 
00132     if (pOutAccessor) {
00133         switch (pOutAccessor->getState()) {
00134         case EXECBUF_NONEMPTY:
00135         case EXECBUF_OVERFLOW:
00136             FENNEL_TRACE(TRACE_FINER, "overflow");
00137             return EXECRC_BUF_OVERFLOW;
00138         case EXECBUF_EOS:
00139             FENNEL_TRACE(TRACE_FINER, "eos");
00140             return EXECRC_EOS;
00141         default:
00142             break;
00143         }
00144     }
00145 
00146     checkEmptyInputs();
00147 
00148     jlong jquantum = static_cast<jlong>(quantum.nTuplesMax);
00149     JniEnvAutoRef pEnv;
00150     assert(farragoTransform);
00151     PBuffer pBuffer;
00152     if (pOutAccessor) {
00153         pBuffer = pOutAccessor->getProductionStart();
00154         if (!outputByteBuffer1) {
00155             outputByteBuffer1 = pEnv->NewDirectByteBuffer(
00156                 pBuffer,
00157                 pOutAccessor->getProductionAvailable());
00158             outputByteBuffer1 = pEnv->NewGlobalRef(outputByteBuffer1);
00159             pBuffer1 = pBuffer;
00160         } else if (!outputByteBuffer2) {
00161             if (pBuffer1 != pBuffer) {
00162                 outputByteBuffer2 = pEnv->NewDirectByteBuffer(
00163                     pBuffer,
00164                     pOutAccessor->getProductionAvailable());
00165                 outputByteBuffer2 = pEnv->NewGlobalRef(outputByteBuffer2);
00166                 pBuffer2 = pBuffer;
00167             }
00168         }
00169     } else {
00170         pBuffer = NULL;
00171     }
00172 
00173     
00174     
00175     
00176     assert((pBuffer == pBuffer1) || (pBuffer == pBuffer2));
00177 
00178     
00179     
00180     
00181     
00182 
00183     int cb = pEnv->CallIntMethod(
00184         farragoTransform,
00185         JniUtil::methFarragoTransformExecute,
00186         (pBuffer == pBuffer1) ? outputByteBuffer1 : outputByteBuffer2,
00187         jquantum);
00188 
00189     if (cb > 0) {
00190         assert(pOutAccessor);
00191         pOutAccessor->produceData(pBuffer + cb);
00192         FENNEL_TRACE(TRACE_FINER, "wrote " << cb << " bytes");
00193         return EXECRC_BUF_OVERFLOW;
00194     } else if (cb < 0) {
00195         FENNEL_TRACE(TRACE_FINER, "underflow");
00196         checkEmptyInputs();
00197         
00198         
00199         return EXECRC_BUF_UNDERFLOW;
00200     } else {
00201         FENNEL_TRACE(TRACE_FINER, "marking EOS");
00202         if (pOutAccessor) {
00203             pOutAccessor->markEOS();
00204         }
00205         return EXECRC_EOS;
00206     }
00207 }
00208 
00209 void JavaTransformExecStream::checkEmptyInputs()
00210 {
00211     for (uint i = 0; i < inAccessors.size(); ++i) {
00212         SharedExecStreamBufAccessor inAccessor = inAccessors[i];
00213         if (inAccessor->getState() == EXECBUF_EMPTY) {
00214             inAccessor->requestProduction();
00215         }
00216     }
00217 }
00218 
00219 void JavaTransformExecStream::closeImpl()
00220 {
00221     JniEnvAutoRef pEnv;
00222     if (farragoTransform) {
00223         pEnv->DeleteGlobalRef(farragoTransform);
00224         farragoTransform = NULL;
00225     }
00226     if (outputByteBuffer1) {
00227         pEnv->DeleteGlobalRef(outputByteBuffer1);
00228         outputByteBuffer1 = NULL;
00229     }
00230     if (outputByteBuffer2) {
00231         pEnv->DeleteGlobalRef(outputByteBuffer2);
00232         outputByteBuffer2 = NULL;
00233     }
00234     pBuffer1 = NULL;
00235     pBuffer2 = NULL;
00236     ExecStream::closeImpl();
00237 }
00238 
00239 ExecStreamBufProvision JavaTransformExecStream::getInputBufProvision() const
00240 {
00241     return BUFPROV_PRODUCER;
00242 }
00243 
00244 ExecStreamBufProvision JavaTransformExecStream::getOutputBufProvision() const
00245 {
00246     return BUFPROV_CONSUMER;
00247 }
00248 
00249 ExecStreamBufProvision JavaTransformExecStream::getOutputBufConversion() const
00250 {
00251     
00252     
00253     
00254     
00255     
00256     return BUFPROV_PRODUCER;
00257 }
00258 
00259 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/JavaTransformExecStream.cpp#10 $");
00260 
00261