My Project
Public Member Functions | Static Public Member Functions | Data Fields | Private Attributes | Friends
LibThread::Scheduler Class Reference

Public Member Functions

 Scheduler (int n)
 
void set_maxconcurrency (int n)
 
int get_maxconcurrency ()
 
int threadpool_size (ThreadPool *pool)
 
virtual ~Scheduler ()
 
ThreadStategetThread (int i)
 
void shutdown (bool wait)
 
void addThread (ThreadPool *owner, ThreadState *thread)
 
void attachJob (ThreadPool *pool, Job *job)
 
void detachJob (Job *job)
 
void queueJob (Job *job)
 
void broadcastJob (ThreadPool *pool, Job *job)
 
void cancelDeps (Job *job)
 
void cancelJob (Job *job)
 
void waitJob (Job *job)
 
void clearThreadState ()
 
- Public Member Functions inherited from LibThread::SharedObject
 SharedObject ()
 
virtual ~SharedObject ()
 
void set_type (int type_init)
 
int get_type ()
 
void set_name (std::string &name_init)
 
void set_name (const char *s)
 
std::stringget_name ()
 
void incref (int by=1)
 
long decref ()
 
long getref ()
 
virtual BOOLEAN op2 (int op, leftv res, leftv a1, leftv a2)
 
virtual BOOLEAN op3 (int op, leftv res, leftv a1, leftv a2, leftv a3)
 

Static Public Member Functions

static void notifyDeps (Scheduler *scheduler, Job *job)
 
static void * main (ThreadState *ts, void *arg)
 

Data Fields

Lock lock
 

Private Attributes

bool single_threaded
 
size_t jobid
 
int nthreads
 
int maxconcurrency
 
int running
 
bool shutting_down
 
int shutdown_counter
 
vector< ThreadState * > threads
 
vector< ThreadPool * > thread_owners
 
priority_queue< Job *, vector< Job * >, JobCompareglobal_queue
 
vector< JobQueue * > thread_queues
 
vector< Job * > pending
 
ConditionVariable cond
 
ConditionVariable response
 

Friends

class Job
 

Detailed Description

Definition at line 1654 of file shared.cc.

Constructor & Destructor Documentation

◆ Scheduler()

LibThread::Scheduler::Scheduler ( int  n)
inline

Definition at line 1673 of file shared.cc.

1673  :
1675  single_threaded(n==0), nthreads(n == 0 ? 1 : n),
1676  lock(true), cond(&lock), response(&lock),
1677  shutting_down(false), shutdown_counter(0), jobid(0),
1678  maxconcurrency(n), running(0)
1679  {
1680  thread_queues.push_back(new JobQueue());
1681  }
vector< ThreadPool * > thread_owners
Definition: shared.cc:1664
vector< JobQueue * > thread_queues
Definition: shared.cc:1666
ConditionVariable response
Definition: shared.cc:1669
priority_queue< Job *, vector< Job * >, JobCompare > global_queue
Definition: shared.cc:1665
ConditionVariable cond
Definition: shared.cc:1668
vector< ThreadState * > threads
Definition: shared.cc:1663
queue< Job * > JobQueue
Definition: shared.cc:1620

◆ ~Scheduler()

virtual LibThread::Scheduler::~Scheduler ( )
inlinevirtual

Definition at line 1696 of file shared.cc.

1696  {
1697  for (int i = 0; i < thread_queues.size(); i++) {
1698  JobQueue *q = thread_queues[i];
1699  while (!q->empty()) {
1700  Job *job = q->front();
1701  q->pop();
1702  releaseShared(job);
1703  }
1704  }
1705  thread_queues.clear();
1706  threads.clear();
1707  }
int i
Definition: cfEzgcd.cc:132
void releaseShared(SharedObject *obj)
Definition: shared.cc:197

Member Function Documentation

◆ addThread()

void LibThread::Scheduler::addThread ( ThreadPool owner,
ThreadState thread 
)
inline

Definition at line 1735 of file shared.cc.

1735  {
1736  lock.lock();
1737  thread_owners.push_back(owner);
1738  threads.push_back(thread);
1739  thread_queues.push_back(new JobQueue());
1740  lock.unlock();
1741  }
void lock()
Definition: thread.h:46
void unlock()
Definition: thread.h:57

