SortedAggExecStream.cpp

Go to the documentation of this file.
00001 /*
00002 // $Id: //open/dev/fennel/exec/SortedAggExecStream.cpp#10 $
00003 // Fennel is a library of data storage and processing components.
00004 // Copyright (C) 2005-2009 The Eigenbase Project
00005 // Copyright (C) 2005-2009 SQLstream, Inc.
00006 // Copyright (C) 2005-2009 LucidEra, Inc.
00007 //
00008 // This program is free software; you can redistribute it and/or modify it
00009 // under the terms of the GNU General Public License as published by the Free
00010 // Software Foundation; either version 2 of the License, or (at your option)
00011 // any later version approved by The Eigenbase Project.
00012 //
00013 // This program is distributed in the hope that it will be useful,
00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 // GNU General Public License for more details.
00017 //
00018 // You should have received a copy of the GNU General Public License
00019 // along with this program; if not, write to the Free Software
00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00021 */
00022 
00023 #include "fennel/common/CommonPreamble.h"
00024 #include "fennel/exec/SortedAggExecStream.h"
00025 #include "fennel/exec/ExecStreamBufAccessor.h"
00026 #include "fennel/tuple/StandardTypeDescriptor.h"
00027 
00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SortedAggExecStream.cpp#10 $");
00029 
00030 void SortedAggExecStream::prepare(SortedAggExecStreamParams const &params)
00031 {
00032     ConduitExecStream::prepare(params);
00033 
00034     inputTuple.compute(pInAccessor->getTupleDesc());
00035 
00036     /*
00037       prevTuple contains the groupByKey fields as well as the accumulator
00038       result fields. outputTuple has the same format as prevTuple.
00039       The difference is that prevTuple has buffer associated with it while
00040       outputTuple has pointers pointing to the result location.
00041     */
00042     TupleDescriptor prevTupleDesc;
00043     TupleDescriptor const &inputDesc = pInAccessor->getTupleDesc();
00044 
00045     // Attribute descriptor for COUNT output
00046     StandardTypeDescriptorFactory stdTypeFactory;
00047     TupleAttributeDescriptor countDesc(
00048         stdTypeFactory.newDataType(STANDARD_TYPE_INT_64));
00049 
00050     groupByKeyCount = params.groupByKeyCount;
00051 
00052     for (int i = 0; i < groupByKeyCount; i ++) {
00053         prevTupleDesc.push_back(inputDesc[i]);
00054     }
00055 
00056     /*
00057       Compute the accumulator result portion of prevTupleDesc based on
00058       requested aggregate function invocations, and instantiate polymorphic
00059       AggComputers bound to correct inputs.
00060     */
00061     for (AggInvocationConstIter pInvocation(params.aggInvocations.begin());
00062          pInvocation != params.aggInvocations.end();
00063          ++pInvocation)
00064     {
00065         switch (pInvocation->aggFunction) {
00066         case AGG_FUNC_COUNT:
00067             prevTupleDesc.push_back(countDesc);
00068             break;
00069         case AGG_FUNC_SUM:
00070         case AGG_FUNC_MIN:
00071         case AGG_FUNC_MAX:
00072         case AGG_FUNC_SINGLE_VALUE:
00073             // Output type is same as input type, but nullable
00074             prevTupleDesc.push_back(inputDesc[pInvocation->iInputAttr]);
00075             prevTupleDesc.back().isNullable = true;
00076             break;
00077         }
00078         TupleAttributeDescriptor const *pInputAttr = NULL;
00079         if (pInvocation->iInputAttr != -1) {
00080             pInputAttr = &(inputDesc[pInvocation->iInputAttr]);
00081         }
00082         aggComputers.push_back(
00083             newAggComputer(
00084                 pInvocation->aggFunction,
00085                 pInputAttr));
00086         aggComputers.back().setInputAttrIndex(pInvocation->iInputAttr);
00087     }
00088 
00089     // Sanity check:  the output shape we computed should agree with
00090     // the descriptor (if any) in the supplied plan.
00091     if (!params.outputTupleDesc.empty()) {
00092         assert(prevTupleDesc == params.outputTupleDesc);
00093     }
00094     prevTuple.computeAndAllocate(prevTupleDesc);
00095     outputTuple.compute(prevTupleDesc);
00096     pOutAccessor->setTupleShape(prevTupleDesc);
00097 }
00098 
00099 AggComputer *SortedAggExecStream::newAggComputer(
00100     AggFunction aggFunction,
00101     TupleAttributeDescriptor const *pAttrDesc)
00102 {
00103     return AggComputer::newAggComputer(aggFunction, pAttrDesc);
00104 }
00105 
00106 inline void SortedAggExecStream::clearAccumulator()
00107 {
00108     for (int i = 0; i < aggComputers.size(); ++i) {
00109         aggComputers[i].clearAccumulator(prevTuple[i + groupByKeyCount]);
00110     }
00111 }
00112 
00113 inline void SortedAggExecStream::updateAccumulator()
00114 {
00115     for (int i = 0; i < aggComputers.size(); ++i) {
00116         aggComputers[i].updateAccumulator(
00117             prevTuple[i + groupByKeyCount],
00118             inputTuple);
00119     }
00120 }
00121 
00122 inline void SortedAggExecStream::copyPrevGroupByKey()
00123 {
00124     /*
00125       Need to make sure pointers are allocated before memcpy.
00126       resetBuffer restores the pointers to the associated buffer.
00127     */
00128     prevTuple.resetBuffer();
00129 
00130     for (int i = 0; i < groupByKeyCount; i ++) {
00131         prevTuple[i].memCopyFrom(inputTuple[i]);
00132     }
00133 }
00134 
00135 inline int SortedAggExecStream::compareGroupByKeys()
00136 {
00137     /*
00138       prevTuple does not actually have the same Tuple layout
00139       as inputTuple; however, the prefixes(of length groupByKeyCount)
00140       refer to the same fields. Compare only the prefixes.
00141     */
00142     int ret =
00143         (pInAccessor->getTupleDesc()).compareTuplesKey(
00144             prevTuple,
00145             inputTuple,
00146             groupByKeyCount);
00147     return ret;
00148 }
00149 
00150 inline void SortedAggExecStream::computeOutput()
00151 {
00152     int i;
00153 
00154     for (i = 0; i < groupByKeyCount; i ++) {
00155         outputTuple[i] = prevTuple[i];
00156     }
00157 
00158     for (i = 0; i < aggComputers.size(); i ++) {
00159         aggComputers[i].computeOutput(
00160             outputTuple[i + groupByKeyCount],
00161             prevTuple[i + groupByKeyCount]);
00162     }
00163 }
00164 
00165 void SortedAggExecStream::open(bool restart)
00166 {
00167     ConduitExecStream::open(restart);
00168     clearAccumulator();
00169 
00170     /*
00171       When accumulating, the first tuple in a group always updates the
00172       accumulator result field. Compare the group by key fields only for
00173       subsequent tuples. We use prevTupleValid to mark if the first tuple
00174       was seen.
00175       Need to set this in open() so that when the same stream is re-executed,
00176       e.g. when two identical group by statements are issued, the state when
00177       receiving the first input tuple of the first group is correct.
00178       Ignore prevTupleValid field when not doing groupby's.
00179     */
00180     prevTupleValid = (groupByKeyCount > 0) ? false : true;
00181 
00182     state = STATE_ACCUMULATING;
00183 }
00184 
00185 inline ExecStreamResult SortedAggExecStream::produce()
00186 {
00187     assert (state == STATE_PRODUCING);
00188 
00189     // attempt to write output
00190     bool success = pOutAccessor->produceTuple(outputTuple);
00191     if (success) {
00192         clearAccumulator();
00193         state = STATE_ACCUMULATING;
00194         /*
00195           Succeeded in outputting result for the previous group.
00196           Record new group by key and update accumulator result fields.
00197         */
00198         copyPrevGroupByKey();
00199         updateAccumulator();
00200         pInAccessor->consumeTuple();
00201         return EXECRC_YIELD;
00202     } else {
00203         return EXECRC_BUF_OVERFLOW;
00204     }
00205 }
00206 
00207 ExecStreamResult SortedAggExecStream::execute(ExecStreamQuantum const &quantum)
00208 {
00209     int keyComp;
00210     ExecStreamResult rc;
00211 
00212     /*
00213       Perform EOS processing first, since there can be a result tuple which is
00214       not produced yet.
00215     */
00216     if (pInAccessor->getState() == EXECBUF_EOS) {
00217         if (!prevTupleValid) {
00218             state = STATE_DONE;
00219         }
00220 
00221         // no more input is coming
00222         if (state == STATE_DONE) {
00223             // already produced output
00224             pOutAccessor->markEOS();
00225             return EXECRC_EOS;
00226         }
00227 
00228         if (state == STATE_ACCUMULATING) {
00229             // compute final output and get ready to write it
00230             computeOutput();
00231             state = STATE_PRODUCING;
00232         }
00233 
00234         // attempt to write output
00235         bool success = pOutAccessor->produceTuple(outputTuple);
00236         if (success) {
00237             state = STATE_DONE;
00238             // let precheckConduitBuffers below return EOS for us
00239         } else {
00240             return EXECRC_BUF_OVERFLOW;
00241         }
00242     } else if (state == STATE_PRODUCING) {
00243         rc = produce();
00244         if (rc != EXECRC_YIELD) {
00245             return rc;
00246         }
00247     }
00248 
00249     /*
00250       Check buffer state. If it is in a good state(EXECRC_YIELD, i.e. not in
00251       any abnormal state and is not empty),  process the tuples from the buffer.
00252     */
00253     rc = precheckConduitBuffers();
00254     if (rc != EXECRC_YIELD) {
00255         return rc;
00256     }
00257 
00258     /*
00259       Iterate through all the INPUT tuples. In this method, quantum represents
00260       unit of input data.
00261     */
00262     for (uint nTuples = 0; nTuples < quantum.nTuplesMax; ++nTuples) {
00263         if (!pInAccessor->demandData()) {
00264             return EXECRC_BUF_UNDERFLOW;
00265         }
00266 
00267         assert (state == STATE_ACCUMULATING);
00268 
00269         pInAccessor->unmarshalTuple(inputTuple);
00270 
00271         if (prevTupleValid) {
00272             keyComp = compareGroupByKeys();
00273             assert(keyComp <= 0);
00274             if (keyComp == 0) {
00275                 // continue reading rows and computing aggregates
00276                 // for this group
00277                 updateAccumulator();
00278                 pInAccessor->consumeTuple();
00279             } else {
00280                 // ready to produce an output row below
00281                 computeOutput();
00282                 state = STATE_PRODUCING;
00283             }
00284         } else {
00285             /*
00286               first tuple read so nothing to compare it to yet, but still need
00287               to record group by key and compute aggregates for that first row.
00288             */
00289             prevTupleValid = true;
00290             copyPrevGroupByKey();
00291             updateAccumulator();
00292             pInAccessor->consumeTuple();
00293         }
00294 
00295         if (state == STATE_PRODUCING) {
00296             rc = produce();
00297             if (rc != EXECRC_YIELD) {
00298                 return rc;
00299             }
00300         }
00301     }
00302 
00303     return EXECRC_YIELD;
00304 }
00305 
00306 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SortedAggExecStream.cpp#10 $");
00307 
00308 // End SortedAggExecStream.cpp

Generated on Mon Jun 22 04:00:18 2009 for Fennel by  doxygen 1.5.1