Fawkes API  Fawkes Development Version
plexil_thread.cpp
1 
2 /***************************************************************************
3  * plexil_thread.cpp - PLEXIL executive
4  *
5  * Created: Mon Aug 13 11:20:12 2018
6  * Copyright 2006-2018 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "plexil_thread.h"
23 
24 #include "be_adapter.h"
25 #include "clock_adapter.h"
26 #include "log_adapter.h"
27 #include "log_stream.h"
28 #include "thread_adapter.h"
29 #ifdef HAVE_NAVGRAPH
30 # include "navgraph_access_thread.h"
31 # include "navgraph_adapter.h"
32 #endif
33 #include "utils.h"
34 
35 #include <core/threading/mutex_locker.h>
36 #include <utils/sub_process/proc.h>
37 #include <utils/system/dynamic_module/module.h>
38 
39 #include <AdapterConfiguration.hh>
40 #include <Debug.hh>
41 #include <ExecApplication.hh>
42 #include <InterfaceManager.hh>
43 #include <InterfaceSchema.hh>
44 #include <boost/filesystem.hpp>
45 #include <boost/interprocess/sync/file_lock.hpp>
46 #include <cstring>
47 #include <fstream>
48 #include <numeric>
49 #include <pugixml.hpp>
50 
51 using namespace fawkes;
52 namespace fs = boost::filesystem;
53 // for C++17 could be:
54 // namespace fs = std::filesystem;
55 
56 /** @class PlexilExecutiveThread "plexil_thread.h"
57  * Main thread of PLEXIL executive.
58  *
59  * @author Tim Niemueller
60  */
61 
62 /** Constructor. */
64 : Thread("PlexilExecutiveThread", Thread::OPMODE_CONTINUOUS)
65 {
67 }
68 
69 /** Destructor. */
71 {
72 }
73 
74 void
76 {
77  cfg_spec_ = config->get_string("/plexil/spec");
78 
79  std::string cfg_prefix = "/plexil/" + cfg_spec_ + "/";
80 
81  bool cfg_print_xml = config->get_bool_or_default((cfg_prefix + "debug/print-xml").c_str(), false);
82 
83  std::map<std::string, plexil_interface_config> cfg_adapters =
84  read_plexil_interface_configs(cfg_prefix + "adapters/");
85 
86  std::map<std::string, plexil_interface_config> cfg_listeners =
87  read_plexil_interface_configs(cfg_prefix + "listeners/");
88 
89  std::vector<std::string> cfg_lib_path =
90  config->get_strings_or_defaults((cfg_prefix + "plan/lib-path").c_str(), {});
91 
92  std::string cfg_basedir =
93  config->get_string_or_default((cfg_prefix + "plan/basedir").c_str(), "");
94 
95  for (auto &a_item : cfg_adapters) {
96  auto &a = a_item.second;
97  if (a.type == "Utility") {
98  logger->log_warn(name(), "Utility adapter configured, consider using FawkesLogging instead");
99  } else if (a.type == "OSNativeTime") {
100  logger->log_warn(name(),
101  "OSNativeTime adapter configured, consider using FawkesTime instead");
102  } else if (a.type == "FawkesRemoteAdapter") {
103  logger->log_error(name(), "Cannot load FawkesRemoteAdapter when running internally");
104  throw Exception("Plexil: cannot load FawkesRemoteAdapter when running internally");
105  }
106 
107  std::string filename =
108  std::string(LIBDIR) + "/plexil/" + a.type + "." + fawkes::Module::get_file_extension();
109  if (fs::exists(filename)) {
110  a.attr["LibPath"] = filename;
111  }
112  }
113 
114  plexil_.reset(new PLEXIL::ExecApplication);
115 
116  PLEXIL::g_manager->setProperty("::Fawkes::Config", config);
117  PLEXIL::g_manager->setProperty("::Fawkes::Clock", clock);
118  PLEXIL::g_manager->setProperty("::Fawkes::Logger", logger);
119  PLEXIL::g_manager->setProperty("::Fawkes::BlackBoard", blackboard);
120 
121  for (const auto &p : cfg_lib_path) {
122  plexil_->addLibraryPath(p);
123  }
124 
125  pugi::xml_document xml_config;
126  pugi::xml_node xml_interfaces =
127  xml_config.append_child(PLEXIL::InterfaceSchema::INTERFACES_TAG());
128 
129  add_plexil_interface_configs(xml_interfaces,
130  cfg_adapters,
131  PLEXIL::InterfaceSchema::ADAPTER_TAG(),
132  PLEXIL::InterfaceSchema::ADAPTER_TYPE_ATTR());
133  add_plexil_interface_configs(xml_interfaces,
134  cfg_listeners,
135  PLEXIL::InterfaceSchema::LISTENER_TAG(),
136  PLEXIL::InterfaceSchema::LISTENER_TYPE_ATTR());
137 
138  auto navgraph_adapter_config =
139  std::find_if(cfg_adapters.begin(), cfg_adapters.end(), [](const auto &entry) {
140  return entry.second.type == "NavGraphAdapter";
141  });
142  if (navgraph_adapter_config != cfg_adapters.end()) {
143 #ifdef HAVE_NAVGRAPH
144  navgraph_access_thread_ = new PlexilNavgraphAccessThread();
145  thread_collector->add(navgraph_access_thread_);
146  navgraph_ = navgraph_access_thread_->get_navgraph();
147  PLEXIL::g_manager->setProperty("::Fawkes::NavGraph", &navgraph_);
148 #else
149  throw Exception("NavGraph adapter configured, "
150  "but navgraph library not available at compile time");
151 #endif
152  }
153 
154  if (cfg_print_xml) {
155  struct xml_string_writer : pugi::xml_writer
156  {
157  std::string result;
158  virtual void
159  write(const void *data, size_t size)
160  {
161  result.append(static_cast<const char *>(data), size);
162  }
163  };
164 
165  xml_string_writer writer;
166  xml_config.save(writer);
167  logger->log_info(name(), "Interface config XML:\n%s", writer.result.c_str());
168  }
169 
170  if (config->get_bool_or_default((cfg_prefix + "debug/enable").c_str(), false)) {
171  std::vector<std::string> debug_markers =
172  config->get_strings_or_defaults((cfg_prefix + "debug/markers").c_str(), {});
173 
174  std::stringstream dbg_config;
175  for (const auto &m : debug_markers) {
176  dbg_config << m << std::endl;
177  }
178  PLEXIL::readDebugConfigStream(dbg_config);
179  }
180 
181  log_buffer_.reset(new PlexilLogStreamBuffer(logger));
182  log_stream_.reset(new std::ostream(&*log_buffer_));
183  PLEXIL::setDebugOutputStream(*log_stream_);
184 
185  if (!plexil_->initialize(xml_interfaces)) {
186  throw Exception("Failed to initialize Plexil application");
187  }
188 
189  if (config->is_list(cfg_prefix + "plan/ple")) {
190  cfg_plan_ple_ = config->get_strings_or_defaults((cfg_prefix + "plan/ple").c_str(), {});
191  } else {
192  std::string ple = config->get_string_or_default((cfg_prefix + "plan/ple").c_str(), "");
193  if (!ple.empty()) {
194  cfg_plan_ple_ = {ple};
195  }
196  }
197  if (cfg_plan_ple_.empty()) {
198  throw Exception("No PLE configured");
199  }
200  cfg_plan_plx_ = config->get_string((cfg_prefix + "plan/plx").c_str());
201  cfg_plan_auto_compile_ =
202  config->get_bool_or_default((cfg_prefix + "plan/compilation/enable").c_str(), false);
203  cfg_plan_force_compile_ =
204  config->get_bool_or_default((cfg_prefix + "plan/compilation/force").c_str(), false);
205 
206  if (!cfg_plan_plx_.empty()) {
207  cfg_plan_plx_ = cfg_basedir + "/" + cfg_plan_plx_;
208  replace_tokens(cfg_plan_plx_);
209  }
210 
211  std::set<std::string> base_paths;
212 
213  for (auto &p : cfg_plan_ple_) {
214  p = cfg_basedir + "/" + p;
215  replace_tokens(p);
216 
217  fs::path ple_path{p};
218  fs::path plx_path{fs::path{ple_path}.replace_extension(".plx")};
219 
220  // make sure not two processes try to compile at the same time
221  boost::interprocess::file_lock flock(ple_path.string().c_str());
222 
223  base_paths.insert(plx_path.parent_path().string());
224 
225  if (cfg_plan_auto_compile_) {
226  if (cfg_plan_force_compile_ || !fs::exists(plx_path)
227  || fs::last_write_time(plx_path) < fs::last_write_time(ple_path)) {
228  logger->log_info(name(), "Compiling %s", ple_path.string().c_str());
229  plexil_compile(ple_path.string());
230  }
231  } else {
232  if (!fs::exists(plx_path)) {
233  throw Exception("PLX %s does not exist and auto-compile disabled");
234  } else if (fs::last_write_time(plx_path) < fs::last_write_time(ple_path)) {
235  logger->log_warn(name(),
236  "PLX %s older than PLE, auto-compile disabled",
237  plx_path.string().c_str());
238  }
239  }
240  }
241 
242  if (!fs::exists(cfg_plan_plx_)) {
243  throw Exception("PLX %s does not exist", cfg_plan_plx_.c_str());
244  }
245 
246  for (const auto &p : base_paths) {
247  plexil_->addLibraryPath(p);
248  }
249 
250  plan_plx_.reset(new pugi::xml_document);
251  pugi::xml_parse_result parse_result = plan_plx_->load_file(cfg_plan_plx_.c_str());
252  if (parse_result.status != pugi::status_ok) {
253  throw Exception("Failed to parse plan '%s': %s",
254  cfg_plan_plx_.c_str(),
255  parse_result.description());
256  }
257 }
258 
259 void
261 {
262  if (!plexil_->startInterfaces()) {
263  throw Exception("Failed to start Plexil interfaces");
264  }
265  if (!plexil_->run()) {
266  throw Exception("Failed to start Plexil");
267  }
268 
269  if (!plexil_->addPlan(&*plan_plx_)) {
270  logger->log_error(name(), "Failed to add Plexil plan. See log for details");
271  } else {
272  plexil_->notifyExec();
273  }
274 }
275 
276 bool
278 {
279  if (!plexil_->stop()) {
280  logger->log_error(name(), "Failed to stop Plexil");
281  }
282  plexil_->notifyExec();
283  return true;
284 }
285 
286 void
288 {
289  if (!plexil_->shutdown()) {
290  logger->log_error(name(), "Failed to shutdown Plexil");
291  }
292  PLEXIL::g_configuration->clearAdapterRegistry();
293  plexil_->waitForShutdown();
294 
295  // We really should do a reset here, killing off the ExecApplication instance.
296  // However, the executive crashes in a state cache destructor if there is any
297  // active wait (or probably any active LookupOnChange, as here on time).
298  // Therefore, we accept this memleak here under the assumption, that we do not
299  // frequently reload the plexil plugin. This at least avoids the segfaut on quit.
300  plexil_.release();
301  //plexil_.reset();
302  log_stream_.reset();
303  log_buffer_.reset();
304  plan_plx_.reset();
305 #ifdef HAVE_NAVGRAPH
306  if (navgraph_) {
307  navgraph_.clear();
308  thread_collector->remove(navgraph_access_thread_);
309  delete navgraph_access_thread_;
310  }
311 #endif
312 }
313 
314 void
316 {
317  //plexil_->notifyExec();
318  //plexil_->waitForPlanFinished();
319  static PLEXIL::ExecApplication::ApplicationState state = PLEXIL::ExecApplication::APP_SHUTDOWN;
320  PLEXIL::ExecApplication::ApplicationState new_state = plexil_->getApplicationState();
321  if (new_state != state) {
322  logger->log_info(name(), "State changed to %s", plexil_->getApplicationStateName(new_state));
323  state = new_state;
324  }
325 
326  using namespace std::chrono_literals;
327  std::this_thread::sleep_for(500ms);
328 }
329 
330 // Parse adapter configurations
331 std::map<std::string, PlexilExecutiveThread::plexil_interface_config>
332 PlexilExecutiveThread::read_plexil_interface_configs(const std::string &config_prefix)
333 {
334  std::map<std::string, plexil_interface_config> cfg_adapters;
335 
336  std::unique_ptr<Configuration::ValueIterator> cfg_item{config->search(config_prefix)};
337  while (cfg_item->next()) {
338  std::string path = cfg_item->path();
339 
340  std::string::size_type start_pos = config_prefix.size();
341  std::string::size_type slash_pos = path.find("/", start_pos + 1);
342  if (slash_pos != std::string::npos) {
343  std::string id = path.substr(start_pos, slash_pos - start_pos);
344 
345  start_pos = slash_pos + 1;
346  slash_pos = path.find("/", start_pos);
347  std::string what = path.substr(start_pos, slash_pos - start_pos);
348 
349  if (what == "type") {
350  cfg_adapters[id].type = cfg_item->get_string();
351  } else if (what == "attr") {
352  start_pos = slash_pos + 1;
353  slash_pos = path.find("/", start_pos);
354  std::string key = path.substr(start_pos, slash_pos - start_pos);
355  cfg_adapters[id].attr[key] = cfg_item->get_as_string();
356  } else if (what == "args") {
357  start_pos = slash_pos + 1;
358  slash_pos = path.find("/", start_pos);
359  std::string key = path.substr(start_pos, slash_pos - start_pos);
360  cfg_adapters[id].args[key] = cfg_item->get_as_string();
361  } else if (what == "verbatim-args") {
362  start_pos = slash_pos + 1;
363  slash_pos = path.find("/", start_pos);
364  std::string verb_id = path.substr(start_pos, slash_pos - start_pos);
365 
366  start_pos = slash_pos + 1;
367  slash_pos = path.find("/", start_pos);
368  std::string verb_what = path.substr(start_pos, slash_pos - start_pos);
369 
370  if (verb_what == "tag") {
371  cfg_adapters[id].verbatim_args[verb_id].tag = cfg_item->get_as_string();
372  } else if (verb_what == "text") {
373  cfg_adapters[id].verbatim_args[verb_id].has_text = true;
374  cfg_adapters[id].verbatim_args[verb_id].text = cfg_item->get_as_string();
375  } else if (verb_what == "attr") {
376  start_pos = slash_pos + 1;
377  slash_pos = path.find("/", start_pos);
378  std::string verb_key = path.substr(start_pos, slash_pos - start_pos);
379  cfg_adapters[id].verbatim_args[verb_id].attr[verb_key] = cfg_item->get_as_string();
380  }
381  } else if (what == "verbatim-xml") {
382  logger->log_warn(name(), "Parsing verbatim");
383  pugi::xml_parse_result parse_result =
384  cfg_adapters[id].verbatim.load_string(cfg_item->get_string().c_str());
385  if (parse_result.status != pugi::status_ok) {
386  throw Exception("Failed to parse verbatim-xml for '%s': %s",
387  cfg_adapters[id].type.c_str(),
388  parse_result.description());
389  }
390  }
391  }
392  }
393  return cfg_adapters;
394 }
395 
396 // Add adapter configurations to Plexil interface XML config
397 void
398 PlexilExecutiveThread::add_plexil_interface_configs(
399  pugi::xml_node & parent,
400  const std::map<std::string, PlexilExecutiveThread::plexil_interface_config> &configs,
401  const char * tag_name,
402  const char * type_attr_name)
403 {
404  for (const auto &a_item : configs) {
405  const auto & a = a_item.second;
406  pugi::xml_node xml_adapter = parent.append_child(tag_name);
407  xml_adapter.append_attribute(type_attr_name).set_value(a.type.c_str());
408  for (const auto &attr : a.attr) {
409  xml_adapter.append_attribute(attr.first.c_str()).set_value(attr.second.c_str());
410  }
411  for (const auto &arg : a.args) {
412  pugi::xml_node xml_adapter_arg = xml_adapter.append_child("Parameter");
413  xml_adapter_arg.append_attribute("key").set_value(arg.first.c_str());
414  xml_adapter_arg.text().set(arg.second.c_str());
415  }
416  for (const auto &arg : a.verbatim_args) {
417  const auto & varg = arg.second;
418  pugi::xml_node xml_adapter_arg = xml_adapter.append_child(varg.tag.c_str());
419  for (const auto &attr : varg.attr) {
420  xml_adapter_arg.append_attribute(attr.first.c_str()).set_value(attr.second.c_str());
421  }
422  if (varg.has_text) {
423  xml_adapter_arg.text().set(varg.text.c_str());
424  }
425  }
426  if (a.verbatim && a.verbatim.children().begin() != a.verbatim.children().end()) {
427  for (const auto &child : a.verbatim.children()) {
428  xml_adapter.append_copy(child);
429  }
430  }
431  }
432 }
433 
434 void
435 PlexilExecutiveThread::plexil_compile(const std::string &ple_file)
436 {
437  std::vector<std::string> argv{"plexilc", ple_file};
438  std::string command_line =
439  std::accumulate(std::next(argv.begin()),
440  argv.end(),
441  argv.front(),
442  [](std::string &s, const std::string &a) { return s + " " + a; });
443  logger->log_debug(name(), "Compiler command: %s", command_line.c_str());
444 
445  SubProcess proc("plexilc", "plexilc", argv, {}, logger);
446  using namespace std::chrono_literals;
447  auto compile_start = std::chrono::system_clock::now();
448  auto now = std::chrono::system_clock::now();
449  do {
450  proc.check_proc();
451  if (!proc.alive()) {
452  if (proc.exit_status() != 0) {
453  throw Exception("Plexil compilation failed, check log for messages.");
454  } else {
455  break;
456  }
457  }
458  now = std::chrono::system_clock::now();
459  std::this_thread::sleep_for(500ms);
460  } while (now < compile_start + 30s);
461  if (proc.alive()) {
462  proc.kill(SIGINT);
463  throw Exception("Plexil compilation timeout after 30s");
464  }
465 }
virtual void init()
Initialize the thread.
virtual ~PlexilExecutiveThread()
Destructor.
PlexilExecutiveThread()
Constructor.
virtual void loop()
Code to execute in the thread.
virtual void finalize()
Finalize the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
virtual void once()
Execute an action exactly once.
Log Plexil log output to Fawkes logger.
Definition: log_stream.h:31
Access to internal navgraph for Plexil.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual const char * path() const =0
Path of value.
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
Definition: config.cpp:736
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual bool is_list(const char *path)=0
Check if a value is a list.
virtual bool get_bool_or_default(const char *path, const bool &default_val)
Get value from configuration which is of type bool, or the given default if the path does not exist.
Definition: config.cpp:726
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
virtual std::vector< std::string > get_strings_or_defaults(const char *path, const std::vector< std::string > &default_val)
Get list of values from configuration which is of type string, or the given default if the path does ...
Definition: config.cpp:786
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
static const char * get_file_extension()
Get file extension for dl modules.
Definition: module.cpp:260
Sub-process execution with stdin/stdout/stderr redirection.
Definition: proc.h:37
bool alive()
Check if process is alive.
Definition: proc.cpp:353
int exit_status()
Get exit status of process once it ended.
Definition: proc.cpp:365
void check_proc()
Check if the process is still alive.
Definition: proc.cpp:375
void kill(int signum)
Send a signal to the process.
Definition: proc.cpp:188
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
Fawkes library namespace.