◆ attachJob()

void LibThread::Scheduler::attachJob ( ThreadPool pool,
Job job 
)
inline

Definition at line 1742 of file shared.cc.

1742  {
1743  lock.lock();
1744  job->pool = pool;
1745  job->id = jobid++;
1746  acquireShared(job);
1747  if (job->ready()) {
1748  global_queue.push(job);
1749  cond.signal();
1750  }
1751  else if (job->pending_index < 0) {
1752  job->pool = pool;
1753  job->pending_index = pending.size();
1754  pending.push_back(job);
1755  }
1756  lock.unlock();
1757  }
void signal()
Definition: thread.h:97
ThreadPool * pool
Definition: shared.cc:1551
long pending_index
Definition: shared.cc:1554
virtual bool ready()
Definition: shared.cc:1605
vector< Job * > pending
Definition: shared.cc:1667
void acquireShared(SharedObject *obj)
Definition: shared.cc:193

◆ broadcastJob()

void LibThread::Scheduler::broadcastJob ( ThreadPool pool,
Job job 
)
inline

Definition at line 1776 of file shared.cc.

1776  {
1777  lock.lock();
1778  for (int i = 0; i <thread_queues.size(); i++) {
1779  if (thread_owners[i] == pool) {
1780  acquireShared(job);
1781  thread_queues[i]->push(job);
1782  }
1783  }
1784  lock.unlock();
1785  }

◆ cancelDeps()

void LibThread::Scheduler::cancelDeps ( Job job)
inline

Definition at line 1786 of file shared.cc.

1786  {
1787  vector<Job *> &notify = job->notify;
1788  for (int i = 0; i <notify.size(); i++) {
1789  Job *next = notify[i];
1790  if (!next->cancelled) {
1791  cancelJob(next);
1792  }
1793  }
1794  }
vector< Job * > notify
Definition: shared.cc:1556
void cancelJob(Job *job)
Definition: shared.cc:1795
ListNode * next
Definition: janet.h:31

◆ cancelJob()

void LibThread::Scheduler::cancelJob ( Job job)
inline

Definition at line 1795 of file shared.cc.

1795  {
1796  lock.lock();
1797  if (!job->cancelled) {
1798  job->cancelled = true;
1799  if (!job->running && !job->done) {
1800  job->done = true;
1801  cancelDeps(job);
1802  }
1803  }
1804  lock.unlock();
1805  }
bool cancelled
Definition: shared.cc:1565
void cancelDeps(Job *job)
Definition: shared.cc:1786

◆ clearThreadState()

void LibThread::Scheduler::clearThreadState ( )
inline

Definition at line 1826 of file shared.cc.

1826  {
1827  threads.clear();
1828  }

◆ detachJob()

void LibThread::Scheduler::detachJob ( Job job)
inline

Definition at line 1758 of file shared.cc.

1758  {
1759  lock.lock();
1760  long i = job->pending_index;
1761  job->pending_index = -1;
1762  if (i >= 0) {
1763  job = pending.back();
1764  pending.resize(pending.size()-1);
1765  pending[i] = job;
1766  job->pending_index = i;
1767  }
1768  lock.unlock();
1769  }

◆ get_maxconcurrency()

int LibThread::Scheduler::get_maxconcurrency ( )
inline

Definition at line 1685 of file shared.cc.

1685  {
1686  return maxconcurrency;
1687  }

◆ getThread()

ThreadState* LibThread::Scheduler::getThread ( int  i)
inline

Definition at line 1708 of file shared.cc.

1708 { return threads[i]; }

◆ main()

static void* LibThread::Scheduler::main ( ThreadState ts,
void *  arg 
)
inlinestatic

Definition at line 1856 of file shared.cc.

