bes  Updated for version 3.20.10
ArrayJoinExistingAggregation.cc
1 // This file is part of the "NcML Module" project, a BES module designed
3 // to allow NcML files to be used to be used as a wrapper to add
4 // AIS to existing datasets of any format.
5 //
6 // Copyright (c) 2010 OPeNDAP, Inc.
7 // Author: Michael Johnson <m.johnson@opendap.org>
8 //
9 // For more information, please also see the main website: http://opendap.org/
10 //
11 // This library is free software; you can redistribute it and/or
12 // modify it under the terms of the GNU Lesser General Public
13 // License as published by the Free Software Foundation; either
14 // version 2.1 of the License, or (at your option) any later version.
15 //
16 // This library is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 // Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public
22 // License along with this library; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 //
25 // Please see the files COPYING and COPYRIGHT for more information on the GLPL.
26 //
27 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
29 
30 #include "config.h"
31 
32 #include <sstream>
33 
34 #include <libdap/Marshaller.h>
35 
36 #include "BESDebug.h"
37 #include "BESStopWatch.h"
38 
39 #include "ArrayJoinExistingAggregation.h"
40 
41 #include "AggregationException.h" // agg_util
42 #include "AggregationUtil.h" // agg_util
43 #include "NCMLDebug.h"
44 
45 static const string DEBUG_CHANNEL(NCML_MODULE_DBG_CHANNEL_2);
46 static const bool PRINT_CONSTRAINTS = false;
47 
48 // Timeouts are now handled in/by the BES framework in BESInterface.
49 // jhrg 12/29/15
50 #undef USE_LOCAL_TIMEOUT_SCHEME
51 
52 namespace agg_util {
53 
55  const AMDList& memberDatasets, std::auto_ptr<ArrayGetterInterface>& arrayGetter, const Dimension& joinDim) :
56  ArrayAggregationBase(granuleTemplate, memberDatasets, arrayGetter), _joinDim(joinDim)
57 {
58  BESDEBUG_FUNC(DEBUG_CHANNEL, "Making the aggregated outer dimension be: " + joinDim.toString() + "\n");
59 
60  // We created the array with the given granule prototype, but the resulting
61  // outer dimension size must be calculated according to the
62  // value in the passed in dimension object.
63  libdap::Array::dimension& rOuterDim = *(dim_begin());
64  NCML_ASSERT_MSG(rOuterDim.name == joinDim.name, "The outer dimension name of this is not the expected "
65  "outer dimension name! Broken precondition: This ctor cannot be called "
66  "without this being true!");
67  rOuterDim.size = joinDim.size;
68  // Force it to recompute constraints since we changed size.
69  reset_constraint();
70 
71  ostringstream oss;
73  if (PRINT_CONSTRAINTS) {
74  // constraints as well to ensure reset worked.
76  }
77  BESDEBUG_FUNC(DEBUG_CHANNEL, "Constrained Dims after set are: " + oss.str());
78 }
79 
81  ArrayAggregationBase(rhs), _joinDim(rhs._joinDim)
82 {
83  duplicate(rhs);
84 }
85 
86 /* virtual */
87 ArrayJoinExistingAggregation::~ArrayJoinExistingAggregation()
88 {
89  cleanup();
90 }
91 
92 ArrayJoinExistingAggregation&
93 ArrayJoinExistingAggregation::operator=(const ArrayJoinExistingAggregation& rhs)
94 {
95  if (this != &rhs) {
96  cleanup();
97  ArrayAggregationBase::operator=(rhs);
98  duplicate(rhs);
99  }
100  return *this;
101 }
102 
103 /* virtual */
104 ArrayJoinExistingAggregation*
106 {
107  return new ArrayJoinExistingAggregation(*this);
108 }
109 
110 // Set this to 0 to get the old behavior where the entire response
111 // (for this variable) is built in memory and then sent to the client.
112 #define PIPELINING 1
113 
114 /* virtual */
115 // begin modifying here for the double buffering
116 // see notes about how this was written marked with '***'
117 // Following this method is an older version of serialize that
118 // provides no new functionality but does get run instead of the
119 // more general implementation in libdap::Array.
120 bool ArrayJoinExistingAggregation::serialize(libdap::ConstraintEvaluator &eval, libdap::DDS &dds, libdap::Marshaller &m,
121  bool ce_eval)
122 {
123  BESStopWatch sw;
124  if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start("ArrayJoinExistingAggregation::serialize", "");
125 
126  // *** This serialize() implementation was made by starting with a simple version that
127  // *** tested read_p(), calling read() if needed and tsting send_p() and is_in_selection(),
128  // *** returning true if the data did not need to be sent. I moved that test here.
129 
130  // Only continue if we are supposed to serialize this object at all.
131  if (!(send_p() || is_in_selection())) {
132  BESDEBUG_FUNC(DEBUG_CHANNEL, "Object not in output, skipping... name=" << name() << endl);
133  return true;
134  }
135 
136  // *** Add status so that we can do our magic _or_ pass off the call to libdap
137  // *** and collect the result either way.
138  bool status = false;
139 
140  if (!read_p()) {
141  // *** copy lines from AggregationBase::read() into here in place
142  // *** of the call to read()
143 
144  if (PRINT_CONSTRAINTS) {
145  BESDEBUG_FUNC(DEBUG_CHANNEL, "Constraints on this Array are:" << endl);
146  printConstraints(*this);
147  }
148 
149  // call subclass impl
151 
152  if (PRINT_CONSTRAINTS) {
153  BESDEBUG_FUNC(DEBUG_CHANNEL, "After transfer, constraints on the member template Array are: " << endl);
155  }
156 
157  // *** Inserted code from readConstrainedGranuleArraysAndAggregateDataHook here
158 
159  // outer one is the first in iteration
160  const Array::dimension& outerDim = *(dim_begin());
161  BESDEBUG("ncml",
162  "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
163 
164  try {
165 #if PIPELINING
166  // assumes the constraints are already set properly on this
167  m.put_vector_start(length());
168 #else
169  reserve_value_capacity();
170 #endif
171 
172  // Start the iteration state for the granule.
173  const AMDList& datasets = getDatasetList(); // the list
174  NCML_ASSERT(!datasets.empty());
175  int currDatasetIndex = 0; // index into datasets
176  const AggMemberDataset* pCurrDataset = (datasets[currDatasetIndex]).get();
177 
178  int outerDimIndexOfCurrDatasetHead = 0;
179  int currDatasetSize = int(pCurrDataset->getCachedDimensionSize(_joinDim.name));
180  bool currDatasetWasRead = false;
181 
182  // where in this output array we are writing next
183  unsigned int nextOutputBufferElementIndex = 0;
184 
185  // Traverse the outer dimension constraints,
186  // Keeping track of which dataset we need to
187  // be inside for the given values of the constraint.
188  for (int outerDimIndex = outerDim.start; outerDimIndex <= outerDim.stop && outerDimIndex < outerDim.size;
189  outerDimIndex += outerDim.stride) {
190  // Figure out where the given outer index maps into in local granule space
191  int localGranuleIndex = outerDimIndex - outerDimIndexOfCurrDatasetHead;
192 
193  // if this is beyond the dataset end, move state to the next dataset
194  // and try again until we're in the proper interval, with proper dataset.
195  while (localGranuleIndex >= currDatasetSize) {
196  localGranuleIndex -= currDatasetSize;
197  outerDimIndexOfCurrDatasetHead += currDatasetSize;
198  ++currDatasetIndex;
199  NCML_ASSERT(currDatasetIndex < int(datasets.size()));
200  pCurrDataset = datasets[currDatasetIndex].get();
201  currDatasetSize = pCurrDataset->getCachedDimensionSize(_joinDim.name);
202  currDatasetWasRead = false;
203 
204  BESDEBUG_FUNC(DEBUG_CHANNEL,
205  "The constraint traversal passed a granule boundary " << "on the outer dimension and is stepping forward into " << "granule index=" << currDatasetIndex << endl);
206  }
207 
208  // If we haven't read in this granule yet (we passed a boundary)
209  // then do it now. Map constraints into the local granule space.
210  if (!currDatasetWasRead) {
211  BESDEBUG_FUNC(DEBUG_CHANNEL,
212  " Current granule dataset was traversed but not yet " "read and copied into output. Mapping constraints " "and calling read()..." << endl);
213 
214  // Set up a constraint object for the actual granule read
215  // so that it only loads the data values in which we are
216  // interested.
217  Array& granuleConstraintTemplate = getGranuleTemplateArray();
218 
219  // The inner dim constraints were set up in the containing read() call.
220  // The outer dim was left open for us to fix now...
221  Array::Dim_iter outerDimIt = granuleConstraintTemplate.dim_begin();
222 
223  // modify the outerdim size to match the dataset we need to
224  // load. The inners MUST match so we can let those get
225  //checked later...
226  outerDimIt->size = currDatasetSize;
227  outerDimIt->c_size = currDatasetSize; // this will get recalc below?
228 
229  // find the mapped endpoint
230  // Basically, the fullspace endpoint mapped to local offset,
231  // clamped into the local granule size.
232  int granuleStopIndex = std::min(outerDim.stop - outerDimIndexOfCurrDatasetHead,
233  currDatasetSize - 1);
234 
235  // we must clamp the stride to the interval of the
236  // dataset in order to avoid an exception in
237  // add_constraint on stride being larger than dataset.
238  int clampedStride = std::min(outerDim.stride, currDatasetSize);
239  // mapped endpoint clamped within this granule
240  granuleConstraintTemplate.add_constraint(outerDimIt, localGranuleIndex, clampedStride,
241  granuleStopIndex);
242 #if USE_LOCAL_TIMEOUT_SCHEME
243  dds.timeout_on();
244 #endif
245 #if 0
246  // Do the constrained read and copy it into this output buffer
247  agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray(*this,// into the output buffer of this object
248  nextOutputBufferElementIndex,// into the next open slice
249  getGranuleTemplateArray(),// constraints we just setup
250  name(),// aggvar name
251  const_cast<AggMemberDataset&>(*pCurrDataset),// Dataset who's DDS should be searched
252  getArrayGetterInterface(), DEBUG_CHANNEL);
253 #endif
254 
256  getGranuleTemplateArray(), name(), const_cast<AggMemberDataset&>(*pCurrDataset),
257  getArrayGetterInterface(), DEBUG_CHANNEL);
258 #if USE_LOCAL_TIMEOUT_SCHEME
259  dds.timeout_off();
260 #endif
261 
262 #if PIPELINING
263  m.put_vector_part(pDatasetArray->get_buf(), getGranuleTemplateArray().length(), var()->width(),
264  var()->type());
265 #else
266  this->set_value_slice_from_row_major_vector(*pDatasetArray, nextOutputBufferElementIndex);
267 #endif
268 
269  pDatasetArray->clear_local_data();
270 
271  // Jump output buffer index forward by the amount we added.
272  nextOutputBufferElementIndex += getGranuleTemplateArray().length();
273  currDatasetWasRead = true;
274 
275  BESDEBUG_FUNC(DEBUG_CHANNEL,
276  " The granule index " << currDatasetIndex << " was read with constraints and copied into the aggregation output." << endl);
277  } // !currDatasetWasRead
278  } // for loop over outerDim
279  } // end of try
280  catch (AggregationException& ex) {
281  THROW_NCML_PARSE_ERROR(-1, ex.what());
282  }
283 
284  // *** end of code inserted from readConstrainedGranuleArraysAndAggregateDataHook
285 
286 #if PIPELINING
287  m.put_vector_end();
288  status = true;
289 #else
290  set_read_p(true);
291  status = libdap::Array::serialize(eval, dds, m, ce_eval);
292 #endif
293  }
294  else {
295  status = libdap::Array::serialize(eval, dds, m, ce_eval);
296  }
297 
298  return status;
299 }
300 
302 // Private Impl Below
303 
304 void ArrayJoinExistingAggregation::duplicate(const ArrayJoinExistingAggregation& rhs)
305 {
306  _joinDim = rhs._joinDim;
307 }
308 
309 void ArrayJoinExistingAggregation::cleanup() throw ()
310 {
311 }
312 
313 /* virtual */
315 {
316  // transfer the constraints from this object into the subArray template
317  // skipping our first dim which is the join dim we need to do specially every
318  // granule in the read hook.
320  *this, // from this
321  true, // skip first dim in the copy since we handle it special
322  true, // also skip it in the toArray for the same reason.
323  true, // print debug
324  DEBUG_CHANNEL); // on this channel
325 }
326 
328 {
329  BESStopWatch sw;
330  if (BESDebug::IsSet(TIMING_LOG_KEY))
331  sw.start("ArrayJoinExistingAggregation::readConstrainedGranuleArraysAndAggregateDataHook", "");
332 
333  // outer one is the first in iteration
334  const Array::dimension& outerDim = *(dim_begin());
335  BESDEBUG("ncml",
336  "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
337 
338  try {
339  // assumes the constraints are already set properly on this
340  reserve_value_capacity();
341 
342  // Start the iteration state for the granule.
343  const AMDList& datasets = getDatasetList(); // the list
344  NCML_ASSERT(!datasets.empty());
345  int currDatasetIndex = 0; // index into datasets
346  const AggMemberDataset* pCurrDataset = (datasets[currDatasetIndex]).get();
347 
348  int outerDimIndexOfCurrDatasetHead = 0;
349  int currDatasetSize = int(pCurrDataset->getCachedDimensionSize(_joinDim.name));
350  bool currDatasetWasRead = false;
351 
352  // where in this output array we are writing next
353  unsigned int nextOutputBufferElementIndex = 0;
354 
355  // Traverse the outer dimension constraints,
356  // Keeping track of which dataset we need to
357  // be inside for the given values of the constraint.
358  for (int outerDimIndex = outerDim.start; outerDimIndex <= outerDim.stop && outerDimIndex < outerDim.size;
359  outerDimIndex += outerDim.stride) {
360  // Figure out where the given outer index maps into in local granule space
361  int localGranuleIndex = outerDimIndex - outerDimIndexOfCurrDatasetHead;
362 
363  // if this is beyond the dataset end, move state to the next dataset
364  // and try again until we're in the proper interval, with proper dataset.
365  while (localGranuleIndex >= currDatasetSize) {
366  localGranuleIndex -= currDatasetSize;
367  outerDimIndexOfCurrDatasetHead += currDatasetSize;
368  ++currDatasetIndex;
369  NCML_ASSERT(currDatasetIndex < int(datasets.size()));
370  pCurrDataset = datasets[currDatasetIndex].get();
371  currDatasetSize = pCurrDataset->getCachedDimensionSize(_joinDim.name);
372  currDatasetWasRead = false;
373 
374  BESDEBUG_FUNC(DEBUG_CHANNEL,
375  "The constraint traversal passed a granule boundary " << "on the outer dimension and is stepping forward into " << "granule index=" << currDatasetIndex << endl);
376  }
377 
378  // If we haven't read in this granule yet (we passed a boundary)
379  // then do it now. Map constraints into the local granule space.
380  if (!currDatasetWasRead) {
381  BESDEBUG_FUNC(DEBUG_CHANNEL,
382  " Current granule dataset was traversed but not yet " "read and copied into output. Mapping constraints " "and calling read()..." << endl);
383 
384  // Set up a constraint object for the actual granule read
385  // so that it only loads the data values in which we are
386  // interested.
387  Array& granuleConstraintTemplate = getGranuleTemplateArray();
388 
389  // The inner dim constraints were set up in the containing read() call.
390  // The outer dim was left open for us to fix now...
391  Array::Dim_iter outerDimIt = granuleConstraintTemplate.dim_begin();
392 
393  // modify the outerdim size to match the dataset we need to
394  // load. The inners MUST match so we can let those get
395  //checked later...
396  outerDimIt->size = currDatasetSize;
397  outerDimIt->c_size = currDatasetSize; // this will get recalc below?
398 
399  // find the mapped endpoint
400  // Basically, the fullspace endpoint mapped to local offset,
401  // clamped into the local granule size.
402  int granuleStopIndex = std::min(outerDim.stop - outerDimIndexOfCurrDatasetHead, currDatasetSize - 1);
403 
404  // we must clamp the stride to the interval of the
405  // dataset in order to avoid an exception in
406  // add_constraint on stride being larger than dataset.
407  int clampedStride = std::min(outerDim.stride, currDatasetSize);
408  // mapped endpoint clamped within this granule
409  granuleConstraintTemplate.add_constraint(outerDimIt, localGranuleIndex, clampedStride, granuleStopIndex);
410 
411  // Do the constrained read and copy it into this output buffer
412  agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray(*this, // into the output buffer of this object
413  nextOutputBufferElementIndex, // into the next open slice
414  getGranuleTemplateArray(), // constraints we just setup
415  name(), // aggvar name
416  const_cast<AggMemberDataset&>(*pCurrDataset), // Dataset who's DDS should be searched
417  getArrayGetterInterface(), DEBUG_CHANNEL);
418 
419  // Jump output buffer index forward by the amount we added.
420  nextOutputBufferElementIndex += getGranuleTemplateArray().length();
421  currDatasetWasRead = true;
422 
423  BESDEBUG_FUNC(DEBUG_CHANNEL,
424  " The granule index " << currDatasetIndex << " was read with constraints and copied into the aggregation output." << endl);
425  } // !currDatasetWasRead
426  } // for loop over outerDim
427  } // try
428 
429  catch (AggregationException& ex) {
430  THROW_NCML_PARSE_ERROR(-1, ex.what());
431  }
432 
433 }
434 
435 } // namespace agg_util
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:168
virtual bool start(std::string name)
Definition: BESStopWatch.cc:67
virtual unsigned int getCachedDimensionSize(const std::string &dimName) const =0
static void addDatasetArrayDataToAggregationOutputArray(libdap::Array &oOutputArray, unsigned int atIndex, const libdap::Array &constrainedTemplateArray, const string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const string &debugChannel)
static void transferArrayConstraints(libdap::Array *pToArray, const libdap::Array &fromArray, bool skipFirstFromDim, bool skipFirstToDim, bool printDebug=false, const std::string &debugChannel="agg_util")
static void printDimensions(std::ostream &os, const libdap::Array &fromArray)
static libdap::Array * readDatasetArrayDataForAggregation(const libdap::Array &constrainedTemplateArray, const std::string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const std::string &debugChannel)
static void printConstraints(std::ostream &os, const libdap::Array &fromArray)
const AMDList & getDatasetList() const
void printConstraints(const Array &fromArray)
const ArrayGetterInterface & getArrayGetterInterface() const
ArrayJoinExistingAggregation(const libdap::Array &granuleTemplate, const AMDList &memberDatasets, std::auto_ptr< ArrayGetterInterface > &arrayGetter, const Dimension &joinDim)
virtual ArrayJoinExistingAggregation * ptr_duplicate()
Helper class for temporarily hijacking an existing dhi to load a DDX response for one particular file...
std::string toString() const
Definition: Dimension.cc:55