Source code for qmpy.computing.queue

#!/usr/bin/env python

from django.db import models
import json
import os.path
import time
from datetime import datetime, timedelta
import random

from .resources import Project, Account, Allocation
from qmpy.analysis.vasp import Calculation
from qmpy.db.custom import *
import qmpy


class TaskError(Exception):
    """A project was needed but not provided"""


class ResourceUnavailableError(Exception):
    """Resource is occupied"""


[docs]class Task(models.Model): """ Model for a :Task: to be done. A :Task: consists of a module, which is the name of a computing script, and a set of keyword arguments, specified as a dictionary as the `kwargs` attribute of the task. In order for a Task for be completed, it must also be assigned one or more :Project:s. Relationships: | :mod:`~qmpy.Entry` via entry | :mod:`~qmpy.Job` via job_set | :mod:`~qmpy.Project` via project_set Attributes: | id | created: datetime object for when the task was created. | finished: datetime object for when the task was completed. | module: The name of a function in :mod:`~qmpy.computing.scripts` | kwargs: dict of keyword:value pairs to pass to the calculation | module. | priority: Priority of the task. Lower values are more urgent. | state: State code, given by the table below. Task codes: +------+-------------------+ | Code | Description | +======+===================+ | -2 | being held | +------+-------------------+ | -1 | encountered error | +------+-------------------+ | 0 | ready to run | +------+-------------------+ | 1 | jobs running | +------+-------------------+ | 2 | completed | +------+-------------------+ """ module = models.CharField(max_length=60) kwargs = DictField() state = models.IntegerField(default=0) priority = models.IntegerField(default=50) created = models.DateTimeField(blank=True, auto_now_add=True) finished = models.DateTimeField(blank=True, null=True) entry = models.ForeignKey("Entry", on_delete=models.CASCADE) project_set = models.ManyToManyField(Project) _projects = None class Meta: app_label = "qmpy" db_table = "tasks"
[docs] def save(self, *args, **kwargs): super(Task, self).save(*args, **kwargs) self.project_set.set([Project.get(p) for p in self.projects])
@property def projects(self): """List of related projects.""" if self._projects is None: self._projects = list(self.project_set.all()) return self._projects @projects.setter def projects(self, projects): self._projects = projects def get_project(self): projects = self.project_set.filter(state=1) projects = [p for p in projects if p.active] if not projects: return return random.choice(projects) @property def eligible_to_run(self): if self.state != 0: return False if self.entry.holds: return False return True @staticmethod def create(entry, module="static", kwargs={}, priority=None, projects=None): if projects is None: projects = entry.projects elif isinstance(projects, str): projects = Project.objects.get(name=projects) if priority is None: priority = len(entry.input) task, created = Task.objects.get_or_create( entry=entry, kwargs=kwargs, module=module ) if created: task.projects = projects else: task.projects += projects task.priority = priority return task
[docs] def complete(self): """Sets the Task state to 2 and populates the finished field.""" self.state = 2 self.finished = datetime.now()
def hold(self): self.state = -2 def fail(self): self.state = -1 def __str__(self): return "%s (%s: %s)" % (self.module, self.entry, self.entry.path) @property def jobs(self): """List of jobs related to the task.""" return self.job_set.all() @property def last_job_state(self): if self.job_set.all(): return self.job_set.all().order_by("-id")[0].state @property def errors(self): """List of errors encountered by related calculations.""" return self.entry.errors
[docs] def get_jobs(self, project=None, allocation=None, account=None, host=None): """ Check the calculation module specified by the `Task`, and returns a list of :class:`Job` objects accordingly. Calls the task's entry's "do" method with the `Task.module` as the first argument, and passing `Task.kwargs` as keyword arguments. Returns: List of Job objects. When nothing is left to do for the task, returns empty. Raises: ResourceUnavailableError: Raise if for the specified project, allocation, account and/or host there are no available cores. """ if host != None: if not project: projects = self.project_set.filter(allocations__host=host, state=1) project = random.choice(list(projects)) if not allocation: allocations = project.allocations.filter(host=host, state=1) allocation = random.choice(list(allocations)) elif project != None: allocation = project.get_allocation() if not allocation: raise ResourceUnavailableError else: project = self.get_project() if account is None: if project is None: account = allocation.get_account() elif not allocation is None: account = allocation.get_account(users=list(project.users.all())) calc = self.entry.do(self.module, **self.kwargs) # Special case: Adjustments for certain clusters if not allocation is None: if host.name == "quest": # Special MPI call for quest Slurm calc.instructions["mpi"] = "mpirun -np $NPROCS" if allocation.name == "b1004": # Can only run parallel VASP on b1004 allocation calc.instructions["serial"] = False calc.instructions["binary"] = "vasp_53" calc.instructions[ "queuetype" ] = "buyin" # queue type for b1004 is 'buyin' elif allocation.name == "d20829": # Sheel doesn't have access to b1004 binaries calc.instructions["binary"] = "~/vasp_53" calc.instructions["queuetype"] = "normal" elif allocation.name == "p30919": calc.instructions["queuetype"] = "short" calc.instructions["serial"] = False calc.instructions["nodes"] = 1 calc.instructions["ntasks"] = 4 calc.instructions["walltime"] = 3600*4 #calc.instructions["binary"] = "vasp_53" elif allocation.name == "p31151": calc.instructions["queuetype"] = "short" calc.instructions["serial"] = False calc.instructions["nodes"] = 1 calc.instructions["ntasks"] = 4 calc.instructions["walltime"] = 3600*4 elif allocation.name == "p30475": calc.instructions["queuetype"] = "short" calc.instructions["serial"] = False calc.instructions["nodes"] = 1 calc.instructions["ntasks"] = 16 calc.instructions["walltime"] = 3600*4 elif allocation.name == "p30649": calc.instructions["queuetype"] = "short" calc.instructions["serial"] = False if 'fast' in self.entry.keywords: calc.instructions["nodes"] = 2 calc.instructions["ntasks"] = 32 else: calc.instructions["nodes"] = 1 calc.instructions["ntasks"] = 16 calc.instructions["walltime"] = 3600*4 calc.settings["kpar"] = 4 elif allocation.name == "p31102": calc.instructions["queuetype"] = "short" calc.instructions["serial"] = False calc.instructions["nodes"] = 1 calc.instructions["ntasks"] = 16 calc.instructions["walltime"] = 3600*4 elif calc.entry.natoms < 9: calc.instructions["queuetype"] = "short" calc.instructions["serial"] = False calc.instructions["nodes"] = 1 calc.instructions["ntasks"] = 4 calc.instructions["walltime"] = 3600*4 elif calc.entry.natoms < 13: calc.instructions["queuetype"] = "short" calc.instructions["serial"] = False calc.instructions["nodes"] = 1 calc.instructions["ntasks"] = 16 calc.instructions["walltime"] = 3600*4 elif Project.get("pyrochlore") in calc.entry.projects: calc.instructions["queuetype"] = "short" calc.instructions["serial"] = False calc.instructions["nodes"] = 1 calc.instructions["ntasks"] = 16 calc.instructions["walltime"] = 3600*4 elif calc.entry.natoms > 19: calc.instructions["queuetype"] = "normal" calc.instructions["nodes"] = 1 calc.instructions["ntasks"] = 16 else: calc.instructions["queuetype"] = "normal" if allocation.name == "bebop": # Special MPI call for bebop calc.instructions["mpi"] = "mpirun -psm2 -np $NPROCS" if allocation.name == "xsede": # Special MPI call for xsede calc.instructions["mpi"] = "mpirun -np $NPROCS" if allocation.name == "babbage": # Check if calculation is parallel if "serial" in calc.instructions and not calc.instructions["serial"]: # Different MPI call on Babbage calc.instructions[ "mpi" ] = "mpirun -np $NPROCS -machinefile $PBS_NODEFILE -tmpdir /scratch" jobs = [] if calc.instructions: self.state = 1 new_job = Job.create( task=self, allocation=allocation, account=account, entry=self.entry, **calc.instructions, ) jobs.append(new_job) calc.save() elif calc.converged: self.complete() else: self.state = -1 return jobs
[docs]class Job(models.Model): """ Base class for job submitted to a compute cluster. Relationships: | :mod:`~qmpy.Task` via task | :mod:`~qmpy.Account` via account. The account the calculation is | performed on. | :mod:`~qmpy.Allocation` via allocation. The allocation on which the | calculation is being performed. | :mod:`~qmpy.Entry` via entry Attributes: | id | created: datetime object for when the task was created. | finished: datetime object for when the task was completed. | ncpus: # of processors assigned. | path: Origination path of the calculation. | run_path: Path of the calculation on the compute resource. | qid: PBS queue ID number. | walltime: Max walltime (in seconds). | state: State code, defined as in the table below. Job codes +------+-------------------+ | Code | Description | +======+===================+ | -1 | encountered error | +------+-------------------+ | 0 | ready to submit | +------+-------------------+ | 1 | currently running | +------+-------------------+ | 2 | completed | +------+-------------------+ """ qid = models.IntegerField(default=0) walltime = models.DateTimeField(blank=True) path = models.CharField(max_length=200) run_path = models.CharField(max_length=200) ncpus = models.IntegerField(blank=True) created = models.DateTimeField(blank=True, auto_now_add=True) finished = models.DateTimeField(blank=True, null=True) state = models.IntegerField(default=0) task = models.ForeignKey(Task, on_delete=models.CASCADE) entry = models.ForeignKey("Entry", on_delete=models.CASCADE) account = models.ForeignKey(Account, on_delete=models.CASCADE) allocation = models.ForeignKey(Allocation, on_delete=models.CASCADE) class Meta: app_label = "qmpy" db_table = "jobs" @staticmethod def create( task=None, allocation=None, entry=None, account=None, path=None, serial=None, walltime=3600, queuetype=None, nodes=None, ntasks=None, header=None, mpi=None, binary=None, pipes=None, footer=None, ): if entry is None: entry = task.entry # assert isinstance(allocation, Allocation) # assert isinstance(task, Task) # assert path is not None # if account is None: # account = allocation.get_account() job = Job( path=path, walltime=walltime, allocation=allocation, account=account, entry=entry, task=task, ) # if walltime < 3600: # nodes = 1 # ppn = int(walltime/3600.*job.account.host.ppn) # walltime = walltime/ppn # else: # ppn = job.account.host.ppn # nodes = 1+int(walltime/float(job.account.host.walltime)) # walltime = walltime/float(ppn*nodes) if serial: ppn = 1 nodes = 1 walltime = 3600 * 24 * 4 # change queuetype to long for quest machine if job.allocation.host.name == "quest": queuetype = "long" if job.allocation.name == "p20746": walltime = 3600 * 24 if job.allocation.name == "p20747": walltime = 3600 * 24 else: if nodes is None: nodes = 1 ppn = job.account.host.ppn if job.allocation.name == "b1004": ppn = 4 if walltime is None: walltime = job.account.host.walltime # < Mohan # Set a HARD upper bound for walltime # If longer walltime is needed, please modify the following codes! walltime = min(walltime, job.account.host.walltime) # Mohan > binary = job.account.host.get_binary(binary) if not binary: raise AllocationError sec = timedelta(seconds=walltime) d = datetime(1, 1, 1) + sec job.walltime = d ## walltime format for quest is hh:mm:ss (Mohan) if job.allocation.host.name == "quest": walltime = "%d:%02d:%02d" % ((d.day - 1) * 24 + d.hour, d.minute, d.second) else: walltime = "%02d:%02d:%02d:%02d" % (d.day - 1, d.hour, d.minute, d.second) if not ntasks and job.allocation.host.name == "quest": ntasks = nodes * ppn qp = qmpy.INSTALL_PATH + "/configuration/qfiles/" text = open(qp + job.account.host.sub_text + ".q", "r").read() qfile = text.format( host=allocation.host.name, key=allocation.key, name=job.description, queuetype=queuetype, ntasks=ntasks, walltime=walltime, nodes=nodes, ppn=ppn, header=header, mpi=mpi, binary=binary, pipes=pipes, footer=footer, ) qf = open(job.path + "/auto.q", "w") qf.write(qfile) qf.close() job.ncpus = ppn * nodes job.run_path = job.account.run_path + "/" + job.description return job @property def walltime_expired(self): from datetime import datetime, timedelta elapsed = datetime.now() - self.created if elapsed.total_seconds() > self.walltime: return True else: return False @property def calculation(self): try: return Calculation.objects.get(path=self.path) except: return @property def subdir(self): return self.path.replace(self.entry.path, "") @property def description(self): uniq = "" if self.task.kwargs: uniq = "_" + "_".join( ["%s:%s" % (k, v) for k, v in list(self.task.kwargs.items())] ) return "{entry}_{subdir}{uniq}".format( entry=self.entry.id, subdir=self.subdir.strip("/").replace("/", "_"), uniq=uniq, ) def __str__(self): return "%s on %s" % (self.description, self.account) def is_done(self): # Ensure the calculation has had time to show up showq if datetime.now() + timedelta(seconds=-600) < self.created: return False # then check to see if it is still there if self.qid in self.account.host.running_now: return False else: return True def submit(self): if not self.account.host.active: return self.created = datetime.now() self.qid = self.account.submit( path=self.path, run_path=self.run_path, qfile="auto.q" ) self.task.state = 1 self.state = 1 def collect(self): self.task.state = 0 self.task.save() self.state = 2 self.account.copy( move=True, to="local", destination=self.path, folder=self.run_path, file="*" ) self.account.execute("rm -rf %s" % self.run_path, ignore_output=True) self.finished = datetime.now() self.save()