Fawkes API  Fawkes Development Version
mongodb_log_tf_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_tf_thread.cpp - MongoDB transforms logging Thread
4  *
5  * Created: Tue Dec 11 14:58:00 2012 (Freiburg)
6  * Copyright 2010-2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mongodb_log_tf_thread.h"
23 
24 #include <core/threading/mutex_locker.h>
25 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
26 #include <tf/time_cache.h>
27 #include <utils/time/wait.h>
28 
29 #include <bsoncxx/builder/basic/document.hpp>
30 #include <cstdlib>
31 #include <mongocxx/client.hpp>
32 #include <mongocxx/exception/operation_exception.hpp>
33 
34 using namespace mongocxx;
35 using namespace fawkes;
36 
37 /** @class MongoLogTransformsThread "mongodb_log_tf_thread.h"
38  * MongoDB transforms logging thread.
39  * This thread periodically queries the transformer for recent transforms
40  * and stores them to the database.
41  *
42  * @author Tim Niemueller
43  */
44 
45 /** Constructor. */
47 : Thread("MongoLogTransformsThread", Thread::OPMODE_CONTINUOUS),
48  MongoDBAspect("default"),
49  TransformAspect(TransformAspect::ONLY_LISTENER)
50 {
52 }
53 
54 /** Destructor. */
56 {
57 }
58 
59 void
61 {
62  database_ = "fflog";
63  collection_ = "tf";
64  try {
65  database_ = config->get_string("/plugins/mongodb-log/database");
66  } catch (Exception &e) {
67  logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
68  }
69 
70  try {
71  collection_ = config->get_string("/plugins/mongodb-log/transforms/collection");
72  } catch (Exception &e) {
73  logger->log_info(name(), "No transforms collection configured, using %s", collection_.c_str());
74  }
75  collection_ = database_ + "." + collection_;
76 
77  cfg_storage_interval_ = config->get_float("/plugins/mongodb-log/transforms/storage-interval");
78 
79  if (cfg_storage_interval_ <= 0.) {
80  // 50% of the cache time, assume 50% is enough to store the data
81  cfg_storage_interval_ = tf_listener->get_cache_time() * 0.5;
82  }
83 
84  wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
85  mutex_ = new Mutex();
86 }
87 
88 bool
90 {
91  mutex_->lock();
92  return true;
93 }
94 
95 void
97 {
98  delete wait_;
99  delete mutex_;
100 }
101 
102 void
104 {
105  mutex_->lock();
106  fawkes::Time loop_start(clock);
107  wait_->mark_start();
108  std::vector<fawkes::Time> tf_range_start;
109  std::vector<fawkes::Time> tf_range_end;
110  ;
111 
112  tf_listener->lock();
113  std::vector<tf::TimeCacheInterfacePtr> caches = tf_listener->get_frame_caches();
114  std::vector<tf::TimeCacheInterfacePtr> copies(caches.size(), tf::TimeCacheInterfacePtr());
115 
116  const size_t n_caches = caches.size();
117  tf_range_start.resize(n_caches, fawkes::Time(0, 0));
118  tf_range_end.resize(n_caches, fawkes::Time(0, 0));
119  if (last_tf_range_end_.size() != n_caches) {
120  last_tf_range_end_.resize(n_caches, fawkes::Time(0, 0));
121  }
122 
123  unsigned int num_transforms = 0;
124  unsigned int num_upd_caches = 0;
125 
126  for (size_t i = 0; i < n_caches; ++i) {
127  if (caches[i]) {
128  tf_range_end[i] = caches[i]->get_latest_timestamp();
129  if (last_tf_range_end_[i] != tf_range_end[i]) {
130  // we have new data
131  if (!tf_range_end[i].is_zero()) {
132  tf_range_start[i] = tf_range_end[i] - cfg_storage_interval_;
133  if (last_tf_range_end_[i] > tf_range_start[i]) {
134  tf_range_start[i] = last_tf_range_end_[i];
135  }
136  }
137  copies[i] = caches[i]->clone(tf_range_start[i]);
138  last_tf_range_end_[i] = tf_range_end[i];
139  num_upd_caches += 1;
140  num_transforms += copies[i]->get_list_length();
141  }
142  }
143  }
144  tf_listener->unlock();
145 
146  store(copies, tf_range_start, tf_range_end);
147 
148  mutex_->unlock();
149  // -1 to subtract "NO PARENT" pseudo cache
150  fawkes::Time loop_end(clock);
151  logger->log_debug(name(),
152  "%u transforms for %u updated frames stored in %.1f ms",
153  num_transforms,
154  num_upd_caches,
155  (loop_end - &loop_start) * 1000.);
156  wait_->wait();
157 }
158 
159 void
160 MongoLogTransformsThread::store(std::vector<tf::TimeCacheInterfacePtr> &caches,
161  std::vector<fawkes::Time> & from,
162  std::vector<fawkes::Time> & to)
163 {
164  std::vector<std::string> frame_map = tf_listener->get_frame_id_mappings();
165 
166  for (size_t i = 0; i < caches.size(); ++i) {
167  tf::TimeCacheInterfacePtr tc = caches[i];
168  if (!tc)
169  continue;
170 
171  using namespace bsoncxx::builder;
172  basic::document document;
173 
174  document.append(basic::kvp("timestamp", static_cast<int64_t>(from[i].in_msec())));
175  document.append(basic::kvp("timestamp_from", static_cast<int64_t>(from[i].in_msec())));
176  document.append(basic::kvp("timestamp_to", static_cast<int64_t>(to[i].in_msec())));
177  const tf::TimeCache::L_TransformStorage &storage = tc->get_storage();
178 
179  if (storage.empty()) {
180  /*
181  fawkes::Time now(clock);
182  logger->log_warn(name(), "Empty storage for %s (start: %lu end: %lu now: %lu",
183  frame_map[i].c_str(), from[i].in_msec(), to[i].in_msec(),
184  now.in_msec());
185  */
186  continue;
187  }
188 
189  document.append(basic::kvp("frame", frame_map[storage.front().frame_id]));
190  document.append(basic::kvp("child_frame", frame_map[storage.front().child_frame_id]));
191 
192  /*
193  logger->log_debug(name(), "Writing %zu transforms for child frame %s",
194  storage.size(), frame_map[i].c_str());
195  */
196 
197  document.append(basic::kvp("transforms", [storage, frame_map](basic::sub_array array) {
198  for (auto s = storage.begin(); s != storage.end(); ++s) {
199  /*
200  "frame" : "/bl_caster_rotation_link",
201  "child_frame" : "/bl_caster_l_wheel_link",
202  "translation" : [
203  0,
204  0.049,
205  0
206  ],
207  "rotation" : [
208  0,
209  0.607301261865804,
210  0,
211  0.7944716340664417
212  ]
213  */
214  basic::document tf_doc;
215  tf_doc.append(basic::kvp("timestamp", static_cast<int64_t>(s->stamp.in_msec())));
216  tf_doc.append(basic::kvp("frame", frame_map[s->frame_id]));
217  tf_doc.append(basic::kvp("child_frame", frame_map[s->child_frame_id]));
218  tf_doc.append(basic::kvp("rotation", [s](basic::sub_array rot_array) {
219  rot_array.append(s->rotation.x());
220  rot_array.append(s->rotation.y());
221  rot_array.append(s->rotation.z());
222  rot_array.append(s->rotation.w());
223  }));
224  tf_doc.append(basic::kvp("translation", [s](basic::sub_array trans_array) {
225  trans_array.append(s->translation.x());
226  trans_array.append(s->translation.y());
227  trans_array.append(s->translation.z());
228  }));
229  array.append(tf_doc);
230  }
231  }));
232 
233  try {
234  mongodb_client->database(database_)[collection_].insert_one(document.view());
235  } catch (operation_exception &e) {
236  logger->log_warn(name(), "Inserting TF failed: %s", e.what());
237 
238  } catch (std::exception &e) {
239  // for old and broken compilers
240  logger->log_warn(name(), "Inserting TF failed: %s (*)", e.what());
241  }
242  }
243 }
virtual ~MongoLogTransformsThread()
Destructor.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
virtual void loop()
Code to execute in the thread.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access MongoDB.
Definition: mongodb.h:39
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:54
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
Time wait utility.
Definition: wait.h:33
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:78
A class for handling time.
Definition: time.h:93
Thread aspect to access the transform system.
Definition: tf.h:39
tf::Transformer * tf_listener
This is the transform listener which saves transforms published by other threads in the system.
Definition: tf.h:67
float get_cache_time() const
Get cache time.
void lock()
Lock transformer.
std::vector< TimeCacheInterfacePtr > get_frame_caches() const
Get all currently existing caches.
std::vector< std::string > get_frame_id_mappings() const
Get mappings from frame ID to names.
Fawkes library namespace.