23 #include "openprs_comm.h"
25 #include "openprs_server_proxy.h"
27 #include <core/exception.h>
28 #include <core/exceptions/system.h>
29 #include <utils/sub_process/proc.h>
33 #include <opaque-pub.h>
40 void send_message_string_socket(
int socket, Symbol rec, PString message);
41 void broadcast_message_string_socket(
int socket, PString message);
43 multicast_message_string_socket(
int socket,
unsigned int nb_recs, Symbol *recs, PString message);
65 const char * hostname,
70 server_proxy_(server_proxy),
72 io_service_work_(io_service_),
73 sd_mp_socket_(io_service_)
77 mp_socket_ = external_register_to_the_mp_host_prot(local_name, hostname, port, STRINGS_PT);
78 if (mp_socket_ == -1) {
79 throw Exception(
"Failed to connect to OpenPRS as '%s'", local_name);
81 io_service_thread_ = std::thread([
this]() { this->io_service_.run(); });
82 sd_mp_socket_.assign(dup(mp_socket_));
90 io_service_thread_.join();
91 if (mp_socket_ >= 0) {
103 send_message_string_socket(mp_socket_, recipient, (
char *)message);
112 broadcast_message_string_socket(mp_socket_, (
char *)message);
122 multicast_message_string_socket(mp_socket_, recipients.size(), &(recipients[0]), (
char *)message);
132 send_message_string_socket(mp_socket_, recipient.c_str(), (
char *)message.c_str());
141 broadcast_message_string_socket(mp_socket_, (
char *)message.c_str());
150 const std::string & message)
152 std::vector<const char *> recs;
153 recs.resize(recipients.size());
154 for (
size_t i = 0; i < recipients.size(); ++i) {
155 recs[i] = recipients[i].c_str();
157 multicast_message_string_socket(mp_socket_, recs.size(), &(recs[0]), (
char *)message.c_str());
168 va_start(arg, format);
170 if (vasprintf(&msg, format, arg) == -1) {
175 send_message_string_socket(mp_socket_, recipient.c_str(), msg);
186 va_start(arg, format);
188 if (vasprintf(&msg, format, arg) == -1) {
193 broadcast_message_string_socket(mp_socket_, msg);
206 std::vector<const char *> recs;
207 recs.resize(recipients.size());
208 for (
size_t i = 0; i < recipients.size(); ++i) {
209 recs[i] = recipients[i].c_str();
212 va_start(arg, format);
214 if (vasprintf(&msg, format, arg) == -1) {
219 multicast_message_string_socket(mp_socket_, recs.size(), &(recs[0]), msg);
257 va_start(arg, format);
263 OpenPRSComm::start_recv()
265 sd_mp_socket_.async_read_some(boost::asio::null_buffers(),
266 boost::bind(&OpenPRSComm::handle_recv,
268 boost::asio::placeholders::error));
272 OpenPRSComm::handle_recv(
const boost::system::error_code &err)
276 std::string sender = read_string_from_socket(sd_mp_socket_);
277 std::string message = read_string_from_socket(sd_mp_socket_);
279 sig_rcvd_(sender, message);
280 }
catch (Exception &e) {
282 logger_->
log_warn(name_.c_str(),
"Failed to receive message: %s", e.what_no_backtrace());
285 }
else if (logger_) {
286 logger_->
log_warn(name_.c_str(),
"Failed to receive message: %s", err.message().c_str());
292 OpenPRSComm::read_string_from_socket(boost::asio::posix::stream_descriptor &socket)
295 boost::system::error_code ec;
296 boost::asio::read(socket, boost::asio::buffer(&s_size,
sizeof(s_size)), ec);
298 throw Exception(
"Failed to read string size from socket: %s", ec.message().c_str());
300 s_size = ntohl(s_size);
303 boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
305 throw Exception(
"Failed to read string content from socket: %s", ec.message().c_str());
Base class for exceptions in Fawkes.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void transmit_command(const char *recipient, const char *message)
Transmit a command to an OpenPRS kernel.
void multicast_message(std::vector< const char * > &recipients, const char *message)
Send a message to multiple OpenPRS kernel.
void multicast_message_f(const std::vector< std::string > &recipients, const char *format,...)
Send a message to multiple OpenPRS kernel.
virtual ~OpenPRSComm()
Destructor.
void broadcast_message_f(const char *format,...)
Send a formatted message to all OpenPRS kernels.
void transmit_command_f(const std::string &recipient, const char *format,...)
Transmit a command to an OpenPRS kernel.
void broadcast_message(const char *message)
Send a message to all OpenPRS kernels.
void send_message_f(const std::string &recipient, const char *format,...)
Send a formatted message to an OpenPRS kernel.
OpenPRSComm(const char *local_name, const char *hostname, unsigned short port, OpenPRSServerProxy *server_proxy, Logger *logger=NULL)
Constructor.
void send_message(const char *recipient, const char *message)
Send a message to an OpenPRS kernel.
System ran out of memory and desired operation could not be fulfilled.
Fawkes library namespace.