23 #include "sync_thread.h"
25 #include <blackboard/remote.h>
26 #include <core/threading/mutex_locker.h>
27 #include <utils/time/wait.h>
46 std::string &peer_cfg_prefix,
50 set_name(
"BBSyncThread[%s]", peer.c_str());
53 bbsync_cfg_prefix_ = bbsync_cfg_prefix;
54 peer_cfg_prefix_ = peer_cfg_prefix;
69 unsigned int check_interval = 0;
74 check_interval =
config->
get_uint((bbsync_cfg_prefix_ +
"check_interval").c_str());
76 e.
append(
"Host or port not specified for peer");
81 check_interval =
config->
get_uint((peer_cfg_prefix_ +
"check_interval").c_str());
87 read_config_combos(peer_cfg_prefix_ +
"reading/",
false);
88 read_config_combos(peer_cfg_prefix_ +
"writing/",
true);
90 for (ComboMap::iterator i = combos_.begin(); i != combos_.end(); ++i) {
92 "Combo: %s, %s (%s, R) -> %s (%s, W)",
93 i->second.type.c_str(),
94 i->second.reader_id.c_str(),
95 i->second.remote_writer ?
"local" :
"remote",
96 i->second.writer_id.c_str(),
97 i->second.remote_writer ?
"remote" :
"local");
103 if (!check_connection()) {
133 BlackBoardSynchronizationThread::check_connection()
135 if (!remote_bb_ || !remote_bb_->
is_alive()) {
138 "Lost connection via remote BB to %s (%s:%u), will try to re-establish",
152 "Successfully connected via remote BB to %s (%s:%u)",
169 BlackBoardSynchronizationThread::read_config_combos(std::string prefix,
bool writing)
173 if (strcmp(i->
type(),
"string") != 0) {
175 "but found value of type %s",
182 std::string varname = std::string(i->
path()).substr(prefix.length());
186 if ((sf = uid.find(
"::")) == std::string::npos) {
188 throw Exception(
"Interface UID '%s' at %s is not valid, missing double colon",
193 std::string type = uid.substr(0, sf);
194 std::string
id = uid.substr(sf + 2);
195 combo_t combo = {type, id, id, writing};
197 if ((sf =
id.find(
"=")) != std::string::npos) {
199 combo.reader_id =
id.substr(0, sf);
200 combo.writer_id =
id.substr(sf + 1);
203 combos_[varname] = combo;
209 BlackBoardSynchronizationThread::open_interfaces()
214 ComboMap::iterator i;
215 for (i = combos_.begin(); i != combos_.end(); ++i) {
216 Interface *iface_reader = NULL, *iface_writer = NULL;
223 "Opening reading %s (%s:%s)",
224 i->second.remote_writer ?
"locally" :
"remotely",
225 i->second.type.c_str(),
226 i->second.reader_id.c_str());
228 reader_bb->
open_for_reading(i->second.type.c_str(), i->second.reader_id.c_str());
230 "Opened interface with serial %s",
235 "Opening writing on %s (%s:%s)",
236 i->second.remote_writer ?
"remotely" :
"locally",
237 i->second.type.c_str(),
238 i->second.writer_id.c_str());
240 writer_bb->
open_for_writing(i->second.type.c_str(), i->second.writer_id.c_str());
243 InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
244 interfaces_[iface_reader] = ii;
247 reader_bb->
close(iface_reader);
248 writer_bb->
close(iface_writer);
258 sync_listeners_[iface_reader] = sync_listener;
260 if (i->second.remote_writer) {
269 BlackBoardSynchronizationThread::close_interfaces()
271 SyncListenerMap::iterator s;
272 for (s = sync_listeners_.begin(); s != sync_listeners_.end(); ++s) {
279 InterfaceMap::iterator i;
280 for (i = interfaces_.begin(); i != interfaces_.end(); ++i) {
282 "Closing %s reading interface %s",
283 i->second.combo->remote_writer ?
"local" :
"remote",
285 if (i->second.combo->remote_writer) {
290 remote_bb_->
close(i->first);
292 if (i->second.writer) {
294 "Closing %s writing interface %s",
295 i->second.combo->remote_writer ?
"remote" :
"local",
296 i->second.writer->uid());
297 if (i->second.combo->remote_writer) {
298 remote_bb_->
close(i->second.writer);
305 sync_listeners_.clear();
317 if (interfaces_[interface].writer) {
320 "Writer added for %s, but relay exists already. Bug?",
323 logger->
log_warn(name(),
"Writer added for %s, opening relay writer", interface->uid());
327 InterfaceInfo & ii = interfaces_[interface];
329 iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(), ii.combo->writer_id.c_str());
332 "Creating sync listener for %s:%s-%s",
333 ii.combo->type.c_str(),
334 ii.combo->reader_id.c_str(),
335 ii.combo->writer_id.c_str());
340 sync_listeners_[interface] = sync_listener;
344 delete sync_listener;
345 ii.writer_bb->close(iface);
347 "Failed to open writer for %s:%s-%s, sync broken",
348 ii.combo->type.c_str(),
349 ii.combo->reader_id.c_str(),
350 ii.combo->writer_id.c_str());
365 if (!interfaces_[interface].writer) {
367 logger->
log_warn(name(),
"Writer removed for %s, but no relay exists. Bug?", interface->uid());
369 logger->
log_warn(name(),
"Writer removed for %s, closing relay writer", interface->uid());
371 InterfaceInfo &ii = interfaces_[interface];
373 delete sync_listeners_[interface];
374 sync_listeners_[interface] = NULL;
376 ii.writer_bb->close(ii.writer);
381 "Failed to close writer for %s:%s-%s, sync broken",
382 ii.combo->type.c_str(),
383 ii.combo->reader_id.c_str(),
384 ii.combo->writer_id.c_str());
void writer_removed(fawkes::Interface *interface) noexcept
A writer has been removed for an interface.
virtual void init()
Initialize the thread.
virtual void finalize()
Finalize the thread.
virtual void loop()
Code to execute in the thread.
void writer_added(fawkes::Interface *interface) noexcept
A writer has been added for an interface.
virtual ~BlackBoardSynchronizationThread()
Destructor.
BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix, std::string &peer_cfg_prefix, std::string &peer)
Constructor.
Synchronize two interfaces.
Listener for writer events in bbsync plugin.
void add_interface(fawkes::Interface *interface)
Add an interface to listen to.
void remove_interface(fawkes::Interface *interface)
Remove an interface to listen to.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
The BlackBoard abstract class.
virtual bool is_alive() const noexcept=0
Check if the BlackBoard is still alive.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Configuration * config
This is the Configuration member used to access the configuration.
Iterator interface to iterate over config values.
virtual const char * path() const =0
Path of value.
virtual bool next()=0
Check if there is another element and advance to this if possible.
virtual const char * type() const =0
Type of value.
virtual std::string get_string() const =0
Get string value.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
void print_trace() noexcept
Prints trace to stderr.
void append(const char *format,...) noexcept
Append messages to the message list.
Base class for all Fawkes BlackBoard interfaces.
Uuid serial() const
Get instance serial of interface.
bool has_writer() const
Check if there is a writer for the interface.
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
virtual void log_error(const char *component, const char *format,...)
Log error message.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
const char * name() const
Get name of thread.
void set_name(const char *format,...)
Set name of thread.
void mark_start()
Mark start of loop.
void wait_systime()
Wait until minimum loop time has been reached in real time.
std::string get_string() const
Get the string representation of the Uuid.
Fawkes library namespace.