00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/test/ExecStreamUnitTestBase.h"
00025 #include "fennel/exec/MockProducerExecStream.h"
00026 #include "fennel/exec/ExecStreamEmbryo.h"
00027
00028 #include "fennel/exec/ExecStreamGraph.h"
00029 #include "fennel/exec/ExecStreamGraphEmbryo.h"
00030 #include "fennel/exec/ExecStreamScheduler.h"
00031 #include "fennel/exec/ExecStream.h"
00032 #include "fennel/exec/ScratchBufferExecStream.h"
00033 #include "fennel/exec/ExecStreamEmbryo.h"
00034 #include "fennel/exec/ExecStreamBufAccessor.h"
00035 #include "fennel/exec/MockProducerExecStream.h"
00036 #include "fennel/tuple/TuplePrinter.h"
00037 #include "fennel/tuple/StandardTypeDescriptor.h"
00038
00039 #include <boost/test/test_tools.hpp>
00040
00041 #include "fennel/flatfile/FlatFileBuffer.h"
00042 #include "fennel/flatfile/FlatFileParser.h"
00043 #include "fennel/flatfile/FlatFileExecStream.h"
00044
00045 using namespace fennel;
00046
00051 class StringExecStreamGenerator
00052 {
00053 public:
00054 virtual ~StringExecStreamGenerator() {}
00055
00061 virtual const std::string &generateValue(uint iRow) = 0;
00062 };
00063
00064 class StringExecStreamGeneratorImpl : public StringExecStreamGenerator
00065 {
00066 std::vector<std::string> values;
00067
00068 public:
00069 void insert(const std::string &value)
00070 {
00071 values.push_back(value);
00072 }
00073
00074
00075 const std::string &generateValue(uint iRow)
00076 {
00077 BOOST_CHECK(iRow < values.size());
00078 return values[iRow];
00079 }
00080 };
00081
00082 class FlatFileExecStreamTest : public ExecStreamUnitTestBase
00083 {
00084 void checkRead(
00085 FlatFileBuffer &buffer,
00086 const char *string);
00087
00088 void checkTrim(
00089 FlatFileParser &parser,
00090 const char *string,
00091 const char *result);
00092
00093 void checkStrip(
00094 FlatFileParser &parser,
00095 const char *string,
00096 const char *result);
00097
00098 void checkColumnScan(
00099 FlatFileParser &parser,
00100 const char *string,
00101 FlatFileColumnParseResult::DelimiterType type,
00102 uint size,
00103 uint offset);
00104
00105 void verifyOutput(
00106 ExecStream &stream,
00107 uint nRowsExpected,
00108 StringExecStreamGenerator &generator);
00109
00110 public:
00111 explicit FlatFileExecStreamTest()
00112 {
00113 FENNEL_UNIT_TEST_CASE(FlatFileExecStreamTest, testBuffer);
00114 FENNEL_UNIT_TEST_CASE(FlatFileExecStreamTest, testParser);
00115 FENNEL_UNIT_TEST_CASE(FlatFileExecStreamTest, testStream);
00116 }
00117
00118 void testBuffer();
00119 void testParser();
00120 void testStream();
00121 };
00122
00123 void FlatFileExecStreamTest::testBuffer()
00124 {
00125 FixedBuffer fixedBuffer[8];
00126 std::string path = "flatfile/buffer";
00127
00128 SharedFlatFileBuffer pFileBuffer;
00129 pFileBuffer.reset(new FlatFileBuffer(path), ClosableObjectDestructor());
00130 pFileBuffer->open();
00131 pFileBuffer->setStorage((char *) fixedBuffer, (uint)8);
00132
00133 checkRead(*pFileBuffer, "12345671");
00134 BOOST_CHECK_EQUAL(pFileBuffer->getReadPtr(), (char *)fixedBuffer);
00135
00136 pFileBuffer->setReadPtr(pFileBuffer->getReadPtr() + 7);
00137 checkRead(*pFileBuffer, "12345676");
00138
00139 pFileBuffer->setReadPtr(pFileBuffer->getReadPtr() + 6);
00140 checkRead(*pFileBuffer, "7654\n");
00141 BOOST_CHECK(pFileBuffer->isComplete());
00142 }
00143
00144 void FlatFileExecStreamTest::testParser()
00145 {
00146 FlatFileParser parser(',', '\n', '"', '"');
00147
00148 checkTrim(parser, "", "");
00149 checkTrim(parser, "aRobin", "aRobin");
00150 checkTrim(parser, " red breast in cage ", "red breast in cage");
00151
00152 checkStrip(parser, "", "");
00153 checkStrip(parser, "puts all", "puts all");
00154 checkStrip(parser, "\"heaven\"", "heaven");
00155 checkStrip(parser, " \"in a\" ", "in a");
00156 checkStrip(parser, " \"\"\"rage\"\"\" ", "\"rage\"");
00157
00158
00159 checkColumnScan(
00160 parser, "\"all that\n is \"gold, ",
00161 FlatFileColumnParseResult::FIELD_DELIM, 19, 20);
00162
00163
00164 checkColumnScan(
00165 parser, "\"does not, glitter\"\n ",
00166 FlatFileColumnParseResult::ROW_DELIM, 19, 20);
00167
00168
00169 checkColumnScan(
00170 parser, "\"not all those who \"\"wander\"\"\", ",
00171 FlatFileColumnParseResult::FIELD_DELIM, 30, 31);
00172
00173
00174 checkColumnScan(
00175 parser, " are lost \"",
00176 FlatFileColumnParseResult::NO_DELIM, 12, 12);
00177
00178
00179 checkColumnScan(
00180 parser, "\"JRR, ",
00181 FlatFileColumnParseResult::NO_DELIM, 6, 6);
00182
00183
00184 checkColumnScan(
00185 parser, "\"Tolkien\" , ",
00186 FlatFileColumnParseResult::FIELD_DELIM, 11, 12);
00187
00188
00189 checkColumnScan(
00190 parser, "some poems",
00191 FlatFileColumnParseResult::NO_DELIM, 10, 10);
00192 }
00193
00194 void FlatFileExecStreamTest::checkRead(
00195 FlatFileBuffer &buffer,
00196 const char *string)
00197 {
00198 uint size = strlen(string);
00199 buffer.read();
00200 BOOST_CHECK_EQUAL(buffer.getEndPtr() - buffer.getReadPtr(), size);
00201 BOOST_CHECK_EQUAL(strncmp(buffer.getReadPtr(), string, size), 0);
00202 }
00203
00204 void FlatFileExecStreamTest::checkTrim(
00205 FlatFileParser &parser,
00206 const char *string,
00207 const char *result)
00208 {
00209 char buffer[128];
00210 assert (strlen(string) < sizeof(buffer));
00211 strcpy(buffer, string);
00212
00213 uint size = strlen(result);
00214 BOOST_CHECK_EQUAL(parser.trim(buffer, strlen(buffer)), size);
00215 BOOST_CHECK_EQUAL(strncmp(buffer, result, size), 0);
00216 }
00217
00218 void FlatFileExecStreamTest::checkStrip(
00219 FlatFileParser &parser,
00220 const char *string,
00221 const char *result)
00222 {
00223 char buffer[128];
00224 assert (strlen(string) < sizeof(buffer));
00225 strcpy(buffer, string);
00226
00227 uint size = strlen(result);
00228 BOOST_CHECK_EQUAL(parser.stripQuoting(buffer, strlen(buffer), true), size);
00229 BOOST_CHECK_EQUAL(strncmp(buffer, result, size), 0);
00230 }
00231
00232 void FlatFileExecStreamTest::checkColumnScan(
00233 FlatFileParser &parser,
00234 const char *string,
00235 FlatFileColumnParseResult::DelimiterType type,
00236 uint size,
00237 uint offset)
00238 {
00239 char buffer[128];
00240 assert(strlen(string) < sizeof(buffer));
00241 strcpy(buffer, string);
00242
00243 FlatFileColumnParseResult result;
00244 parser.scanColumn(buffer, strlen(buffer), sizeof(buffer), result);
00245
00246 BOOST_CHECK_EQUAL(result.type, type);
00247 BOOST_CHECK_EQUAL(result.size, size);
00248 BOOST_CHECK_EQUAL(result.next, buffer + offset);
00249 }
00250
00251 void FlatFileExecStreamTest::testStream()
00252 {
00253 StandardTypeDescriptorFactory stdTypeFactory;
00254 TupleAttributeDescriptor attrDesc(
00255 stdTypeFactory.newDataType(STANDARD_TYPE_VARCHAR),
00256 false,
00257 32);
00258
00259 FlatFileExecStreamParams flatfileParams;
00260 flatfileParams.scratchAccessor =
00261 pSegmentFactory->newScratchSegment(pCache,1);
00262 flatfileParams.outputTupleDesc.push_back(attrDesc);
00263 flatfileParams.outputTupleDesc.push_back(attrDesc);
00264 flatfileParams.dataFilePath = "flatfile/stream";
00265 flatfileParams.fieldDelim = ',';
00266 flatfileParams.rowDelim = '\n';
00267 flatfileParams.quoteChar = '"';
00268 flatfileParams.escapeChar = '\\';
00269 flatfileParams.header = false;
00270
00271 ExecStreamEmbryo flatfileStreamEmbryo;
00272 flatfileStreamEmbryo.init(
00273 FlatFileExecStream::newFlatFileExecStream(), flatfileParams);
00274 flatfileStreamEmbryo.getStream()->setName("FlatFileExecStream");
00275
00276 SharedExecStream pOutputStream = prepareSourceGraph(flatfileStreamEmbryo);
00277 StringExecStreamGeneratorImpl verifier;
00278 verifier.insert("[ 'No one', 'travels' ]");
00279 verifier.insert("[ 'Along this way', 'but I,' ]");
00280 verifier.insert("[ 'This', 'autumn evening.' ]");
00281
00282 verifyOutput(
00283 *pOutputStream,
00284 3,
00285 verifier);
00286 }
00287
00288 void FlatFileExecStreamTest::verifyOutput(
00289 ExecStream &stream,
00290 uint nRowsExpected,
00291 StringExecStreamGenerator &generator)
00292 {
00293
00294
00295
00296 pResourceGovernor->requestResources(*pGraph);
00297 pGraph->open();
00298 pScheduler->start();
00299 uint nRows = 0;
00300 for (;;) {
00301 ExecStreamBufAccessor &bufAccessor =
00302 pScheduler->readStream(stream);
00303 if (bufAccessor.getState() == EXECBUF_EOS) {
00304 break;
00305 }
00306 BOOST_REQUIRE(bufAccessor.isConsumptionPossible());
00307 const uint nCol =
00308 bufAccessor.getConsumptionTupleAccessor().size();
00309 BOOST_REQUIRE(nCol == bufAccessor.getTupleDesc().size());
00310 BOOST_REQUIRE(nCol >= 1);
00311 TupleData inputTuple;
00312 inputTuple.compute(bufAccessor.getTupleDesc());
00313 std::ostringstream oss;
00314 TuplePrinter tuplePrinter;
00315 for (;;) {
00316 if (!bufAccessor.demandData()) {
00317 break;
00318 }
00319 BOOST_REQUIRE(nRows < nRowsExpected);
00320 bufAccessor.unmarshalTuple(inputTuple);
00321 tuplePrinter.print(oss,bufAccessor.getTupleDesc(),inputTuple);
00322 std::string actualValue = oss.str();
00323 oss.str("");
00324 const std::string &expectedValue = generator.generateValue(nRows);
00325 if (actualValue.compare(expectedValue)) {
00326 std::cout << "(Row) = (" << nRows << ")" << std::endl;
00327 BOOST_CHECK_EQUAL(expectedValue,actualValue);
00328 return;
00329 }
00330 bufAccessor.consumeTuple();
00331 ++nRows;
00332 }
00333 }
00334 BOOST_CHECK_EQUAL(nRowsExpected,nRows);
00335 }
00336
00337 FENNEL_UNIT_TEST_SUITE(FlatFileExecStreamTest);
00338
00339