00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/farrago/ExecStreamFactory.h"
00025 #include "fennel/lucidera/colstore/LcsClusterAppendExecStream.h"
00026 #include "fennel/lucidera/colstore/LcsClusterReplaceExecStream.h"
00027 #include "fennel/lucidera/colstore/LcsRowScanExecStream.h"
00028 #include "fennel/lucidera/bitmap/LbmGeneratorExecStream.h"
00029 #include "fennel/lucidera/bitmap/LbmSplicerExecStream.h"
00030 #include "fennel/lucidera/bitmap/LbmSearchExecStream.h"
00031 #include "fennel/lucidera/bitmap/LbmChopperExecStream.h"
00032 #include "fennel/lucidera/bitmap/LbmUnionExecStream.h"
00033 #include "fennel/lucidera/bitmap/LbmIntersectExecStream.h"
00034 #include "fennel/lucidera/bitmap/LbmMinusExecStream.h"
00035 #include "fennel/lucidera/bitmap/LbmBitOpExecStream.h"
00036 #include "fennel/lucidera/bitmap/LbmNormalizerExecStream.h"
00037 #include "fennel/lucidera/bitmap/LbmSortedAggExecStream.h"
00038 #include "fennel/db/Database.h"
00039 #include "fennel/segment/SegmentFactory.h"
00040 #include "fennel/exec/ExecStreamEmbryo.h"
00041 #include "fennel/cache/QuotaCacheAccessor.h"
00042
00043 #ifdef __MSVC__
00044 #include <windows.h>
00045 #endif
00046
00047 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/farrago/NativeMethods_lu.cpp#48 $");
00048
00049 class ExecStreamSubFactory_lu
00050 : public ExecStreamSubFactory,
00051 public FemVisitor
00052 {
00053 ExecStreamFactory *pExecStreamFactory;
00054 ExecStreamEmbryo *pEmbryo;
00055
00056 bool created;
00057
00058 void readClusterScan(
00059 ProxyLcsRowScanStreamDef &streamDef,
00060 LcsRowScanBaseExecStreamParams ¶ms)
00061 {
00062 SharedProxyLcsClusterScanDef pClusterScan = streamDef.getClusterScan();
00063 for (; pClusterScan; ++pClusterScan) {
00064 LcsClusterScanDef clusterScanParam;
00065 clusterScanParam.pCacheAccessor = params.pCacheAccessor;
00066 pExecStreamFactory->readBTreeStreamParams(
00067 clusterScanParam,
00068 *pClusterScan);
00069 pExecStreamFactory->readTupleDescriptor(
00070 clusterScanParam.clusterTupleDesc,
00071 pClusterScan->getClusterTupleDesc());
00072 params.lcsClusterScanDefs.push_back(clusterScanParam);
00073 }
00074 }
00075
00076
00077 virtual void visit(ProxyLcsClusterAppendStreamDef &streamDef)
00078 {
00079 LcsClusterAppendExecStreamParams params;
00080 readClusterAppendParams(streamDef, params);
00081
00082 pEmbryo->init(
00083 new LcsClusterAppendExecStream(),
00084 params);
00085 }
00086
00087
00088 virtual void visit(ProxyLcsClusterReplaceStreamDef &streamDef)
00089 {
00090 LcsClusterReplaceExecStreamParams params;
00091 readClusterAppendParams(streamDef, params);
00092
00093 pEmbryo->init(
00094 new LcsClusterReplaceExecStream(),
00095 params);
00096 }
00097
00098 void readClusterAppendParams(
00099 ProxyLcsClusterAppendStreamDef &streamDef,
00100 LcsClusterAppendExecStreamParams ¶ms)
00101 {
00102 pExecStreamFactory->readTupleStreamParams(params, streamDef);
00103 pExecStreamFactory->readBTreeStreamParams(params, streamDef);
00104
00105
00106 pExecStreamFactory->createPrivateScratchSegment(params);
00107
00108 CmdInterpreter::readTupleProjection(
00109 params.inputProj,
00110 streamDef.getClusterColProj());
00111 }
00112
00113
00114 virtual void visit(ProxyLcsRowScanStreamDef &streamDef)
00115 {
00116 LcsRowScanExecStreamParams params;
00117
00118 pExecStreamFactory->readTupleStreamParams(params, streamDef);
00119 readClusterScan(streamDef, params);
00120 CmdInterpreter::readTupleProjection(
00121 params.outputProj,
00122 streamDef.getOutputProj());
00123 params.isFullScan = streamDef.isFullScan();
00124 params.hasExtraFilter = streamDef.isHasExtraFilter();
00125
00126 params.samplingMode = streamDef.getSamplingMode();
00127 params.samplingRate = streamDef.getSamplingRate();
00128 params.samplingIsRepeatable = streamDef.isSamplingRepeatable();
00129 params.samplingRepeatableSeed = streamDef.getSamplingRepeatableSeed();
00130 params.samplingClumps =
00131 LcsRowScanExecStreamParams::defaultSystemSamplingClumps;
00132 params.samplingRowCount = streamDef.getSamplingRowCount();
00133
00134 CmdInterpreter::readTupleProjection(
00135 params.residualFilterCols,
00136 streamDef.getResidualFilterColumns());
00137 pEmbryo->init(new LcsRowScanExecStream(), params);
00138 }
00139
00140
00141 virtual void visit(ProxyLbmGeneratorStreamDef &streamDef)
00142 {
00143 LbmGeneratorExecStreamParams params;
00144
00145 pExecStreamFactory->readTupleStreamParams(params, streamDef);
00146 pExecStreamFactory->readBTreeStreamParams(params, streamDef);
00147
00148
00149 pExecStreamFactory->createPrivateScratchSegment(params);
00150
00151 readClusterScan(streamDef, params);
00152 CmdInterpreter::readTupleProjection(
00153 params.outputProj, streamDef.getOutputProj());
00154 params.insertRowCountParamId =
00155 pExecStreamFactory->readDynamicParamId(
00156 streamDef.getInsertRowCountParamId());
00157 params.createIndex = streamDef.isCreateIndex();
00158
00159 pEmbryo->init(new LbmGeneratorExecStream(), params);
00160 }
00161
00162
00163 virtual void visit(ProxyLbmSplicerStreamDef &streamDef)
00164 {
00165 LbmSplicerExecStreamParams params;
00166 pExecStreamFactory->readExecStreamParams(params, streamDef);
00167 pExecStreamFactory->readTupleDescriptor(
00168 params.outputTupleDesc,
00169 streamDef.getOutputDesc());
00170 SharedProxySplicerIndexAccessorDef pIndexAccessorDef =
00171 streamDef.getIndexAccessor();
00172 for (; pIndexAccessorDef; ++pIndexAccessorDef) {
00173 BTreeExecStreamParams bTreeParams;
00174 pExecStreamFactory->readBTreeParams(
00175 bTreeParams,
00176 *pIndexAccessorDef);
00177 params.bTreeParams.push_back(bTreeParams);
00178 }
00179 params.insertRowCountParamId =
00180 pExecStreamFactory->readDynamicParamId(
00181 streamDef.getInsertRowCountParamId());
00182 params.writeRowCountParamId =
00183 pExecStreamFactory->readDynamicParamId(
00184 streamDef.getWriteRowCountParamId());
00185 params.createNewIndex = streamDef.isCreateNewIndex();
00186 pEmbryo->init(new LbmSplicerExecStream(), params);
00187 }
00188
00189
00190 virtual void visit(ProxyLbmSearchStreamDef &streamDef)
00191 {
00192 LbmSearchExecStreamParams params;
00193 pExecStreamFactory->initBTreePrefetchSearchParams(params, streamDef);
00194
00195 params.rowLimitParamId =
00196 pExecStreamFactory->readDynamicParamId(
00197 streamDef.getRowLimitParamId());
00198
00199 params.startRidParamId =
00200 pExecStreamFactory->readDynamicParamId(
00201 streamDef.getStartRidParamId());
00202
00203 pEmbryo->init(new LbmSearchExecStream(), params);
00204 }
00205
00206
00207 virtual void visit(ProxyLbmChopperStreamDef &streamDef)
00208 {
00209 LbmChopperExecStreamParams params;
00210 pExecStreamFactory->readTupleStreamParams(params, streamDef);
00211
00212 params.ridLimitParamId =
00213 pExecStreamFactory->readDynamicParamId(
00214 streamDef.getRidLimitParamId());
00215 pEmbryo->init(new LbmChopperExecStream(), params);
00216 }
00217
00218
00219 virtual void visit(ProxyLbmUnionStreamDef &streamDef)
00220 {
00221 LbmUnionExecStreamParams params;
00222 pExecStreamFactory->readTupleStreamParams(params, streamDef);
00223
00224
00225 pExecStreamFactory->createPrivateScratchSegment(params);
00226
00227 params.startRidParamId =
00228 pExecStreamFactory->readDynamicParamId(
00229 streamDef.getConsumerSridParamId());
00230
00231 params.segmentLimitParamId =
00232 pExecStreamFactory->readDynamicParamId(
00233 streamDef.getSegmentLimitParamId());
00234
00235 params.ridLimitParamId =
00236 pExecStreamFactory->readDynamicParamId(
00237 streamDef.getRidLimitParamId());
00238
00239 params.maxRid = (LcsRid) 0;
00240
00241 pEmbryo->init(new LbmUnionExecStream(), params);
00242 }
00243
00244
00245 virtual void visit(ProxyLbmIntersectStreamDef &streamDef)
00246 {
00247 LbmIntersectExecStreamParams params;
00248 pExecStreamFactory->readTupleStreamParams(params, streamDef);
00249 readBitOpDynamicParams(streamDef, params);
00250
00251 pEmbryo->init(new LbmIntersectExecStream(), params);
00252 }
00253
00254 virtual void visit(ProxyLbmMinusStreamDef &streamDef)
00255 {
00256 LbmMinusExecStreamParams params;
00257 pExecStreamFactory->readTupleStreamParams(params, streamDef);
00258 readBitOpDynamicParams(streamDef, params);
00259
00260 pEmbryo->init(new LbmMinusExecStream(), params);
00261 }
00262
00263 void readBitOpDynamicParams(
00264 ProxyLbmBitOpStreamDef &streamDef, LbmBitOpExecStreamParams ¶ms)
00265 {
00266 params.rowLimitParamId =
00267 pExecStreamFactory->readDynamicParamId(
00268 streamDef.getRowLimitParamId());
00269 params.startRidParamId =
00270 pExecStreamFactory->readDynamicParamId(
00271 streamDef.getStartRidParamId());
00272 }
00273
00274 virtual void visit(ProxyLbmNormalizerStreamDef &streamDef)
00275 {
00276 LbmNormalizerExecStreamParams params;
00277 pExecStreamFactory->readTupleStreamParams(params, streamDef);
00278 TupleProjection keyProj;
00279 for (int i = 0; i < params.outputTupleDesc.size(); i++) {
00280 keyProj.push_back(i);
00281 }
00282 params.keyProj = keyProj;
00283
00284 pEmbryo->init(new LbmNormalizerExecStream(), params);
00285 }
00286
00287 virtual void visit(ProxyLbmSortedAggStreamDef &streamDef)
00288 {
00289 LbmSortedAggExecStreamParams params;
00290 pExecStreamFactory->readAggStreamParams(params, streamDef);
00291 pEmbryo->init(new LbmSortedAggExecStream(), params);
00292 }
00293
00294
00295 virtual void unhandledVisit()
00296 {
00297
00298 created = false;
00299 }
00300
00301
00302 virtual bool createStream(
00303 ExecStreamFactory &factory,
00304 ProxyExecutionStreamDef &streamDef,
00305 ExecStreamEmbryo &embryo)
00306 {
00307 pExecStreamFactory = &factory;
00308 pEmbryo = &embryo;
00309 created = true;
00310
00311
00312 FemVisitor::visitTbl.accept(*this, streamDef);
00313
00314 return created;
00315 }
00316 };
00317
00318 #ifdef __MSVC__
00319 extern "C" JNIEXPORT BOOL APIENTRY DllMain(
00320 HANDLE hModule,
00321 DWORD ul_reason_for_call,
00322 LPVOID lpReserved)
00323 {
00324 return TRUE;
00325 }
00326 #endif
00327
00328 extern "C" JNIEXPORT jint JNICALL
00329 JNI_OnLoad(JavaVM *vm,void *)
00330 {
00331 JniUtil::initDebug("FENNEL_RS_JNI_DEBUG");
00332 FENNEL_JNI_ONLOAD_COMMON();
00333 return JniUtil::jniVersion;
00334 }
00335
00336 extern "C" JNIEXPORT void JNICALL
00337 Java_com_lucidera_farrago_fennel_LucidEraJni_registerStreamFactory(
00338 JNIEnv *pEnvInit, jclass, jlong hStreamGraph)
00339 {
00340 JniEnvRef pEnv(pEnvInit);
00341 try {
00342 CmdInterpreter::StreamGraphHandle &streamGraphHandle =
00343 CmdInterpreter::getStreamGraphHandleFromLong(hStreamGraph);
00344 if (streamGraphHandle.pExecStreamFactory) {
00345 streamGraphHandle.pExecStreamFactory->addSubFactory(
00346 SharedExecStreamSubFactory(
00347 new ExecStreamSubFactory_lu()));
00348 }
00349 } catch (std::exception &ex) {
00350 pEnv.handleExcn(ex);
00351 }
00352 }
00353
00354 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/farrago/NativeMethods_lu.cpp#48 $");
00355
00356