From 40161692745eb9fedc0820d540e7c70184247b55 Mon Sep 17 00:00:00 2001 From: peelstnac Date: Sat, 10 Feb 2024 15:22:26 -0500 Subject: [PATCH 1/2] Add class "SuperQueue", a queue with arbitrary index insertions and retrievals. --- gpu_queue/queue.py | 120 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 gpu_queue/queue.py diff --git a/gpu_queue/queue.py b/gpu_queue/queue.py new file mode 100644 index 0000000..b6b405d --- /dev/null +++ b/gpu_queue/queue.py @@ -0,0 +1,120 @@ +from queue import Queue +from time import monotonic as time + + +try: + from _queue import Empty +except ImportError: + class Empty(Exception): + """Exception raised by Queue.get(block=0)/get_nowait().""" + pass + + +class Full(Exception): + """Exception raised by Queue.put(block=0)/put_nowait().""" + pass + + +class SuperQueue(Queue): + """Queue with arbitray insertion and removal""" + + def put(self, item, block=True, timeout=None, index=None): + """Put an item into the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Full exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the Full exception ('timeout' + is ignored in that case). + + If optional arg 'index' is not None, then item will be inserted at + specified index. + """ + with self.not_full: + if self.maxsize > 0: + if not block: + if self._qsize() >= self.maxsize: + raise Full + elif timeout is None: + while self._qsize() >= self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while self._qsize() >= self.maxsize: + remaining = endtime - time() + if remaining <= 0.0: + raise Full + self.not_full.wait(remaining) + + self._put(item, index=index) + self.unfinished_tasks += 1 + self.not_empty.notify() + + def get(self, block=True, timeout=None, index=None): + """Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + + If optional arg index is not None, then item will be retrieved at + specified index. + """ + with self.not_empty: + if not block: + if not self._qsize(): + raise Empty + elif timeout is None: + while not self._qsize(): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while not self._qsize(): + remaining = endtime - time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + item = self._get(index=index) + self.not_full.notify() + return item + + def put_nowait(self, item, index=None): + """Put an item into the queue without blocking. + + Only enqueue the item if a free slot is immediately available. + Otherwise raise the Full exception. + """ + return self.put(item, block=False, index=index) + + def get_nowait(self, index=None): + """Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + """ + return self.get(block=False, index=None) + + # Put a new item in the queue + def _put(self, item, index=None): + if index is None: + self.queue.append(item) + return + self.queue.insert(index, item) + + # Get an item from the queue + def _get(self, index=None): + if index is None: + return self.queue.popleft() + item = self.queue[index] + del self.queue[index] + return item From e2394db58a1e1e7fbdc54b23b27fccb5c3ac1c78 Mon Sep 17 00:00:00 2001 From: peelstnac Date: Tue, 26 Mar 2024 19:08:16 -0400 Subject: [PATCH 2/2] allow swapping out gpu_queue --- gpu_queue/main.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/gpu_queue/main.py b/gpu_queue/main.py index 0c10ebe..a9f1cfa 100644 --- a/gpu_queue/main.py +++ b/gpu_queue/main.py @@ -64,10 +64,11 @@ def __init__( for job in self.job_array: self.job_queue.put(job) + self.gpu_queue_id = 0 self.gpu_queue = Queue() for gpu in self.available_gpus: - self.gpu_queue.put(gpu) + self.gpu_queue.put((gpu, self.gpu_queue_id)) print("%d jobs has been loaded" % len(self.job_array)) self.result_dict = {} @@ -106,8 +107,18 @@ def submit_jobs(self): if self.verbose: self._print(f_dict) + def update_available_gpus(self, available_gpus: str | int | t.List[str | int]): + self.gpu_queue_id += 1 + gpu_queue = Queue() + for gpu in available_gpus: + gpu_queue.put((gpu, self.gpu_queue_id)) + self.available_gpus = available_gpus + self.gpu_queue = gpu_queue + + @threaded(daemon=False, name="submitter") def _process_daemon(self, job, gpu): + gpu, gpu_queue_id = gpu new_environment = os.environ.copy() new_environment["CUDA_VISIBLE_DEVICES"] = str(gpu) # with log_writer(job, save_dir=self.save_dir) as writer: @@ -118,7 +129,8 @@ def _process_daemon(self, job, gpu): ) self.result_dict[job] = result_code.returncode # Recycling GPU num - self.gpu_queue.put(gpu) + if gpu_queue_id == self.gpu_queue_id: + self.gpu_queue.put(gpu) def _print(self, result_dict): for k, v in result_dict.items():