1856  {
1857  SchedInfo *info = (SchedInfo *) arg;
1858  Scheduler *scheduler = info->scheduler;
1859  ThreadPool *oldThreadPool = currentThreadPoolRef;
1860  // TODO: set current thread pool
1861  // currentThreadPoolRef = pool;
1862  Lock &lock = scheduler->lock;
1863  ConditionVariable &cond = scheduler->cond;
1864  ConditionVariable &response = scheduler->response;
1865  JobQueue *my_queue = scheduler->thread_queues[info->num];
1866  if (!scheduler->single_threaded)
1867  thread_init();
1868  lock.lock();
1869  for (;;) {
1870  if (info->job && info->job->done)
1871  break;
1872  if (scheduler->shutting_down) {
1873  scheduler->shutdown_counter++;
1874  scheduler->response.signal();
1875  break;
1876  }
1877  if (!my_queue->empty()) {
1878  Job *job = my_queue->front();
1879  my_queue->pop();
1880  if (!scheduler->global_queue.empty())
1881  cond.signal();
1882  currentJobRef = job;
1883  job->run();
1884  currentJobRef = NULL;
1885  notifyDeps(scheduler, job);
1886  releaseShared(job);
1887  scheduler->response.signal();
1888  continue;
1889  } else if (!scheduler->global_queue.empty()) {
1890  Job *job = scheduler->global_queue.top();
1891  scheduler->global_queue.pop();
1892  if (!scheduler->global_queue.empty())
1893  cond.signal();
1894  currentJobRef = job;
1895  job->run();
1896  currentJobRef = NULL;
1897  notifyDeps(scheduler, job);
1898  releaseShared(job);
1899  scheduler->response.signal();
1900  continue;
1901  } else {
1902  if (scheduler->single_threaded) {
1903  break;
1904  }
1905  cond.wait();
1906  }
1907  }
1908  // TODO: correct current thread pool
1909  // releaseShared(currentThreadPoolRef);
1910  currentThreadPoolRef = oldThreadPool;
1911  scheduler->lock.unlock();
1912  delete info;
1913  return NULL;
1914  }
void wait()
Definition: thread.h:88
void run()
Definition: shared.cc:1982
static void notifyDeps(Scheduler *scheduler, Job *job)
Definition: shared.cc:1829
Definition: thread.h:17
const ExtensionInfo & info
< [in] sqrfree poly
STATIC_VAR Job * currentJobRef
Definition: shared.cc:1631
STATIC_VAR ThreadPool * currentThreadPoolRef
Definition: shared.cc:1630
void thread_init()
Definition: shared.cc:1373
#define NULL
Definition: omList.c:12

◆ notifyDeps()

static void LibThread::Scheduler::notifyDeps ( Scheduler scheduler,
Job job 
)
inlinestatic

Definition at line 1829 of file shared.cc.

1829  {
1830  vector<Job *> &notify = job->notify;
1831  job->incref(notify.size());
1832  for (int i = 0; i <notify.size(); i++) {
1833  Job *next = notify[i];
1834  if (!next->queued && next->ready() && !next->cancelled) {
1835  next->queued = true;
1836  scheduler->queueJob(next);
1837  }
1838  }
1839  vector<Trigger *> &triggers = job->triggers;
1840  leftv arg = NULL;
1841  if (triggers.size() > 0 && job->result.size() > 0)
1842  arg = LinTree::from_string(job->result);
1843  for (int i = 0; i < triggers.size(); i++) {
1844  Trigger *trigger = triggers[i];
1845  if (trigger->accept(arg)) {
1846  trigger->activate(arg);
1847  if (trigger->ready())
1848  scheduler->queueJob(trigger);
1849  }
1850  }
1851  if (arg) {
1852  arg->CleanUp();
1853  omFreeBin(arg, sleftv_bin);
1854  }
1855  }
string result
Definition: shared.cc:1559
vector< Trigger * > triggers
Definition: shared.cc:1557
void queueJob(Job *job)
Definition: shared.cc:1770
void incref(int by=1)
Definition: shared.cc:170
virtual void activate(leftv arg)=0
virtual bool accept(leftv arg)=0
Class used for (list of) interpreter objects.
Definition: subexpr.h:83
void CleanUp(ring r=currRing)
Definition: subexpr.cc:348
EXTERN_VAR omBin sleftv_bin
Definition: ipid.h:145
leftv from_string(std::string &str)
Definition: lintree.cc:854
#define omFreeBin(addr, bin)
Definition: omAllocDecl.h:259

◆ queueJob()

void LibThread::Scheduler::queueJob ( Job job)
inline

Definition at line 1770 of file shared.cc.

1770  {
1771  lock.lock();
1772  global_queue.push(job);
1773  cond.signal();
1774  lock.unlock();
1775  }

◆ set_maxconcurrency()

