24 #include <core/exceptions/system.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/mutex_locker.h>
27 #include <core/threading/thread.h>
28 #include <core/threading/wait_condition.h>
29 #include <netcomm/fawkes/client.h>
30 #include <netcomm/fawkes/client_handler.h>
31 #include <netcomm/fawkes/message_queue.h>
32 #include <netcomm/fawkes/transceiver.h>
33 #include <netcomm/socket/stream.h>
34 #include <netcomm/utils/exceptions.h>
53 :
Exception(
"A handler for this component has already been registered")
75 outbound_mutex_ =
new Mutex();
79 outbound_msgq_ = outbound_msgqs_[0];
80 outbound_havemore_ =
false;
86 for (
unsigned int i = 0; i < 2; ++i) {
87 while (!outbound_msgqs_[i]->empty()) {
90 outbound_msgqs_[i]->pop();
93 delete outbound_msgqs_[0];
94 delete outbound_msgqs_[1];
95 delete outbound_mutex_;
101 parent_->set_send_slave_alive();
110 while (outbound_havemore_) {
111 outbound_mutex_->
lock();
112 outbound_havemore_ =
false;
114 outbound_active_ = 1 - outbound_active_;
115 outbound_msgq_ = outbound_msgqs_[outbound_active_];
116 outbound_mutex_->
unlock();
122 parent_->connection_died();
155 outbound_mutex_->
lock();
156 outbound_msgq_->push(message);
157 outbound_havemore_ =
true;
158 outbound_mutex_->
unlock();
173 Mutex * outbound_mutex_;
174 unsigned int outbound_active_;
175 bool outbound_havemore_;
195 :
Thread(
"FawkesNetworkClientRecvThread")
200 recv_mutex_ = recv_mutex;
206 while (!inbound_msgq_->empty()) {
209 inbound_msgq_->pop();
211 delete inbound_msgq_;
218 std::list<unsigned int> wakeup_list;
225 inbound_msgq_->
lock();
226 while (!inbound_msgq_->empty()) {
228 wakeup_list.push_back(m->
cid());
229 parent_->dispatch_message(m);
231 inbound_msgq_->pop();
238 wakeup_list.unique();
239 for (std::list<unsigned int>::iterator i = wakeup_list.begin(); i != wakeup_list.end(); ++i) {
240 parent_->wake_handlers(*i);
250 parent_->set_recv_slave_alive();
268 parent_->connection_died();
275 parent_->connection_died();
310 host_ = strdup(host);
319 connection_died_recently =
false;
320 send_slave_alive_ =
false;
321 recv_slave_alive_ =
false;
323 slave_status_mutex =
new Mutex();
328 recv_mutex_ =
new Mutex();
330 connest_mutex_ =
new Mutex();
333 connest_interrupted_ =
false;
351 connection_died_recently =
false;
352 send_slave_alive_ =
false;
353 recv_slave_alive_ =
false;
355 slave_status_mutex =
new Mutex();
360 recv_mutex_ =
new Mutex();
362 connest_mutex_ =
new Mutex();
365 connest_interrupted_ =
false;
375 host_ = strdup(host);
384 connection_died_recently =
false;
385 send_slave_alive_ =
false;
386 recv_slave_alive_ =
false;
388 slave_status_mutex =
new Mutex();
393 recv_mutex_ =
new Mutex();
395 connest_mutex_ =
new Mutex();
398 connest_interrupted_ =
false;
411 delete slave_status_mutex;
413 delete connest_waitcond_;
414 delete connest_mutex_;
415 delete recv_waitcond_;
426 if (host_ == NULL && addr_ == NULL) {
434 connection_died_recently =
false;
446 send_slave_->
start();
448 recv_slave_->
start();
450 connection_died_recently =
true;
463 send_slave_alive_ =
false;
464 recv_slave_alive_ =
false;
470 connest_mutex_->
lock();
471 while (!connest_ && !connest_interrupted_) {
472 connest_waitcond_->
wait();
474 bool interrupted = connest_interrupted_;
475 connest_interrupted_ =
false;
481 notify_of_connection_established();
495 host_ = strdup(host);
512 addr_ = (
struct sockaddr *)malloc(addr_len);
513 addr_len_ = addr_len;
514 memcpy(addr_, addr, addr_len);
515 host_ = strdup(hostname);
530 addr_ = (
struct sockaddr *)malloc(
sizeof(sockaddr_storage));
531 addr_len_ =
sizeof(sockaddr_storage);
532 memcpy(addr_, &addr, addr_len_);
533 host_ = strdup(hostname);
544 if (send_slave_alive_) {
545 if (!connection_died_recently) {
555 if (recv_slave_alive_) {
561 send_slave_alive_ =
false;
562 recv_slave_alive_ =
false;
566 if (!connection_died_recently) {
578 connest_mutex_->
lock();
579 connest_interrupted_ =
true;
616 if (send_slave_ && recv_slave_) {
618 if (recv_received_.find(message->
cid()) != recv_received_.end()) {
620 unsigned int cid = message->
cid();
621 throw Exception(
"There is already a thread waiting for messages of "
626 unsigned int cid = message->
cid();
627 recv_received_[cid] =
false;
628 while (!recv_received_[cid] && !connection_died_recently) {
630 recv_received_.erase(cid);
632 throw TimeoutException(
"Timeout reached while waiting for incoming message "
633 "(outgoing was %u:%u)",
638 recv_received_.erase(cid);
641 unsigned int cid = message->
cid();
642 unsigned int msgid = message->
msgid();
643 throw Exception(
"Cannot enqueue given message %u:%u, sender or "
659 unsigned int component_id)
662 if (handlers.find(component_id) != handlers.end()) {
666 handlers[component_id] = handler;
679 if (handlers.find(component_id) != handlers.end()) {
680 handlers[component_id]->deregistered(_id);
681 handlers.erase(component_id);
685 if (recv_received_.find(component_id) != recv_received_.end()) {
686 recv_received_[component_id] =
true;
695 unsigned int cid = m->
cid();
697 if (handlers.find(cid) != handlers.end()) {
698 handlers[cid]->inbound_received(m, _id);
704 FawkesNetworkClient::wake_handlers(
unsigned int cid)
707 if (recv_received_.find(cid) != recv_received_.end()) {
708 recv_received_[cid] =
true;
715 FawkesNetworkClient::notify_of_connection_dead()
717 connest_mutex_->
lock();
722 for (HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i) {
723 i->second->connection_died(_id);
733 FawkesNetworkClient::notify_of_connection_established()
736 for (HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i) {
737 i->second->connection_established(_id);
743 FawkesNetworkClient::connection_died()
745 connection_died_recently =
true;
746 notify_of_connection_dead();
750 FawkesNetworkClient::set_send_slave_alive()
752 slave_status_mutex->
lock();
753 send_slave_alive_ =
true;
754 if (send_slave_alive_ && recv_slave_alive_) {
755 connest_mutex_->
lock();
760 slave_status_mutex->
unlock();
764 FawkesNetworkClient::set_recv_slave_alive()
766 slave_status_mutex->
lock();
767 recv_slave_alive_ =
true;
768 if (send_slave_alive_ && recv_slave_alive_) {
769 connest_mutex_->
lock();
774 slave_status_mutex->
unlock();
788 if (recv_received_.find(component_id) != recv_received_.end()) {
790 throw Exception(
"There is already a thread waiting for messages of "
794 recv_received_[component_id] =
false;
795 while (!recv_received_[component_id] && !connection_died_recently) {
797 recv_received_.erase(component_id);
799 throw TimeoutException(
"Timeout reached while waiting for incoming message "
804 recv_received_.erase(component_id);
817 if (recv_received_.find(component_id) != recv_received_.end()) {
818 recv_received_[component_id] =
true;
830 return (!connection_died_recently && (s != NULL));
849 throw Exception(
"Trying to get the ID of a client that has no ID");
Thrown if the connection died during an operation.
Base class for exceptions in Fawkes.
Message handler for FawkesNetworkClient.
Fawkes network client receive thread.
virtual void run()
Stub to see name in backtrace for easier debugging.
virtual void once()
Execute an action exactly once.
~FawkesNetworkClientRecvThread()
Destructor.
virtual void loop()
Code to execute in the thread.
void recv()
Receive and process messages.
FawkesNetworkClientRecvThread(StreamSocket *s, FawkesNetworkClient *parent, Mutex *recv_mutex)
Constructor.
Fawkes network client send thread.
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send and take ownership.
FawkesNetworkClientSendThread(StreamSocket *s, FawkesNetworkClient *parent)
Constructor.
void force_send()
Force sending of messages.
virtual void loop()
Code to execute in the thread.
virtual void once()
Execute an action exactly once.
virtual void run()
Stub to see name in backtrace for easier debugging.
~FawkesNetworkClientSendThread()
Destructor.
Simple Fawkes network client.
void wake(unsigned int component_id)
Wake a waiting thread.
void register_handler(FawkesNetworkClientHandler *handler, unsigned int component_id)
Register handler.
~FawkesNetworkClient()
Destructor.
const char * get_hostname() const
Get the client's hostname.
void wait(unsigned int component_id, unsigned int timeout_sec=15)
Wait for messages for component ID.
bool has_id() const
Check whether the client has an id.
void enqueue_and_wait(FawkesNetworkMessage *message, unsigned int timeout_sec=15)
Enqueue message to send and wait for answer.
void connect()
Connect to remote.
FawkesNetworkClient()
Constructor.
void disconnect()
Disconnect socket.
void deregister_handler(unsigned int component_id)
Deregister handler.
unsigned int id() const
Get the client's ID.
bool connected() const noexcept
Check if connection is alive.
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send.
void interrupt_connect()
Interrupt connect().
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
Representation of a message that is sent over the network.
unsigned short int msgid() const
Get message type ID.
unsigned short int cid() const
Get component ID.
static void send(StreamSocket *s, FawkesNetworkMessageQueue *msgq)
Send messages.
static void recv(StreamSocket *s, FawkesNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
Client handler has already been registered.
HandlerAlreadyRegisteredException()
Costructor.
The current system call has been interrupted (for instance by a signal).
void lock() const
Lock list.
void unlock() const
Unlock list.
void lock() const
Lock queue.
void unlock() const
Unlock list.
void unlock()
Unlock the mutex.
Mutex mutual exclusion lock.
bool try_lock()
Tries to lock the mutex.
void lock()
Lock this mutex.
void unlock()
Unlock the mutex.
A NULL pointer was supplied where not allowed.
void unref()
Decrement reference count and conditionally delete this instance.
static const short POLL_RDHUP
Stream socket peer closed connection, or shut down writing half of connection.
virtual void connect(const char *hostname, const unsigned short int port)
Connect socket.
static const short POLL_HUP
Hang up.
static const short POLL_IN
Data can be read.
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
static const short POLL_ERR
Error condition.
TCP stream socket over IP.
Thread class encapsulation of pthreads.
Mutex * loop_mutex
Mutex that is used to protect a call to loop().
void start(bool wait=true)
Call this method to start the thread.
void join()
Join the thread.
void exit()
Exit the thread.
void wakeup()
Wake up thread.
void cancel()
Cancel a thread.
virtual void run()
Code to execute in the thread.
@ OPMODE_WAITFORWAKEUP
operate in wait-for-wakeup mode
The current system call has timed out before completion.
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
Fawkes library namespace.