mirror of https://github.com/nodejs/node.git
220 lines
5.0 KiB
Python
220 lines
5.0 KiB
Python
#!/usr/bin/env python
|
|
# encoding: utf-8
|
|
# Thomas Nagy, 2005-2008 (ita)
|
|
|
|
"Execute the tasks"
|
|
|
|
import sys, random, time, threading, traceback
|
|
try: from Queue import Queue
|
|
except ImportError: from queue import Queue
|
|
import Build, Utils, Logs, Options
|
|
from Logs import debug, error
|
|
from Constants import *
|
|
|
|
GAP = 15
|
|
|
|
run_old = threading.Thread.run
|
|
def run(*args, **kwargs):
|
|
try:
|
|
run_old(*args, **kwargs)
|
|
except (KeyboardInterrupt, SystemExit):
|
|
raise
|
|
except:
|
|
sys.excepthook(*sys.exc_info())
|
|
threading.Thread.run = run
|
|
|
|
class TaskConsumer(threading.Thread):
|
|
def __init__(self, m):
|
|
threading.Thread.__init__(self)
|
|
self.setDaemon(1)
|
|
self.master = m
|
|
self.start()
|
|
|
|
def run(self):
|
|
try:
|
|
self.loop()
|
|
except:
|
|
pass
|
|
|
|
def loop(self):
|
|
m = self.master
|
|
while 1:
|
|
tsk = m.ready.get()
|
|
if m.stop:
|
|
m.out.put(tsk)
|
|
continue
|
|
|
|
try:
|
|
tsk.generator.bld.printout(tsk.display())
|
|
if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
|
|
# actual call to task's run() function
|
|
else: ret = tsk.call_run()
|
|
except Exception, e:
|
|
tsk.err_msg = Utils.ex_stack()
|
|
tsk.hasrun = EXCEPTION
|
|
|
|
# TODO cleanup
|
|
m.error_handler(tsk)
|
|
m.out.put(tsk)
|
|
continue
|
|
|
|
if ret:
|
|
tsk.err_code = ret
|
|
tsk.hasrun = CRASHED
|
|
else:
|
|
try:
|
|
tsk.post_run()
|
|
except Utils.WafError:
|
|
pass
|
|
except Exception:
|
|
tsk.err_msg = Utils.ex_stack()
|
|
tsk.hasrun = EXCEPTION
|
|
else:
|
|
tsk.hasrun = SUCCESS
|
|
if tsk.hasrun != SUCCESS:
|
|
m.error_handler(tsk)
|
|
|
|
m.out.put(tsk)
|
|
|
|
class Parallel(object):
|
|
"""
|
|
keep the consumer threads busy, and avoid consuming cpu cycles
|
|
when no more tasks can be added (end of the build, etc)
|
|
"""
|
|
def __init__(self, bld, j=2):
|
|
|
|
# number of consumers
|
|
self.numjobs = j
|
|
|
|
self.manager = bld.task_manager
|
|
self.manager.current_group = 0
|
|
|
|
self.total = self.manager.total()
|
|
|
|
# tasks waiting to be processed - IMPORTANT
|
|
self.outstanding = []
|
|
self.maxjobs = MAXJOBS
|
|
|
|
# tasks that are awaiting for another task to complete
|
|
self.frozen = []
|
|
|
|
# tasks waiting to be run by the consumers
|
|
self.ready = Queue(0)
|
|
self.out = Queue(0)
|
|
|
|
self.count = 0 # tasks not in the producer area
|
|
|
|
self.processed = 1 # progress indicator
|
|
|
|
self.consumers = None # the consumer threads, created lazily
|
|
|
|
self.stop = False # error condition to stop the build
|
|
self.error = False # error flag
|
|
|
|
def get_next(self):
|
|
"override this method to schedule the tasks in a particular order"
|
|
if not self.outstanding:
|
|
return None
|
|
return self.outstanding.pop(0)
|
|
|
|
def postpone(self, tsk):
|
|
"override this method to schedule the tasks in a particular order"
|
|
# TODO consider using a deque instead
|
|
if random.randint(0, 1):
|
|
self.frozen.insert(0, tsk)
|
|
else:
|
|
self.frozen.append(tsk)
|
|
|
|
def refill_task_list(self):
|
|
"called to set the next group of tasks"
|
|
|
|
while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
|
|
self.get_out()
|
|
|
|
while not self.outstanding:
|
|
if self.count:
|
|
self.get_out()
|
|
|
|
if self.frozen:
|
|
self.outstanding += self.frozen
|
|
self.frozen = []
|
|
elif not self.count:
|
|
(jobs, tmp) = self.manager.get_next_set()
|
|
if jobs != None: self.maxjobs = jobs
|
|
if tmp: self.outstanding += tmp
|
|
break
|
|
|
|
def get_out(self):
|
|
"the tasks that are put to execute are all collected using get_out"
|
|
ret = self.out.get()
|
|
self.manager.add_finished(ret)
|
|
if not self.stop and getattr(ret, 'more_tasks', None):
|
|
self.outstanding += ret.more_tasks
|
|
self.total += len(ret.more_tasks)
|
|
self.count -= 1
|
|
|
|
def error_handler(self, tsk):
|
|
"by default, errors make the build stop (not thread safe so be careful)"
|
|
if not Options.options.keep:
|
|
self.stop = True
|
|
self.error = True
|
|
|
|
def start(self):
|
|
"execute the tasks"
|
|
|
|
while not self.stop:
|
|
|
|
self.refill_task_list()
|
|
|
|
# consider the next task
|
|
tsk = self.get_next()
|
|
if not tsk:
|
|
if self.count:
|
|
# tasks may add new ones after they are run
|
|
continue
|
|
else:
|
|
# no tasks to run, no tasks running, time to exit
|
|
break
|
|
|
|
if tsk.hasrun:
|
|
# if the task is marked as "run", just skip it
|
|
self.processed += 1
|
|
self.manager.add_finished(tsk)
|
|
continue
|
|
|
|
try:
|
|
st = tsk.runnable_status()
|
|
except Exception, e:
|
|
tsk.err_msg = Utils.ex_stack()
|
|
tsk.hasrun = EXCEPTION
|
|
self.processed += 1
|
|
self.error_handler(tsk)
|
|
self.manager.add_finished(tsk)
|
|
continue
|
|
|
|
if st == ASK_LATER:
|
|
self.postpone(tsk)
|
|
elif st == SKIP_ME:
|
|
self.processed += 1
|
|
tsk.hasrun = SKIPPED
|
|
self.manager.add_finished(tsk)
|
|
else:
|
|
# run me: put the task in ready queue
|
|
tsk.position = (self.processed, self.total)
|
|
self.count += 1
|
|
self.ready.put(tsk)
|
|
self.processed += 1
|
|
|
|
# create the consumer threads only if there is something to consume
|
|
if not self.consumers:
|
|
self.consumers = [TaskConsumer(self) for i in xrange(self.numjobs)]
|
|
|
|
# self.count represents the tasks that have been made available to the consumer threads
|
|
# collect all the tasks after an error else the message may be incomplete
|
|
while self.error and self.count:
|
|
self.get_out()
|
|
|
|
#print loop
|
|
assert (self.count == 0 or self.stop)
|
|
|