Fawkes API  Fawkes Development Version
openprs_server_proxy.cpp
1 
2 /***************************************************************************
3  * openprs_server_proxy.h - OpenPRS server proxy
4  *
5  * Created: Tue Aug 19 16:59:27 2014
6  * Copyright 2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version. A runtime exception applies to
13  * this software (see LICENSE.GPL_WRE file mentioned below for details).
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21  */
22 
23 #include "openprs_server_proxy.h"
24 
25 #include <core/exception.h>
26 #include <core/exceptions/system.h>
27 #include <core/threading/mutex_locker.h>
28 #include <logging/logger.h>
29 
30 #include <boost/bind/bind.hpp>
31 #include <boost/lexical_cast.hpp>
32 
33 using namespace boost::asio;
34 
35 // Types copied from OPRS because they are not public there
36 /// @cond EXTERN
37 namespace OPRS {
38 typedef enum { MESSAGE_MT = 1, BROADCAST_MT, MULTICAST_MT, DISCONNECT_MT } Message_Type;
39 typedef enum { REGISTER_OK, REGISTER_NAME_CONFLICT, REGISTER_DENIED } Register_Type;
40 typedef enum { MESSAGES_PT, STRINGS_PT } Protocol_Type;
41 } // namespace OPRS
42 /// @endcond
43 
44 namespace fawkes {
45 
46 /** @class OpenPRSServerProxy "openprs_server_proxy.h"
47  * Proxy for the OpenPRS server communication.
48  * Using this proxy allows to inject commands into the communication between
49  * oprs-server and oprs (or xoprs).
50  * @author Tim Niemueller
51  */
52 
53 /** Constructor.
54  * @param tcp_port port to listen on for incoming connections
55  * @param server_host host of oprs-server to connect to
56  * @param server_port TCP port that oprs-server listens on
57  * @param logger logger for informational messages
58  */
59 OpenPRSServerProxy::OpenPRSServerProxy(unsigned short tcp_port,
60  const std::string &server_host,
61  unsigned short server_port,
62  fawkes::Logger * logger)
63 : io_service_work_(io_service_),
64  acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), tcp_port)),
65  server_host_(server_host),
66  server_port_(server_port),
67  logger_(logger)
68 {
69  acceptor_.set_option(socket_base::reuse_address(true));
70  io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
71  start_accept();
72 }
73 
74 /** Destructor. */
76 {
77  io_service_.stop();
78  io_service_thread_.join();
79 }
80 
81 /** Check if a kernel connected to the proxy.
82  * @param kernel_name name of the kernel to look for
83  * @return true if the kernel connected, false otherwise
84  */
85 bool
86 OpenPRSServerProxy::has_kernel(const std::string &kernel_name)
87 {
88  auto map_it =
89  find_if(mappings_.begin(), mappings_.end(), [&kernel_name](const Mapping::Ptr &mapping) {
90  return mapping->client_name == kernel_name;
91  });
92  return (map_it != mappings_.end());
93 }
94 
95 OpenPRSServerProxy::Mapping::Ptr
96 OpenPRSServerProxy::find_mapping(const std::string &recipient)
97 {
98  auto map_it =
99  find_if(mappings_.begin(), mappings_.end(), [&recipient](const Mapping::Ptr &mapping) {
100  return mapping->client_name == recipient;
101  });
102  if (map_it != mappings_.end()) {
103  return *map_it;
104  } else {
105  throw Exception("Client %s is not connected to OpenPRS server proxy", recipient.c_str());
106  }
107 }
108 
109 /** Transmit a command to an OpenPRS kernel.
110  * This works equivalent to the transmit oprs-server console command.
111  * @param recipient OpenPRS kernel name to send to
112  * @param command command to send, cf. OpenPRS manual for valid commands
113  */
114 void
115 OpenPRSServerProxy::transmit_command(const std::string &recipient, const std::string &command)
116 {
117  MutexLocker lock(mappings_.mutex());
118  Mapping::Ptr mapping = find_mapping(recipient);
119  mapping->transmit_command(command);
120 }
121 
122 /** Transmit a command to an OpenPRS kernel.
123  * This works equivalent to the transmit oprs-server console command.
124  * This function allows to pass a format according to the sprintf()
125  * format and its arguments.
126  * @param recipient OpenPRS kernel name to send to
127  * @param format format string for the command, must be followed by the
128  * appropriate number and types of arguments.
129  */
130 void
131 OpenPRSServerProxy::transmit_command_f(const std::string &recipient, const char *format, ...)
132 {
133  MutexLocker lock(mappings_.mutex());
134  Mapping::Ptr mapping = find_mapping(recipient);
135 
136  va_list arg;
137  va_start(arg, format);
138 
139  char *msg;
140  if (vasprintf(&msg, format, arg) == -1) {
141  va_end(arg);
142  throw OutOfMemoryException("Cannot format OpenPRS client command string");
143  }
144  va_end(arg);
145  std::string command = msg;
146  free(msg);
147 
148  mapping->transmit_command(command);
149 }
150 
151 /** Transmit a command to an OpenPRS kernel.
152  * This works equivalent to the transmit oprs-server console command.
153  * This function allows to pass a format according to the sprintf()
154  * format and its arguments. The arguments are read from the @p arg list.
155  * @param recipient OpenPRS kernel name to send to
156  * @param format format string for the command, must be followed by the
157  * appropriate number and types of arguments.
158  * @param arg argument list for the string format
159  */
160 void
161 OpenPRSServerProxy::transmit_command_v(const std::string &recipient,
162  const char * format,
163  va_list arg)
164 {
165  MutexLocker lock(mappings_.mutex());
166  Mapping::Ptr mapping = find_mapping(recipient);
167 
168  char *msg;
169  if (vasprintf(&msg, format, arg) == -1) {
170  throw OutOfMemoryException("Cannot format OpenPRS client command string");
171  }
172  std::string command = msg;
173  free(msg);
174 
175  mapping->transmit_command(command);
176 }
177 
178 /** Start accepting connections. */
179 void
180 OpenPRSServerProxy::start_accept()
181 {
182  Mapping::Ptr mapping(new Mapping(io_service_, server_host_, server_port_, logger_));
183  acceptor_.async_accept(mapping->client_socket,
184  boost::bind(&OpenPRSServerProxy::handle_accept,
185  this,
186  mapping,
187  boost::asio::placeholders::error));
188 }
189 
190 void
191 OpenPRSServerProxy::handle_accept(Mapping::Ptr mapping, const boost::system::error_code &error)
192 {
193  if (!error) {
194  MutexLocker lock(mappings_.mutex());
195  mappings_.push_back(mapping);
196  mapping->start();
197  }
198 
199  start_accept();
200 }
201 
202 OpenPRSServerProxy::Mapping::Mapping(boost::asio::io_service &io_service,
203  const std::string & server_host,
204  unsigned short server_port,
205  fawkes::Logger * logger)
206 : io_service_(io_service),
207  resolver_(io_service_),
208  server_host_(server_host),
209  server_port_(server_port),
210  logger_(logger),
211  client_socket(io_service_),
212  server_socket(io_service_)
213 {
214 }
215 
216 /** Destruct mapping.
217  * This closes both, client and server sockets. This destructor
218  * assumes that the io_service has been cancelled.
219  */
220 OpenPRSServerProxy::Mapping::~Mapping()
221 {
222  boost::system::error_code err;
223  client_socket.shutdown(ip::tcp::socket::shutdown_both, err);
224  client_socket.close();
225  server_socket.shutdown(ip::tcp::socket::shutdown_both, err);
226  server_socket.close();
227 }
228 
229 /** A client has connected, start this mapping. */
230 void
231 OpenPRSServerProxy::Mapping::start()
232 {
233  logger_->log_info("OPRS-server-proxy", "Client connected, connecting to server");
234  ip::tcp::resolver::query query(server_host_, boost::lexical_cast<std::string>(server_port_));
235  resolver_.async_resolve(query,
236  boost::bind(&OpenPRSServerProxy::Mapping::handle_resolve,
237  this,
238  boost::asio::placeholders::error,
239  boost::asio::placeholders::iterator));
240 }
241 
242 bool
243 OpenPRSServerProxy::Mapping::alive() const
244 {
245  return client_socket.is_open();
246 }
247 
248 void
249 OpenPRSServerProxy::Mapping::disconnect()
250 {
251  logger_->log_info("OPRS-server-proxy", "Disconnecting %s", client_name.c_str());
252  boost::system::error_code ec;
253  client_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
254  client_socket.close();
255 }
256 
257 void
258 OpenPRSServerProxy::Mapping::handle_resolve(const boost::system::error_code &err,
259  ip::tcp::resolver::iterator endpoint_iterator)
260 {
261  if (!err) {
262  // Attempt a connection to each endpoint in the list until we
263  // successfully establish a connection.
264 #if BOOST_ASIO_VERSION > 100409
265  boost::asio::async_connect(server_socket,
266  endpoint_iterator,
267 #else
268  server_socket.async_connect(*endpoint_iterator,
269 #endif
270  boost::bind(&OpenPRSServerProxy::Mapping::handle_connect,
271  this,
272  boost::asio::placeholders::error));
273  } else {
274  disconnect();
275  }
276 }
277 
278 void
279 OpenPRSServerProxy::Mapping::handle_connect(const boost::system::error_code &err)
280 {
281  if (!err) {
282  try {
283  // forward greeting
284  std::string greeting = read_string_from_socket(server_socket);
285  logger_->log_info("OPRS-server-proxy", "Forwarding greeting '%s'", greeting.c_str());
286  write_string_to_socket(client_socket, greeting);
287 
288  int client_pid = 0;
289  int client_use_x = 0;
290 
291  logger_->log_info("OPRS-server-proxy", "Reading client details");
292  // now read connection details
293  client_name = read_string_from_socket(client_socket);
294  client_pid = read_int_from_socket(client_socket);
295  client_use_x = read_int_from_socket(client_socket);
296 
297  logger_->log_info("OPRS-server-proxy",
298  "Got client info: %s %i %s",
299  client_name.c_str(),
300  client_pid,
301  client_use_x ? "XOPRS" : "OPRS");
302 
303  // forward to server
304  write_string_to_socket(server_socket, client_name);
305  write_int_to_socket(server_socket, client_pid);
306  write_int_to_socket(server_socket, client_use_x);
307 
308  start_recv_client();
309  start_recv_server();
310  } catch (Exception &e) {
311  disconnect();
312  }
313  } else {
314  disconnect();
315  }
316 }
317 
318 void
319 OpenPRSServerProxy::Mapping::start_recv_client()
320 {
321  boost::asio::async_read(client_socket,
322  boost::asio::buffer(&client_in_num_completions_,
323  sizeof(client_in_num_completions_)),
324  boost::bind(&OpenPRSServerProxy::Mapping::handle_recv_client,
325  this,
326  boost::asio::placeholders::error));
327 }
328 
329 void
330 OpenPRSServerProxy::Mapping::start_recv_server()
331 {
332  boost::asio::async_read_until(server_socket,
333  server_buffer_,
334  '\n',
335  boost::bind(&OpenPRSServerProxy::Mapping::handle_recv_server,
336  this,
337  boost::asio::placeholders::error));
338 }
339 
340 void
341 OpenPRSServerProxy::Mapping::handle_recv_server(const boost::system::error_code &err)
342 {
343  if (!err) {
344  std::string line;
345  std::istream in_stream(&server_buffer_);
346  std::getline(in_stream, line);
347 
348  logger_->log_info("OPRS-server-proxy", "Forwarding S->C line '%s'", line.c_str());
349  write_string_newline_to_socket(client_socket, line);
350 
351  start_recv_server();
352  } else {
353  disconnect();
354  }
355 }
356 
357 void
358 OpenPRSServerProxy::Mapping::handle_recv_client(const boost::system::error_code &err)
359 {
360  if (!err) {
361  client_in_num_completions_ = ntohl(client_in_num_completions_);
362  for (int i = 0; i < client_in_num_completions_; ++i) {
363  std::string c = read_string_from_socket(client_socket);
364  write_string_to_socket(server_socket, c);
365  }
366 
367  start_recv_client();
368  } else {
369  disconnect();
370  }
371 }
372 
373 void
374 OpenPRSServerProxy::Mapping::transmit_command(const std::string &command)
375 {
376  write_string_newline_to_socket(client_socket, command);
377 }
378 
379 /** Read an int from a given socket.
380  * @param socket socket to read from
381  * @return read value
382  */
383 int
384 OpenPRSServerProxy::read_int_from_socket(boost::asio::ip::tcp::socket &socket)
385 {
386  int32_t value;
387  boost::system::error_code ec;
388  boost::asio::read(socket, boost::asio::buffer(&value, sizeof(value)), ec);
389  if (ec) {
390  throw Exception("Failed to read int from socket: %s", ec.message().c_str());
391  } else {
392  return ntohl(value);
393  }
394 }
395 
396 /** Read a string from a given socket.
397  * @param socket socket to read from
398  * @return read value
399  */
400 std::string
401 OpenPRSServerProxy::read_string_from_socket(boost::asio::ip::tcp::socket &socket)
402 {
403  uint32_t s_size = 0;
404  boost::system::error_code ec;
405  boost::asio::read(socket, boost::asio::buffer(&s_size, sizeof(s_size)), ec);
406  if (ec) {
407  throw Exception("Failed to read string size from socket: %s", ec.message().c_str());
408  }
409  s_size = ntohl(s_size);
410 
411  char s[s_size + 1];
412  boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
413  if (ec) {
414  throw Exception("Failed to read string content from socket: %s", ec.message().c_str());
415  }
416  s[s_size] = 0;
417 
418  return s;
419 }
420 
421 /** Write an int to a given socket.
422  * @param socket socket to write to
423  * @param i value to write
424  */
425 void
426 OpenPRSServerProxy::write_int_to_socket(boost::asio::ip::tcp::socket &socket, int i)
427 {
428  boost::system::error_code ec;
429  int32_t value = htonl(i);
430  boost::asio::write(socket, boost::asio::buffer(&value, sizeof(value)), ec);
431  if (ec) {
432  throw Exception("Failed to write int to socket: %s", ec.message().c_str());
433  }
434 }
435 
436 /** Write a string to a given socket.
437  * @param socket socket to write to
438  * @param str string value to write
439  */
440 void
441 OpenPRSServerProxy::write_string_to_socket(boost::asio::ip::tcp::socket &socket,
442  const std::string & str)
443 {
444  boost::system::error_code ec;
445  uint32_t s_size = htonl(str.size());
446  std::array<boost::asio::const_buffer, 2> buffers;
447  buffers[0] = boost::asio::buffer(&s_size, sizeof(s_size));
448  buffers[1] = boost::asio::buffer(str.c_str(), str.size());
449 
450  boost::asio::write(socket, buffers, ec);
451  if (ec) {
452  throw Exception("Failed to write string to socket: %s", ec.message().c_str());
453  }
454 }
455 
456 /** Write a string followed by a newline character to a given socket.
457  * @param socket socket to write to
458  * @param str string value to write
459  */
460 void
461 OpenPRSServerProxy::write_string_newline_to_socket(boost::asio::ip::tcp::socket &socket,
462  const std::string & str)
463 {
464  boost::system::error_code ec;
465  std::string s = str + "\n";
466  boost::asio::write(socket, boost::asio::buffer(s.c_str(), s.size()), ec);
467  if (ec) {
468  throw Exception("Failed to write string to socket: %s", ec.message().c_str());
469  }
470 }
471 
472 } // end namespace fawkes
Base class for exceptions in Fawkes.
Definition: exception.h:36
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_list.h:172
Interface for logging.
Definition: logger.h:42
Mutex locking helper.
Definition: mutex_locker.h:34
void transmit_command_f(const std::string &client_name, const char *format,...)
Transmit a command to an OpenPRS kernel.
virtual ~OpenPRSServerProxy()
Destructor.
static void write_string_newline_to_socket(boost::asio::ip::tcp::socket &socket, const std::string &str)
Write a string followed by a newline character to a given socket.
static void write_int_to_socket(boost::asio::ip::tcp::socket &socket, int i)
Write an int to a given socket.
static int read_int_from_socket(boost::asio::ip::tcp::socket &socket)
Read an int from a given socket.
static std::string read_string_from_socket(boost::asio::ip::tcp::socket &socket)
Read a string from a given socket.
void transmit_command(const std::string &client_name, const std::string &command)
Transmit a command to an OpenPRS kernel.
void transmit_command_v(const std::string &client_name, const char *format, va_list arg)
Transmit a command to an OpenPRS kernel.
bool has_kernel(const std::string &kernel_name)
Check if a kernel connected to the proxy.
static void write_string_to_socket(boost::asio::ip::tcp::socket &socket, const std::string &str)
Write a string to a given socket.
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
Fawkes library namespace.