My Project
shared.cc
Go to the documentation of this file.
1 #include "threadconf.h"
2 #include <iostream>
3 #include "kernel/mod2.h"
4 #include "Singular/ipid.h"
5 #include "Singular/ipshell.h"
7 #include "Singular/lists.h"
8 #include "Singular/blackbox.h"
9 #include "Singular/feOpt.h"
10 #include "Singular/libsingular.h"
11 #include <cstring>
12 #include <string>
13 #include <errno.h>
14 #include <stdio.h>
15 #include <vector>
16 #include <map>
17 #include <iterator>
18 #include <queue>
19 #include <assert.h>
20 #include "thread.h"
21 #include "lintree.h"
22 
23 #include "singthreads.h"
24 
25 using namespace std;
26 
27 #ifdef ENABLE_THREADS
28 extern char *global_argv0;
29 #endif
30 
31 extern "C" void pSingular_initialize_thread();
32 
33 namespace LibThread {
34 
35 #ifdef ENABLE_THREADS
36 const int have_threads = 1;
37 #else
38 const int have_threads = 0;
39 #endif
40 
41 class Command {
42 private:
43  const char *name;
44  const char *error;
47  int argc;
48 public:
49  Command(const char *n, leftv r, leftv a)
50  {
51  name = n;
52  result = r;
53  error = NULL;
54  argc = 0;
55  for (leftv t = a; t != NULL; t = t->next) {
56  argc++;
57  }
58  args = (leftv *) omAlloc0(sizeof(leftv) * argc);
59  int i = 0;
60  for (leftv t = a; t != NULL; t = t->next) {
61  args[i++] = t;
62  }
63  result->rtyp = NONE;
64  result->data = NULL;
65  }
67  omFree(args);
68  }
69  void check_argc(int n) {
70  if (error) return;
71  if (argc != n) error = "wrong number of arguments";
72  }
73  void check_argc(int lo, int hi) {
74  if (error) return;
75  if (argc < lo || argc > hi) error = "wrong number of arguments";
76  }
77  void check_argc_min(int n) {
78  if (error) return;
79  if (argc < n) error = "wrong number of arguments";
80  }
81  void check_arg(int i, int type, const char *err) {
82  if (error) return;
83  if (args[i]->Typ() != type) error = err;
84  }
85  void check_init(int i, const char *err) {
86  if (error) return;
87  leftv arg = args[i];
88  if (arg->Data() == NULL || *(void **)(arg->Data()) == NULL)
89  error = err;
90  }
91  void check_arg(int i, int type, int type2, const char *err) {
92  if (error) return;
93  if (args[i]->Typ() != type && args[i]->Typ() != type2) error = err;
94  }
95  int argtype(int i) {
96  return args[i]->Typ();
97  }
98  int nargs() {
99  return argc;
100  }
101  void *arg(int i) {
102  return args[i]->Data();
103  }
104  template <typename T>
105  T *shared_arg(int i) {
106  return *(T **)(arg(i));
107  }
108  long int_arg(int i) {
109  return (long)(args[i]->Data());
110  }
111  void report(const char *err) {
112  error = err;
113  }
114  // intentionally not bool, so we can also do
115  // q = p + test_arg(p, type);
116  int test_arg(int i, int type) {
117  if (i >= argc) return 0;
118  return args[i]->Typ() == type;
119  }
120  void set_result(long n) {
121  result->rtyp = INT_CMD;
122  result->data = (char *)n;
123  }
124  void set_result(const char *s) {
125  result->rtyp = STRING_CMD;
126  result->data = omStrDup(s);
127  }
128  void set_result(int type, void *p) {
129  result->rtyp = type;
130  result->data = (char *) p;
131  }
132  void set_result(int type, long n) {
133  result->rtyp = type;
134  result->data = (char *) n;
135  }
136  void no_result() {
137  result->rtyp = NONE;
138  }
139  bool ok() {
140  return error == NULL;
141  }
143  if (error) {
144  Werror("%s: %s", name, error);
145  }
146  return error != NULL;
147  }
148  BOOLEAN abort(const char *err) {
149  report(err);
150  return status();
151  }
152 };
153 
155 private:
157  long refcount;
158  int type;
160 public:
161  SharedObject(): lock(), refcount(0) { }
162  virtual ~SharedObject() { }
163  void set_type(int type_init) { type = type_init; }
164  int get_type() { return type; }
165  void set_name(std::string &name_init) { name = name_init; }
166  void set_name(const char *s) {
167  name = std::string(s);
168  }
169  std::string &get_name() { return name; }
170  void incref(int by = 1) {
171  lock.lock();
172  refcount += 1;
173  lock.unlock();
174  }
175  long decref() {
176  int result;
177  lock.lock();
178  result = --refcount;
179  lock.unlock();
180  return result;
181  }
182  long getref() {
183  return refcount;
184  }
185  virtual BOOLEAN op2(int op, leftv res, leftv a1, leftv a2) {
186  return TRUE;
187  }
188  virtual BOOLEAN op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
189  return TRUE;
190  }
191 };
192 
194  obj->incref();
195 }
196 
198  if (obj->decref() == 0) {
199  // delete obj;
200  }
201 }
202 
203 typedef std::map<std::string, SharedObject *> SharedObjectTable;
204 
205 class Region : public SharedObject {
206 private:
208 public:
210  Region() : SharedObject(), region_lock(), objects() { }
211  virtual ~Region() { }
212  Lock *get_lock() { return &region_lock; }
213  void lock() {
214  if (!region_lock.is_locked())
215  region_lock.lock();
216  }
217  void unlock() {
218  if (region_lock.is_locked())
219  region_lock.unlock();
220  }
221  int is_locked() {
222  return region_lock.is_locked();
223  }
224 };
225 
232 
245 
248 
250  Lock *lock, int type, string &name, SharedConstructor scons)
251 {
252  int was_locked = lock->is_locked();
254  if (!was_locked)
255  lock->lock();
256  if (table.count(name)) {
257  result = table[name];
258  if (result->get_type() != type)
259  result = NULL;
260  } else {
261  result = scons();
262  result->set_type(type);
263  result->set_name(name);
264  table.insert(pair<string,SharedObject *>(name, result));
265  }
266  if (!was_locked)
267  lock->unlock();
268  return result;
269 }
270 
272  Lock *lock, string &name)
273 {
274  int was_locked = lock->is_locked();
276  if (!was_locked)
277  lock->lock();
278  if (table.count(name)) {
279  result = table[name];
280  }
281  if (!was_locked)
282  lock->unlock();
283  return result;
284 }
285 
287 private:
290 protected:
291  int tx_begin() {
292  if (!region)
293  lock->lock();
294  else {
295  if (!lock->is_locked()) {
296  return 0;
297  }
298  }
299  return 1;
300  }
301  void tx_end() {
302  if (!region)
303  lock->unlock();
304  }
305 public:
307  SharedObject(), region(NULL), lock(NULL) {
308  }
309  void set_region(Region *region_init) {
310  region = region_init;
311  if (region_init) {
312  lock = region_init->get_lock();
313  } else {
314  lock = new Lock();
315  }
316  }
317  virtual ~Transactional() { if (!region && lock) delete lock; }
318 };
319 
320 class TxTable: public Transactional {
321 private:
322  std::map<string, string> entries;
323 public:
324  TxTable() : Transactional(), entries() { }
325  virtual ~TxTable() { }
326  int put(string &key, string &value) {
327  int result = 0;
328  if (!tx_begin()) return -1;
329  if (entries.count(key)) {
330  entries[key] = value;
331  } else {
332  entries.insert(pair<string, string>(key, value));
333  result = 1;
334  }
335  tx_end();
336  return result;
337  }
338  int get(string &key, string &value) {
339  int result = 0;
340  if (!tx_begin()) return -1;
341  if (entries.count(key)) {
342  value = entries[key];
343  result = 1;
344  }
345  tx_end();
346  return result;
347  }
348  int check(string &key) {
349  int result;
350  if (!tx_begin()) return -1;
351  result = entries.count(key);
352  tx_end();
353  return result;
354  }
355 };
356 
357 class TxList: public Transactional {
358 private:
359  vector<string> entries;
360 public:
361  TxList() : Transactional(), entries() { }
362  virtual ~TxList() { }
363  int put(size_t index, string &value) {
364  int result = -1;
365  if (!tx_begin()) return -1;
366  if (index >= 1 && index <= entries.size()) {
367  entries[index-1] = value;
368  result = 1;
369  } else {
370  entries.resize(index+1);
371  entries[index-1] = value;
372  result = 0;
373  }
374  tx_end();
375  return result;
376  }
377  int get(size_t index, string &value) {
378  int result = 0;
379  if (!tx_begin()) return -1;
380  if (index >= 1 && index <= entries.size()) {
381  result = (entries[index-1].size() != 0);
382  if (result)
383  value = entries[index-1];
384  }
385  tx_end();
386  return result;
387  }
388  long size() {
389  long result;
390  if (!tx_begin()) return -1;
391  result = (long) entries.size();
392  tx_end();
393  return result;
394  }
395 };
396 
398 private:
399  queue<string> q;
402 public:
403  SingularChannel(): SharedObject(), lock(), cond(&lock) { }
404  virtual ~SingularChannel() { }
405  void send(string item) {
406  lock.lock();
407  q.push(item);
408  cond.signal();
409  lock.unlock();
410  }
411  string receive() {
412  lock.lock();
413  while (q.empty()) {
414  cond.wait();
415  }
416  string result = q.front();
417  q.pop();
418  if (!q.empty())
419  cond.signal();
420  lock.unlock();
421  return result;
422  }
423  long count() {
424  lock.lock();
425  long result = q.size();
426  lock.unlock();
427  return result;
428  }
429 };
430 
432 private:
433  string value;
434  int init;
437 public:
438  SingularSyncVar(): SharedObject(), init(0), lock(), cond(&lock) { }
439  virtual ~SingularSyncVar() { }
440  void acquire() {
441  lock.lock();
442  }
443  void release() {
444  lock.unlock();
445  }
446  void wait_init() {
447  while (!init)
448  cond.wait();
449  }
450  leftv get() {
451  if (value.size() == 0) return NULL;
452  return LinTree::from_string(value);
453  }
454  void update(leftv val) {
455  value = LinTree::to_string(val);
456  init = 1;
457  cond.broadcast();
458  }
459  int write(string item) {
460  int result = 0;
461  lock.lock();
462  if (!init) {
463  value = item;
464  init = 1;
465  cond.broadcast();
466  result = 1;
467  }
468  lock.unlock();
469  return result;
470  }
471  string read() {
472  lock.lock();
473  while (!init)
474  cond.wait();
475  string result = value;
476  lock.unlock();
477  return result;
478  }
479  int check() {
480  lock.lock();
481  int result = init;
482  lock.unlock();
483  return result;
484  }
485 };
486 
487 void *shared_init(blackbox *b) {
488  return omAlloc0(sizeof(SharedObject *));
489 }
490 
492  acquireShared(obj);
493  void *result = omAlloc0(sizeof(SharedObject *));
494  *(SharedObject **)result = obj;
495  return result;
496 }
497 
498 void shared_destroy(blackbox *b, void *d) {
499  SharedObject *obj = *(SharedObject **)d;
500  if (obj) {
501  releaseShared(*(SharedObject **)d);
502  *(SharedObject **)d = NULL;
503  }
504 }
505 
506 void rlock_destroy(blackbox *b, void *d) {
507  SharedObject *obj = *(SharedObject **)d;
508  ((Region *) obj)->unlock();
509  if (obj) {
510  releaseShared(*(SharedObject **)d);
511  *(SharedObject **)d = NULL;
512  }
513 }
514 
515 void *shared_copy(blackbox *b, void *d) {
516  SharedObject *obj = *(SharedObject **)d;
517  void *result = shared_init(b);
518  *(SharedObject **)result = obj;
519  if (obj)
520  acquireShared(obj);
521  return result;
522 }
523 
525  if (r->Typ() == l->Typ()) {
526  if (l->rtyp == IDHDL) {
527  omFree(IDDATA((idhdl)l->data));
528  IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
529  } else {
530  leftv ll=l->LData();
531  if (ll==NULL)
532  {
533  return TRUE; // out of array bounds or similiar
534  }
535  if (ll->data) {
536  shared_destroy(NULL, ll->data);
537  omFree(ll->data);
538  }
539  ll->data = shared_copy(NULL,r->Data());
540  }
541  } else {
542  Werror("assign %s(%d) = %s(%d)",
543  Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
544  return TRUE;
545  }
546  return FALSE;
547 }
548 
550  if (r->Typ() == l->Typ()) {
551  if (l->rtyp == IDHDL) {
552  omFree(IDDATA((idhdl)l->data));
553  IDDATA((idhdl)l->data) = (char*)shared_copy(NULL,r->Data());
554  } else {
555  leftv ll=l->LData();
556  if (ll==NULL)
557  {
558  return TRUE; // out of array bounds or similiar
559  }
560  rlock_destroy(NULL, ll->data);
561  omFree(ll->data);
562  ll->data = shared_copy(NULL,r->Data());
563  }
564  } else {
565  Werror("assign %s(%d) = %s(%d)",
566  Tok2Cmdname(l->Typ()),l->Typ(),Tok2Cmdname(r->Typ()),r->Typ());
567  return TRUE;
568  }
569  return FALSE;
570 }
571 
572 
574  int lt = l->Typ();
575  int rt = r->Typ();
576  if (lt != DEF_CMD && lt != rt) {
577  const char *rn=Tok2Cmdname(rt);
578  const char *ln=Tok2Cmdname(lt);
579  Werror("cannot assign %s (%d) to %s (%d)\n", rn, rt, ln, lt);
580  return TRUE;
581  }
582  return FALSE;
583 }
584 
585 BOOLEAN shared_op2(int op, leftv res, leftv a1, leftv a2) {
586  SharedObject *obj = *(SharedObject **)a1->Data();
587  return obj->op2(op, res, a1, a2);
588 }
589 
590 BOOLEAN shared_op3(int op, leftv res, leftv a1, leftv a2, leftv a3) {
591  SharedObject *obj = *(SharedObject **)a1->Data();
592  return obj->op3(op, res, a1, a2, a3);
593 }
594 
595 char *shared_string(blackbox *b, void *d) {
596  char buf[80];
597  SharedObject *obj = *(SharedObject **)d;
598  if (!obj)
599  return omStrDup("<uninitialized shared object>");
600  int type = obj->get_type();
601  string &name = obj->get_name();
602  const char *type_name = "unknown";
603  if (type == type_channel)
604  type_name = "channel";
605  else if (type == type_atomic_table)
606  type_name = "atomic_table";
607  else if (type == type_shared_table)
608  type_name = "shared_table";
609  else if (type == type_atomic_list)
610  type_name = "atomic_list";
611  else if (type == type_shared_list)
612  type_name = "shared_list";
613  else if (type == type_syncvar)
614  type_name = "syncvar";
615  else if (type == type_region)
616  type_name = "region";
617  else if (type == type_regionlock)
618  type_name = "regionlock";
619  else if (type == type_thread) {
620  sprintf(buf, "<thread #%s>", name.c_str());
621  return omStrDup(buf);
622  }
623  else if (type == type_threadpool) {
624  if (name.size() > 0) {
625  name_lock.lock();
626  sprintf(buf, "<threadpool \"%.40s\"@%p>", name.c_str(), obj);
627  name_lock.unlock();
628  } else
629  sprintf(buf, "<threadpool @%p>", obj);
630  return omStrDup(buf);
631  }
632  else if (type == type_job) {
633  if (name.size() > 0) {
634  name_lock.lock();
635  sprintf(buf, "<job \"%.40s\"@%p>", name.c_str(), obj);
636  name_lock.unlock();
637  } else
638  sprintf(buf, "<job @%p>", obj);
639  return omStrDup(buf);
640  }
641  else if (type == type_trigger) {
642  if (name.size() > 0) {
643  name_lock.lock();
644  sprintf(buf, "<trigger \"%.40s\"@%p>", name.c_str(), obj);
645  name_lock.unlock();
646  } else
647  sprintf(buf, "<trigger @%p>", obj);
648  return omStrDup(buf);
649  } else {
650  sprintf(buf, "<unknown type %d>", type);
651  return omStrDup(buf);
652  }
653  sprintf(buf, "<%s \"%.40s\">", type_name, name.c_str());
654  return omStrDup(buf);
655 }
656 
657 char *rlock_string(blackbox *b, void *d) {
658  char buf[80];
659  SharedObject *obj = *(SharedObject **)d;
660  if (!obj)
661  return omStrDup("<uninitialized region lock>");
662  sprintf(buf, "<region lock \"%.40s\">", obj->get_name().c_str());
663  return omStrDup(buf);
664 }
665 
666 void report(const char *fmt, const char *name) {
667  char buf[80];
668  sprintf(buf, fmt, name);
669  WerrorS(buf);
670 }
671 
672 int wrong_num_args(const char *name, leftv arg, int n) {
673  for (int i=1; i<=n; i++) {
674  if (!arg) {
675  report("%s: too few arguments", name);
676  return TRUE;
677  }
678  arg = arg->next;
679  }
680  if (arg) {
681  report("%s: too many arguments", name);
682  return TRUE;
683  }
684  return FALSE;
685 }
686 
687 int not_a_uri(const char *name, leftv arg) {
688  if (arg->Typ() != STRING_CMD) {
689  report("%s: not a valid URI", name);
690  return TRUE;
691  }
692  return FALSE;
693 }
694 
695 int not_a_region(const char *name, leftv arg) {
696  if (arg->Typ() != type_region || !arg->Data()) {
697  report("%s: not a region", name);
698  return TRUE;
699  }
700  return FALSE;
701 }
702 
703 
704 char *str(leftv arg) {
705  return (char *)(arg->Data());
706 }
707 
709  return new TxTable();
710 }
711 
713  return new TxList();
714 }
715 
717  return new SingularChannel();
718 }
719 
721  return new SingularSyncVar();
722 }
723 
725  return new Region();
726 }
727 
728 static void appendArg(vector<leftv> &argv, string &s) {
729  if (s.size() == 0) return;
731  if (val->Typ() == NONE) {
732  omFreeBin(val, sleftv_bin);
733  return;
734  }
735  argv.push_back(val);
736 }
737 
738 static void appendArg(vector<leftv> &argv, leftv arg) {
739  argv.push_back(arg);
740 }
741 
742 static void appendArgCopy(vector<leftv> &argv, leftv arg) {
744  val->Copy(arg);
745  argv.push_back(val);
746 }
747 
748 
750  const char *procname, const vector<leftv> &argv)
751 {
752  leftv procnode = (leftv) omAlloc0Bin(sleftv_bin);
753  procnode->name = omStrDup(procname);
754  procnode->req_packhdl = basePack;
755  int error = procnode->Eval();
756  if (error) {
757  Werror("procedure \"%s\" not found", procname);
758  omFreeBin(procnode, sleftv_bin);
759  return TRUE;
760  }
761  memset(&result, 0, sizeof(result));
762  leftv *tail = &procnode->next;
763  for (int i = 0; i < argv.size(); i++) {
764  *tail = argv[i];
765  tail = &(*tail)->next;
766  }
767  *tail = NULL;
768  error = iiExprArithM(&result, procnode, '(');
769  procnode->CleanUp();
770  omFreeBin(procnode, sleftv_bin);
771  if (error) {
772  Werror("procedure call of \"%s\" failed", procname);
773  return TRUE;
774  }
775  return FALSE;
776 }
777 
779  if (wrong_num_args("makeAtomicTable", arg, 1))
780  return TRUE;
781  if (not_a_uri("makeAtomicTable", arg))
782  return TRUE;
783  string uri = str(arg);
786  ((TxTable *) obj)->set_region(NULL);
787  result->rtyp = type_atomic_table;
788  result->data = new_shared(obj);
789  return FALSE;
790 }
791 
793  if (wrong_num_args("makeAtomicList", arg, 1))
794  return TRUE;
795  if (not_a_uri("makeAtomicList", arg))
796  return TRUE;
797  string uri = str(arg);
800  ((TxList *) obj)->set_region(NULL);
801  result->rtyp = type_atomic_list;
802  result->data = new_shared(obj);
803  return FALSE;
804 }
805 
807  if (wrong_num_args("makeSharedTable", arg, 2))
808  return TRUE;
809  if (not_a_region("makeSharedTable", arg))
810  return TRUE;
811  if (not_a_uri("makeSharedTable", arg->next))
812  return TRUE;
813  Region *region = *(Region **) arg->Data();
814  fflush(stdout);
815  string s = str(arg->next);
816  SharedObject *obj = makeSharedObject(region->objects,
817  region->get_lock(), type_shared_table, s, consTable);
818  ((TxTable *) obj)->set_region(region);
819  result->rtyp = type_shared_table;
820  result->data = new_shared(obj);
821  return FALSE;
822 }
823 
825  if (wrong_num_args("makeSharedList", arg, 2))
826  return TRUE;
827  if (not_a_region("makeSharedList", arg))
828  return TRUE;
829  if (not_a_uri("makeSharedList", arg->next))
830  return TRUE;
831  Region *region = *(Region **) arg->Data();
832  string s = str(arg->next);
833  SharedObject *obj = makeSharedObject(region->objects,
834  region->get_lock(), type_shared_list, s, consList);
835  ((TxList *) obj)->set_region(region);
836  result->rtyp = type_shared_list;
837  result->data = new_shared(obj);
838  return FALSE;
839 }
840 
842  if (wrong_num_args("makeChannel", arg, 1))
843  return TRUE;
844  if (not_a_uri("makeChannel", arg))
845  return TRUE;
846  string uri = str(arg);
849  result->rtyp = type_channel;
850  result->data = new_shared(obj);
851  return FALSE;
852 }
853 
855  if (wrong_num_args("makeSyncVar", arg, 1))
856  return TRUE;
857  if (not_a_uri("makeSyncVar", arg))
858  return TRUE;
859  string uri = str(arg);
862  result->rtyp = type_syncvar;
863  result->data = new_shared(obj);
864  return FALSE;
865 }
866 
868  if (wrong_num_args("makeRegion", arg, 1))
869  return TRUE;
870  if (not_a_uri("makeRegion", arg))
871  return TRUE;
872  string uri = str(arg);
875  result->rtyp = type_region;
876  result->data = new_shared(obj);
877  return FALSE;
878 }
879 
881  if (wrong_num_args("findSharedObject", arg, 1))
882  return TRUE;
883  if (not_a_uri("findSharedObject", arg))
884  return TRUE;
885  string uri = str(arg);
887  &global_objects_lock, uri);
888  result->rtyp = INT_CMD;
889  result->data = (char *)(long)(obj != NULL);
890  return FALSE;
891 }
892 
894  if (wrong_num_args("findSharedObject", arg, 1))
895  return TRUE;
896  if (not_a_uri("findSharedObject", arg))
897  return TRUE;
898  string uri = str(arg);
900  &global_objects_lock, uri);
901  int type = obj ? obj->get_type() : -1;
902  const char *type_name = "undefined";
903  if (type == type_channel)
904  type_name = "channel";
905  else if (type == type_atomic_table)
906  type_name = "atomic_table";
907  else if (type == type_shared_table)
908  type_name = "shared_table";
909  else if (type == type_atomic_list)
910  type_name = "atomic_list";
911  else if (type == type_shared_list)
912  type_name = "shared_list";
913  else if (type == type_syncvar)
914  type_name = "syncvar";
915  else if (type == type_region)
916  type_name = "region";
917  else if (type == type_regionlock)
918  type_name = "regionlock";
919  result->rtyp = STRING_CMD;
920  result->data = (char *)(omStrDup(type_name));
921  return FALSE;
922 }
923 
925  if (wrong_num_args("bindSharedObject", arg, 1))
926  return TRUE;
927  if (not_a_uri("bindSharedObject", arg))
928  return TRUE;
929  string uri = str(arg);
931  &global_objects_lock, uri);
932  if (!obj) {
933  WerrorS("bindSharedObject: cannot find object");
934  return TRUE;
935  }
936  result->rtyp = obj->get_type();
937  result->data = new_shared(obj);
938  return FALSE;
939 }
940 
942  if (wrong_num_args("getTable", arg, 2))
943  return TRUE;
944  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
945  WerrorS("getTable: not a valid table");
946  return TRUE;
947  }
948  if (arg->next->Typ() != STRING_CMD) {
949  WerrorS("getTable: not a valid table key");
950  return TRUE;
951  }
952  TxTable *table = *(TxTable **) arg->Data();
953  if (!table) {
954  WerrorS("getTable: table has not been initialized");
955  return TRUE;
956  }
957  string key = (char *)(arg->next->Data());
958  string value;
959  int success = table->get(key, value);
960  if (success < 0) {
961  WerrorS("getTable: region not acquired");
962  return TRUE;
963  }
964  if (success == 0) {
965  WerrorS("getTable: key not found");
966  return TRUE;
967  }
968  leftv tmp = LinTree::from_string(value);
969  result->rtyp = tmp->Typ();
970  result->data = tmp->Data();
971  return FALSE;
972 }
973 
975  if (wrong_num_args("inTable", arg, 2))
976  return TRUE;
977  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
978  WerrorS("inTable: not a valid table");
979  return TRUE;
980  }
981  if (arg->next->Typ() != STRING_CMD) {
982  WerrorS("inTable: not a valid table key");
983  return TRUE;
984  }
985  TxTable *table = *(TxTable **) arg->Data();
986  if (!table) {
987  WerrorS("inTable: table has not been initialized");
988  return TRUE;
989  }
990  string key = (char *)(arg->next->Data());
991  int success = table->check(key);
992  if (success < 0) {
993  WerrorS("inTable: region not acquired");
994  return TRUE;
995  }
996  result->rtyp = INT_CMD;
997  result->data = (char *)(long)(success);
998  return FALSE;
999 }
1000 
1002  if (wrong_num_args("putTable", arg, 3))
1003  return TRUE;
1004  if (arg->Typ() != type_atomic_table && arg->Typ() != type_shared_table) {
1005  WerrorS("putTable: not a valid table");
1006  return TRUE;
1007  }
1008  if (arg->next->Typ() != STRING_CMD) {
1009  WerrorS("putTable: not a valid table key");
1010  return TRUE;
1011  }
1012  TxTable *table = *(TxTable **) arg->Data();
1013  if (!table) {
1014  WerrorS("putTable: table has not been initialized");
1015  return TRUE;
1016  }
1017  string key = (char *)(arg->next->Data());
1018  string value = LinTree::to_string(arg->next->next);
1019  int success = table->put(key, value);
1020  if (success < 0) {
1021  WerrorS("putTable: region not acquired");
1022  return TRUE;
1023  }
1024  result->rtyp = NONE;
1025  return FALSE;
1026 }
1027 
1029  if (wrong_num_args("getList", arg, 2))
1030  return TRUE;
1031  if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1032  WerrorS("getList: not a valid list (atomic or shared)");
1033  return TRUE;
1034  }
1035  if (arg->next->Typ() != INT_CMD) {
1036  WerrorS("getList: index must be an integer");
1037  return TRUE;
1038  }
1039  TxList *list = *(TxList **) arg->Data();
1040  if (!list) {
1041  WerrorS("getList: list has not been initialized");
1042  return TRUE;
1043  }
1044  long index = (long)(arg->next->Data());
1045  string value;
1046  int success = list->get(index, value);
1047  if (success < 0) {
1048  WerrorS("getList: region not acquired");
1049  return TRUE;
1050  }
1051  if (success == 0) {
1052  WerrorS("getList: no value at position");
1053  return TRUE;
1054  }
1055  leftv tmp = LinTree::from_string(value);
1056  result->rtyp = tmp->Typ();
1057  result->data = tmp->Data();
1058  return FALSE;
1059 }
1060 
1062  if (wrong_num_args("putList", arg, 3))
1063  return TRUE;
1064  if (arg->Typ() != type_atomic_list && arg->Typ() != type_shared_list) {
1065  WerrorS("putList: not a valid list (shared or atomic)");
1066  return TRUE;
1067  }
1068  if (arg->next->Typ() != INT_CMD) {
1069  WerrorS("putList: index must be an integer");
1070  return TRUE;
1071  }
1072  TxList *list = *(TxList **) arg->Data();
1073  if (!list) {
1074  WerrorS("putList: list has not been initialized");
1075  return TRUE;
1076  }
1077  long index = (long)(arg->next->Data());
1078  string value = LinTree::to_string(arg->next->next);
1079  int success = list->put(index, value);
1080  if (success < 0) {
1081  WerrorS("putList: region not acquired");
1082  return TRUE;
1083  }
1084  result->rtyp = NONE;
1085  return FALSE;
1086 }
1087 
1089  if (wrong_num_args("lockRegion", arg, 1))
1090  return TRUE;
1091  if (not_a_region("lockRegion", arg))
1092  return TRUE;
1093  Region *region = *(Region **)arg->Data();
1094  if (region->is_locked()) {
1095  WerrorS("lockRegion: region is already locked");
1096  return TRUE;
1097  }
1098  region->lock();
1099  result->rtyp = NONE;
1100  return FALSE;
1101 }
1102 
1104  if (wrong_num_args("lockRegion", arg, 1))
1105  return TRUE;
1106  if (not_a_region("lockRegion", arg))
1107  return TRUE;
1108  Region *region = *(Region **)arg->Data();
1109  if (region->is_locked()) {
1110  WerrorS("lockRegion: region is already locked");
1111  return TRUE;
1112  }
1113  region->lock();
1114  result->rtyp = type_regionlock;
1115  result->data = new_shared(region);
1116  return FALSE;
1117 }
1118 
1119 
1121  if (wrong_num_args("unlockRegion", arg, 1))
1122  return TRUE;
1123  if (not_a_region("unlockRegion", arg))
1124  return TRUE;
1125  Region *region = *(Region **)arg->Data();
1126  if (!region->is_locked()) {
1127  WerrorS("unlockRegion: region is not locked");
1128  return TRUE;
1129  }
1130  region->unlock();
1131  result->rtyp = NONE;
1132  return FALSE;
1133 }
1134 
1136  if (wrong_num_args("sendChannel", arg, 2))
1137  return TRUE;
1138  if (arg->Typ() != type_channel) {
1139  WerrorS("sendChannel: argument is not a channel");
1140  return TRUE;
1141  }
1142  SingularChannel *channel = *(SingularChannel **)arg->Data();
1143  if (!channel) {
1144  WerrorS("sendChannel: channel has not been initialized");
1145  return TRUE;
1146  }
1147  channel->send(LinTree::to_string(arg->next));
1148  result->rtyp = NONE;
1149  return FALSE;
1150 }
1151 
1153  if (wrong_num_args("receiveChannel", arg, 1))
1154  return TRUE;
1155  if (arg->Typ() != type_channel) {
1156  WerrorS("receiveChannel: argument is not a channel");
1157  return TRUE;
1158  }
1159  SingularChannel *channel = *(SingularChannel **)arg->Data();
1160  if (!channel) {
1161  WerrorS("receiveChannel: channel has not been initialized");
1162  return TRUE;
1163  }
1164  string item = channel->receive();
1165  leftv val = LinTree::from_string(item);
1166  result->rtyp = val->Typ();
1167  result->data = val->Data();
1168  return FALSE;
1169 }
1170 
1172  if (wrong_num_args("statChannel", arg, 1))
1173  return TRUE;
1174  if (arg->Typ() != type_channel) {
1175  WerrorS("statChannel: argument is not a channel");
1176  return TRUE;
1177  }
1178  SingularChannel *channel = *(SingularChannel **)arg->Data();
1179  if (!channel) {
1180  WerrorS("receiveChannel: channel has not been initialized");
1181  return TRUE;
1182  }
1183  long n = channel->count();
1184  result->rtyp = INT_CMD;
1185  result->data = (char *)n;
1186  return FALSE;
1187 }
1188 
1190  if (wrong_num_args("writeSyncVar", arg, 2))
1191  return TRUE;
1192  if (arg->Typ() != type_syncvar) {
1193  WerrorS("writeSyncVar: argument is not a syncvar");
1194  return TRUE;
1195  }
1196  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1197  if (!syncvar) {
1198  WerrorS("writeSyncVar: syncvar has not been initialized");
1199  return TRUE;
1200  }
1201  if (!syncvar->write(LinTree::to_string(arg->next))) {
1202  WerrorS("writeSyncVar: variable already has a value");
1203  return TRUE;
1204  }
1205  result->rtyp = NONE;
1206  return FALSE;
1207 }
1208 
1210  Command cmd("updateSyncVar", result, arg);
1211  cmd.check_argc_min(2);
1212  cmd.check_arg(0, type_syncvar, "first argument must be a syncvar");
1213  cmd.check_init(0, "syncvar has not been initialized");
1214  cmd.check_arg(1, STRING_CMD, "second argument must be a string");
1215  if (cmd.ok()) {
1216  SingularSyncVar *syncvar = cmd.shared_arg<SingularSyncVar>(0);
1217  char *procname = (char *) cmd.arg(1);
1218  arg = arg->next->next;
1219  syncvar->acquire();
1220  syncvar->wait_init();
1221  vector<leftv> argv;
1222  appendArg(argv, syncvar->get());
1223  while (arg) {
1224  appendArgCopy(argv, arg);
1225  arg = arg->next;
1226  }
1227  int error = executeProc(*result, procname, argv);
1228  if (!error) {
1229  syncvar->update(result);
1230  }
1231  syncvar->release();
1232  return error;
1233  }
1234  return cmd.status();
1235 }
1236 
1237 
1239  if (wrong_num_args("readSyncVar", arg, 1))
1240  return TRUE;
1241  if (arg->Typ() != type_syncvar) {
1242  WerrorS("readSyncVar: argument is not a syncvar");
1243  return TRUE;
1244  }
1245  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1246  if (!syncvar) {
1247  WerrorS("readSyncVar: syncvar has not been initialized");
1248  return TRUE;
1249  }
1250  string item = syncvar->read();
1251  leftv val = LinTree::from_string(item);
1252  result->rtyp = val->Typ();
1253  result->data = val->Data();
1254  return FALSE;
1255 }
1256 
1258  if (wrong_num_args("statSyncVar", arg, 1))
1259  return TRUE;
1260  if (arg->Typ() != type_syncvar) {
1261  WerrorS("statSyncVar: argument is not a syncvar");
1262  return TRUE;
1263  }
1264  SingularSyncVar *syncvar = *(SingularSyncVar **)arg->Data();
1265  if (!syncvar) {
1266  WerrorS("statSyncVar: syncvar has not been initialized");
1267  return TRUE;
1268  }
1269  int init = syncvar->check();
1270  result->rtyp = INT_CMD;
1271  result->data = (char *)(long) init;
1272  return FALSE;
1273 }
1274 
1275 void encode_shared(LinTree::LinTree &lintree, leftv val) {
1276  SharedObject *obj = *(SharedObject **)(val->Data());
1277  acquireShared(obj);
1278  lintree.put(obj);
1279 }
1280 
1282  int type = lintree.get_prev<int>();
1283  SharedObject *obj = lintree.get<SharedObject *>();
1285  result->rtyp = type;
1286  result->data = (void *)new_shared(obj);
1287  return result;
1288 }
1289 
1290 void ref_shared(LinTree::LinTree &lintree, int by) {
1291  SharedObject *obj = lintree.get<SharedObject *>();
1292  while (by > 0) {
1293  obj->incref();
1294  by--;
1295  }
1296  while (by < 0) {
1297  obj->decref();
1298  by++;
1299  }
1300 }
1301 
1302 void installShared(int type) {
1304 }
1305 
1306 void makeSharedType(int &type, const char *name) {
1307  if (type != 0) return;
1308  blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1309  b->blackbox_Init = shared_init;
1310  b->blackbox_destroy = shared_destroy;
1311  b->blackbox_Copy = shared_copy;
1312  b->blackbox_String = shared_string;
1313  b->blackbox_Assign = shared_assign;
1314  b->blackbox_CheckAssign = shared_check_assign;
1315  // b->blackbox_Op2 = shared_op2;
1316  // b->blackbox_Op3 = shared_op3;
1317  type = setBlackboxStuff(b, name);
1318  installShared(type);
1319 }
1320 
1321 void makeRegionlockType(int &type, const char *name) {
1322  if (type != 0) return;
1323  blackbox *b=(blackbox*)omAlloc0(sizeof(blackbox));
1324  b->blackbox_Init = shared_init;
1325  b->blackbox_destroy = rlock_destroy;
1326  b->blackbox_Copy = shared_copy;
1327  b->blackbox_String = shared_string;
1328  b->blackbox_Assign = rlock_assign;
1329  b->blackbox_CheckAssign = shared_check_assign;
1330  type = setBlackboxStuff(b, name);
1331  installShared(type);
1332 }
1333 
1334 #define MAX_THREADS 128
1335 
1337 public:
1338  bool active;
1339  bool running;
1340  int index;
1341  void *(*thread_func)(ThreadState *, void *);
1342  void *arg, *result;
1343  pthread_t id;
1344  pthread_t parent;
1348  queue<string> to_thread;
1349  queue<string> from_thread;
1350  ThreadState() : lock(), to_cond(&lock), from_cond(&lock),
1351  to_thread(), from_thread() {
1352  active = false;
1353  running = false;
1354  index = -1;
1355  }
1357  // We do nothing here. This is to prevent the condition
1358  // variable destructor from firing upon program exit,
1359  // which would invoke undefined behavior if the thread
1360  // is still running.
1361  }
1362 };
1363 
1365 
1367 
1368 void setOption(int ch) {
1369  int index = feGetOptIndex(ch);
1370  feSetOptValue((feOptIndex) index, (int) 1);
1371 }
1372 
1373 void thread_init() {
1374  master_lock.lock();
1376  master_lock.unlock();
1377 #ifdef ENABLE_THREADS
1379  siInit(global_argv0);
1380 #endif
1381  setOption('q');
1382  // setOption('b');
1383 }
1384 
1385 void *thread_main(void *arg) {
1386  ThreadState *ts = (ThreadState *)arg;
1387  thread_init();
1388  return ts->thread_func(ts, ts->arg);
1389 }
1390 
1391 void *interpreter_thread(ThreadState *ts, void *arg) {
1392  ts->lock.lock();
1393  for (;;) {
1394  bool eval = false;
1395  while (ts->to_thread.empty())
1396  ts->to_cond.wait();
1397  /* TODO */
1398  string expr = ts->to_thread.front();
1399  switch (expr[0]) {
1400  case '\0': case 'q':
1401  ts->lock.unlock();
1402  return NULL;
1403  case 'x':
1404  eval = false;
1405  break;
1406  case 'e':
1407  eval = true;
1408  break;
1409  }
1410  ts->to_thread.pop();
1411  expr = ts->to_thread.front();
1412  /* this will implicitly eval commands */
1413  leftv val = LinTree::from_string(expr);
1414  expr = LinTree::to_string(val);
1415  ts->to_thread.pop();
1416  if (eval)
1417  ts->from_thread.push(expr);
1418  ts->from_cond.signal();
1419  }
1420  ts->lock.unlock();
1421  return NULL;
1422 }
1423 
1425 private:
1427 public:
1428  InterpreterThread(ThreadState *ts_init) : SharedObject(), ts(ts_init) { }
1429  virtual ~InterpreterThread() { }
1430  ThreadState *getThreadState() { return ts; }
1432  ts = NULL;
1433  }
1434 };
1435 
1436 static ThreadState *newThread(void *(*thread_func)(ThreadState *, void *),
1437  void *arg, const char **error) {
1438  ThreadState *ts = NULL;
1439  if (error) *error = NULL;
1440  thread_lock.lock();
1441  for (int i=0; i<MAX_THREADS; i++) {
1442  if (!thread_state[i].active) {
1443  ts = thread_state + i;
1444  ts->index = i;
1445  ts->parent = pthread_self();
1446  ts->active = true;
1447  ts->running = true;
1448  ts->to_thread = queue<string>();
1449  ts->from_thread = queue<string>();
1450  ts->thread_func = thread_func;
1451  ts->arg = arg;
1452  ts->result = NULL;
1453  if (pthread_create(&ts->id, NULL, thread_main, ts)<0) {
1454  if (error)
1455  *error = "createThread: internal error: failed to create thread";
1456  goto fail;
1457  }
1458  goto exit;
1459  }
1460  }
1461  if (error) *error = "createThread: too many threads";
1462  fail:
1463  ts = NULL;
1464  exit:
1465  thread_lock.unlock();
1466  return ts;
1467 }
1468 
1469 ThreadState *createThread(void *(*thread_func)(ThreadState *, void *),
1470  void *arg) {
1471  return newThread(thread_func, arg, NULL);
1472 }
1473 
1475  void *result;
1476  pthread_join(ts->id, NULL);
1477  result = ts->result;
1478  thread_lock.lock();
1479  ts->running = false;
1480  ts->active = false;
1481  thread_lock.unlock();
1482  return result;
1483 }
1484 
1487  if (*error) return NULL;
1488  InterpreterThread *thread = new InterpreterThread(ts);
1489  char buf[10];
1490  sprintf(buf, "%d", ts->index);
1491  string name(buf);
1492  thread->set_name(name);
1493  thread->set_type(type_thread);
1494  return thread;
1495 }
1496 
1498  Command cmd("createThread", result, arg);
1499  cmd.check_argc(0);
1500  const char *error;
1501  if (!have_threads)
1502  cmd.report("thread support not available");
1503  if (!cmd.ok()) return cmd.status();
1505  if (error) {
1506  return cmd.abort(error);
1507  }
1508  cmd.set_result(type_thread, new_shared(thread));
1509  return cmd.status();
1510 }
1511 
1513  ThreadState *ts = thread->getThreadState();
1514  if (ts && ts->parent != pthread_self()) {
1515  return false;
1516  }
1517  ts->lock.lock();
1518  string quit("q");
1519  ts->to_thread.push(quit);
1520  ts->to_cond.signal();
1521  ts->lock.unlock();
1522  pthread_join(ts->id, NULL);
1523  thread_lock.lock();
1524  ts->running = false;
1525  ts->active = false;
1526  thread->clearThreadState();
1527  thread_lock.unlock();
1528  return true;
1529 }
1530 
1532  if (wrong_num_args("joinThread", arg, 1))
1533  return TRUE;
1534  if (arg->Typ() != type_thread) {
1535  WerrorS("joinThread: argument is not a thread");
1536  return TRUE;
1537  }
1538  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
1539  if (!joinInterpreterThread(thread)) {
1540  WerrorS("joinThread: can only be called from parent thread");
1541  return TRUE;
1542  }
1543  return FALSE;
1544 }
1545 
1546 class ThreadPool;
1547 class Trigger;
1548 
1549 class Job : public SharedObject {
1550 public:
1552  long prio;
1553  size_t id;
1555  vector<Job *> deps;
1556  vector<Job *> notify;
1557  vector<Trigger *> triggers;
1558  vector<string> args;
1559  string result; // lintree-encoded
1560  void *data;
1561  bool fast;
1562  bool done;
1563  bool queued;
1564  bool running;
1566  Job() : SharedObject(), pool(NULL), deps(), pending_index(-1), fast(false),
1567  done(false), running(false), queued(false), cancelled(false), data(NULL),
1568  result(), args(), notify(), triggers(), prio(0)
1569  { set_type(type_job); }
1570  ~Job();
1571  void addDep(Job *job) {
1572  deps.push_back(job);
1573  }
1574  void addDep(vector<Job *> &jobs);
1575  void addDep(long ndeps, Job **jobs);
1576  void addNotify(vector<Job *> &jobs);
1577  void addNotify(Job *job);
1578  virtual bool ready();
1579  virtual void execute() = 0;
1580  void run();
1581 };
1582 
1583 struct JobCompare {
1584  bool operator()(const Job* lhs, const Job* rhs) {
1585  if (lhs->fast < rhs->fast) {
1586  return true;
1587  }
1588  if (lhs->prio < rhs->prio) {
1589  return true;
1590  }
1591  if (lhs->prio == rhs->prio) {
1592  return lhs->id > rhs->id;
1593  }
1594  return false;
1595  }
1596 };
1597 
1598 class Trigger : public Job {
1599 public:
1600  virtual bool accept(leftv arg) = 0;
1601  virtual void activate(leftv arg) = 0;
1602  Trigger() : Job() { set_type(type_trigger); fast = true; }
1603 };
1604 
1605 bool Job::ready() {
1606  vector<Job *>::iterator it;
1607  for (it = deps.begin(); it != deps.end(); it++) {
1608  if (!(*it)->done) return false;
1609  }
1610  return true;
1611 }
1612 
1613 Job::~Job() {
1614  vector<Job *>::iterator it;
1615  for (it = deps.begin(); it != deps.end(); it++) {
1616  releaseShared(*it);
1617  }
1618 }
1619 
1620 typedef queue<Job *> JobQueue;
1621 
1622 class Scheduler;
1623 
1624 struct SchedInfo {
1627  int num;
1628 };
1629 
1632 
1633 class ThreadPool : public SharedObject {
1634 public:
1637  ThreadPool(Scheduler *sched, int n);
1638  ThreadPool(int n);
1639  ~ThreadPool();
1640  ThreadState *getThread(int i);
1641  void shutdown(bool wait);
1642  void addThread(ThreadState *thread);
1643  void attachJob(Job *job);
1644  void detachJob(Job *job);
1645  void queueJob(Job *job);
1646  void broadcastJob(Job *job);
1647  void cancelDeps(Job * job);
1648  void cancelJob(Job *job);
1649  void waitJob(Job *job);
1650  void clearThreadState();
1651 };
1652 
1653 
1654 class Scheduler : public SharedObject {
1655 private:
1657  size_t jobid;
1660  int running;
1663  vector<ThreadState *> threads;
1664  vector<ThreadPool *> thread_owners;
1665  priority_queue<Job *, vector<Job *>, JobCompare> global_queue;
1666  vector<JobQueue *> thread_queues;
1667  vector<Job *> pending;
1670  friend class Job;
1671 public:
1673  Scheduler(int n) :
1674  SharedObject(), threads(), thread_owners(), global_queue(), thread_queues(),
1675  single_threaded(n==0), nthreads(n == 0 ? 1 : n),
1676  lock(true), cond(&lock), response(&lock),
1677  shutting_down(false), shutdown_counter(0), jobid(0),
1678  maxconcurrency(n), running(0)
1679  {
1680  thread_queues.push_back(new JobQueue());
1681  }
1682  void set_maxconcurrency(int n) {
1683  maxconcurrency = n;
1684  }
1686  return maxconcurrency;
1687  }
1689  int n;
1690  for (int i = 0; i <thread_owners.size(); i++) {
1691  if (thread_owners[i] == pool)
1692  n++;
1693  }
1694  return n;
1695  }
1696  virtual ~Scheduler() {
1697  for (int i = 0; i < thread_queues.size(); i++) {
1698  JobQueue *q = thread_queues[i];
1699  while (!q->empty()) {
1700  Job *job = q->front();
1701  q->pop();
1702  releaseShared(job);
1703  }
1704  }
1705  thread_queues.clear();
1706  threads.clear();
1707  }
1708  ThreadState *getThread(int i) { return threads[i]; }
1709  void shutdown(bool wait) {
1710  if (single_threaded) {
1711  SchedInfo *info = new SchedInfo();
1712  info->num = 0;
1713  info->scheduler = this;
1714  acquireShared(this);
1715  info->job = NULL;
1717  return;
1718  }
1719  lock.lock();
1720  if (wait) {
1721  while (!global_queue.empty()) {
1722  response.wait();
1723  }
1724  }
1725  shutting_down = true;
1726  while (shutdown_counter < nthreads) {
1727  cond.broadcast();
1728  response.wait();
1729  }
1730  lock.unlock();
1731  for (int i = 0; i <threads.size(); i++) {
1732  joinThread(threads[i]);
1733  }
1734  }
1735  void addThread(ThreadPool *owner, ThreadState *thread) {
1736  lock.lock();
1737  thread_owners.push_back(owner);
1738  threads.push_back(thread);
1739  thread_queues.push_back(new JobQueue());
1740  lock.unlock();
1741  }
1742  void attachJob(ThreadPool *pool, Job *job) {
1743  lock.lock();
1744  job->pool = pool;
1745  job->id = jobid++;
1746  acquireShared(job);
1747  if (job->ready()) {
1748  global_queue.push(job);
1749  cond.signal();
1750  }
1751  else if (job->pending_index < 0) {
1752  job->pool = pool;
1753  job->pending_index = pending.size();
1754  pending.push_back(job);
1755  }
1756  lock.unlock();
1757  }
1758  void detachJob(Job *job) {
1759  lock.lock();
1760  long i = job->pending_index;
1761  job->pending_index = -1;
1762  if (i >= 0) {
1763  job = pending.back();
1764  pending.resize(pending.size()-1);
1765  pending[i] = job;
1766  job->pending_index = i;
1767  }
1768  lock.unlock();
1769  }
1770  void queueJob(Job *job) {
1771  lock.lock();
1772  global_queue.push(job);
1773  cond.signal();
1774  lock.unlock();
1775  }
1776  void broadcastJob(ThreadPool *pool, Job *job) {
1777  lock.lock();
1778  for (int i = 0; i <thread_queues.size(); i++) {
1779  if (thread_owners[i] == pool) {
1780  acquireShared(job);
1781  thread_queues[i]->push(job);
1782  }
1783  }
1784  lock.unlock();
1785  }
1786  void cancelDeps(Job * job) {
1787  vector<Job *> &notify = job->notify;
1788  for (int i = 0; i <notify.size(); i++) {
1789  Job *next = notify[i];
1790  if (!next->cancelled) {
1791  cancelJob(next);
1792  }
1793  }
1794  }
1795  void cancelJob(Job *job) {
1796  lock.lock();
1797  if (!job->cancelled) {
1798  job->cancelled = true;
1799  if (!job->running && !job->done) {
1800  job->done = true;
1801  cancelDeps(job);
1802  }
1803  }
1804  lock.unlock();
1805  }
1806  void waitJob(Job *job) {
1807  if (single_threaded) {
1808  SchedInfo *info = new SchedInfo();
1809  info->num = 0;
1810  info->scheduler = this;
1811  acquireShared(this);
1812  info->job = job;
1814  } else {
1815  lock.lock();
1816  for (;;) {
1817  if (job->done || job->cancelled) {
1818  break;
1819  }
1820  response.wait();
1821  }
1822  response.signal(); // forward signal
1823  lock.unlock();
1824  }
1825  }
1827  threads.clear();
1828  }
1829  static void notifyDeps(Scheduler *scheduler, Job *job) {
1830  vector<Job *> &notify = job->notify;
1831  job->incref(notify.size());
1832  for (int i = 0; i <notify.size(); i++) {
1833  Job *next = notify[i];
1834  if (!next->queued && next->ready() && !next->cancelled) {
1835  next->queued = true;
1836  scheduler->queueJob(next);
1837  }
1838  }
1839  vector<Trigger *> &triggers = job->triggers;
1840  leftv arg = NULL;
1841  if (triggers.size() > 0 && job->result.size() > 0)
1842  arg = LinTree::from_string(job->result);
1843  for (int i = 0; i < triggers.size(); i++) {
1844  Trigger *trigger = triggers[i];
1845  if (trigger->accept(arg)) {
1846  trigger->activate(arg);
1847  if (trigger->ready())
1848  scheduler->queueJob(trigger);
1849  }
1850  }
1851  if (arg) {
1852  arg->CleanUp();
1853  omFreeBin(arg, sleftv_bin);
1854  }
1855  }
1856  static void *main(ThreadState *ts, void *arg) {
1857  SchedInfo *info = (SchedInfo *) arg;
1858  Scheduler *scheduler = info->scheduler;
1859  ThreadPool *oldThreadPool = currentThreadPoolRef;
1860  // TODO: set current thread pool
1861  // currentThreadPoolRef = pool;
1862  Lock &lock = scheduler->lock;
1863  ConditionVariable &cond = scheduler->cond;
1864  ConditionVariable &response = scheduler->response;
1865  JobQueue *my_queue = scheduler->thread_queues[info->num];
1866  if (!scheduler->single_threaded)
1867  thread_init();
1868  lock.lock();
1869  for (;;) {
1870  if (info->job && info->job->done)
1871  break;
1872  if (scheduler->shutting_down) {
1873  scheduler->shutdown_counter++;
1874  scheduler->response.signal();
1875  break;
1876  }
1877  if (!my_queue->empty()) {
1878  Job *job = my_queue->front();
1879  my_queue->pop();
1880  if (!scheduler->global_queue.empty())
1881  cond.signal();
1882  currentJobRef = job;
1883  job->run();
1884  currentJobRef = NULL;
1885  notifyDeps(scheduler, job);
1886  releaseShared(job);
1887  scheduler->response.signal();
1888  continue;
1889  } else if (!scheduler->global_queue.empty()) {
1890  Job *job = scheduler->global_queue.top();
1891  scheduler->global_queue.pop();
1892  if (!scheduler->global_queue.empty())
1893  cond.signal();
1894  currentJobRef = job;
1895  job->run();
1896  currentJobRef = NULL;
1897  notifyDeps(scheduler, job);
1898  releaseShared(job);
1899  scheduler->response.signal();
1900  continue;
1901  } else {
1902  if (scheduler->single_threaded) {
1903  break;
1904  }
1905  cond.wait();
1906  }
1907  }
1908  // TODO: correct current thread pool
1909  // releaseShared(currentThreadPoolRef);
1910  currentThreadPoolRef = oldThreadPool;
1911  scheduler->lock.unlock();
1912  delete info;
1913  return NULL;
1914  }
1915 };
1916 
1917 ThreadPool::ThreadPool(int n) : SharedObject(), nthreads(n) {
1918  scheduler = new Scheduler(n);
1920 }
1921 ThreadPool::ThreadPool(Scheduler *sched, int n) : SharedObject(), nthreads(n) {
1922  scheduler = sched;
1923  acquireShared(sched);
1924 }
1927 }
1931  scheduler->addThread(this, thread);
1932 }
1934  scheduler->attachJob(this, job);
1935 }
1937  scheduler->detachJob(job);
1938 }
1940  scheduler->queueJob(job);
1941 }
1943  scheduler->broadcastJob(this, job);
1944 }
1946  scheduler->cancelDeps(job);
1947 }
1949  scheduler->cancelJob(job);
1950 }
1952  scheduler->waitJob(job);
1953 }
1956 }
1957 
1958 void Job::addDep(vector<Job *> &jobs) {
1959  deps.insert(deps.end(), jobs.begin(), jobs.end());
1960 }
1961 
1962 void Job::addDep(long ndeps, Job **jobs) {
1963  for (long i = 0; i < ndeps; i++) {
1964  deps.push_back(jobs[i]);
1965  }
1966 }
1967 
1968 void Job::addNotify(vector<Job *> &jobs) {
1969  notify.insert(notify.end(), jobs.begin(), jobs.end());
1970  if (done) {
1972  }
1973 }
1974 
1975 void Job::addNotify(Job *job) {
1976  notify.push_back(job);
1977  if (done) {
1979  }
1980 }
1981 
1982 void Job::run() {
1983  if (!cancelled) {
1984  running = true;
1985  pool->scheduler->lock.unlock();
1986  pool->scheduler->running++;
1987  execute();
1988  pool->scheduler->running--;
1989  pool->scheduler->lock.lock();
1990  running = false;
1991  }
1992  done = true;
1993 }
1994 
1995 class AccTrigger : public Trigger {
1996 private:
1997  long count;
1998 public:
1999  AccTrigger(long count_init): Trigger(), count(count_init) {
2000  }
2001  virtual bool ready() {
2002  if (!Trigger::ready()) return false;
2003  return args.size() >= count;
2004  }
2005  virtual bool accept(leftv arg) {
2006  return true;
2007  }
2008  virtual void activate(leftv arg) {
2009  while (arg != NULL && !ready()) {
2010  args.push_back(LinTree::to_string(arg));
2011  if (ready()) {
2012  return;
2013  }
2014  arg = arg->next;
2015  }
2016  }
2017  virtual void execute() {
2019  l->Init(args.size());
2020  for (int i = 0; i < args.size(); i++) {
2021  leftv val = LinTree::from_string(args[i]);
2022  memcpy(&l->m[i], val, sizeof(*val));
2023  omFreeBin(val, sleftv_bin);
2024  }
2025  sleftv val;
2026  memset(&val, 0, sizeof(val));
2027  val.rtyp = LIST_CMD;
2028  val.data = l;
2029  result = LinTree::to_string(&val);
2030  // val.CleanUp();
2031  }
2032 };
2033 
2034 class CountTrigger : public Trigger {
2035 private:
2036  long count;
2037 public:
2038  CountTrigger(long count_init): Trigger(), count(count_init) {
2039  }
2040  virtual bool ready() {
2041  if (!Trigger::ready()) return false;
2042  return count <= 0;
2043  }
2044  virtual bool accept(leftv arg) {
2045  return arg == NULL;
2046  }
2047  virtual void activate(leftv arg) {
2048  if (!ready()) {
2049  count--;
2050  }
2051  }
2052  virtual void execute() {
2053  // do nothing
2054  }
2055 };
2056 
2057 class SetTrigger : public Trigger {
2058 private:
2059  vector<bool> set;
2060  long count;
2061 public:
2062  SetTrigger(long count_init) : Trigger(), count(0),
2063  set(count_init) {
2064  }
2065  virtual bool ready() {
2066  if (!Trigger::ready()) return false;
2067  return count == set.size();
2068  }
2069  virtual bool accept(leftv arg) {
2070  return arg->Typ() == INT_CMD;
2071  }
2072  virtual void activate(leftv arg) {
2073  if (!ready()) {
2074  long value = (long) arg->Data();
2075  if (value < 0 || value >= count) return;
2076  if (set[value]) return;
2077  set[value] = true;
2078  count++;
2079  }
2080  }
2081  virtual void execute() {
2082  // do nothing
2083  }
2084 };
2085 
2086 
2087 class ProcTrigger : public Trigger {
2088 private:
2089  string procname;
2090  bool success;
2091 public:
2092  ProcTrigger(const char *p) : Trigger(), procname(p), success(false) {
2093  }
2094  virtual bool ready() {
2095  if (!Trigger::ready()) return false;
2096  return success;
2097  }
2098  virtual bool accept(leftv arg) {
2099  return TRUE;
2100  }
2101  virtual void activate(leftv arg) {
2102  if (!ready()) {
2103  pool->scheduler->lock.unlock();
2104  vector<leftv> argv;
2105  for (int i = 0; i < args.size(); i++) {
2106  appendArg(argv, args[i]);
2107  }
2108  int error = false;
2109  while (arg) {
2110  appendArgCopy(argv, arg);
2111  arg = arg->next;
2112  }
2113  sleftv val;
2114  if (!error)
2115  error = executeProc(val, procname.c_str(), argv);
2116  if (!error) {
2117  if (val.Typ() == NONE || (val.Typ() == INT_CMD &&
2118  (long) val.Data()))
2119  {
2120  success = true;
2121  }
2122  val.CleanUp();
2123  }
2124  pool->scheduler->lock.lock();
2125  }
2126  }
2127  virtual void execute() {
2128  // do nothing
2129  }
2130 };
2131 
2133  long n;
2134  Command cmd("createThreadPool", result, arg);
2135  cmd.check_argc(1, 2);
2136  cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2137  if (cmd.ok()) {
2138  n = (long) cmd.arg(0);
2139  if (n < 0) cmd.report("number of threads must be non-negative");
2140  else if (n >= 256) cmd.report("number of threads too large");
2141  if (!have_threads && n != 0)
2142  cmd.report("in single-threaded mode, number of threads must be zero");
2143  }
2144  if (cmd.ok()) {
2145  ThreadPool *pool = new ThreadPool((int) n);
2146  pool->set_type(type_threadpool);
2147  for (int i = 0; i <n; i++) {
2148  const char *error;
2149  SchedInfo *info = new SchedInfo();
2150  info->scheduler = pool->scheduler;
2151  acquireShared(pool->scheduler);
2152  info->job = NULL;
2153  info->num = i;
2155  if (!thread) {
2156  // TODO: clean up bad pool
2157  return cmd.abort(error);
2158  }
2159  pool->addThread(thread);
2160  }
2162  }
2163  return cmd.status();
2164 }
2165 
2167  Command cmd("createThreadPoolSet", result, arg);
2168  cmd.check_argc(2);
2169  cmd.check_arg(0, INT_CMD, "first argument must be an integer");
2170  cmd.check_arg(1, LIST_CMD, "second argument must be a list of integers");
2171  lists l;
2172  int n;
2173  if (cmd.ok()) {
2174  l = (lists) (cmd.arg(1));
2175  n = lSize(l)+1;
2176  if (n == 0)
2177  return cmd.abort("second argument must not be empty");
2178  for (int i = 0; i < n; i++) {
2179  if (l->m[i].Typ() != INT_CMD)
2180  return cmd.abort("second argument must be a list of integers");
2181  }
2182  }
2183  lists pools = (lists) omAlloc0Bin(slists_bin);
2184  pools->Init(n);
2185  if (cmd.ok()) {
2186  long s = 0;
2187  for (int i = 0; i < n; i++) {
2188  s += (long) (l->m[i].Data());
2189  }
2190  Scheduler *sched = new Scheduler((int)s);
2191  sched->set_maxconcurrency(cmd.int_arg(0));
2192  for (int i = 0; i < n; i++) {
2193  long m = (long) (l->m[i].Data());
2194  ThreadPool *pool = new ThreadPool(sched, (int) m);
2195  pool->set_type(type_threadpool);
2196  for (int j = 0; j < m; j++) {
2197  const char *error;
2198  SchedInfo *info = new SchedInfo();
2199  info->scheduler = pool->scheduler;
2200  acquireShared(pool->scheduler);
2201  info->job = NULL;
2202  info->num = i;
2204  if (!thread) {
2205  // TODO: clean up bad pool
2206  return cmd.abort(error);
2207  }
2208  pool->addThread(thread);
2209  }
2210  pools->m[i].rtyp = type_threadpool;
2211  pools->m[i].data = new_shared(pool);
2212  }
2213  cmd.set_result(LIST_CMD, pools);
2214  }
2215  return cmd.status();
2216 }
2217 
2218 ThreadPool *createThreadPool(int nthreads, int prioThreads = 0) {
2219  ThreadPool *pool = new ThreadPool((int) nthreads);
2220  pool->set_type(type_threadpool);
2221  for (int i = 0; i <nthreads; i++) {
2222  const char *error;
2223  SchedInfo *info = new SchedInfo();
2224  info->scheduler = pool->scheduler;
2225  acquireShared(pool);
2226  info->job = NULL;
2227  info->num = i;
2229  if (!thread) {
2230  return NULL;
2231  }
2232  pool->addThread(thread);
2233  }
2234  return pool;
2235 }
2236 
2237 void release(ThreadPool *pool) {
2238  releaseShared(pool);
2239 }
2240 
2241 void retain(ThreadPool *pool) {
2242  acquireShared(pool);
2243 }
2244 
2246  return currentThreadPoolRef;
2247 }
2248 
2250  Command cmd("getThreadPoolWorkers", result, arg);
2251  cmd.check_argc(1);
2252  cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2253  cmd.check_init(0, "threadpool not initialized");
2254  int r = 0;
2255  if (cmd.ok()) {
2256  ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2257  Scheduler *sched = pool->scheduler;
2258  sched->lock.lock();
2259  r = sched->threadpool_size(pool);
2260  sched->lock.unlock();
2261  cmd.set_result(INT_CMD, r);
2262  }
2263  return cmd.status();
2264 }
2265 
2267  Command cmd("setThreadPoolWorkers", result, arg);
2268  cmd.check_argc(2);
2269  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2270  cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2271  cmd.check_init(0, "threadpool not initialized");
2272  if (cmd.ok()) {
2273  ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2274  Scheduler *sched = pool->scheduler;
2275  // TODO: count/add threads
2276  cmd.no_result();
2277  }
2278  return cmd.status();
2279 }
2280 
2282  Command cmd("getThreadPoolConcurrency", result, arg);
2283  cmd.check_argc(1);
2284  cmd.check_arg(0, type_threadpool, "argument must be a threadpool");
2285  cmd.check_init(0, "threadpool not initialized");
2286  if (cmd.ok()) {
2287  ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2288  Scheduler *sched = pool->scheduler;
2289  sched->lock.lock();
2290  cmd.set_result(INT_CMD, sched->get_maxconcurrency());
2291  sched->lock.unlock();
2292  }
2293  return cmd.status();
2294 }
2295 
2297  Command cmd("setThreadPoolWorkers", result, arg);
2298  cmd.check_argc(2);
2299  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2300  cmd.check_arg(1, INT_CMD, "second argument must be an integer");
2301  cmd.check_init(0, "threadpool not initialized");
2302  if (cmd.ok()) {
2303  ThreadPool *pool = cmd.shared_arg<ThreadPool>(0);
2304  Scheduler *sched = pool->scheduler;
2305  sched->lock.lock();
2306  sched->set_maxconcurrency(cmd.int_arg(1));
2307  sched->lock.unlock();
2308  cmd.no_result();
2309  }
2310  return cmd.status();
2311 }
2312 
2314  Command cmd("closeThreadPool", result, arg);
2315  cmd.check_argc(1, 2);
2316  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2317  cmd.check_init(0, "threadpool not initialized");
2318  if (cmd.nargs() > 1)
2319  cmd.check_arg(1, INT_CMD, "optional argument must be an integer");
2320  if (cmd.ok()) {
2321  ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2322  bool wait = cmd.nargs() == 2 ? (cmd.int_arg(1) != 0) : 1;
2323  pool->shutdown(wait);
2324  cmd.no_result();
2325  }
2326  return cmd.status();
2327 }
2328 
2329 void closeThreadPool(ThreadPool *pool, bool wait) {
2330  pool->shutdown(wait);
2331 }
2332 
2333 
2335  Command cmd("currentThreadPool", result, arg);
2336  cmd.check_argc(0);
2338  if (pool) {
2340  } else {
2341  cmd.report("no current threadpool");
2342  }
2343  return cmd.status();
2344 }
2345 
2347  Command cmd("setCurrentThreadPool", result, arg);
2348  cmd.check_argc(1);
2349  cmd.check_init(0, "threadpool not initialized");
2350  if (cmd.ok()) {
2351  ThreadPool *pool = *(ThreadPool **)(cmd.arg(0));
2352  acquireShared(pool);
2355  currentThreadPoolRef = pool;
2356  }
2357  return cmd.status();
2358 }
2359 
2360 class EvalJob : public Job {
2361 public:
2362  EvalJob() : Job() { }
2363  virtual void execute() {
2364  leftv val = LinTree::from_string(args[0]);
2365  result = (LinTree::to_string(val));
2366  val->CleanUp();
2367  omFreeBin(val, sleftv_bin);
2368  }
2369 };
2370 
2371 class ExecJob : public Job {
2372 public:
2373  ExecJob() : Job() { }
2374  virtual void execute() {
2375  leftv val = LinTree::from_string(args[0]);
2376  val->CleanUp();
2377  omFreeBin(val, sleftv_bin);
2378  }
2379 };
2380 
2381 class ProcJob : public Job {
2382  string procname;
2383 public:
2384  ProcJob(const char *procname_init) : Job(),
2385  procname(procname_init) {
2386  set_name(procname_init);
2387  }
2388  virtual void execute() {
2389  vector<leftv> argv;
2390  for (int i = 0; i <args.size(); i++) {
2391  appendArg(argv, args[i]);
2392  }
2393  for (int i = 0; i < deps.size(); i++) {
2394  appendArg(argv, deps[i]->result);
2395  }
2396  sleftv val;
2397  int error = executeProc(val, procname.c_str(), argv);
2398  if (!error) {
2399  result = (LinTree::to_string(&val));
2400  val.CleanUp();
2401  }
2402  }
2403 };
2404 
2405 class KernelJob : public Job {
2406 private:
2407  void (*cfunc)(leftv result, leftv arg);
2408 public:
2409  KernelJob(void (*func)(leftv result, leftv arg)) : cfunc(func) { }
2410  virtual void execute() {
2411  vector<leftv> argv;
2412  for (int i = 0; i <args.size(); i++) {
2413  appendArg(argv, args[i]);
2414  }
2415  for (int i = 0; i < deps.size(); i++) {
2416  appendArg(argv, deps[i]->result);
2417  }
2418  sleftv val;
2419  memset(&val, 0, sizeof(val));
2420  if (argv.size() > 0) {
2421  leftv *tail = &argv[0]->next;
2422  for (int i = 1; i < argv.size(); i++) {
2423  *tail = argv[i];
2424  tail = &(*tail)->next;
2425  }
2426  *tail = NULL;
2427  }
2428  cfunc(&val, argv[0]);
2429  result = (LinTree::to_string(&val));
2430  val.CleanUp();
2431  }
2432 };
2433 
2434 class RawKernelJob : public Job {
2435 private:
2436  void (*cfunc)(long ndeps, Job **deps);
2437 public:
2438  RawKernelJob(void (*func)(long ndeps, Job **deps)) : cfunc(func) { }
2439  virtual void execute() {
2440  long ndeps = deps.size();
2441  Job **jobs = (Job **) omAlloc0(sizeof(Job *) * ndeps);
2442  for (long i = 0; i < ndeps; i++)
2443  jobs[i] = deps[i];
2444  cfunc(ndeps, jobs);
2445  omFree(jobs);
2446  }
2447 };
2448 
2450  Command cmd("createJob", result, arg);
2451  cmd.check_argc_min(1);
2452  cmd.check_arg(0, STRING_CMD, COMMAND,
2453  "job name must be a string or quote expression");
2454  if (cmd.ok()) {
2455  if (cmd.test_arg(0, STRING_CMD)) {
2456  ProcJob *job = new ProcJob((char *)(cmd.arg(0)));
2457  for (leftv a = arg->next; a != NULL; a = a->next) {
2458  job->args.push_back(LinTree::to_string(a));
2459  }
2460  cmd.set_result(type_job, new_shared(job));
2461  } else {
2462  cmd.check_argc(1);
2463  Job *job = new EvalJob();
2464  job->args.push_back(LinTree::to_string(arg));
2465  cmd.set_result(type_job, new_shared(job));
2466  }
2467  }
2468  return cmd.status();
2469 }
2470 
2471 Job *createJob(void (*func)(leftv result, leftv arg)) {
2472  KernelJob *job = new KernelJob(func);
2473  return job;
2474 }
2475 
2476 Job *createJob(void (*func)(long ndeps, Job **deps)) {
2477  RawKernelJob *job = new RawKernelJob(func);
2478  return job;
2479 }
2480 
2481 Job *startJob(ThreadPool *pool, Job *job, leftv arg) {
2482  if (job->pool) return NULL;
2483  while (arg) {
2484  job->args.push_back(LinTree::to_string(arg));
2485  arg = arg->next;
2486  }
2487  pool->attachJob(job);
2488  return job;
2489 }
2490 
2491 Job *startJob(ThreadPool *pool, Job *job) {
2492  return startJob(pool, job, NULL);
2493 }
2494 
2495 // Job *scheduleJob(ThreadPool *pool, Job *job, long ndeps, Job **deps) {
2496 // if (job->pool) return NULL;
2497 // pool->scheduler->lock.lock();
2498 // bool cancelled = false;
2499 // job->addDep(ndeps, deps);
2500 // for (long i = 0; i < ndeps; i++) {
2501 // deps[i]->addNotify(job);
2502 // cancelled |= deps[i]->cancelled;
2503 // }
2504 // if (cancelled) {
2505 // job->pool = pool;
2506 // pool->cancelJob(job);
2507 // }
2508 // else
2509 // pool->attachJob(job);
2510 // pool->scheduler->lock.unlock();
2511 // return FIXME: missing/unclear what this is supposed to be
2512 // }
2513 
2514 void cancelJob(Job *job) {
2515  ThreadPool *pool = job->pool;
2516  if (pool) pool->cancelJob(job);
2517 }
2518 
2520  return currentJobRef;
2521 }
2522 
2524  Command cmd("startJob", result, arg);
2525  cmd.check_argc_min(1);
2526  int has_pool = cmd.test_arg(0, type_threadpool);
2527  cmd.check_argc_min(1+has_pool);
2528  if (has_pool)
2529  cmd.check_init(0, "threadpool not initialized");
2530  int has_prio = cmd.test_arg(has_pool, INT_CMD);
2531  long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2532  int first_arg = has_pool + has_prio;
2533  cmd.check_arg(first_arg, type_job, STRING_CMD,
2534  "job argument must be a job or string");
2535  if (cmd.ok() && cmd.argtype(first_arg) == type_job)
2536  cmd.check_init(first_arg, "job not initialized");
2537  if (!cmd.ok()) return cmd.status();
2538  ThreadPool *pool;
2539  if (has_pool)
2540  pool = cmd.shared_arg<ThreadPool>(0);
2541  else {
2542  if (!currentThreadPoolRef)
2543  return cmd.abort("no current threadpool defined");
2544  pool = currentThreadPoolRef;
2545  }
2546  Job *job;
2547  if (cmd.argtype(first_arg) == type_job)
2548  job = *(Job **)(cmd.arg(first_arg));
2549  else
2550  job = new ProcJob((char *)(cmd.arg(first_arg)));
2551  leftv a = arg->next;
2552  if (has_pool) a = a->next;
2553  if (has_prio) a = a->next;
2554  for (; a != NULL; a = a->next) {
2555  job->args.push_back(LinTree::to_string(a));
2556  }
2557  if (job->pool)
2558  return cmd.abort("job has already been scheduled");
2559  job->prio = prio;
2560  pool->attachJob(job);
2561  cmd.set_result(type_job, new_shared(job));
2562  return cmd.status();
2563 }
2564 
2566  Command cmd("waitJob", result, arg);
2567  cmd.check_argc(1);
2568  cmd.check_arg(0, type_job, "argument must be a job");
2569  cmd.check_init(0, "job not initialized");
2570  if (cmd.ok()) {
2571  Job *job = *(Job **)(cmd.arg(0));
2572  ThreadPool *pool = job->pool;
2573  if (!pool) {
2574  return cmd.abort("job has not yet been started or scheduled");
2575  }
2576  pool->waitJob(job);
2577  if (job->cancelled) {
2578  return cmd.abort("job has been cancelled");
2579  }
2580  if (job->result.size() == 0)
2581  cmd.no_result();
2582  else {
2584  cmd.set_result(res->Typ(), res->Data());
2585  }
2586  }
2587  return cmd.status();
2588 }
2589 
2590 void waitJob(Job *job) {
2591  assert(job->pool != NULL);
2592  job->pool->waitJob(job);
2593 }
2594 
2596  Command cmd("cancelJob", result, arg);
2597  cmd.check_argc(1);
2598  cmd.check_arg(0, type_job, "argument must be a job");
2599  cmd.check_init(0, "job not initialized");
2600  if (cmd.ok()) {
2601  Job *job = cmd.shared_arg<Job>(0);
2602  ThreadPool *pool = job->pool;
2603  if (!pool) {
2604  return cmd.abort("job has not yet been started or scheduled");
2605  }
2606  pool->cancelJob(job);
2607  cmd.no_result();
2608  }
2609  return cmd.status();
2610 }
2611 
2613  Job *job;
2614  Command cmd("jobCancelled", result, arg);
2615  cmd.check_argc(0, 1);
2616  if (cmd.nargs() == 1) {
2617  cmd.check_arg(0, type_job, "argument must be a job");
2618  cmd.check_init(0, "job not initialized");
2619  job = cmd.shared_arg<Job>(0);
2620  } else {
2621  job = currentJobRef;
2622  if (!job)
2623  cmd.report("no current job");
2624  }
2625  if (cmd.ok()) {
2626  ThreadPool *pool = job->pool;
2627  if (!pool) {
2628  return cmd.abort("job has not yet been started or scheduled");
2629  }
2630  pool->scheduler->lock.lock();
2631  cmd.set_result((long) job->cancelled);
2632  pool->scheduler->lock.unlock();
2633  }
2634  return cmd.status();
2635 }
2636 
2637 bool getJobCancelled(Job *job) {
2638  ThreadPool *pool = job->pool;
2639  if (pool) pool->scheduler->lock.lock();
2640  bool result = job->cancelled;
2641  if (pool) pool->scheduler->lock.unlock();
2642  return result;
2643 }
2644 
2647 }
2648 
2649 void setJobData(Job *job, void *data) {
2650  ThreadPool *pool = job->pool;
2651  if (pool) pool->scheduler->lock.lock();
2652  job->data = data;
2653  if (pool) pool->scheduler->lock.unlock();
2654 }
2655 
2656 
2657 void *getJobData(Job *job) {
2658  ThreadPool *pool = job->pool;
2659  if (pool) pool->scheduler->lock.lock();
2660  void *result = job->data;
2661  if (pool) pool->scheduler->lock.unlock();
2662  return result;
2663 }
2664 
2665 void addJobArgs(Job *job, leftv arg) {
2666  ThreadPool *pool = job->pool;
2667  if (pool) pool->scheduler->lock.lock();
2668  while (arg) {
2669  job->args.push_back(LinTree::to_string(arg));
2670  arg = arg->next;
2671  }
2672  if (pool) pool->scheduler->lock.unlock();
2673 }
2674 
2676  ThreadPool *pool = job->pool;
2677  if (pool) pool->scheduler->lock.lock();
2679  if (pool) pool->scheduler->lock.unlock();
2680  return result;
2681 }
2682 
2683 const char *getJobName(Job *job) {
2684  // TODO
2685  return "";
2686 }
2687 
2688 void setJobName(Job *job, const char *name) {
2689  // TODO
2690 }
2691 
2693  Command cmd("createTrigger", result, arg);
2694  cmd.check_argc_min(1);
2695  int has_pool = cmd.test_arg(0, type_threadpool);
2696  ThreadPool *pool;
2697  if (has_pool) {
2698  cmd.check_init(0, "threadpool not initialized");
2699  pool = cmd.shared_arg<ThreadPool>(0);
2700  } else {
2701  pool = currentThreadPoolRef;
2702  if (!pool)
2703  return cmd.abort("no default threadpool");
2704  }
2705  cmd.check_argc(has_pool + 2);
2706  cmd.check_arg(has_pool + 0, STRING_CMD, "trigger subtype must be a string");
2707  const char *kind = (const char *)(cmd.arg(has_pool + 0));
2708  if (0 == strcmp(kind, "proc")) {
2709  cmd.check_arg(has_pool + 1, STRING_CMD, "proc trigger argument must be a string");
2710  } else {
2711  cmd.check_arg(has_pool + 1, INT_CMD, "trigger argument must be an integer");
2712  }
2713  if (cmd.ok()) {
2714  Trigger *trigger;
2715  long n = (long) (cmd.arg(has_pool + 1));
2716  if (n < 0)
2717  return cmd.abort("trigger argument must be a non-negative integer");
2718  if (0 == strcmp(kind, "acc")) {
2719  trigger = new AccTrigger(n);
2720  } else if (0 == strcmp(kind, "count")) {
2721  trigger = new CountTrigger(n);
2722  } else if (0 == strcmp(kind, "set")) {
2723  trigger = new SetTrigger(n);
2724  } else if (0 == strcmp(kind, "proc")) {
2725  trigger = new ProcTrigger((const char *) cmd.arg(has_pool + 1));
2726  } else {
2727  return cmd.abort("unknown trigger subtype");
2728  }
2729  pool->attachJob(trigger);
2730  cmd.set_result(type_trigger, new_shared(trigger));
2731  }
2732  return cmd.status();
2733 }
2734 
2736  Command cmd("updateTrigger", result, arg);
2737  cmd.check_argc_min(1);
2738  cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2739  cmd.check_init(0, "trigger not initialized");
2740  if (cmd.ok()) {
2741  Trigger *trigger = cmd.shared_arg<Trigger>(0);
2742  trigger->pool->scheduler->lock.lock();
2743  if (!trigger->accept(arg->next))
2744  cmd.report("incompatible argument type(s) for this trigger");
2745  else {
2746  trigger->activate(arg->next);
2747  if (trigger->ready()) {
2748  trigger->run();
2749  Scheduler::notifyDeps(trigger->pool->scheduler, trigger);
2750  }
2751  }
2752  trigger->pool->scheduler->lock.unlock();
2753  }
2754  return cmd.status();
2755 }
2756 
2758  Command cmd("chainTrigger", result, arg);
2759  cmd.check_argc(2);
2760  cmd.check_arg(0, type_trigger, "first argument must be a trigger");
2762  "second argument must be a trigger or job");
2763  cmd.check_init(0, "trigger not initialized");
2764  cmd.check_init(1, "trigger/job not initialized");
2765  if (cmd.ok()) {
2766  Trigger *trigger = cmd.shared_arg<Trigger>(0);
2767  Job *job = cmd.shared_arg<Job>(1);
2768  if (trigger->pool != job->pool)
2769  return cmd.abort("arguments use different threadpools");
2770  ThreadPool *pool = trigger->pool;
2771  pool->scheduler->lock.lock();
2772  job->triggers.push_back(trigger);
2773  pool->scheduler->lock.unlock();
2774  }
2775  return cmd.status();
2776 }
2777 
2779  Command cmd("testTrigger", result, arg);
2780  cmd.check_argc(1);
2781  cmd.check_arg(0, type_trigger, "argument must be a trigger");
2782  cmd.check_init(0, "trigger not initialized");
2783  if (cmd.ok()) {
2784  Trigger *trigger = cmd.shared_arg<Trigger>(0);
2785  ThreadPool *pool = trigger->pool;
2786  pool->scheduler->lock.lock();
2787  cmd.set_result((long)trigger->ready());
2788  pool->scheduler->lock.unlock();
2789  }
2790  return cmd.status();
2791 }
2792 
2793 
2795  vector<Job *> jobs;
2796  vector<Job *> deps;
2797  Command cmd("scheduleJob", result, arg);
2798  cmd.check_argc_min(1);
2799  int has_pool = cmd.test_arg(0, type_threadpool);
2800  if (has_pool) {
2801  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2802  cmd.check_init(0, "threadpool not initialized");
2803  }
2804  cmd.check_argc_min(has_pool+1);
2805  int has_prio = cmd.test_arg(has_pool, INT_CMD);
2806  ThreadPool *pool;
2807  if (has_pool)
2808  pool = cmd.shared_arg<ThreadPool>(0);
2809  else {
2810  if (!currentThreadPoolRef)
2811  return cmd.abort("no current threadpool defined");
2812  pool = currentThreadPoolRef;
2813  }
2814  long prio = has_prio ? (long) cmd.arg(has_pool) : 0L;
2815  int first_arg = has_pool + has_prio;
2816  if (cmd.test_arg(first_arg, type_job)) {
2817  jobs.push_back(*(Job **)(cmd.arg(first_arg)));
2818  } else if (cmd.test_arg(first_arg, STRING_CMD)) {
2819  jobs.push_back(new ProcJob((char *)(cmd.arg(first_arg))));
2820  } else if (cmd.test_arg(first_arg, LIST_CMD)) {
2821  lists l = (lists) (cmd.arg(first_arg));
2822  int n = lSize(l);
2823  for (int i = 0; i < n; i++) {
2824  if (l->m[i].Typ() != type_job)
2825  return cmd.abort("job argument must be a job, string, or list of jobs");
2826  }
2827  for (int i = 0; i < n; i++) {
2828  Job *job = *(Job **) (l->m[i].Data());
2829  if (!job)
2830  return cmd.abort("job not initialized");
2831  jobs.push_back(job);
2832  }
2833  } else {
2834  return cmd.abort("job argument must be a job, string, or list of jobs");
2835  }
2836  bool error = false;
2837  leftv a = arg->next;
2838  if (has_pool) a = a->next;
2839  if (has_prio) a = a->next;
2840  for (; !error && a; a = a->next) {
2841  if (a->Typ() == type_job || a->Typ() == type_trigger) {
2842  deps.push_back(*(Job **)(a->Data()));
2843  } else if (a->Typ() == LIST_CMD) {
2844  lists l = (lists) a->Data();
2845  int n = lSize(l);
2846  for (int i = 0; i < n; i++) {
2847  if (l->m[i].Typ() == type_job || l->m[i].Typ() == type_trigger) {
2848  deps.push_back(*(Job **)(l->m[i].Data()));
2849  } else {
2850  error = true;
2851  break;
2852  }
2853  }
2854  }
2855  }
2856  if (error) {
2857  return cmd.abort("illegal dependency");
2858  }
2859  for (int i = 0; i < jobs.size(); i++) {
2860  Job *job = jobs[i];
2861  if (job->pool) {
2862  return cmd.abort("job has already been scheduled");
2863  }
2864  job->prio = prio;
2865  }
2866  for (int i = 0; i < deps.size(); i++) {
2867  Job *job = deps[i];
2868  if (!job->pool) {
2869  return cmd.abort("dependency has not yet been scheduled");
2870  }
2871  if (job->pool != pool) {
2872  return cmd.abort("dependency has been scheduled on a different threadpool");
2873  }
2874  }
2875  pool->scheduler->lock.lock();
2876  bool cancelled = false;
2877  for (int i = 0; i < jobs.size(); i++) {
2878  jobs[i]->addDep(deps);
2879  }
2880  for (int i = 0; i < deps.size(); i++) {
2881  deps[i]->addNotify(jobs);
2882  cancelled |= deps[i]->cancelled;
2883  }
2884  for (int i = 0; i < jobs.size(); i++) {
2885  if (cancelled) {
2886  jobs[i]->pool = pool;
2887  pool->cancelJob(jobs[i]);
2888  }
2889  else
2890  pool->attachJob(jobs[i]);
2891  }
2892  pool->scheduler->lock.unlock();
2893  if (jobs.size() > 0)
2894  cmd.set_result(type_job, new_shared(jobs[0]));
2895  return cmd.status();
2896 }
2897 
2899  Command cmd("currentJob", result, arg);
2900  cmd.check_argc(0);
2901  Job *job = currentJobRef;
2902  if (job) {
2903  cmd.set_result(type_job, new_shared(job));
2904  } else {
2905  cmd.report("no current job");
2906  }
2907  return cmd.status();
2908 }
2909 
2910 
2912  int i;
2913  if (wrong_num_args("threadID", arg, 0))
2914  return TRUE;
2915  result->rtyp = INT_CMD;
2916  result->data = (char *)thread_id;
2917  return FALSE;
2918 }
2919 
2921  int i;
2922  if (wrong_num_args("mainThread", arg, 0))
2923  return TRUE;
2924  result->rtyp = INT_CMD;
2925  result->data = (char *)(long)(thread_id == 0L);
2926  return FALSE;
2927 }
2928 
2930  if (wrong_num_args("threadEval", arg, 2))
2931  return TRUE;
2932  if (arg->Typ() != type_thread) {
2933  WerrorS("threadEval: argument is not a thread");
2934  return TRUE;
2935  }
2936  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2937  string expr = LinTree::to_string(arg->next);
2938  ThreadState *ts = thread->getThreadState();
2939  if (ts && ts->parent != pthread_self()) {
2940  WerrorS("threadEval: can only be called from parent thread");
2941  return TRUE;
2942  }
2943  if (ts) ts->lock.lock();
2944  if (!ts || !ts->running || !ts->active) {
2945  WerrorS("threadEval: thread is no longer running");
2946  if (ts) ts->lock.unlock();
2947  return TRUE;
2948  }
2949  ts->to_thread.push("e");
2950  ts->to_thread.push(expr);
2951  ts->to_cond.signal();
2952  ts->lock.unlock();
2953  result->rtyp = NONE;
2954  return FALSE;
2955 }
2956 
2958  if (wrong_num_args("threadExec", arg, 2))
2959  return TRUE;
2960  if (arg->Typ() != type_thread) {
2961  WerrorS("threadExec: argument is not a thread");
2962  return TRUE;
2963  }
2964  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
2965  string expr = LinTree::to_string(arg->next);
2966  ThreadState *ts = thread->getThreadState();
2967  if (ts && ts->parent != pthread_self()) {
2968  WerrorS("threadExec: can only be called from parent thread");
2969  return TRUE;
2970  }
2971  if (ts) ts->lock.lock();
2972  if (!ts || !ts->running || !ts->active) {
2973  WerrorS("threadExec: thread is no longer running");
2974  if (ts) ts->lock.unlock();
2975  return TRUE;
2976  }
2977  ts->to_thread.push("x");
2978  ts->to_thread.push(expr);
2979  ts->to_cond.signal();
2980  ts->lock.unlock();
2981  result->rtyp = NONE;
2982  return FALSE;
2983 }
2984 
2986  Command cmd("threadPoolExec", result, arg);
2987  ThreadPool *pool;
2988  cmd.check_argc(1, 2);
2989  int has_pool = cmd.nargs() == 2;
2990  if (has_pool) {
2991  cmd.check_arg(0, type_threadpool, "first argument must be a threadpool");
2992  cmd.check_init(0, "threadpool not initialized");
2993  pool = cmd.shared_arg<ThreadPool>(0);
2994  } else {
2995  pool = currentThreadPoolRef;
2996  if (!pool)
2997  return cmd.abort("no current threadpool");
2998  }
2999  if (cmd.ok()) {
3000  string expr = LinTree::to_string(has_pool ? arg->next : arg);
3001  Job* job = new ExecJob();
3002  job->args.push_back(expr);
3003  job->pool = pool;
3004  pool->broadcastJob(job);
3005  }
3006  return cmd.status();
3007 }
3008 
3010  if (wrong_num_args("threadResult", arg, 1))
3011  return TRUE;
3012  if (arg->Typ() != type_thread) {
3013  WerrorS("threadResult: argument is not a thread");
3014  return TRUE;
3015  }
3016  InterpreterThread *thread = *(InterpreterThread **)arg->Data();
3017  ThreadState *ts = thread->getThreadState();
3018  if (ts && ts->parent != pthread_self()) {
3019  WerrorS("threadResult: can only be called from parent thread");
3020  return TRUE;
3021  }
3022  if (ts) ts->lock.lock();
3023  if (!ts || !ts->running || !ts->active) {
3024  WerrorS("threadResult: thread is no longer running");
3025  if (ts) ts->lock.unlock();
3026  return TRUE;
3027  }
3028  while (ts->from_thread.empty()) {
3029  ts->from_cond.wait();
3030  }
3031  string expr = ts->from_thread.front();
3032  ts->from_thread.pop();
3033  ts->lock.unlock();
3034  leftv val = LinTree::from_string(expr);
3035  result->rtyp = val->Typ();
3036  result->data = val->Data();
3037  return FALSE;
3038 }
3039 
3041  Command cmd("setSharedName", result, arg);
3042  cmd.check_argc(2);
3043  int type = cmd.argtype(0);
3044  cmd.check_init(0, "first argument is not initialized");
3045  if (type != type_job && type != type_trigger && type != type_threadpool) {
3046  cmd.report("first argument must be a job, trigger, or threadpool");
3047  }
3048  cmd.check_arg(1, STRING_CMD, "second argument must be a string");
3049  if (cmd.ok()) {
3050  SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3051  name_lock.lock();
3052  obj->set_name((char *) cmd.arg(1));
3053  name_lock.unlock();
3054  }
3055  return cmd.status();
3056 }
3057 
3059  Command cmd("getSharedName", result, arg);
3060  cmd.check_argc(1);
3061  int type = cmd.argtype(0);
3062  cmd.check_init(0, "first argument is not initialized");
3063  if (type != type_job && type != type_trigger && type != type_threadpool) {
3064  cmd.report("first argument must be a job, trigger, or threadpool");
3065  }
3066  if (cmd.ok()) {
3067  SharedObject *obj = cmd.shared_arg<SharedObject>(0);
3068  name_lock.lock();
3069  cmd.set_result(obj->get_name().c_str());
3070  name_lock.unlock();
3071  }
3072  return cmd.status();
3073 }
3074 
3075 }
3076 
3077 using namespace LibThread;
3078 
3079 
3080 extern "C" int SI_MOD_INIT(systhreads)(SModulFunctions *fn)
3081 {
3082  const char *libname = currPack->libname;
3083  if (!libname) libname = "";
3084  master_lock.lock();
3085  if (!thread_state)
3087  makeSharedType(type_atomic_table, "atomic_table");
3088  makeSharedType(type_atomic_list, "atomic_list");
3089  makeSharedType(type_shared_table, "shared_table");
3090  makeSharedType(type_shared_list, "shared_list");
3091  makeSharedType(type_channel, "channel");
3092  makeSharedType(type_syncvar, "syncvar");
3093  makeSharedType(type_region, "region");
3094  makeSharedType(type_thread, "thread");
3095  makeSharedType(type_threadpool, "threadpool");
3096  makeSharedType(type_job, "job");
3097  makeSharedType(type_trigger, "trigger");
3098  makeRegionlockType(type_regionlock, "regionlock");
3099 
3100  fn->iiAddCproc(libname, "putTable", FALSE, putTable);
3101  fn->iiAddCproc(libname, "getTable", FALSE, getTable);
3102  fn->iiAddCproc(libname, "inTable", FALSE, inTable);
3103  fn->iiAddCproc(libname, "putList", FALSE, putList);
3104  fn->iiAddCproc(libname, "getList", FALSE, getList);
3105  fn->iiAddCproc(libname, "lockRegion", FALSE, lockRegion);
3106  fn->iiAddCproc(libname, "regionLock", FALSE, regionLock);
3107  fn->iiAddCproc(libname, "unlockRegion", FALSE, unlockRegion);
3108  fn->iiAddCproc(libname, "sendChannel", FALSE, sendChannel);
3109  fn->iiAddCproc(libname, "receiveChannel", FALSE, receiveChannel);
3110  fn->iiAddCproc(libname, "statChannel", FALSE, statChannel);
3111  fn->iiAddCproc(libname, "writeSyncVar", FALSE, writeSyncVar);
3112  fn->iiAddCproc(libname, "updateSyncVar", FALSE, updateSyncVar);
3113  fn->iiAddCproc(libname, "readSyncVar", FALSE, readSyncVar);
3114  fn->iiAddCproc(libname, "statSyncVar", FALSE, statSyncVar);
3115 
3116  fn->iiAddCproc(libname, "makeAtomicTable", FALSE, makeAtomicTable);
3117  fn->iiAddCproc(libname, "makeAtomicList", FALSE, makeAtomicList);
3118  fn->iiAddCproc(libname, "makeSharedTable", FALSE, makeSharedTable);
3119  fn->iiAddCproc(libname, "makeSharedList", FALSE, makeSharedList);
3120  fn->iiAddCproc(libname, "makeChannel", FALSE, makeChannel);
3121  fn->iiAddCproc(libname, "makeSyncVar", FALSE, makeSyncVar);
3122  fn->iiAddCproc(libname, "makeRegion", FALSE, makeRegion);
3123  fn->iiAddCproc(libname, "findSharedObject", FALSE, findSharedObject);
3124  fn->iiAddCproc(libname, "bindSharedObject", FALSE, bindSharedObject);
3125  fn->iiAddCproc(libname, "typeSharedObject", FALSE, typeSharedObject);
3126 
3127  fn->iiAddCproc(libname, "createThread", FALSE, createThread);
3128  fn->iiAddCproc(libname, "joinThread", FALSE, joinThread);
3129  fn->iiAddCproc(libname, "createThreadPool", FALSE, createThreadPool);
3130  fn->iiAddCproc(libname, "createThreadPoolSet", FALSE, createThreadPoolSet);
3131 #if 0
3132  fn->iiAddCproc(libname, "adjoinThreadPool", FALSE, adjoinThreadPool);
3133  fn->iiAddCproc(libname, "getAdjoinedThreadPools", FALSE, getAdjoinedThreadPools);
3134 #endif
3135  fn->iiAddCproc(libname, "closeThreadPool", FALSE, closeThreadPool);
3136  fn->iiAddCproc(libname, "getThreadPoolWorkers", FALSE, getThreadPoolWorkers);
3137  fn->iiAddCproc(libname, "setThreadPoolWorkers", FALSE, setThreadPoolWorkers);
3138  fn->iiAddCproc(libname, "getThreadPoolConcurrency", FALSE, getThreadPoolConcurrency);
3139  fn->iiAddCproc(libname, "setThreadPoolConcurrency", FALSE, setThreadPoolConcurrency);
3140  fn->iiAddCproc(libname, "currentThreadPool", FALSE, currentThreadPool);
3141  fn->iiAddCproc(libname, "setCurrentThreadPool", FALSE, setCurrentThreadPool);
3142  fn->iiAddCproc(libname, "threadPoolExec", FALSE, threadPoolExec);
3143  fn->iiAddCproc(libname, "threadID", FALSE, threadID);
3144  fn->iiAddCproc(libname, "mainThread", FALSE, mainThread);
3145  fn->iiAddCproc(libname, "threadEval", FALSE, threadEval);
3146  fn->iiAddCproc(libname, "threadExec", FALSE, threadExec);
3147  fn->iiAddCproc(libname, "threadResult", FALSE, threadResult);
3148  fn->iiAddCproc(libname, "createJob", FALSE, createJob);
3149  fn->iiAddCproc(libname, "currentJob", FALSE, currentJob);
3150  fn->iiAddCproc(libname, "setSharedName", FALSE, setSharedName);
3151  fn->iiAddCproc(libname, "getSharedName", FALSE, getSharedName);
3152  fn->iiAddCproc(libname, "startJob", FALSE, startJob);
3153  fn->iiAddCproc(libname, "waitJob", FALSE, waitJob);
3154  fn->iiAddCproc(libname, "cancelJob", FALSE, cancelJob);
3155  fn->iiAddCproc(libname, "jobCancelled", FALSE, jobCancelled);
3156  fn->iiAddCproc(libname, "scheduleJob", FALSE, scheduleJob);
3157  fn->iiAddCproc(libname, "scheduleJobs", FALSE, scheduleJob);
3158  fn->iiAddCproc(libname, "createTrigger", FALSE, createTrigger);
3159  fn->iiAddCproc(libname, "updateTrigger", FALSE, updateTrigger);
3160  fn->iiAddCproc(libname, "testTrigger", FALSE, testTrigger);
3161  fn->iiAddCproc(libname, "chainTrigger", FALSE, chainTrigger);
3162 
3163  LinTree::init();
3164  master_lock.unlock();
3165 
3166  return MAX_TOK;
3167 }
int BOOLEAN
Definition: auxiliary.h:87
#define TRUE
Definition: auxiliary.h:100
#define FALSE
Definition: auxiliary.h:96
int setBlackboxStuff(blackbox *bb, const char *n)
define a new type
Definition: blackbox.cc:142
int l
Definition: cfEzgcd.cc:100
int m
Definition: cfEzgcd.cc:128
int i
Definition: cfEzgcd.cc:132
int p
Definition: cfModGcd.cc:4080
return false
Definition: cfModGcd.cc:84
CanonicalForm b
Definition: cfModGcd.cc:4105
void wait()
Definition: thread.h:88
void broadcast()
Definition: thread.h:103
void signal()
Definition: thread.h:97
AccTrigger(long count_init)
Definition: shared.cc:1999
virtual void execute()
Definition: shared.cc:2017
virtual void activate(leftv arg)
Definition: shared.cc:2008
virtual bool accept(leftv arg)
Definition: shared.cc:2005
virtual bool ready()
Definition: shared.cc:2001
void set_result(int type, long n)
Definition: shared.cc:132
int test_arg(int i, int type)
Definition: shared.cc:116
void check_init(int i, const char *err)
Definition: shared.cc:85
void report(const char *err)
Definition: shared.cc:111
void * arg(int i)
Definition: shared.cc:101
BOOLEAN abort(const char *err)
Definition: shared.cc:148
void no_result()
Definition: shared.cc:136
long int_arg(int i)
Definition: shared.cc:108
BOOLEAN status()
Definition: shared.cc:142
void check_arg(int i, int type, int type2, const char *err)
Definition: shared.cc:91
T * shared_arg(int i)
Definition: shared.cc:105
leftv * args
Definition: shared.cc:46
void check_arg(int i, int type, const char *err)
Definition: shared.cc:81
const char * error
Definition: shared.cc:44
void set_result(long n)
Definition: shared.cc:120
const char * name
Definition: shared.cc:43
void check_argc_min(int n)
Definition: shared.cc:77
void set_result(int type, void *p)
Definition: shared.cc:128
int argtype(int i)
Definition: shared.cc:95
void set_result(const char *s)
Definition: shared.cc:124
void check_argc(int n)
Definition: shared.cc:69
void check_argc(int lo, int hi)
Definition: shared.cc:73
Command(const char *n, leftv r, leftv a)
Definition: shared.cc:49
CountTrigger(long count_init)
Definition: shared.cc:2038
virtual void execute()
Definition: shared.cc:2052
virtual void activate(leftv arg)
Definition: shared.cc:2047
virtual bool accept(leftv arg)
Definition: shared.cc:2044
virtual bool ready()
Definition: shared.cc:2040
virtual void execute()
Definition: shared.cc:2363
virtual void execute()
Definition: shared.cc:2374
ThreadState * getThreadState()
Definition: shared.cc:1430
InterpreterThread(ThreadState *ts_init)
Definition: shared.cc:1428
vector< string > args
Definition: shared.cc:1558
vector< Job * > deps
Definition: shared.cc:1555
bool cancelled
Definition: shared.cc:1565
void run()
Definition: shared.cc:1982
ThreadPool * pool
Definition: shared.cc:1551
void * data
Definition: shared.cc:1560
string result
Definition: shared.cc:1559
long pending_index
Definition: shared.cc:1554
virtual bool ready()
Definition: shared.cc:1605
void addDep(Job *job)
Definition: shared.cc:1571
virtual void execute()=0
void addNotify(vector< Job * > &jobs)
Definition: shared.cc:1968
vector< Job * > notify
Definition: shared.cc:1556
vector< Trigger * > triggers
Definition: shared.cc:1557
KernelJob(void(*func)(leftv result, leftv arg))
Definition: shared.cc:2409
void(* cfunc)(leftv result, leftv arg)
Definition: shared.cc:2407
virtual void execute()
Definition: shared.cc:2410
ProcJob(const char *procname_init)
Definition: shared.cc:2384
virtual void execute()
Definition: shared.cc:2388
virtual bool accept(leftv arg)
Definition: shared.cc:2098
virtual void execute()
Definition: shared.cc:2127
ProcTrigger(const char *p)
Definition: shared.cc:2092
virtual void activate(leftv arg)
Definition: shared.cc:2101
virtual bool ready()
Definition: shared.cc:2094
virtual void execute()
Definition: shared.cc:2439
void(* cfunc)(long ndeps, Job **deps)
Definition: shared.cc:2436
RawKernelJob(void(*func)(long ndeps, Job **deps))
Definition: shared.cc:2438
SharedObjectTable objects
Definition: shared.cc:209
virtual ~Region()
Definition: shared.cc:211
Lock * get_lock()
Definition: shared.cc:212
virtual ~Scheduler()
Definition: shared.cc:1696
vector< ThreadPool * > thread_owners
Definition: shared.cc:1664
vector< JobQueue * > thread_queues
Definition: shared.cc:1666
void addThread(ThreadPool *owner, ThreadState *thread)
Definition: shared.cc:1735
ConditionVariable response
Definition: shared.cc:1669
ThreadState * getThread(int i)
Definition: shared.cc:1708
void cancelDeps(Job *job)
Definition: shared.cc:1786
priority_queue< Job *, vector< Job * >, JobCompare > global_queue
Definition: shared.cc:1665
static void * main(ThreadState *ts, void *arg)
Definition: shared.cc:1856
ConditionVariable cond
Definition: shared.cc:1668
void set_maxconcurrency(int n)
Definition: shared.cc:1682
void detachJob(Job *job)
Definition: shared.cc:1758
void broadcastJob(ThreadPool *pool, Job *job)
Definition: shared.cc:1776
vector< ThreadState * > threads
Definition: shared.cc:1663
void cancelJob(Job *job)
Definition: shared.cc:1795
void waitJob(Job *job)
Definition: shared.cc:1806
int get_maxconcurrency()
Definition: shared.cc:1685
static void notifyDeps(Scheduler *scheduler, Job *job)
Definition: shared.cc:1829
void clearThreadState()
Definition: shared.cc:1826
void queueJob(Job *job)
Definition: shared.cc:1770
void shutdown(bool wait)
Definition: shared.cc:1709
void attachJob(ThreadPool *pool, Job *job)
Definition: shared.cc:1742
vector< Job * > pending
Definition: shared.cc:1667
int threadpool_size(ThreadPool *pool)
Definition: shared.cc:1688
virtual void execute()
Definition: shared.cc:2081
virtual void activate(leftv arg)
Definition: shared.cc:2072
SetTrigger(long count_init)
Definition: shared.cc:2062
virtual bool ready()
Definition: shared.cc:2065
vector< bool > set
Definition: shared.cc:2059
virtual bool accept(leftv arg)
Definition: shared.cc:2069
std::string name
Definition: shared.cc:159
virtual ~SharedObject()
Definition: shared.cc:162
void set_name(std::string &name_init)
Definition: shared.cc:165
virtual BOOLEAN op3(int op, leftv res, leftv a1, leftv a2, leftv a3)
Definition: shared.cc:188
void set_name(const char *s)
Definition: shared.cc:166
void incref(int by=1)
Definition: shared.cc:170
std::string & get_name()
Definition: shared.cc:169
virtual BOOLEAN op2(int op, leftv res, leftv a1, leftv a2)
Definition: shared.cc:185
void set_type(int type_init)
Definition: shared.cc:163
ConditionVariable cond
Definition: shared.cc:401
queue< string > q
Definition: shared.cc:399
void send(string item)
Definition: shared.cc:405
virtual ~SingularChannel()
Definition: shared.cc:404
int write(string item)
Definition: shared.cc:459
virtual ~SingularSyncVar()
Definition: shared.cc:439
ConditionVariable cond
Definition: shared.cc:436
void update(leftv val)
Definition: shared.cc:454
void attachJob(Job *job)
Definition: shared.cc:1933
void waitJob(Job *job)
Definition: shared.cc:1951
void queueJob(Job *job)
Definition: shared.cc:1939
void broadcastJob(Job *job)
Definition: shared.cc:1942
ThreadState * getThread(int i)
Definition: shared.cc:1928
void cancelJob(Job *job)
Definition: shared.cc:1948
Scheduler * scheduler
Definition: shared.cc:1635
ThreadPool(Scheduler *sched, int n)
Definition: shared.cc:1921
void detachJob(Job *job)
Definition: shared.cc:1936
void cancelDeps(Job *job)
Definition: shared.cc:1945
void shutdown(bool wait)
Definition: shared.cc:1929
void addThread(ThreadState *thread)
Definition: shared.cc:1930
void *(* thread_func)(ThreadState *, void *)
Definition: shared.cc:1341
ConditionVariable to_cond
Definition: shared.cc:1346
queue< string > from_thread
Definition: shared.cc:1349
ConditionVariable from_cond
Definition: shared.cc:1347
queue< string > to_thread
Definition: shared.cc:1348
virtual ~Transactional()
Definition: shared.cc:317
void set_region(Region *region_init)
Definition: shared.cc:309
virtual void activate(leftv arg)=0
virtual bool accept(leftv arg)=0
virtual ~TxList()
Definition: shared.cc:362
int put(size_t index, string &value)
Definition: shared.cc:363
vector< string > entries
Definition: shared.cc:359
int get(size_t index, string &value)
Definition: shared.cc:377
int put(string &key, string &value)
Definition: shared.cc:326
std::map< string, string > entries
Definition: shared.cc:322
int get(string &key, string &value)
Definition: shared.cc:338
int check(string &key)
Definition: shared.cc:348
virtual ~TxTable()
Definition: shared.cc:325
void put(T data)
Definition: lintree.h:61
Definition: thread.h:17
bool is_locked()
Definition: thread.h:68
void lock()
Definition: thread.h:46
void unlock()
Definition: thread.h:57
Definition: idrec.h:35
Class used for (list of) interpreter objects.
Definition: subexpr.h:83
int Typ()
Definition: subexpr.cc:1011
const char * name
Definition: subexpr.h:87
package req_packhdl
Definition: subexpr.h:106
int rtyp
Definition: subexpr.h:91
void * Data()
Definition: subexpr.cc:1154
leftv next
Definition: subexpr.h:86
int Eval()
Definition: subexpr.cc:1908
void Copy(leftv e)
Definition: subexpr.cc:685
void * data
Definition: subexpr.h:88
void CleanUp(ring r=currRing)
Definition: subexpr.cc:348
Definition: lists.h:24
sleftv * m
Definition: lists.h:46
INLINE_THIS void Init(int l=0)
void error(const char *fmt,...)
Definition: emacs.cc:55
return result
Definition: facAbsBiFact.cc:75
const CanonicalForm int s
Definition: facAbsFact.cc:51
CanonicalForm res
Definition: facAbsFact.cc:60
CFList & eval
Definition: facFactorize.cc:47
const ExtensionInfo & info
< [in] sqrfree poly
int j
Definition: facHensel.cc:110
char name(const Variable &v)
Definition: factory.h:196
void WerrorS(const char *s)
Definition: feFopen.cc:24
feOptIndex
Definition: feOptGen.h:15
const char * feSetOptValue(feOptIndex opt, char *optarg)
Definition: feOpt.cc:154
feOptIndex feGetOptIndex(const char *name)
Definition: feOpt.cc:104
const char * Tok2Cmdname(int tok)
Definition: gentable.cc:140
#define STATIC_VAR
Definition: globaldefs.h:7
#define VAR
Definition: globaldefs.h:5
BOOLEAN iiExprArithM(leftv res, leftv a, int op)
Definition: iparith.cc:9352
VAR package basePack
Definition: ipid.cc:58
VAR package currPack
Definition: ipid.cc:57
EXTERN_VAR omBin sleftv_bin
Definition: ipid.h:145
#define IDDATA(a)
Definition: ipid.h:126
lists getList(spectrum &spec)
Definition: ipshell.cc:3480
STATIC_VAR jList * T
Definition: janet.cc:30
ListNode * next
Definition: janet.h:31
#define string
Definition: libparse.cc:1252
void siInit(char *)
Definition: misc_ip.cc:1356
VAR omBin slists_bin
Definition: lists.cc:23
int lSize(lists L)
Definition: lists.cc:25
slists * lists
Definition: mpr_numeric.h:146
BOOLEAN readSyncVar(leftv result, leftv arg)
Definition: shared.cc:1238
void * interpreter_thread(ThreadState *ts, void *arg)
Definition: shared.cc:1391
static BOOLEAN createThread(leftv result, leftv arg)
Definition: shared.cc:1497
static BOOLEAN getThreadPoolWorkers(leftv result, leftv arg)
Definition: shared.cc:2249
void retain(Job *job)
BOOLEAN getTable(leftv result, leftv arg)
Definition: shared.cc:941
int type_thread
Definition: shared.cc:241
static BOOLEAN getThreadPoolConcurrency(leftv result, leftv arg)
Definition: shared.cc:2281
static BOOLEAN waitJob(leftv result, leftv arg)
Definition: shared.cc:2565
int type_region
Definition: shared.cc:233
ThreadState * createThread(void *(*thread_func)(ThreadState *, void *), void *arg)
Definition: shared.cc:1469
Job * getCurrentJob()
Definition: shared.cc:2519
int not_a_uri(const char *name, leftv arg)
Definition: shared.cc:687
static BOOLEAN setThreadPoolConcurrency(leftv result, leftv arg)
Definition: shared.cc:2296
static BOOLEAN scheduleJob(leftv result, leftv arg)
Definition: shared.cc:2794
BOOLEAN makeSyncVar(leftv result, leftv arg)
Definition: shared.cc:854
BOOLEAN shared_check_assign(blackbox *b, leftv l, leftv r)
Definition: shared.cc:573
BOOLEAN makeSharedList(leftv result, leftv arg)
Definition: shared.cc:824
char * shared_string(blackbox *b, void *d)
Definition: shared.cc:595
BOOLEAN shared_assign(leftv l, leftv r)
Definition: shared.cc:524
void ref_shared(LinTree::LinTree &lintree, int by)
Definition: shared.cc:1290
BOOLEAN getList(leftv result, leftv arg)
Definition: shared.cc:1028
Lock global_objects_lock
Definition: shared.cc:226
static InterpreterThread * createInterpreterThread(const char **error)
Definition: shared.cc:1485
int type_atomic_list
Definition: shared.cc:239
static void appendArgCopy(vector< leftv > &argv, leftv arg)
Definition: shared.cc:742
BOOLEAN statChannel(leftv result, leftv arg)
Definition: shared.cc:1171
SharedObject * consSyncVar()
Definition: shared.cc:720
void * shared_copy(blackbox *b, void *d)
Definition: shared.cc:515
BOOLEAN threadEval(leftv result, leftv arg)
Definition: shared.cc:2929
SharedObject * SharedObjectPtr
Definition: shared.cc:246
bool getJobCancelled()
Definition: shared.cc:2645
BOOLEAN unlockRegion(leftv result, leftv arg)
Definition: shared.cc:1120
BOOLEAN sendChannel(leftv result, leftv arg)
Definition: shared.cc:1135
void * joinThread(ThreadState *ts)
Definition: shared.cc:1474
void * shared_init(blackbox *b)
Definition: shared.cc:487
Job * startJob(ThreadPool *pool, Job *job, leftv arg)
Definition: shared.cc:2481
BOOLEAN setSharedName(leftv result, leftv arg)
Definition: shared.cc:3040
static void appendArg(vector< leftv > &argv, string &s)
Definition: shared.cc:728
char * str(leftv arg)
Definition: shared.cc:704
BOOLEAN writeSyncVar(leftv result, leftv arg)
Definition: shared.cc:1189
std::map< std::string, SharedObject * > SharedObjectTable
Definition: shared.cc:203
BOOLEAN makeAtomicTable(leftv result, leftv arg)
Definition: shared.cc:778
BOOLEAN bindSharedObject(leftv result, leftv arg)
Definition: shared.cc:924
void report(const char *fmt, const char *name)
Definition: shared.cc:666
void * new_shared(SharedObject *obj)
Definition: shared.cc:491
BOOLEAN lockRegion(leftv result, leftv arg)
Definition: shared.cc:1088
BOOLEAN currentJob(leftv result, leftv arg)
Definition: shared.cc:2898
long thread_counter
Definition: shared.cc:231
int wrong_num_args(const char *name, leftv arg, int n)
Definition: shared.cc:672
ThreadState * thread_state
Definition: shared.cc:1366
SharedObject * consRegion()
Definition: shared.cc:724
int type_threadpool
Definition: shared.cc:242
Lock name_lock(true)
SharedObject * makeSharedObject(SharedObjectTable &table, Lock *lock, int type, string &name, SharedConstructor scons)
Definition: shared.cc:249
int type_job
Definition: shared.cc:243
static BOOLEAN testTrigger(leftv result, leftv arg)
Definition: shared.cc:2778
ThreadPool * createThreadPool(int threads, int prioThreads=0)
Definition: shared.cc:2218
BOOLEAN threadExec(leftv result, leftv arg)
Definition: shared.cc:2957
BOOLEAN rlock_assign(leftv l, leftv r)
Definition: shared.cc:549
const int have_threads
Definition: shared.cc:38
void setJobData(Job *job, void *data)
Definition: shared.cc:2649
BOOLEAN putTable(leftv result, leftv arg)
Definition: shared.cc:1001
char * rlock_string(blackbox *b, void *d)
Definition: shared.cc:657
BOOLEAN makeAtomicList(leftv result, leftv arg)
Definition: shared.cc:792
ThreadPool * getCurrentThreadPool()
Definition: shared.cc:2245
const char * getJobName()
STATIC_VAR Job * currentJobRef
Definition: shared.cc:1631
static BOOLEAN createTrigger(leftv result, leftv arg)
Definition: shared.cc:2692
int not_a_region(const char *name, leftv arg)
Definition: shared.cc:695
BOOLEAN currentThreadPool(leftv result, leftv arg)
Definition: shared.cc:2334
void addJobArgs(Job *job, leftv arg)
Definition: shared.cc:2665
void rlock_destroy(blackbox *b, void *d)
Definition: shared.cc:506
static BOOLEAN joinThread(leftv result, leftv arg)
Definition: shared.cc:1531
int type_atomic_table
Definition: shared.cc:237
static BOOLEAN setThreadPoolWorkers(leftv result, leftv arg)
Definition: shared.cc:2266
int type_syncvar
Definition: shared.cc:236
static BOOLEAN updateTrigger(leftv result, leftv arg)
Definition: shared.cc:2735
BOOLEAN getSharedName(leftv result, leftv arg)
Definition: shared.cc:3058
SharedObjectTable global_objects
Definition: shared.cc:227
BOOLEAN receiveChannel(leftv result, leftv arg)
Definition: shared.cc:1152
BOOLEAN mainThread(leftv result, leftv arg)
Definition: shared.cc:2920
void releaseShared(SharedObject *obj)
Definition: shared.cc:197
void * getJobData(Job *job)
Definition: shared.cc:2657
void closeThreadPool(ThreadPool *pool, bool wait)
Definition: shared.cc:2329
int type_shared_table
Definition: shared.cc:238
static void appendArg(vector< leftv > &argv, leftv arg)
Definition: shared.cc:738
int type_channel
Definition: shared.cc:235
BOOLEAN threadPoolExec(leftv result, leftv arg)
Definition: shared.cc:2985
void * thread_main(void *arg)
Definition: shared.cc:1385
int type_shared_list
Definition: shared.cc:240
BOOLEAN setCurrentThreadPool(leftv result, leftv arg)
Definition: shared.cc:2346
BOOLEAN regionLock(leftv result, leftv arg)
Definition: shared.cc:1103
BOOLEAN findSharedObject(leftv result, leftv arg)
Definition: shared.cc:880
static BOOLEAN cancelJob(leftv result, leftv arg)
Definition: shared.cc:2595
BOOLEAN threadID(leftv result, leftv arg)
Definition: shared.cc:2911
void makeRegionlockType(int &type, const char *name)
Definition: shared.cc:1321
void encode_shared(LinTree::LinTree &lintree, leftv val)
Definition: shared.cc:1275
BOOLEAN makeSharedTable(leftv result, leftv arg)
Definition: shared.cc:806
BOOLEAN typeSharedObject(leftv result, leftv arg)
Definition: shared.cc:893
Job * createJob(void(*func)(leftv result, leftv arg))
Definition: shared.cc:2471
SharedObject * consChannel()
Definition: shared.cc:716
SharedObject * findSharedObject(SharedObjectTable &table, Lock *lock, string &name)
Definition: shared.cc:271
void release(Job *job)
VAR long thread_id
Definition: shared.cc:230
STATIC_VAR ThreadPool * currentThreadPoolRef
Definition: shared.cc:1630
Scheduler * scheduler
Definition: shared.cc:1625
BOOLEAN statSyncVar(leftv result, leftv arg)
Definition: shared.cc:1257
void setOption(int ch)
Definition: shared.cc:1368
queue< Job * > JobQueue
Definition: shared.cc:1620
BOOLEAN shared_op2(int op, leftv res, leftv a1, leftv a2)
Definition: shared.cc:585
Lock master_lock(true)
SharedObjectPtr(* SharedConstructor)()
Definition: shared.cc:247
int type_regionlock
Definition: shared.cc:234
SharedObject * consTable()
Definition: shared.cc:708
int type_trigger
Definition: shared.cc:244
BOOLEAN makeChannel(leftv result, leftv arg)
Definition: shared.cc:841
BOOLEAN shared_op3(int op, leftv res, leftv a1, leftv a2, leftv a3)
Definition: shared.cc:590
void acquireShared(SharedObject *obj)
Definition: shared.cc:193
static ThreadState * newThread(void *(*thread_func)(ThreadState *, void *), void *arg, const char **error)
Definition: shared.cc:1436
void thread_init()
Definition: shared.cc:1373
static BOOLEAN jobCancelled(leftv result, leftv arg)
Definition: shared.cc:2612
static BOOLEAN chainTrigger(leftv result, leftv arg)
Definition: shared.cc:2757
BOOLEAN makeRegion(leftv result, leftv arg)
Definition: shared.cc:867
BOOLEAN putList(leftv result, leftv arg)
Definition: shared.cc:1061
static bool joinInterpreterThread(InterpreterThread *thread)
Definition: shared.cc:1512
void setJobName(const char *)
static BOOLEAN createThreadPoolSet(leftv result, leftv arg)
Definition: shared.cc:2166
void makeSharedType(int &type, const char *name)
Definition: shared.cc:1306
void shared_destroy(blackbox *b, void *d)
Definition: shared.cc:498
static BOOLEAN executeProc(sleftv &result, const char *procname, const vector< leftv > &argv)
Definition: shared.cc:749
BOOLEAN updateSyncVar(leftv result, leftv arg)
Definition: shared.cc:1209
BOOLEAN inTable(leftv result, leftv arg)
Definition: shared.cc:974
leftv getJobResult(Job *job)
Definition: shared.cc:2675
Lock thread_lock
Definition: shared.cc:1364
void installShared(int type)
Definition: shared.cc:1302
BOOLEAN threadResult(leftv result, leftv arg)
Definition: shared.cc:3009
void cancelJob(Job *job)
Definition: shared.cc:2514
void waitJob(Job *job)
Definition: shared.cc:2590
leftv decode_shared(LinTree::LinTree &lintree)
Definition: shared.cc:1281
SharedObject * consList()
Definition: shared.cc:712
void init()
Definition: lintree.cc:864
std::string to_string(leftv val)
Definition: lintree.cc:843
void install(int typ, LinTreeEncodeFunc enc, LinTreeDecodeFunc dec, LinTreeRefFunc ref)
Definition: lintree.cc:51
leftv from_string(std::string &str)
Definition: lintree.cc:854
#define omStrDup(s)
Definition: omAllocDecl.h:263
#define omAlloc0Bin(bin)
Definition: omAllocDecl.h:206
#define omFree(addr)
Definition: omAllocDecl.h:261
#define omAlloc0(size)
Definition: omAllocDecl.h:211
#define omFreeBin(addr, bin)
Definition: omAllocDecl.h:259
#define NULL
Definition: omList.c:12
int main(int argc, char *argv[])
Definition: omTables.c:165
static int index(p_Length length, p_Ord ord)
Definition: p_Procs_Impl.h:592
void Werror(const char *fmt,...)
Definition: reporter.cc:189
#define MAX_THREADS
Definition: shared.cc:1334
void pSingular_initialize_thread()
int SI_MOD_INIT() systhreads(SModulFunctions *fn)
Definition: shared.cc:3080
int * status
Definition: si_signals.h:51
int status int void * buf
Definition: si_signals.h:59
wait
Definition: si_signals.h:51
bool operator()(const Job *lhs, const Job *rhs)
Definition: shared.cc:1584
sleftv * leftv
Definition: structs.h:62
#define assert(A)
Definition: svd_si.h:3
#define IDHDL
Definition: tok.h:31
@ LIST_CMD
Definition: tok.h:118
@ DEF_CMD
Definition: tok.h:58
@ STRING_CMD
Definition: tok.h:185
@ INT_CMD
Definition: tok.h:96
@ MAX_TOK
Definition: tok.h:218
#define NONE
Definition: tok.h:221
#define COMMAND
Definition: tok.h:29