JavaSinkExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/farrago/JavaSinkExecStream.cpp#15 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 // Portions Copyright (C) 2004-2009 John V. Sichi
00008 //
00009 // This program is free software; you can redistribute it and/or modify it
00010 // under the terms of the GNU General Public License as published by the Free
00011 // Software Foundation; either version 2 of the License, or (at your option)
00012 // any later version approved by The Eigenbase Project.
00013 //
00014 // This program is distributed in the hope that it will be useful,
00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 // GNU General Public License for more details.
00018 //
00019 // You should have received a copy of the GNU General Public License
00020 // along with this program; if not, write to the Free Software
00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 */
00023 
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/farrago/JavaSinkExecStream.h"
00026 #include "fennel/farrago/JniUtil.h"
00027 #include "fennel/exec/ExecStreamGraph.h"
00028 #include "fennel/exec/ExecStreamScheduler.h"
00029 #include "fennel/exec/ExecStreamBufAccessor.h"
00030 #include <iostream>
00031 
00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/JavaSinkExecStream.cpp#15 $");
00033 
00034 JavaSinkExecStream::JavaSinkExecStream()
00035 {
00036     lastResult = EXECRC_QUANTUM_EXPIRED; // neutral
00037     pStreamGraphHandle = NULL;
00038     javaFennelPipeTupleIterId = 0;
00039     javaFennelPipeTupleIter = NULL;
00040 }
00041 
00042 void JavaSinkExecStream::prepare(JavaSinkExecStreamParams const &params)
00043 {
00044     SingleInputExecStream::prepare(params);
00045     pStreamGraphHandle = params.pStreamGraphHandle;
00046     javaFennelPipeTupleIterId = params.javaFennelPipeTupleIterId;
00047 
00048     JniEnvAutoRef pEnv;
00049     jclass classFennelPipeTupleIter = pEnv->FindClass(
00050         "net/sf/farrago/runtime/FennelPipeTupleIter");
00051     assert(classFennelPipeTupleIter);
00052     methFennelPipeTupleIter_write = pEnv->GetMethodID(
00053         classFennelPipeTupleIter, "write", "(Ljava/nio/ByteBuffer;I)V");
00054     assert(methFennelPipeTupleIter_write);
00055     methFennelPipeTupleIter_getByteBuffer = pEnv->GetMethodID(
00056         classFennelPipeTupleIter, "getByteBuffer", "(I)Ljava/nio/ByteBuffer;");
00057     assert(methFennelPipeTupleIter_getByteBuffer);
00058 
00059     jclass classByteBuffer = pEnv->FindClass("java/nio/ByteBuffer");
00060     assert(classByteBuffer);
00061     methByteBuffer_array =
00062         pEnv->GetMethodID(classByteBuffer, "array", "()[B");
00063     assert(methByteBuffer_array);
00064 }
00065 
00066 void JavaSinkExecStream::open(bool restart)
00067 {
00068     FENNEL_TRACE(TRACE_FINE, "open");
00069     SingleInputExecStream::open(restart);
00070 
00071     // Find our FennelPipeTupleIter peer
00072     JniEnvAutoRef pEnv;
00073     jlong hJavaFennelPipeTupleIter = pEnv->CallLongMethod(
00074         pStreamGraphHandle->javaRuntimeContext,
00075         JniUtil::methGetJavaStreamHandle,
00076         javaFennelPipeTupleIterId);
00077     javaFennelPipeTupleIter =
00078         CmdInterpreter::getObjectFromLong(hJavaFennelPipeTupleIter);
00079     assert(javaFennelPipeTupleIter);
00080 }
00081 
00082 ExecStreamResult JavaSinkExecStream::execute(ExecStreamQuantum const &)
00083 {
00084     ExecStreamBufAccessor &inAccessor = *pInAccessor;
00085     switch (inAccessor.getState()) {
00086     case EXECBUF_EMPTY:
00087         // Nothing to read, so don't send anything to Java. FennelPipeTupleIter
00088         // would interpret a 0-length buffer as end-of-stream, which is not the
00089         // case.
00090         FENNEL_TRACE(TRACE_FINE, "no input");
00091         return (lastResult = EXECRC_BUF_UNDERFLOW);
00092     case EXECBUF_EOS:
00093         // Need to signal end-of-stream to Java. Do this by sending a buffer of
00094         // length 0. There should be 0 bytes available, so the code below
00095         // should do this naturally.
00096         FENNEL_TRACE(TRACE_FINE, "input EOS");
00097         assert(inAccessor.getConsumptionAvailable() == 0);
00098         break;
00099     default:
00100         FENNEL_TRACE(TRACE_FINER, "input rows:");
00101         getGraph().getScheduler()->
00102             traceStreamBufferContents(*this, inAccessor, TRACE_FINER);
00103         break;
00104     }
00105 
00106     PConstBuffer pInBufStart = inAccessor.getConsumptionStart();
00107     PConstBuffer pInBufEnd = inAccessor.getConsumptionEnd();
00108     uint nbytes = pInBufEnd - pInBufStart;
00109     sendData(pInBufStart, nbytes);
00110     if (nbytes > 0) {
00111         inAccessor.consumeData(pInBufEnd);
00112         return (lastResult = EXECRC_BUF_UNDERFLOW);
00113     } else {
00114         return (lastResult = EXECRC_EOS);
00115     }
00116 }
00117 
00119 void JavaSinkExecStream::sendData(PConstBuffer src, uint size)
00120 {
00121     JniEnvAutoRef pEnv;
00122 
00123     // Get an output ByteBuffer. Since this is a local ref, it will be
00124     // automatically deleted when the next method call returns.
00125     // REVIEW: Could give the ByteBuffer a longer lifecycle.
00126     jobject javaByteBuf = pEnv->CallObjectMethod(
00127         javaFennelPipeTupleIter, methFennelPipeTupleIter_getByteBuffer, size);
00128     assert(javaByteBuf);
00129 
00130     // copy the data, allowing upstream XO to produce more output
00131     stuffByteBuffer(javaByteBuf, src, size);
00132 
00133     // Send to the iterator, calling the method
00134     //   void FennelIterPipe.write(ByteBuffer, int byteCount)
00135     FENNEL_TRACE(
00136         TRACE_FINE,
00137         "call FennelPipeTupleIter.write " << size << " bytes");
00138     pEnv->CallVoidMethod(
00139         javaFennelPipeTupleIter, methFennelPipeTupleIter_write,
00140         javaByteBuf, size);
00141     FENNEL_TRACE(TRACE_FINE, "FennelPipeTupleIter.write returned");
00142 }
00143 
00144 void JavaSinkExecStream::stuffByteBuffer(
00145     jobject byteBuffer,
00146     PConstBuffer src,
00147     uint size)
00148 {
00149     // TODO: lookup methods in constructor.
00150     // TODO: ByteBuffer with a longer life, permanently pinned.
00151     JniEnvAutoRef pEnv;
00152 
00153     // pin the byte array
00154     jbyteArray bufBacking =
00155         static_cast<jbyteArray>(
00156             pEnv->CallObjectMethod(byteBuffer, methByteBuffer_array));
00157     jboolean copied;
00158     jbyte* dst = pEnv->GetByteArrayElements(bufBacking, &copied);
00159 
00160     // copy the data
00161     memcpy(dst, src, size);
00162 
00163     // trace the copy
00164     if (isTracingLevel(TRACE_FINER)) {
00165         // wrap the output buffer with a buf accessor
00166         ExecStreamBufAccessor ba;
00167         ba.setProvision(BUFPROV_PRODUCER);
00168         ba.setTupleShape(
00169             pInAccessor->getTupleDesc(), pInAccessor->getTupleFormat());
00170         ba.clear();
00171         PBuffer buf = (PBuffer) dst;
00172         ba.provideBufferForConsumption(buf, buf + size);
00173         FENNEL_TRACE(TRACE_FINER, "output rows:");
00174         getGraph().getScheduler()->
00175             traceStreamBufferContents(*this, ba, TRACE_FINER);
00176     }
00177 
00178     // unpin
00179     pEnv->ReleaseByteArrayElements(bufBacking, dst, 0);
00180 }
00181 
00182 
00183 void JavaSinkExecStream::closeImpl()
00184 {
00185     FENNEL_TRACE(TRACE_FINE, "closing");
00186 
00187     // If java peer is waiting for more data, send it a final EOS
00188     if (javaFennelPipeTupleIter && (lastResult != EXECRC_EOS)) {
00189         FixedBuffer dummy[1];
00190         sendData(dummy, 0);
00191     }
00192 
00193     javaFennelPipeTupleIter = NULL;
00194     SingleInputExecStream::closeImpl();
00195 }
00196 
00197 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/JavaSinkExecStream.cpp#15 $");
00198 
00199 // End JavaSinkExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1