void LibThread::Scheduler::set_maxconcurrency ( int  n)
inline

Definition at line 1682 of file shared.cc.

1682  {
1683  maxconcurrency = n;
1684  }

◆ shutdown()

void LibThread::Scheduler::shutdown ( bool  wait)
inline

Definition at line 1709 of file shared.cc.

1709  {
1710  if (single_threaded) {
1711  SchedInfo *info = new SchedInfo();
1712  info->num = 0;
1713  info->scheduler = this;
1714  acquireShared(this);
1715  info->job = NULL;
1717  return;
1718  }
1719  lock.lock();
1720  if (wait) {
1721  while (!global_queue.empty()) {
1722  response.wait();
1723  }
1724  }
1725  shutting_down = true;
1726  while (shutdown_counter < nthreads) {
1727  cond.broadcast();
1728  response.wait();
1729  }
1730  lock.unlock();
1731  for (int i = 0; i <threads.size(); i++) {
1732  joinThread(threads[i]);
1733  }
1734  }
void broadcast()
Definition: thread.h:103
static void * main(ThreadState *ts, void *arg)
Definition: shared.cc:1856
void * joinThread(ThreadState *ts)
Definition: shared.cc:1474
wait
Definition: si_signals.h:51

◆ threadpool_size()

int LibThread::Scheduler::threadpool_size ( ThreadPool pool)
inline

Definition at line 1688 of file shared.cc.

1688  {
1689  int n;
1690  for (int i = 0; i <thread_owners.size(); i++) {
1691  if (thread_owners[i] == pool)
1692  n++;
1693  }
1694  return n;
1695  }

◆ waitJob()

void LibThread::Scheduler::waitJob ( Job job)
inline

Definition at line 1806 of file shared.cc.

1806  {
1807  if (single_threaded) {
1808  SchedInfo *info = new SchedInfo();
1809  info->num = 0;
1810  info->scheduler = this;
1811  acquireShared(this);
1812  info->job = job;
1814  } else {
1815  lock.lock();
1816  for (;;) {
1817  if (job->done || job->cancelled) {
1818  break;
1819  }
1820  response.wait();
1821  }
1822  response.signal(); // forward signal
1823  lock.unlock();
1824  }
1825  }

Friends And Related Function Documentation

◆ Job

friend class Job
friend

Definition at line 1670 of file shared.cc.

Field Documentation

◆ cond

ConditionVariable LibThread::Scheduler::cond
private

Definition at line 1668 of file shared.cc.

◆ global_queue

priority_queue<Job *, vector<Job *>, JobCompare> LibThread::Scheduler::global_queue
private

Definition at line 1665 of file shared.cc.

◆ jobid

size_t LibThread::Scheduler::jobid
private

Definition at line 1657 of file shared.cc.

◆ lock

Lock LibThread::Scheduler::lock

Definition at line 1672 of file shared.cc.

◆ maxconcurrency

int LibThread::Scheduler::maxconcurrency
private

Definition at line 1659 of file shared.cc.

◆ nthreads

int LibThread::Scheduler::nthreads
private

Definition at line 1658 of file shared.cc.

◆ pending

vector<Job *> LibThread::Scheduler::pending
private

Definition at line 1667 of file shared.cc.

◆ response

ConditionVariable LibThread::Scheduler::response
private

Definition at line 1669 of file shared.cc.

◆ running

int LibThread::Scheduler::running
private

Definition at line 1660 of file shared.cc.

◆ shutdown_counter

int LibThread::Scheduler::shutdown_counter
private

Definition at line 1662 of file shared.cc.

◆ shutting_down

bool LibThread::Scheduler::shutting_down
private

Definition at line 1661 of file shared.cc.

◆ single_threaded

bool LibThread::Scheduler::single_threaded
private

Definition at line 1656 of file shared.cc.

◆ thread_owners

vector<ThreadPool *> LibThread::Scheduler::thread_owners
private

Definition at line 1664 of file shared.cc.

◆ thread_queues

vector<JobQueue *> LibThread::Scheduler::thread_queues
private

Definition at line 1666 of file shared.cc.

◆ threads

vector<ThreadState *> LibThread::Scheduler::threads
private

Definition at line 1663 of file shared.cc.


The documentation for this class was generated from the following file: