00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/test/CorrelationJoinExecStreamTestSuite.h"
00026 #include "fennel/exec/CorrelationJoinExecStream.h"
00027 #include "fennel/tuple/StandardTypeDescriptor.h"
00028 #include "fennel/tuple/TupleOverflowExcn.h"
00029 #include "fennel/exec/MockProducerExecStream.h"
00030 #include "fennel/exec/ExecStreamEmbryo.h"
00031 #include "fennel/exec/ExecStreamGraph.h"
00032
00033 using namespace fennel;
00034
00035 CorrelationJoinExecStreamTestSuite::CorrelationJoinExecStreamTestSuite(
00036 bool addAllTests)
00037 {
00038 if (addAllTests) {
00039 FENNEL_UNIT_TEST_CASE(
00040 CorrelationJoinExecStreamTestSuite, testCorrelationJoin);
00041 }
00042
00043 StandardTypeDescriptorFactory stdTypeFactory;
00044
00045 descAttrInt64 = TupleAttributeDescriptor(
00046 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00047 descInt64.push_back(descAttrInt64);
00048 }
00049
00050 void CorrelationJoinExecStreamTestSuite::testCorrelationJoin()
00051 {
00052 MockProducerExecStreamParams paramsMockLeft;
00053 paramsMockLeft.outputTupleDesc.push_back(descAttrInt64);
00054 paramsMockLeft.pGenerator.reset(new RampExecStreamGenerator);
00055 paramsMockLeft.nRows = 5000;
00056
00057 ExecStreamEmbryo leftStreamEmbryo;
00058 leftStreamEmbryo.init(new MockProducerExecStream(),paramsMockLeft);
00059 leftStreamEmbryo.getStream()->setName("LeftProducerExecStream");
00060
00061 DynamicParamId dynamicParamId(1);
00062 MockProducerExecStreamParams paramsMockRight(paramsMockLeft);
00063 paramsMockRight.pGenerator.reset(new DynamicParamExecStreamGenerator(
00064 dynamicParamId,
00065 pGraph->getDynamicParamManager()));
00066 paramsMockRight.nRows = 10;
00067
00068 ExecStreamEmbryo rightStreamEmbryo;
00069 rightStreamEmbryo.init(new MockProducerExecStream(),paramsMockRight);
00070 rightStreamEmbryo.getStream()->setName("RightProducerExecStream");
00071
00072 CorrelationJoinExecStreamParams paramsJoin;
00073
00074 Correlation correlation(dynamicParamId, 0);
00075 paramsJoin.correlations.push_back(correlation);
00076
00077 ExecStreamEmbryo joinStreamEmbryo;
00078 joinStreamEmbryo.init(new CorrelationJoinExecStream(),paramsJoin);
00079 joinStreamEmbryo.getStream()->setName("CorrelationJoinExecStream");
00080
00081 SharedExecStream pOutputStream = prepareConfluenceGraph(
00082 leftStreamEmbryo,
00083 rightStreamEmbryo,
00084 joinStreamEmbryo);
00085
00086 StairCaseExecStreamGenerator rampExpectedGenerator(
00087 1, paramsMockRight.nRows);
00088 verifyOutput(
00089 *pOutputStream,
00090 paramsMockLeft.nRows * paramsMockRight.nRows,
00091 rampExpectedGenerator);
00092 }
00093
00094