22 #include "mongodb_log_tf_thread.h"
24 #include <core/threading/mutex_locker.h>
25 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
26 #include <tf/time_cache.h>
27 #include <utils/time/wait.h>
29 #include <bsoncxx/builder/basic/document.hpp>
31 #include <mongocxx/client.hpp>
32 #include <mongocxx/exception/operation_exception.hpp>
34 using namespace mongocxx;
47 :
Thread(
"MongoLogTransformsThread",
Thread::OPMODE_CONTINUOUS),
71 collection_ =
config->
get_string(
"/plugins/mongodb-log/transforms/collection");
73 logger->
log_info(
name(),
"No transforms collection configured, using %s", collection_.c_str());
75 collection_ = database_ +
"." + collection_;
77 cfg_storage_interval_ =
config->
get_float(
"/plugins/mongodb-log/transforms/storage-interval");
79 if (cfg_storage_interval_ <= 0.) {
84 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
108 std::vector<fawkes::Time> tf_range_start;
109 std::vector<fawkes::Time> tf_range_end;
114 std::vector<tf::TimeCacheInterfacePtr> copies(caches.size(), tf::TimeCacheInterfacePtr());
116 const size_t n_caches = caches.size();
119 if (last_tf_range_end_.size() != n_caches) {
120 last_tf_range_end_.resize(n_caches,
fawkes::Time(0, 0));
123 unsigned int num_transforms = 0;
124 unsigned int num_upd_caches = 0;
126 for (
size_t i = 0; i < n_caches; ++i) {
128 tf_range_end[i] = caches[i]->get_latest_timestamp();
129 if (last_tf_range_end_[i] != tf_range_end[i]) {
131 if (!tf_range_end[i].is_zero()) {
132 tf_range_start[i] = tf_range_end[i] - cfg_storage_interval_;
133 if (last_tf_range_end_[i] > tf_range_start[i]) {
134 tf_range_start[i] = last_tf_range_end_[i];
137 copies[i] = caches[i]->clone(tf_range_start[i]);
138 last_tf_range_end_[i] = tf_range_end[i];
140 num_transforms += copies[i]->get_list_length();
146 store(copies, tf_range_start, tf_range_end);
152 "%u transforms for %u updated frames stored in %.1f ms",
155 (loop_end - &loop_start) * 1000.);
160 MongoLogTransformsThread::store(std::vector<tf::TimeCacheInterfacePtr> &caches,
161 std::vector<fawkes::Time> & from,
162 std::vector<fawkes::Time> & to)
166 for (
size_t i = 0; i < caches.size(); ++i) {
167 tf::TimeCacheInterfacePtr tc = caches[i];
171 using namespace bsoncxx::builder;
172 basic::document document;
174 document.append(basic::kvp(
"timestamp",
static_cast<int64_t
>(from[i].in_msec())));
175 document.append(basic::kvp(
"timestamp_from",
static_cast<int64_t
>(from[i].in_msec())));
176 document.append(basic::kvp(
"timestamp_to",
static_cast<int64_t
>(to[i].in_msec())));
177 const tf::TimeCache::L_TransformStorage &storage = tc->get_storage();
179 if (storage.empty()) {
189 document.append(basic::kvp(
"frame", frame_map[storage.front().frame_id]));
190 document.append(basic::kvp(
"child_frame", frame_map[storage.front().child_frame_id]));
197 document.append(basic::kvp(
"transforms", [storage, frame_map](basic::sub_array array) {
198 for (
auto s = storage.begin(); s != storage.end(); ++s) {
214 basic::document tf_doc;
215 tf_doc.append(basic::kvp(
"timestamp", static_cast<int64_t>(s->stamp.in_msec())));
216 tf_doc.append(basic::kvp(
"frame", frame_map[s->frame_id]));
217 tf_doc.append(basic::kvp(
"child_frame", frame_map[s->child_frame_id]));
218 tf_doc.append(basic::kvp(
"rotation", [s](basic::sub_array rot_array) {
219 rot_array.append(s->rotation.x());
220 rot_array.append(s->rotation.y());
221 rot_array.append(s->rotation.z());
222 rot_array.append(s->rotation.w());
224 tf_doc.append(basic::kvp(
"translation", [s](basic::sub_array trans_array) {
225 trans_array.append(s->translation.x());
226 trans_array.append(s->translation.y());
227 trans_array.append(s->translation.z());
229 array.append(tf_doc);
234 mongodb_client->database(database_)[collection_].insert_one(document.view());
235 }
catch (operation_exception &e) {
238 }
catch (std::exception &e) {
Clock * clock
By means of this member access to the clock is given.
Configuration * config
This is the Configuration member used to access the configuration.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Thread aspect to access MongoDB.
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Mutex mutual exclusion lock.
void lock()
Lock this mutex.
void unlock()
Unlock the mutex.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
const char * name() const
Get name of thread.
void mark_start()
Mark start of loop.
void wait()
Wait until minimum loop time has been reached.
A class for handling time.
Fawkes library namespace.