00001 /* 00002 // $Id: //open/dev/fennel/exec/SortedAggExecStream.cpp#10 $ 00003 // Fennel is a library of data storage and processing components. 00004 // Copyright (C) 2005-2009 The Eigenbase Project 00005 // Copyright (C) 2005-2009 SQLstream, Inc. 00006 // Copyright (C) 2005-2009 LucidEra, Inc. 00007 // 00008 // This program is free software; you can redistribute it and/or modify it 00009 // under the terms of the GNU General Public License as published by the Free 00010 // Software Foundation; either version 2 of the License, or (at your option) 00011 // any later version approved by The Eigenbase Project. 00012 // 00013 // This program is distributed in the hope that it will be useful, 00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 // GNU General Public License for more details. 00017 // 00018 // You should have received a copy of the GNU General Public License 00019 // along with this program; if not, write to the Free Software 00020 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00021 */ 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/exec/SortedAggExecStream.h" 00025 #include "fennel/exec/ExecStreamBufAccessor.h" 00026 #include "fennel/tuple/StandardTypeDescriptor.h" 00027 00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SortedAggExecStream.cpp#10 $"); 00029 00030 void SortedAggExecStream::prepare(SortedAggExecStreamParams const ¶ms) 00031 { 00032 ConduitExecStream::prepare(params); 00033 00034 inputTuple.compute(pInAccessor->getTupleDesc()); 00035 00036 /* 00037 prevTuple contains the groupByKey fields as well as the accumulator 00038 result fields. outputTuple has the same format as prevTuple. 00039 The difference is that prevTuple has buffer associated with it while 00040 outputTuple has pointers pointing to the result location. 00041 */ 00042 TupleDescriptor prevTupleDesc; 00043 TupleDescriptor const &inputDesc = pInAccessor->getTupleDesc(); 00044 00045 // Attribute descriptor for COUNT output 00046 StandardTypeDescriptorFactory stdTypeFactory; 00047 TupleAttributeDescriptor countDesc( 00048 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00049 00050 groupByKeyCount = params.groupByKeyCount; 00051 00052 for (int i = 0; i < groupByKeyCount; i ++) { 00053 prevTupleDesc.push_back(inputDesc[i]); 00054 } 00055 00056 /* 00057 Compute the accumulator result portion of prevTupleDesc based on 00058 requested aggregate function invocations, and instantiate polymorphic 00059 AggComputers bound to correct inputs. 00060 */ 00061 for (AggInvocationConstIter pInvocation(params.aggInvocations.begin()); 00062 pInvocation != params.aggInvocations.end(); 00063 ++pInvocation) 00064 { 00065 switch (pInvocation->aggFunction) { 00066 case AGG_FUNC_COUNT: 00067 prevTupleDesc.push_back(countDesc); 00068 break; 00069 case AGG_FUNC_SUM: 00070 case AGG_FUNC_MIN: 00071 case AGG_FUNC_MAX: 00072 case AGG_FUNC_SINGLE_VALUE: 00073 // Output type is same as input type, but nullable 00074 prevTupleDesc.push_back(inputDesc[pInvocation->iInputAttr]); 00075 prevTupleDesc.back().isNullable = true; 00076 break; 00077 } 00078 TupleAttributeDescriptor const *pInputAttr = NULL; 00079 if (pInvocation->iInputAttr != -1) { 00080 pInputAttr = &(inputDesc[pInvocation->iInputAttr]); 00081 } 00082 aggComputers.push_back( 00083 newAggComputer( 00084 pInvocation->aggFunction, 00085 pInputAttr)); 00086 aggComputers.back().setInputAttrIndex(pInvocation->iInputAttr); 00087 } 00088 00089 // Sanity check: the output shape we computed should agree with 00090 // the descriptor (if any) in the supplied plan. 00091 if (!params.outputTupleDesc.empty()) { 00092 assert(prevTupleDesc == params.outputTupleDesc); 00093 } 00094 prevTuple.computeAndAllocate(prevTupleDesc); 00095 outputTuple.compute(prevTupleDesc); 00096 pOutAccessor->setTupleShape(prevTupleDesc); 00097 } 00098 00099 AggComputer *SortedAggExecStream::newAggComputer( 00100 AggFunction aggFunction, 00101 TupleAttributeDescriptor const *pAttrDesc) 00102 { 00103 return AggComputer::newAggComputer(aggFunction, pAttrDesc); 00104 } 00105 00106 inline void SortedAggExecStream::clearAccumulator() 00107 { 00108 for (int i = 0; i < aggComputers.size(); ++i) { 00109 aggComputers[i].clearAccumulator(prevTuple[i + groupByKeyCount]); 00110 } 00111 } 00112 00113 inline void SortedAggExecStream::updateAccumulator() 00114 { 00115 for (int i = 0; i < aggComputers.size(); ++i) { 00116 aggComputers[i].updateAccumulator( 00117 prevTuple[i + groupByKeyCount], 00118 inputTuple); 00119 } 00120 } 00121 00122 inline void SortedAggExecStream::copyPrevGroupByKey() 00123 { 00124 /* 00125 Need to make sure pointers are allocated before memcpy. 00126 resetBuffer restores the pointers to the associated buffer. 00127 */ 00128 prevTuple.resetBuffer(); 00129 00130 for (int i = 0; i < groupByKeyCount; i ++) { 00131 prevTuple[i].memCopyFrom(inputTuple[i]); 00132 } 00133 } 00134 00135 inline int SortedAggExecStream::compareGroupByKeys() 00136 { 00137 /* 00138 prevTuple does not actually have the same Tuple layout 00139 as inputTuple; however, the prefixes(of length groupByKeyCount) 00140 refer to the same fields. Compare only the prefixes. 00141 */ 00142 int ret = 00143 (pInAccessor->getTupleDesc()).compareTuplesKey( 00144 prevTuple, 00145 inputTuple, 00146 groupByKeyCount); 00147 return ret; 00148 } 00149 00150 inline void SortedAggExecStream::computeOutput() 00151 { 00152 int i; 00153 00154 for (i = 0; i < groupByKeyCount; i ++) { 00155 outputTuple[i] = prevTuple[i]; 00156 } 00157 00158 for (i = 0; i < aggComputers.size(); i ++) { 00159 aggComputers[i].computeOutput( 00160 outputTuple[i + groupByKeyCount], 00161 prevTuple[i + groupByKeyCount]); 00162 } 00163 } 00164 00165 void SortedAggExecStream::open(bool restart) 00166 { 00167 ConduitExecStream::open(restart); 00168 clearAccumulator(); 00169 00170 /* 00171 When accumulating, the first tuple in a group always updates the 00172 accumulator result field. Compare the group by key fields only for 00173 subsequent tuples. We use prevTupleValid to mark if the first tuple 00174 was seen. 00175 Need to set this in open() so that when the same stream is re-executed, 00176 e.g. when two identical group by statements are issued, the state when 00177 receiving the first input tuple of the first group is correct. 00178 Ignore prevTupleValid field when not doing groupby's. 00179 */ 00180 prevTupleValid = (groupByKeyCount > 0) ? false : true; 00181 00182 state = STATE_ACCUMULATING; 00183 } 00184 00185 inline ExecStreamResult SortedAggExecStream::produce() 00186 { 00187 assert (state == STATE_PRODUCING); 00188 00189 // attempt to write output 00190 bool success = pOutAccessor->produceTuple(outputTuple); 00191 if (success) { 00192 clearAccumulator(); 00193 state = STATE_ACCUMULATING; 00194 /* 00195 Succeeded in outputting result for the previous group. 00196 Record new group by key and update accumulator result fields. 00197 */ 00198 copyPrevGroupByKey(); 00199 updateAccumulator(); 00200 pInAccessor->consumeTuple(); 00201 return EXECRC_YIELD; 00202 } else { 00203 return EXECRC_BUF_OVERFLOW; 00204 } 00205 } 00206 00207 ExecStreamResult SortedAggExecStream::execute(ExecStreamQuantum const &quantum) 00208 { 00209 int keyComp; 00210 ExecStreamResult rc; 00211 00212 /* 00213 Perform EOS processing first, since there can be a result tuple which is 00214 not produced yet. 00215 */ 00216 if (pInAccessor->getState() == EXECBUF_EOS) { 00217 if (!prevTupleValid) { 00218 state = STATE_DONE; 00219 } 00220 00221 // no more input is coming 00222 if (state == STATE_DONE) { 00223 // already produced output 00224 pOutAccessor->markEOS(); 00225 return EXECRC_EOS; 00226 } 00227 00228 if (state == STATE_ACCUMULATING) { 00229 // compute final output and get ready to write it 00230 computeOutput(); 00231 state = STATE_PRODUCING; 00232 } 00233 00234 // attempt to write output 00235 bool success = pOutAccessor->produceTuple(outputTuple); 00236 if (success) { 00237 state = STATE_DONE; 00238 // let precheckConduitBuffers below return EOS for us 00239 } else { 00240 return EXECRC_BUF_OVERFLOW; 00241 } 00242 } else if (state == STATE_PRODUCING) { 00243 rc = produce(); 00244 if (rc != EXECRC_YIELD) { 00245 return rc; 00246 } 00247 } 00248 00249 /* 00250 Check buffer state. If it is in a good state(EXECRC_YIELD, i.e. not in 00251 any abnormal state and is not empty), process the tuples from the buffer. 00252 */ 00253 rc = precheckConduitBuffers(); 00254 if (rc != EXECRC_YIELD) { 00255 return rc; 00256 } 00257 00258 /* 00259 Iterate through all the INPUT tuples. In this method, quantum represents 00260 unit of input data. 00261 */ 00262 for (uint nTuples = 0; nTuples < quantum.nTuplesMax; ++nTuples) { 00263 if (!pInAccessor->demandData()) { 00264 return EXECRC_BUF_UNDERFLOW; 00265 } 00266 00267 assert (state == STATE_ACCUMULATING); 00268 00269 pInAccessor->unmarshalTuple(inputTuple); 00270 00271 if (prevTupleValid) { 00272 keyComp = compareGroupByKeys(); 00273 assert(keyComp <= 0); 00274 if (keyComp == 0) { 00275 // continue reading rows and computing aggregates 00276 // for this group 00277 updateAccumulator(); 00278 pInAccessor->consumeTuple(); 00279 } else { 00280 // ready to produce an output row below 00281 computeOutput(); 00282 state = STATE_PRODUCING; 00283 } 00284 } else { 00285 /* 00286 first tuple read so nothing to compare it to yet, but still need 00287 to record group by key and compute aggregates for that first row. 00288 */ 00289 prevTupleValid = true; 00290 copyPrevGroupByKey(); 00291 updateAccumulator(); 00292 pInAccessor->consumeTuple(); 00293 } 00294 00295 if (state == STATE_PRODUCING) { 00296 rc = produce(); 00297 if (rc != EXECRC_YIELD) { 00298 return rc; 00299 } 00300 } 00301 } 00302 00303 return EXECRC_YIELD; 00304 } 00305 00306 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SortedAggExecStream.cpp#10 $"); 00307 00308 // End SortedAggExecStream.cpp