00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/farrago/JavaSinkExecStream.h"
00026 #include "fennel/farrago/JniUtil.h"
00027 #include "fennel/exec/ExecStreamGraph.h"
00028 #include "fennel/exec/ExecStreamScheduler.h"
00029 #include "fennel/exec/ExecStreamBufAccessor.h"
00030 #include <iostream>
00031
00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/JavaSinkExecStream.cpp#15 $");
00033
00034 JavaSinkExecStream::JavaSinkExecStream()
00035 {
00036 lastResult = EXECRC_QUANTUM_EXPIRED;
00037 pStreamGraphHandle = NULL;
00038 javaFennelPipeTupleIterId = 0;
00039 javaFennelPipeTupleIter = NULL;
00040 }
00041
00042 void JavaSinkExecStream::prepare(JavaSinkExecStreamParams const ¶ms)
00043 {
00044 SingleInputExecStream::prepare(params);
00045 pStreamGraphHandle = params.pStreamGraphHandle;
00046 javaFennelPipeTupleIterId = params.javaFennelPipeTupleIterId;
00047
00048 JniEnvAutoRef pEnv;
00049 jclass classFennelPipeTupleIter = pEnv->FindClass(
00050 "net/sf/farrago/runtime/FennelPipeTupleIter");
00051 assert(classFennelPipeTupleIter);
00052 methFennelPipeTupleIter_write = pEnv->GetMethodID(
00053 classFennelPipeTupleIter, "write", "(Ljava/nio/ByteBuffer;I)V");
00054 assert(methFennelPipeTupleIter_write);
00055 methFennelPipeTupleIter_getByteBuffer = pEnv->GetMethodID(
00056 classFennelPipeTupleIter, "getByteBuffer", "(I)Ljava/nio/ByteBuffer;");
00057 assert(methFennelPipeTupleIter_getByteBuffer);
00058
00059 jclass classByteBuffer = pEnv->FindClass("java/nio/ByteBuffer");
00060 assert(classByteBuffer);
00061 methByteBuffer_array =
00062 pEnv->GetMethodID(classByteBuffer, "array", "()[B");
00063 assert(methByteBuffer_array);
00064 }
00065
00066 void JavaSinkExecStream::open(bool restart)
00067 {
00068 FENNEL_TRACE(TRACE_FINE, "open");
00069 SingleInputExecStream::open(restart);
00070
00071
00072 JniEnvAutoRef pEnv;
00073 jlong hJavaFennelPipeTupleIter = pEnv->CallLongMethod(
00074 pStreamGraphHandle->javaRuntimeContext,
00075 JniUtil::methGetJavaStreamHandle,
00076 javaFennelPipeTupleIterId);
00077 javaFennelPipeTupleIter =
00078 CmdInterpreter::getObjectFromLong(hJavaFennelPipeTupleIter);
00079 assert(javaFennelPipeTupleIter);
00080 }
00081
00082 ExecStreamResult JavaSinkExecStream::execute(ExecStreamQuantum const &)
00083 {
00084 ExecStreamBufAccessor &inAccessor = *pInAccessor;
00085 switch (inAccessor.getState()) {
00086 case EXECBUF_EMPTY:
00087
00088
00089
00090 FENNEL_TRACE(TRACE_FINE, "no input");
00091 return (lastResult = EXECRC_BUF_UNDERFLOW);
00092 case EXECBUF_EOS:
00093
00094
00095
00096 FENNEL_TRACE(TRACE_FINE, "input EOS");
00097 assert(inAccessor.getConsumptionAvailable() == 0);
00098 break;
00099 default:
00100 FENNEL_TRACE(TRACE_FINER, "input rows:");
00101 getGraph().getScheduler()->
00102 traceStreamBufferContents(*this, inAccessor, TRACE_FINER);
00103 break;
00104 }
00105
00106 PConstBuffer pInBufStart = inAccessor.getConsumptionStart();
00107 PConstBuffer pInBufEnd = inAccessor.getConsumptionEnd();
00108 uint nbytes = pInBufEnd - pInBufStart;
00109 sendData(pInBufStart, nbytes);
00110 if (nbytes > 0) {
00111 inAccessor.consumeData(pInBufEnd);
00112 return (lastResult = EXECRC_BUF_UNDERFLOW);
00113 } else {
00114 return (lastResult = EXECRC_EOS);
00115 }
00116 }
00117
00119 void JavaSinkExecStream::sendData(PConstBuffer src, uint size)
00120 {
00121 JniEnvAutoRef pEnv;
00122
00123
00124
00125
00126 jobject javaByteBuf = pEnv->CallObjectMethod(
00127 javaFennelPipeTupleIter, methFennelPipeTupleIter_getByteBuffer, size);
00128 assert(javaByteBuf);
00129
00130
00131 stuffByteBuffer(javaByteBuf, src, size);
00132
00133
00134
00135 FENNEL_TRACE(
00136 TRACE_FINE,
00137 "call FennelPipeTupleIter.write " << size << " bytes");
00138 pEnv->CallVoidMethod(
00139 javaFennelPipeTupleIter, methFennelPipeTupleIter_write,
00140 javaByteBuf, size);
00141 FENNEL_TRACE(TRACE_FINE, "FennelPipeTupleIter.write returned");
00142 }
00143
00144 void JavaSinkExecStream::stuffByteBuffer(
00145 jobject byteBuffer,
00146 PConstBuffer src,
00147 uint size)
00148 {
00149
00150
00151 JniEnvAutoRef pEnv;
00152
00153
00154 jbyteArray bufBacking =
00155 static_cast<jbyteArray>(
00156 pEnv->CallObjectMethod(byteBuffer, methByteBuffer_array));
00157 jboolean copied;
00158 jbyte* dst = pEnv->GetByteArrayElements(bufBacking, &copied);
00159
00160
00161 memcpy(dst, src, size);
00162
00163
00164 if (isTracingLevel(TRACE_FINER)) {
00165
00166 ExecStreamBufAccessor ba;
00167 ba.setProvision(BUFPROV_PRODUCER);
00168 ba.setTupleShape(
00169 pInAccessor->getTupleDesc(), pInAccessor->getTupleFormat());
00170 ba.clear();
00171 PBuffer buf = (PBuffer) dst;
00172 ba.provideBufferForConsumption(buf, buf + size);
00173 FENNEL_TRACE(TRACE_FINER, "output rows:");
00174 getGraph().getScheduler()->
00175 traceStreamBufferContents(*this, ba, TRACE_FINER);
00176 }
00177
00178
00179 pEnv->ReleaseByteArrayElements(bufBacking, dst, 0);
00180 }
00181
00182
00183 void JavaSinkExecStream::closeImpl()
00184 {
00185 FENNEL_TRACE(TRACE_FINE, "closing");
00186
00187
00188 if (javaFennelPipeTupleIter && (lastResult != EXECRC_EOS)) {
00189 FixedBuffer dummy[1];
00190 sendData(dummy, 0);
00191 }
00192
00193 javaFennelPipeTupleIter = NULL;
00194 SingleInputExecStream::closeImpl();
00195 }
00196
00197 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/JavaSinkExecStream.cpp#15 $");
00198
00199