00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "fennel/common/CommonPreamble.h"
00025 #include "fennel/test/CollectExecStreamTestSuite.h"
00026 #include "fennel/exec/CollectExecStream.h"
00027 #include "fennel/exec/UncollectExecStream.h"
00028 #include "fennel/tuple/StandardTypeDescriptor.h"
00029 #include "fennel/tuple/TupleOverflowExcn.h"
00030 #include "fennel/exec/MockProducerExecStream.h"
00031 #include "fennel/exec/ExecStreamEmbryo.h"
00032
00033 using namespace fennel;
00034
00035 CollectExecStreamTestSuite::CollectExecStreamTestSuite(bool addAllTests)
00036 {
00037 if (addAllTests) {
00038 FENNEL_UNIT_TEST_CASE(CollectExecStreamTestSuite,testCollectInts);
00039 FENNEL_UNIT_TEST_CASE(CollectExecStreamTestSuite,testCollectUncollect);
00040 FENNEL_UNIT_TEST_CASE(
00041 CollectExecStreamTestSuite,testCollectCollectUncollectUncollect);
00042 }
00043
00044 StandardTypeDescriptorFactory stdTypeFactory;
00045
00046 descAttrInt64 =
00047 TupleAttributeDescriptor(
00048 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00049 descInt64.push_back(descAttrInt64);
00050
00051 descAttrVarbinary32 =
00052 TupleAttributeDescriptor(
00053 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY), true, 32);
00054 descVarbinary32.push_back(descAttrVarbinary32);
00055 }
00056
00057 void CollectExecStreamTestSuite::testCollectInts()
00058 {
00059 uint rows = 2;
00060 MockProducerExecStreamParams mockParams;
00061 mockParams.outputTupleDesc.push_back(descAttrInt64);
00062 mockParams.nRows = rows;
00063 mockParams.pGenerator.reset(new RampExecStreamGenerator(1));
00064
00065 CollectExecStreamParams collectParams;
00066 collectParams.outputTupleDesc = descVarbinary32;
00067
00068 ExecStreamEmbryo mockStreamEmbryo;
00069 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00070 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00071
00072 ExecStreamEmbryo collectStreamEmbryo;
00073 collectStreamEmbryo.init(new CollectExecStream(), collectParams);
00074 collectStreamEmbryo.getStream()->setName("CollectExecStream");
00075
00076
00077
00078 uint8_t intArrayBuff[32];
00079 uint64_t one = 1;
00080 TupleData oneData(descInt64);
00081 oneData[0].pData = (PConstBuffer) &one;
00082 TupleAccessor oneAccessor;
00083 oneAccessor.compute(descInt64);
00084 assert(oneAccessor.getMaxByteCount() <= sizeof(intArrayBuff));
00085 oneAccessor.marshal(oneData, (PBuffer) intArrayBuff);
00086
00087 uint64_t two = 2;
00088 TupleData twoData(descInt64);
00089 twoData[0].pData = (PConstBuffer) &two;
00090 TupleAccessor twoAccessor;
00091 twoAccessor.compute(descInt64);
00092 assert((oneAccessor.getMaxByteCount() + twoAccessor.getMaxByteCount()) <=
00093 sizeof(intArrayBuff));
00094 twoAccessor.marshal(
00095 twoData,
00096 ((PBuffer)intArrayBuff) + oneAccessor.getMaxByteCount());
00097
00098 uint8_t varbinaryBuff[1000];
00099 TupleData binData(descVarbinary32);
00100 binData[0].pData = (PConstBuffer) intArrayBuff;
00101 binData[0].cbData =
00102 oneAccessor.getMaxByteCount() + twoAccessor.getMaxByteCount();
00103 TupleAccessor binAccessor;
00104 binAccessor.compute(descVarbinary32);
00105 binAccessor.marshal(binData, (PBuffer) varbinaryBuff);
00106
00107
00108 SharedExecStream pOutputStream = prepareTransformGraph(
00109 mockStreamEmbryo, collectStreamEmbryo);
00110
00111 verifyConstantOutput(*pOutputStream, binData, 1);
00112 }
00113
00114 void CollectExecStreamTestSuite::testCollectUncollect()
00115 {
00116 StandardTypeDescriptorFactory stdTypeFactory;
00117 uint rows = 127;
00118
00119 TupleAttributeDescriptor tupleDescAttr(
00120 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY),
00121 true,
00122 2 * rows * sizeof(uint64_t));
00123 TupleDescriptor tupleDesc;
00124 tupleDesc.push_back(tupleDescAttr);
00125
00126 MockProducerExecStreamParams mockParams;
00127 mockParams.outputTupleDesc.push_back(descAttrInt64);
00128 mockParams.nRows = rows;
00129 mockParams.pGenerator.reset(new RampExecStreamGenerator());
00130
00131 CollectExecStreamParams collectParams;
00132 collectParams.outputTupleDesc = tupleDesc;
00133
00134 UncollectExecStreamParams uncollectParams;
00135 uncollectParams.outputTupleDesc = descInt64;
00136
00137 ExecStreamEmbryo mockStreamEmbryo;
00138 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00139 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00140
00141 ExecStreamEmbryo collectStreamEmbryo;
00142 collectStreamEmbryo.init(new CollectExecStream(), collectParams);
00143 collectStreamEmbryo.getStream()->setName("CollectExecStream");
00144
00145 ExecStreamEmbryo uncollectStreamEmbryo;
00146 uncollectStreamEmbryo.init(new UncollectExecStream(), uncollectParams);
00147 uncollectStreamEmbryo.getStream()->setName("UncollectExecStream");
00148
00149
00150 std::vector<ExecStreamEmbryo> transforms;
00151 transforms.push_back(collectStreamEmbryo);
00152 transforms.push_back(uncollectStreamEmbryo);
00153 SharedExecStream pOutputStream = prepareTransformGraph(
00154 mockStreamEmbryo, transforms);
00155
00156 RampExecStreamGenerator rampExpectedGenerator;
00157
00158 verifyOutput(*pOutputStream, rows, rampExpectedGenerator);
00159 }
00160
00161 void CollectExecStreamTestSuite::testCollectCollectUncollectUncollect() {
00162 StandardTypeDescriptorFactory stdTypeFactory;
00163 uint rows = 3;
00164
00165 TupleAttributeDescriptor tupleDescAttr1(
00166 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY),
00167 true,
00168 2 * rows * sizeof(uint64_t));
00169 TupleDescriptor vbDesc1;
00170 vbDesc1.push_back(tupleDescAttr1);
00171
00172 TupleAttributeDescriptor tupleDescAttr2(
00173 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY),
00174 true,
00175 2 * rows * rows * sizeof(uint64_t));
00176 TupleDescriptor vbDesc2;
00177 vbDesc2.push_back(tupleDescAttr2);
00178
00179 MockProducerExecStreamParams mockParams;
00180 mockParams.outputTupleDesc.push_back(descAttrInt64);
00181 mockParams.nRows = rows;
00182 mockParams.pGenerator.reset(new RampExecStreamGenerator());
00183
00184 CollectExecStreamParams collectParams1;
00185 collectParams1.outputTupleDesc = vbDesc1;
00186
00187 CollectExecStreamParams collectParams2;
00188 collectParams2.outputTupleDesc = vbDesc2;
00189
00190 UncollectExecStreamParams uncollectParams1;
00191 uncollectParams1.outputTupleDesc = vbDesc1;
00192
00193 UncollectExecStreamParams uncollectParams2;
00194 uncollectParams2.outputTupleDesc = descInt64;
00195
00196 ExecStreamEmbryo mockStreamEmbryo;
00197 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams);
00198 mockStreamEmbryo.getStream()->setName("MockProducerExecStream");
00199
00200 ExecStreamEmbryo collectStreamEmbryo1;
00201 collectStreamEmbryo1.init(new CollectExecStream(), collectParams1);
00202 collectStreamEmbryo1.getStream()->setName("CollectExecStream1");
00203
00204 ExecStreamEmbryo collectStreamEmbryo2;
00205 collectStreamEmbryo2.init(new CollectExecStream(), collectParams2);
00206 collectStreamEmbryo2.getStream()->setName("CollectExecStream2");
00207
00208 ExecStreamEmbryo uncollectStreamEmbryo1;
00209 uncollectStreamEmbryo1.init(new UncollectExecStream(), uncollectParams1);
00210 uncollectStreamEmbryo1.getStream()->setName("UncollectExecStream1");
00211
00212 ExecStreamEmbryo uncollectStreamEmbryo2;
00213 uncollectStreamEmbryo2.init(new UncollectExecStream(), uncollectParams2);
00214 uncollectStreamEmbryo2.getStream()->setName("UncollectExecStream2");
00215
00216 std::vector<ExecStreamEmbryo> transforms;
00217 transforms.push_back(collectStreamEmbryo1);
00218 transforms.push_back(collectStreamEmbryo2);
00219 transforms.push_back(uncollectStreamEmbryo1);
00220 transforms.push_back(uncollectStreamEmbryo2);
00221 SharedExecStream pOutputStream = prepareTransformGraph(
00222 mockStreamEmbryo, transforms);
00223
00224 RampExecStreamGenerator rampExpectedGenerator;
00225
00226 verifyOutput(*pOutputStream, rows, rampExpectedGenerator);
00227 }
00228
00229