Fawkes API  Fawkes Development Version
notifier.cpp
1 
2 /***************************************************************************
3  * notifier.cpp - BlackBoard notifier
4  *
5  * Created: Mon Mar 03 23:28:18 2008
6  * Copyright 2006-2008 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <blackboard/blackboard.h>
25 #include <blackboard/interface_listener.h>
26 #include <blackboard/interface_observer.h>
27 #include <blackboard/internal/notifier.h>
28 #include <core/threading/mutex.h>
29 #include <core/threading/mutex_locker.h>
30 #include <core/utils/lock_hashmap.h>
31 #include <core/utils/lock_hashset.h>
32 #include <interface/interface.h>
33 #include <logging/liblogger.h>
34 
35 #include <algorithm>
36 #include <cstdlib>
37 #include <cstring>
38 #include <fnmatch.h>
39 #include <functional>
40 
41 namespace fawkes {
42 
43 /** @class BlackBoardNotifier <blackboard/internal/notifier.h>
44  * BlackBoard notifier.
45  * This class is used by the BlackBoard to notify listeners and observers
46  * of changes.
47  *
48  * @author Tim Niemueller
49  */
50 
51 /** Constructor. */
53 {
54  bbil_writer_events_ = 0;
55  bbil_writer_mutex_ = new Mutex();
56 
57  bbil_reader_events_ = 0;
58  bbil_reader_mutex_ = new Mutex();
59 
60  bbil_data_events_ = 0;
61  bbil_data_mutex_ = new Mutex();
62 
63  bbil_messages_events_ = 0;
64  bbil_messages_mutex_ = new Mutex();
65 
66  bbio_events_ = 0;
67  bbio_mutex_ = new Mutex();
68 }
69 
70 /** Destructor */
72 {
73  delete bbil_writer_mutex_;
74  delete bbil_reader_mutex_;
75  delete bbil_data_mutex_;
76  delete bbil_messages_mutex_;
77 
78  delete bbio_mutex_;
79 }
80 
81 /** Register BB event listener.
82  * @param listener BlackBoard event listener to register
83  * @param flag concatenation of flags denoting which queue entries should be
84  * processed
85  */
86 void
89 {
90  update_listener(listener, flag);
91 }
92 
93 /** Update BB event listener.
94  * @param listener BlackBoard event listener to update subscriptions of
95  * @param flag concatenation of flags denoting which queue entries should be
96  * processed
97  */
98 void
101 {
102  const BlackBoardInterfaceListener::InterfaceQueue &queue = listener->bbil_acquire_queue();
103 
104  BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
105 
106  for (i = queue.begin(); i != queue.end(); ++i) {
107  switch (i->type) {
109  if (flag & BlackBoard::BBIL_FLAG_DATA) {
110  proc_listener_maybe_queue(i->op,
111  i->interface,
112  listener,
113  bbil_data_mutex_,
114  bbil_data_events_,
115  bbil_data_,
116  bbil_data_queue_,
117  "data");
118  }
119  break;
121  if (flag & BlackBoard::BBIL_FLAG_MESSAGES) {
122  proc_listener_maybe_queue(i->op,
123  i->interface,
124  listener,
125  bbil_messages_mutex_,
126  bbil_messages_events_,
127  bbil_messages_,
128  bbil_messages_queue_,
129  "messages");
130  }
131  break;
133  if (flag & BlackBoard::BBIL_FLAG_READER) {
134  proc_listener_maybe_queue(i->op,
135  i->interface,
136  listener,
137  bbil_reader_mutex_,
138  bbil_reader_events_,
139  bbil_reader_,
140  bbil_reader_queue_,
141  "reader");
142  }
143  break;
145  if (flag & BlackBoard::BBIL_FLAG_WRITER) {
146  proc_listener_maybe_queue(i->op,
147  i->interface,
148  listener,
149  bbil_writer_mutex_,
150  bbil_writer_events_,
151  bbil_writer_,
152  bbil_writer_queue_,
153  "writer");
154  }
155  break;
156  default: break;
157  }
158  }
159 
160  listener->bbil_release_queue(flag);
161 }
162 
163 void
164 BlackBoardNotifier::proc_listener_maybe_queue(bool op,
165  Interface * interface,
166  BlackBoardInterfaceListener *listener,
167  Mutex * mutex,
168  unsigned int & events,
169  BBilMap & map,
170  BBilQueue & queue,
171  const char * hint)
172 {
173  MutexLocker lock(mutex);
174  if (events > 0) {
175  LibLogger::log_warn("BlackBoardNotifier",
176  "%s interface "
177  "listener %s for %s events (queued)",
178  op ? "Registering" : "Unregistering",
179  listener->bbil_name(),
180  hint);
181 
182  queue_listener(op, interface, listener, queue);
183  } else {
184  if (op) { // add
185  add_listener(interface, listener, map);
186  } else {
187  remove_listener(interface, listener, map);
188  }
189  }
190 }
191 
192 /** Unregister BB interface listener.
193  * This will remove the given BlackBoard interface listener from any
194  * event that it was previously registered for.
195  * @param listener BlackBoard event listener to remove
196  */
197 void
199 {
200  const BlackBoardInterfaceListener::InterfaceMaps maps = listener->bbil_acquire_maps();
201 
202  BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
203  for (i = maps.data.begin(); i != maps.data.end(); ++i) {
204  proc_listener_maybe_queue(false,
205  i->second,
206  listener,
207  bbil_data_mutex_,
208  bbil_data_events_,
209  bbil_data_,
210  bbil_data_queue_,
211  "data");
212  }
213 
214  for (i = maps.messages.begin(); i != maps.messages.end(); ++i) {
215  proc_listener_maybe_queue(false,
216  i->second,
217  listener,
218  bbil_messages_mutex_,
219  bbil_messages_events_,
220  bbil_messages_,
221  bbil_messages_queue_,
222  "messages");
223  }
224 
225  for (i = maps.reader.begin(); i != maps.reader.end(); ++i) {
226  proc_listener_maybe_queue(false,
227  i->second,
228  listener,
229  bbil_reader_mutex_,
230  bbil_reader_events_,
231  bbil_reader_,
232  bbil_reader_queue_,
233  "reader");
234  }
235 
236  for (i = maps.writer.begin(); i != maps.writer.end(); ++i) {
237  proc_listener_maybe_queue(false,
238  i->second,
239  listener,
240  bbil_writer_mutex_,
241  bbil_writer_events_,
242  bbil_writer_,
243  bbil_writer_queue_,
244  "writer");
245  }
246 
247  listener->bbil_release_maps();
248 }
249 
250 /** Add listener for specified map.
251  * @param listener interface listener for events
252  * @param im map of interfaces to listen for
253  * @param ilmap internal map to add listener to
254  */
255 void
256 BlackBoardNotifier::add_listener(Interface * interface,
257  BlackBoardInterfaceListener *listener,
258  BBilMap & ilmap)
259 {
260  std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
261 
262  BBilMap::value_type v = std::make_pair(interface->uid(), listener);
263  BBilMap::iterator f = std::find(ret.first, ret.second, v);
264 
265  if (f == ret.second) {
266  ilmap.insert(std::make_pair(interface->uid(), listener));
267  }
268 }
269 
270 void
271 BlackBoardNotifier::remove_listener(Interface * interface,
272  BlackBoardInterfaceListener *listener,
273  BBilMap & ilmap)
274 {
275  std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
276  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
277  if (j->second == listener) {
278  ilmap.erase(j);
279  break;
280  }
281  }
282 }
283 
284 bool
285 BlackBoardNotifier::is_in_queue(bool op,
286  BBilQueue & queue,
287  const char * uid,
288  BlackBoardInterfaceListener *bbil)
289 {
290  BBilQueue::iterator q;
291  for (q = queue.begin(); q != queue.end(); ++q) {
292  if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
293  return true;
294  }
295  }
296  return false;
297 }
298 
299 void
300 BlackBoardNotifier::queue_listener(bool op,
301  Interface * interface,
302  BlackBoardInterfaceListener *listener,
303  BBilQueue & queue)
304 {
305  BBilQueueEntry qe = {op, interface->uid(), interface, listener};
306  queue.push_back(qe);
307 }
308 
309 /** Register BB interface observer.
310  * @param observer BlackBoard interface observer to register
311  */
312 void
314 {
315  bbio_mutex_->lock();
316  if (bbio_events_ > 0) {
317  bbio_queue_.push_back(std::make_pair(1, observer));
318  } else {
319  add_observer(observer, observer->bbio_get_observed_create(), bbio_created_);
320  add_observer(observer, observer->bbio_get_observed_destroy(), bbio_destroyed_);
321  }
322  bbio_mutex_->unlock();
323 }
324 
325 void
326 BlackBoardNotifier::add_observer(BlackBoardInterfaceObserver * observer,
328  BBioMap & bbiomap)
329 {
331  its->lock();
332  for (i = its->begin(); i != its->end(); ++i) {
333  bbiomap[i->first].push_back(make_pair(observer, i->second));
334  }
335  its->unlock();
336 }
337 
338 /** Remove observer from map.
339  * @param iomap interface observer map to remove the observer from
340  * @param observer observer to remove
341  */
342 void
343 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
344 {
345  BBioMapIterator i, tmp;
346 
347  i = iomap.begin();
348  while (i != iomap.end()) {
349  BBioListIterator j = i->second.begin();
350  while (j != i->second.end()) {
351  if (j->first == observer) {
352  j = i->second.erase(j);
353  } else {
354  ++j;
355  }
356  }
357  if (i->second.empty()) {
358  tmp = i;
359  ++i;
360  iomap.erase(tmp);
361  } else {
362  ++i;
363  }
364  }
365 }
366 
367 /** Unregister BB interface observer.
368  * This will remove the given BlackBoard event listener from any event that it was
369  * previously registered for.
370  * @param observer BlackBoard event listener to remove
371  */
372 void
374 {
375  MutexLocker lock(bbio_mutex_);
376  if (bbio_events_ > 0) {
377  BBioQueueEntry e = std::make_pair((unsigned int)0, observer);
378  BBioQueue::iterator re;
379  while ((re = find_if(bbio_queue_.begin(),
380  bbio_queue_.end(),
381  bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
382  != bbio_queue_.end()) {
383  // if there is an entry in the register queue, remove it!
384  if (re->second == observer) {
385  bbio_queue_.erase(re);
386  }
387  }
388  bbio_queue_.push_back(std::make_pair(0, observer));
389 
390  } else {
391  remove_observer(bbio_created_, observer);
392  remove_observer(bbio_destroyed_, observer);
393  }
394 }
395 
396 /** Notify that an interface has been created.
397  * @param type type of the interface
398  * @param id ID of the interface
399  */
400 void
401 BlackBoardNotifier::notify_of_interface_created(const char *type, const char *id) noexcept
402 {
403  bbio_mutex_->lock();
404  bbio_events_ += 1;
405  bbio_mutex_->unlock();
406 
407  BBioMapIterator lhmi;
408  BBioListIterator i, l;
409  for (lhmi = bbio_created_.begin(); lhmi != bbio_created_.end(); ++lhmi) {
410  if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
411  continue;
412 
413  BBioList &list = lhmi->second;
414  for (i = list.begin(); i != list.end(); ++i) {
415  BlackBoardInterfaceObserver *bbio = i->first;
416  for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
417  if (fnmatch(pi->c_str(), id, 0) == 0) {
418  bbio->bb_interface_created(type, id);
419  break;
420  }
421  }
422  }
423  }
424 
425  bbio_mutex_->lock();
426  bbio_events_ -= 1;
427  process_bbio_queue();
428  bbio_mutex_->unlock();
429 }
430 
431 /** Notify that an interface has been destroyed.
432  * @param type type of the interface
433  * @param id ID of the interface
434  */
435 void
436 BlackBoardNotifier::notify_of_interface_destroyed(const char *type, const char *id) noexcept
437 {
438  bbio_mutex_->lock();
439  bbio_events_ += 1;
440  bbio_mutex_->unlock();
441 
442  BBioMapIterator lhmi;
443  BBioListIterator i, l;
444  for (lhmi = bbio_destroyed_.begin(); lhmi != bbio_destroyed_.end(); ++lhmi) {
445  if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
446  continue;
447 
448  BBioList &list = (*lhmi).second;
449  for (i = list.begin(); i != list.end(); ++i) {
450  BlackBoardInterfaceObserver *bbio = i->first;
451  for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
452  if (fnmatch(pi->c_str(), id, 0) == 0) {
453  bbio->bb_interface_destroyed(type, id);
454  break;
455  }
456  }
457  }
458  }
459 
460  bbio_mutex_->lock();
461  bbio_events_ -= 1;
462  process_bbio_queue();
463  bbio_mutex_->unlock();
464 }
465 
466 void
467 BlackBoardNotifier::process_bbio_queue()
468 {
469  if (!bbio_queue_.empty()) {
470  if (bbio_events_ > 0) {
471  return;
472  } else {
473  while (!bbio_queue_.empty()) {
474  BBioQueueEntry &e = bbio_queue_.front();
475  if (e.first) { // register
476  add_observer(e.second, e.second->bbio_get_observed_create(), bbio_created_);
477  add_observer(e.second, e.second->bbio_get_observed_destroy(), bbio_destroyed_);
478  } else { // unregister
479  remove_observer(bbio_created_, e.second);
480  remove_observer(bbio_destroyed_, e.second);
481  }
482  bbio_queue_.pop_front();
483  }
484  }
485  }
486 }
487 
488 /** Notify that writer has been added.
489  * @param interface the interface for which the event happened. It is not necessarily the
490  * instance which caused the event, but it must have the same mem serial.
491  * @param event_instance_serial the instance serial of the interface that caused the event
492  * @see BlackBoardInterfaceListener::bb_interface_writer_added()
493  */
494 void
496  Uuid event_instance_serial) noexcept
497 {
498  bbil_writer_mutex_->lock();
499  bbil_writer_events_ += 1;
500  bbil_writer_mutex_->unlock();
501 
502  const char * uid = interface->uid();
503  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
504  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
505  BlackBoardInterfaceListener *bbil = j->second;
506  if (!is_in_queue(/* remove op*/ false, bbil_writer_queue_, uid, bbil)) {
507  Interface *bbil_iface = bbil->bbil_writer_interface(uid);
508  if (bbil_iface != NULL) {
509  bbil->bb_interface_writer_added(bbil_iface, event_instance_serial);
510  } else {
511  LibLogger::log_warn("BlackBoardNotifier",
512  "BBIL[%s] registered for writer events "
513  "(open) for '%s' but has no such interface",
514  bbil->bbil_name(),
515  uid);
516  }
517  }
518  }
519 
520  bbil_writer_mutex_->lock();
521  bbil_writer_events_ -= 1;
522  process_writer_queue();
523  bbil_writer_mutex_->unlock();
524 }
525 
526 /** Notify that writer has been removed.
527  * @param interface interface for which the writer has been removed
528  * @param event_instance_serial instance serial of the interface that caused the event
529  * @see BlackBoardInterfaceListener::bb_interface_writer_removed()
530  */
531 void
533  Uuid event_instance_serial) noexcept
534 {
535  bbil_writer_mutex_->lock();
536  bbil_writer_events_ += 1;
537  bbil_writer_mutex_->unlock();
538 
539  const char * uid = interface->uid();
540  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
541  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
542  BlackBoardInterfaceListener *bbil = j->second;
543  if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
544  Interface *bbil_iface = bbil->bbil_writer_interface(uid);
545  if (bbil_iface != NULL) {
546  bbil->bb_interface_writer_removed(bbil_iface, event_instance_serial);
547  } else {
548  LibLogger::log_warn("BlackBoardNotifier",
549  "BBIL[%s] registered for writer events "
550  "(close) for '%s' but has no such interface",
551  bbil->bbil_name(),
552  uid);
553  }
554  }
555  }
556 
557  bbil_writer_mutex_->lock();
558  bbil_writer_events_ -= 1;
559  process_writer_queue();
560  bbil_writer_mutex_->unlock();
561 }
562 
563 void
564 BlackBoardNotifier::process_writer_queue()
565 {
566  if (!bbil_writer_queue_.empty()) {
567  if (bbil_writer_events_ > 0) {
568  return;
569  } else {
570  while (!bbil_writer_queue_.empty()) {
571  BBilQueueEntry &e = bbil_writer_queue_.front();
572  if (e.op) { // register
573  add_listener(e.interface, e.listener, bbil_writer_);
574  } else { // unregister
575  remove_listener(e.interface, e.listener, bbil_writer_);
576  }
577  bbil_writer_queue_.pop_front();
578  }
579  }
580  }
581 }
582 
583 /** Notify that reader has been added.
584  * @param interface interface for which the reader has been added
585  * @param event_instance_serial instance serial of the interface that caused the event
586  * @see BlackBoardInterfaceListener::bb_interface_reader_added()
587  */
588 void
590  Uuid event_instance_serial) noexcept
591 {
592  bbil_reader_mutex_->lock();
593  bbil_reader_events_ += 1;
594  bbil_reader_mutex_->unlock();
595 
596  const char * uid = interface->uid();
597  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
598  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
599  BlackBoardInterfaceListener *bbil = j->second;
600  if (!is_in_queue(/* remove op*/ false, bbil_reader_queue_, uid, bbil)) {
601  Interface *bbil_iface = bbil->bbil_reader_interface(uid);
602  if (bbil_iface != NULL) {
603  bbil->bb_interface_reader_added(bbil_iface, event_instance_serial);
604  } else {
605  LibLogger::log_warn("BlackBoardNotifier",
606  "BBIL[%s] registered for reader events "
607  "(open) for '%s' but has no such interface",
608  bbil->bbil_name(),
609  uid);
610  }
611  }
612  }
613 
614  bbil_reader_mutex_->lock();
615  bbil_reader_events_ -= 1;
616  process_reader_queue();
617  bbil_reader_mutex_->unlock();
618 }
619 
620 /** Notify that reader has been removed.
621  * @param interface interface for which the reader has been removed
622  * @param event_instance_serial instance serial of the interface that caused the event
623  * @see BlackBoardInterfaceListener::bb_interface_reader_removed()
624  */
625 void
627  Uuid event_instance_serial) noexcept
628 {
629  bbil_reader_mutex_->lock();
630  bbil_reader_events_ += 1;
631  bbil_reader_mutex_->unlock();
632 
633  const char * uid = interface->uid();
634  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
635  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
636  BlackBoardInterfaceListener *bbil = j->second;
637  if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
638  Interface *bbil_iface = bbil->bbil_reader_interface(uid);
639  if (bbil_iface != NULL) {
640  bbil->bb_interface_reader_removed(bbil_iface, event_instance_serial);
641  } else {
642  LibLogger::log_warn("BlackBoardNotifier",
643  "BBIL[%s] registered for reader events "
644  "(close) for '%s' but has no such interface",
645  bbil->bbil_name(),
646  uid);
647  }
648  }
649  }
650 
651  bbil_reader_mutex_->lock();
652  bbil_reader_events_ -= 1;
653  process_reader_queue();
654  bbil_reader_mutex_->unlock();
655 }
656 
657 void
658 BlackBoardNotifier::process_reader_queue()
659 {
660  if (!bbil_reader_queue_.empty()) {
661  if (bbil_reader_events_ > 0) {
662  return;
663  } else {
664  while (!bbil_reader_queue_.empty()) {
665  BBilQueueEntry &e = bbil_reader_queue_.front();
666  if (e.op) { // register
667  add_listener(e.interface, e.listener, bbil_reader_);
668  } else { // unregister
669  remove_listener(e.interface, e.listener, bbil_reader_);
670  }
671  bbil_reader_queue_.pop_front();
672  }
673  }
674  }
675 }
676 
677 /** Notify of data change.
678  * Notify all subscribers of the given interface of a data change.
679  * This also influences logging and sending data over the network so it is
680  * mandatory to call this function! The interface base class write method does
681  * that for you.
682  * @param interface interface whose subscribers to notify
683  * @param has_changed whether the current data is different from the last time write() was
684  * called on the interface
685  * @see Interface::write()
686  * @see BlackBoardInterfaceListener::bb_interface_data_changed()
687  */
688 void
689 BlackBoardNotifier::notify_of_data_refresh(const Interface *interface, bool has_changed)
690 {
691  bbil_data_mutex_->lock();
692  bbil_data_events_ += 1;
693  bbil_data_mutex_->unlock();
694 
695  const char * uid = interface->uid();
696  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_data_.equal_range(uid);
697  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
698  BlackBoardInterfaceListener *bbil = j->second;
699  if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
700  Interface *bbil_iface = bbil->bbil_data_interface(uid);
701  if (bbil_iface != NULL) {
702  bbil->bb_interface_data_refreshed(bbil_iface);
703  if (has_changed)
704  bbil->bb_interface_data_changed(bbil_iface);
705  } else {
706  LibLogger::log_warn("BlackBoardNotifier",
707  "BBIL[%s] registered for data change events "
708  "for '%s' but has no such interface",
709  bbil->bbil_name(),
710  uid);
711  }
712  }
713  }
714 
715  bbil_data_mutex_->lock();
716  bbil_data_events_ -= 1;
717  if (!bbil_data_queue_.empty()) {
718  if (bbil_data_events_ == 0) {
719  while (!bbil_data_queue_.empty()) {
720  BBilQueueEntry &e = bbil_data_queue_.front();
721  if (e.op) { // register
722  add_listener(e.interface, e.listener, bbil_data_);
723  } else { // unregister
724  remove_listener(e.interface, e.listener, bbil_data_);
725  }
726  bbil_data_queue_.pop_front();
727  }
728  }
729  }
730  bbil_data_mutex_->unlock();
731 }
732 
733 /** Notify of message received
734  * Notify all subscribers of the given interface of an incoming message
735  * This also influences logging and sending data over the network so it is
736  * mandatory to call this function! The interface base class write method does
737  * that for you.
738  * @param interface interface whose subscribers to notify
739  * @param message message which is being received
740  * @return false if any listener returned false, true otherwise
741  * @see BlackBoardInterfaceListener::bb_interface_message_received()
742  */
743 bool
745 {
746  bbil_messages_mutex_->lock();
747  bbil_messages_events_ += 1;
748  bbil_messages_mutex_->unlock();
749 
750  bool enqueue = true;
751 
752  const char * uid = interface->uid();
753  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_messages_.equal_range(uid);
754  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
755  BlackBoardInterfaceListener *bbil = j->second;
756  if (!is_in_queue(/* remove op*/ false, bbil_messages_queue_, uid, bbil)) {
757  Interface *bbil_iface = bbil->bbil_message_interface(uid);
758  if (bbil_iface != NULL) {
759  bool abort = !bbil->bb_interface_message_received(bbil_iface, message);
760  if (abort) {
761  enqueue = false;
762  break;
763  }
764  } else {
765  LibLogger::log_warn("BlackBoardNotifier",
766  "BBIL[%s] registered for message events "
767  "for '%s' but has no such interface",
768  bbil->bbil_name(),
769  uid);
770  }
771  }
772  }
773 
774  bbil_messages_mutex_->lock();
775  bbil_messages_events_ -= 1;
776  if (!bbil_messages_queue_.empty()) {
777  if (bbil_messages_events_ == 0) {
778  while (!bbil_messages_queue_.empty()) {
779  BBilQueueEntry &e = bbil_messages_queue_.front();
780  if (e.op) { // register
781  add_listener(e.interface, e.listener, bbil_messages_);
782  } else { // unregister
783  remove_listener(e.interface, e.listener, bbil_messages_);
784  }
785  bbil_messages_queue_.pop_front();
786  }
787  }
788  }
789  bbil_messages_mutex_->unlock();
790 
791  return enqueue;
792 }
793 
794 } // end namespace fawkes
BlackBoard interface listener.
Interface * bbil_reader_interface(const char *iuid) noexcept
Get interface instance for given UID.
virtual void bb_interface_writer_added(Interface *interface, Uuid instance_serial) noexcept
A writing instance has been opened for a watched interface.
@ MESSAGES
Message received event entry.
@ DATA
Data changed event entry.
virtual bool bb_interface_message_received(Interface *interface, Message *message) noexcept
BlackBoard message received notification.
virtual void bb_interface_data_refreshed(Interface *interface) noexcept
BlackBoard data refreshed notification.
Interface * bbil_message_interface(const char *iuid) noexcept
Get interface instance for given UID.
virtual void bb_interface_data_changed(Interface *interface) noexcept
BlackBoard data changed notification.
virtual void bb_interface_writer_removed(Interface *interface, Uuid instance_serial) noexcept
A writing instance has been closed for a watched interface.
virtual void bb_interface_reader_added(Interface *interface, Uuid instance_serial) noexcept
A reading instance has been opened for a watched interface.
const char * bbil_name() const
Get BBIL name.
Interface * bbil_writer_interface(const char *iuid) noexcept
Get interface instance for given UID.
std::list< QueueEntry > InterfaceQueue
Queue of additions/removal of interfaces.
virtual void bb_interface_reader_removed(Interface *interface, Uuid instance_serial) noexcept
A reading instance has been closed for a watched interface.
Interface * bbil_data_interface(const char *iuid) noexcept
Get interface instance for given UID.
BlackBoard interface observer.
ObservedInterfaceLockMap::iterator ObservedInterfaceLockMapIterator
Type for iterator of lockable interface type hash sets.
ObservedInterfaceLockMap * bbio_get_observed_destroy() noexcept
Get interface destriction type watch list.
ObservedInterfaceLockMap * bbio_get_observed_create() noexcept
Get interface creation type watch list.
virtual void bb_interface_created(const char *type, const char *id) noexcept
BlackBoard interface created notification.
virtual void bb_interface_destroyed(const char *type, const char *id) noexcept
BlackBoard interface destroyed notification.
BlackBoardNotifier()
Constructor.
Definition: notifier.cpp:52
void notify_of_writer_added(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that writer has been added.
Definition: notifier.cpp:495
void notify_of_writer_removed(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that writer has been removed.
Definition: notifier.cpp:532
void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: notifier.cpp:198
void notify_of_interface_destroyed(const char *type, const char *id) noexcept
Notify that an interface has been destroyed.
Definition: notifier.cpp:436
void notify_of_reader_added(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that reader has been added.
Definition: notifier.cpp:589
virtual ~BlackBoardNotifier()
Destructor.
Definition: notifier.cpp:71
void notify_of_reader_removed(const Interface *interface, Uuid event_instance_serial) noexcept
Notify that reader has been removed.
Definition: notifier.cpp:626
void notify_of_data_refresh(const Interface *interface, bool has_changed)
Notify of data change.
Definition: notifier.cpp:689
void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: notifier.cpp:373
void register_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Register BB event listener.
Definition: notifier.cpp:87
void notify_of_interface_created(const char *type, const char *id) noexcept
Notify that an interface has been created.
Definition: notifier.cpp:401
void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: notifier.cpp:313
void update_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Update BB event listener.
Definition: notifier.cpp:99
bool notify_of_message_received(const Interface *interface, Message *message)
Notify of message received Notify all subscribers of the given interface of an incoming message This ...
Definition: notifier.cpp:744
ListenerRegisterFlag
Flags to constrain listener registration/updates.
Definition: blackboard.h:87
@ BBIL_FLAG_READER
consider reader events
Definition: blackboard.h:90
@ BBIL_FLAG_DATA
consider data events
Definition: blackboard.h:88
@ BBIL_FLAG_WRITER
consider writer events
Definition: blackboard.h:91
@ BBIL_FLAG_MESSAGES
consider message received events
Definition: blackboard.h:89
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:686
static void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: liblogger.cpp:156
void lock() const
Lock list.
Definition: lock_map.h:91
void unlock() const
Unlock list.
Definition: lock_map.h:109
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Definition: message.h:44
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
A convenience class for universally unique identifiers (UUIDs).
Definition: uuid.h:29
Fawkes library namespace.
Structure to hold maps for active subscriptions.
InterfaceMap writer
Writer event subscriptions.
InterfaceMap messages
Message received event subscriptions.
InterfaceMap data
Data event subscriptions.
InterfaceMap reader
Reader event subscriptions.