00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/exec/ExecStreamBufAccessor.h"
00025 #include "fennel/exec/ExecStreamGraph.h"
00026 #include "fennel/farrago/JavaTransformExecStream.h"
00027 #include "fennel/farrago/JniUtil.h"
00028
00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/JavaTransformExecStream.cpp#10 $");
00030
00031 JavaTransformExecStreamParams::JavaTransformExecStreamParams()
00032 {
00033 outputTupleFormat = TUPLE_FORMAT_STANDARD;
00034 javaClassName = "";
00035 }
00036
00037 JavaTransformExecStream::JavaTransformExecStream()
00038 {
00039 pStreamGraphHandle = NULL;
00040 outputByteBuffer1 = NULL;
00041 outputByteBuffer2 = NULL;
00042 pBuffer1 = NULL;
00043 pBuffer2 = NULL;
00044 farragoTransform = NULL;
00045 }
00046
00047 JavaTransformExecStream::~JavaTransformExecStream()
00048 {
00049 }
00050
00051 void JavaTransformExecStream::setInputBufAccessors(
00052 std::vector<SharedExecStreamBufAccessor> const &inAccessorsInit)
00053 {
00054 inAccessors = inAccessorsInit;
00055 }
00056
00057 void JavaTransformExecStream::setOutputBufAccessors(
00058 std::vector<SharedExecStreamBufAccessor> const &outAccessors)
00059 {
00060 assert(outAccessors.size() <= 1);
00061
00062 if (outAccessors.size() > 0) {
00063 pOutAccessor = outAccessors[0];
00064 }
00065 }
00066
00067 void JavaTransformExecStream::prepare(
00068 JavaTransformExecStreamParams const ¶ms)
00069 {
00070 ExecStream::prepare(params);
00071
00072 if (pOutAccessor) {
00073 assert(pOutAccessor->getProvision() == getOutputBufProvision());
00074 if (pOutAccessor->getTupleDesc().empty()) {
00075 assert(!params.outputTupleDesc.empty());
00076 pOutAccessor->setTupleShape(
00077 params.outputTupleDesc,
00078 params.outputTupleFormat);
00079 }
00080 }
00081
00082 for (uint i = 0; i < inAccessors.size(); ++i) {
00083 assert(inAccessors[i]->getProvision() == getInputBufProvision());
00084 }
00085
00086 javaClassName = params.javaClassName;
00087 pStreamGraphHandle = params.pStreamGraphHandle;
00088 }
00089
00090 void JavaTransformExecStream::open(bool restart)
00091 {
00092 FENNEL_TRACE(TRACE_FINER, "open" << (restart ? " (restart)" : ""));
00093 ExecStream::open(restart);
00094
00095 JniEnvAutoRef pEnv;
00096 if (restart) {
00097 if (pOutAccessor) {
00098 pOutAccessor->clear();
00099 }
00100
00101
00102 for (uint i = 0; i < inAccessors.size(); ++i) {
00103 inAccessors[i]->clear();
00104 pGraph->getStreamInput(getStreamId(),i)->open(true);
00105 }
00106
00107 assert(farragoTransform);
00108 pEnv->CallVoidMethod(
00109 farragoTransform,
00110 JniUtil::methFarragoTransformRestart,
00111 NULL);
00112 return;
00113 }
00114
00115
00116 FENNEL_TRACE(TRACE_FINER, "finding java peer, class " << javaClassName);
00117 jobject o =
00118 pEnv->CallObjectMethod(
00119 pStreamGraphHandle->javaRuntimeContext,
00120 JniUtil::methFarragoRuntimeContextFindFarragoTransform,
00121 pEnv->NewStringUTF(javaClassName.c_str()));
00122 assert(o);
00123 farragoTransform = pEnv->NewGlobalRef(o);
00124 }
00125
00126
00127 ExecStreamResult JavaTransformExecStream::execute(
00128 ExecStreamQuantum const &quantum)
00129 {
00130 FENNEL_TRACE(TRACE_FINEST, "execute");
00131
00132 if (pOutAccessor) {
00133 switch (pOutAccessor->getState()) {
00134 case EXECBUF_NONEMPTY:
00135 case EXECBUF_OVERFLOW:
00136 FENNEL_TRACE(TRACE_FINER, "overflow");
00137 return EXECRC_BUF_OVERFLOW;
00138 case EXECBUF_EOS:
00139 FENNEL_TRACE(TRACE_FINER, "eos");
00140 return EXECRC_EOS;
00141 default:
00142 break;
00143 }
00144 }
00145
00146 checkEmptyInputs();
00147
00148 jlong jquantum = static_cast<jlong>(quantum.nTuplesMax);
00149 JniEnvAutoRef pEnv;
00150 assert(farragoTransform);
00151 PBuffer pBuffer;
00152 if (pOutAccessor) {
00153 pBuffer = pOutAccessor->getProductionStart();
00154 if (!outputByteBuffer1) {
00155 outputByteBuffer1 = pEnv->NewDirectByteBuffer(
00156 pBuffer,
00157 pOutAccessor->getProductionAvailable());
00158 outputByteBuffer1 = pEnv->NewGlobalRef(outputByteBuffer1);
00159 pBuffer1 = pBuffer;
00160 } else if (!outputByteBuffer2) {
00161 if (pBuffer1 != pBuffer) {
00162 outputByteBuffer2 = pEnv->NewDirectByteBuffer(
00163 pBuffer,
00164 pOutAccessor->getProductionAvailable());
00165 outputByteBuffer2 = pEnv->NewGlobalRef(outputByteBuffer2);
00166 pBuffer2 = pBuffer;
00167 }
00168 }
00169 } else {
00170 pBuffer = NULL;
00171 }
00172
00173
00174
00175
00176 assert((pBuffer == pBuffer1) || (pBuffer == pBuffer2));
00177
00178
00179
00180
00181
00182
00183 int cb = pEnv->CallIntMethod(
00184 farragoTransform,
00185 JniUtil::methFarragoTransformExecute,
00186 (pBuffer == pBuffer1) ? outputByteBuffer1 : outputByteBuffer2,
00187 jquantum);
00188
00189 if (cb > 0) {
00190 assert(pOutAccessor);
00191 pOutAccessor->produceData(pBuffer + cb);
00192 FENNEL_TRACE(TRACE_FINER, "wrote " << cb << " bytes");
00193 return EXECRC_BUF_OVERFLOW;
00194 } else if (cb < 0) {
00195 FENNEL_TRACE(TRACE_FINER, "underflow");
00196 checkEmptyInputs();
00197
00198
00199 return EXECRC_BUF_UNDERFLOW;
00200 } else {
00201 FENNEL_TRACE(TRACE_FINER, "marking EOS");
00202 if (pOutAccessor) {
00203 pOutAccessor->markEOS();
00204 }
00205 return EXECRC_EOS;
00206 }
00207 }
00208
00209 void JavaTransformExecStream::checkEmptyInputs()
00210 {
00211 for (uint i = 0; i < inAccessors.size(); ++i) {
00212 SharedExecStreamBufAccessor inAccessor = inAccessors[i];
00213 if (inAccessor->getState() == EXECBUF_EMPTY) {
00214 inAccessor->requestProduction();
00215 }
00216 }
00217 }
00218
00219 void JavaTransformExecStream::closeImpl()
00220 {
00221 JniEnvAutoRef pEnv;
00222 if (farragoTransform) {
00223 pEnv->DeleteGlobalRef(farragoTransform);
00224 farragoTransform = NULL;
00225 }
00226 if (outputByteBuffer1) {
00227 pEnv->DeleteGlobalRef(outputByteBuffer1);
00228 outputByteBuffer1 = NULL;
00229 }
00230 if (outputByteBuffer2) {
00231 pEnv->DeleteGlobalRef(outputByteBuffer2);
00232 outputByteBuffer2 = NULL;
00233 }
00234 pBuffer1 = NULL;
00235 pBuffer2 = NULL;
00236 ExecStream::closeImpl();
00237 }
00238
00239 ExecStreamBufProvision JavaTransformExecStream::getInputBufProvision() const
00240 {
00241 return BUFPROV_PRODUCER;
00242 }
00243
00244 ExecStreamBufProvision JavaTransformExecStream::getOutputBufProvision() const
00245 {
00246 return BUFPROV_CONSUMER;
00247 }
00248
00249 ExecStreamBufProvision JavaTransformExecStream::getOutputBufConversion() const
00250 {
00251
00252
00253
00254
00255
00256 return BUFPROV_PRODUCER;
00257 }
00258
00259 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/JavaTransformExecStream.cpp#10 $");
00260
00261