from datetime import datetime, timedelta
import time
import random
import subprocess
import os
import os.path
import time
from collections import defaultdict
import json
import logging
import numbers
import yaml
from django.db import models
from django.contrib.auth.models import AbstractUser
import pexpect, getpass
import qmpy
from qmpy.db.custom import DictField
from . import queue as queue
import threading
logger = logging.getLogger(__name__)
def is_yes(string):
char = string.lower()[0]
if char == "n":
return False
if char == "y":
return True
return None
class AllocationError(Exception):
"""Problem with the allocation"""
class SubmissionError(Exception):
"""Failed to submit a job"""
[docs]class User(AbstractUser):
"""
User model - stores an oqmd users information.
Relationships:
| :mod:`~qmpy.Account` via account_set
| :mod:`~qmpy.Allocation` via allocation_set
| :mod:`~qmpy.Project` via project_set
Attributes:
| id
| username
| first_name
| last_name
| date_joined
| is_active
| is_staff
| is_superuser
| last_login
| email
"""
class Meta:
app_label = "qmpy"
db_table = "users"
@property
def running(self):
return queue.Job.objects.filter(account__user=self, state=1)
@classmethod
def get(cls, name):
try:
return cls.objects.get(username=name)
except cls.DoesNotExist:
return cls(username=name)
@staticmethod
def create():
username = input("Username: ")
email = input("E-mail address: ")
user, new = User.objects.get_or_create(
username=username, last_login=datetime.now()
)
if not new:
print("User by that name exists!")
print("Please try a new name, or exit with Ctrl-x")
return User.create()
print("Okay, user created!")
user.save()
user.create_accounts()
# user.assign_allocations()
return user
def create_accounts(self):
msg = "Would you like to create cluster accounts for this user?"
ans = is_yes(input(msg + " [y/n]: "))
if ans is False:
return
elif ans is None:
print("I didn't understand that command.")
return self.create_accounts()
msg = "Does user %s have an account on %s? [y/n]: "
msg2 = "What is %s's username on %s?: "
msg3 = "On %s@%s where should calculations be run? (absolute path): "
known = self.account_set.values_list("host__name", flat=True)
for host in Host.objects.exclude(name__in=known):
ans = input(msg % (self.username, host.name))
ans = is_yes(ans)
if ans is False:
continue
uname = input(msg2 % (self.username, host.name))
acct, new = Account.objects.get_or_create(user=self, host=host)
if not new:
print("Account exists!")
continue
path = input(msg3 % (self.username, host.name))
acct.run_path = path
acct.username = uname.strip()
acct.save()
acct.create_passwordless_ssh()
[docs]class Host(models.Model):
"""
Host model - stores all host information for a cluster.
Relationships:
| account
| allocation
Attributes:
| name: Primary key.
| binaries: dict of label:path pairs for vasp binaries.
| check_queue: Path to showq command
| checked_time: datetime object for the last time the queue was
| checked.
| hostname: Full host name.
| ip_address: Full ip address.
| nodes: Total number of nodes.
| ppn: Number of processors per node.
| running: dict of PBS_ID:state pairs.
| sub_script: Path to qsub command
| sub_text: Path to queue file template.
| utilization: Number of active cores (based on showq).
| walltime: Maximum walltime on the machine.
| state: State code. 1=Up, 0=Full (auto-resets to 1 when jobs are
| collected), -1=Down.
"""
name = models.CharField(max_length=63, primary_key=True)
ip_address = models.GenericIPAddressField(null=True)
hostname = models.CharField(max_length=255)
binaries = DictField()
ppn = models.IntegerField(default=8)
nodes = models.IntegerField(default=30)
walltime = models.IntegerField(default=3600 * 24)
sub_script = models.CharField(max_length=120)
sub_text = models.TextField(default="/usr/local/bin/qsub")
check_queue = models.CharField(max_length=180, default="/usr/local/maui/bin/showq")
checked_time = models.DateTimeField(default=datetime.min)
running = DictField()
utilization = models.IntegerField(default=0)
state = models.IntegerField(default=1)
class Meta:
app_label = "qmpy"
db_table = "hosts"
def __str__(self):
return self.name
[docs] @staticmethod
def create():
"""
Classmethod to create a Host model. Script will ask you questions about
the host to add, and will return the created Host.
"""
host = {}
host["name"] = input("Hostname:")
if Host.objects.filter(name=host["name"]).exists():
print("Host by that name already exists!")
exit(-1)
host["ip_address"] = input("IP Address:")
if Host.objects.filter(ip_address=host["ip_address"]).exists():
print("Host at that address already exists!")
exit(-1)
host["ppn"] = input("Processors per node:")
host["nodes"] = input("Max nodes to run on:")
host["sub_script"] = input(
"Command to submit a script " "(e.g. /usr/local/bin/qsub):"
)
host["check_queue"] = input(
"Command for showq (e.g." "/usr/local/maui/bin/showq):"
)
host["sub_text"] = input("Path to qfile template:")
h = Host(**host)
h.save()
@classmethod
def get(cls, name):
try:
return cls.objects.get(name=name)
except cls.DoesNotExist:
return cls(name=name)
@property
def accounts(self):
return list(self.account_set.all())
@property
def jobs(self):
jobs = []
for acct in self.accounts:
jobs += list(acct.job_set.filter(state=1))
return jobs
@property
def active(self):
if self.state < 1:
return False
elif self.utilization > 5 * self.nodes * self.ppn:
return False
else:
return True
@property
def percent_utilization(self):
return 100.0 * float(self.utilization) / (self.nodes * self.ppn)
def get_utilization(self):
util = 0
for acct in self.account_set.all():
for job in acct.job_set.filter(state=1):
util += job.ncpus
self.utilization = util
return util
# < Jiahong
# def get_project(self):
# """
# Out of the active projects able to run on this host,
# select one at random
# Output:
# Project, Active project able to run on this host
# """
# proj = Project.objects.filter(allocations__host=self, state=1)
# proj = proj.filter(task__state=0)
# if proj.exists():
# return random.choice(list(proj.distinct()))
[docs] def get_project(self, maxuse=400):
"""
Out of the active projects able to run on this host,
select one at random
Output:
Project, Active project able to run on this host
"""
proj = Project.objects.filter(allocations__host=self, state=1)
proj = proj.filter(task__state=0)
if proj.exists():
proj_list = list(proj.distinct())
proj_list = [p for p in proj_list if p.running.count() < maxuse]
if len(proj_list) > 0:
return random.choice(proj_list)
def get_tasks(self, project=None):
tasks = queue.Task.objects.filter(state=0)
if project is None:
project = self.get_project()
if project is None:
return
tasks = tasks.filter(project_set=project)
tasks = tasks.filter(project_set__allocations__host=self)
tasks = tasks.filter(project_set__users__account__host=self)
return tasks.order_by("priority", "id")
@property
def qfile(self):
return open(self.sub_text).read()
def get_binary(self, key):
return self.binaries[key]
def _try_login(self, timeout=5.0):
def _login():
self._tmp_acct = Allocation.get("b1004").get_account()
self._tmp_ssh = 'ssh {user}@{host} "{cmd}"'.format(
user=self._tmp_acct.user.username,
host=self._tmp_acct.host.ip_address,
cmd="whoami",
)
self._tmp_proc = subprocess.Popen(
self._tmp_ssh,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = self._tmp_proc.communicate()
if stdout.strip() == self._tmp_acct.user.username:
print("quest is up")
self._tmp_thread = threading.Thread(target=_login)
self._tmp_thread.start()
self._tmp_thread.join(timeout)
if self._tmp_thread.is_alive():
print("unable login on quest")
self._tmp_proc.terminate()
self._tmp_thread.join()
return self._tmp_proc.returncode
[docs] def check_host(self):
"""Pings the host to see if it is online. Returns False if it is
offline."""
ret = subprocess.call(
"ping -c 1 -w 1 %s" % self.ip_address,
shell=True,
stdout=open("/dev/null", "w"),
stderr=subprocess.STDOUT,
)
if ret == 0:
self.state = 1
self.save()
write_resources()
return True
else:
"""Sometimes quest refuses to respond to ping requests. So, try
logging into it using an(y) account. Trying executing a command and
see if it is successful."""
if self.name == "quest":
if self._try_login() == 0:
self.state = 1
self.save()
write_resources()
return True
self.state = -2
self.save()
return False
@property
def running_now(self):
if not self.state == 1:
return {}
if datetime.now() + timedelta(seconds=-60) > self.checked_time:
self.check_running()
return self.running
[docs] def check_running(self):
"""
Uses the hosts data and one of the associated accounts to check the PBS
queue on the Host. If it has been checked in the last 2 minutes, it
will return the previously returned result.
"""
self.checked_time = datetime.now()
if not self.state == 1:
self.running = {}
self.save()
return
account = random.choice(self.accounts)
raw_data = account.execute(self.check_queue)
running = {}
if not raw_data:
return
elif type(raw_data) == bytes:
raw_data = raw_data.decode()
## < Mohan
if "squeue" in self.check_queue:
for line in raw_data.strip().split("\n"):
if "JOBID" in line:
continue
line = line.split()
try:
qid = int(line[0])
running[qid] = {
"user": line[3],
"state": line[4],
"proc": int(line[6]) * self.ppn,
}
except:
pass
else:
for line in raw_data.strip().split("\n"):
if "Active Jobs" in line:
continue
line = line.split()
if len(line) != 9:
continue
try:
# < Mohan
if "Moab" in line[0]:
qid = int(line[0].strip().split(".")[1])
else:
qid = int(line[0])
running[qid] = {
"user": line[1],
"state": line[2],
"proc": int(line[3]),
}
# Mohan >
except:
pass
## Mohan >
self.running = running
self.save()
def get_running(self):
if self.running is not None:
return self.running
else:
return {}
[docs] def activate(self):
"""
Allow jobs to be run on this system. Remember to save() to enact change
"""
self.state = 1
[docs] def deactivate(self):
"""
Prevent new jobs from being started on this system.
Remember to save() changes
"""
self.state = -1
@property
def utilization_by_project(self):
utilization = defaultdict(int)
for job in self.jobs:
projects = job.task.project_set.all()
for p in projects:
utilization[str(p.name)] += float(job.ncpus) / len(projects)
if self.ppn * self.nodes > sum(utilization.values()):
utilization["Idle"] = self.ppn * self.nodes - sum(utilization.values())
return utilization
@property
def utilization_json(self):
series = []
for k, v in list(self.utilization_by_project.items()):
series.append({"data": v, "label": k})
return json.dumps(series)
@property
def ncpus(self):
return self.ppn * self.nodes
# ===============================================================================#
[docs]class Account(models.Model):
"""
Base class for a `User` account on a `Host`.
Attributes:
| host
| id
| job
| run_path
| state
| user
| username
"""
user = models.ForeignKey(User, on_delete=models.CASCADE)
host = models.ForeignKey(Host, on_delete=models.CASCADE)
username = models.CharField(max_length=255)
run_path = models.TextField()
state = models.IntegerField(default=1)
class Meta:
app_label = "qmpy"
db_table = "accounts"
def __str__(self):
return "{user}@{host}".format(user=self.user.username, host=self.host.name)
@classmethod
def get(cls, user, host):
try:
return Account.objects.get(user=user, host=host)
except cls.DoesNotExist:
return Account(host=host, user=user)
def create_passwordless_ssh(self, key="id_rsa", origin=None):
msg = "password for {user}@{host}: "
if origin is None:
origin = "/home/{user}/.ssh".format(user=getpass.getuser())
pas = getpass.getpass(msg.format(user=self.username, host=self.host.name))
msg = "/usr/bin/ssh {user}@{host} touch"
msg += " /home/{user}/.ssh/authorized_keys"
p = pexpect.spawn(
msg.format(
origin=origin, key=key, user=self.username, host=self.host.ip_address
)
)
p.expect("assword:")
p.sendline(pas)
time.sleep(2)
p.close()
msg = "/usr/bin/scp {origin}/{key} {user}@{host}:/home/{user}/.ssh/"
p = pexpect.spawn(
msg.format(
origin=origin, key=key, user=self.username, host=self.host.ip_address
)
)
p.expect("assword:")
p.sendline(pas)
time.sleep(2)
p.close()
msg = "/usr/bin/ssh {user}@{host}"
msg += " chmod 600 /home/{user}/.ssh/authorized_keys"
p = pexpect.spawn(
msg.format(
origin=origin, key=key, user=self.username, host=self.host.ip_address
)
)
p.expect("assword:")
p.sendline(pas)
time.sleep(2)
p.close()
msg = "/usr/bin/ssh-copy-id -i {origin}/{key} {user}@{host}"
p = pexpect.spawn(
msg.format(
origin=origin, key=key, user=self.username, host=self.host.ip_address
)
)
p.expect("assword:")
p.sendline(pas)
time.sleep(2)
p.close()
print("Great! Lets test it real quick...")
out = self.execute("whoami")
if out.decode("utf-8") == "%s\n" % self.username:
print("Awesome! It worked!")
else:
print("Something appears to be wrong, talk to Scott...")
@property
def active(self):
if self.state < 1:
return False
elif not self.host.active:
return False
else:
return True
def submit(self, path=None, run_path=None, qfile=None):
self.execute("mkdir %s" % run_path, ignore_output=True)
self.copy(folder=path, file="*", destination=run_path)
cmd = "command cd {path} && {sub} {qfile}".format(
path=run_path, sub=self.host.sub_script, qfile=qfile
)
stdout = self.execute(cmd)
## < Mohan
if "sbatch" in self.host.sub_script:
jid = int(stdout.strip().split()[-1])
return jid
else:
# < Mohan
tmp = stdout.strip().split()[0]
if type(tmp) == bytes:
tmp = tmp.decode()
if "Moab" in tmp:
jid = int(tmp.split(".")[1])
else:
jid = int(tmp.split(".")[0])
# Mohan >
## Mohan >
return jid
def execute(self, command="exit 0", ignore_output=False):
ssh = "ssh {user}@{host} '{cmd}'".format(
user=self.username, host=self.host.ip_address, cmd=command
)
logging.debug(ssh)
call = subprocess.Popen(
ssh, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = call.communicate()
logging.debug("stdout: %s" % stdout)
logging.debug("stderr: %s" % stderr)
if stderr and not ignore_output:
logging.warn("WARNING: %s" % stderr)
logging.warn(
"(u, h, cmd) = {}, {}, {}".format(self.username, self.host, command)
)
return stdout
def copy(
self,
destination=None,
to=None, # where to send the stuff
fr=None,
file=None,
folder=None, # what to send
clear_dest_dir=False,
move=False,
): # some conditions on sending it
if destination is None:
destination = self.run_path
if to is None:
to = self
if fr is None:
if to == "local":
fr = self
else:
fr = "local"
assert isinstance(to, Account) or to == "local"
assert isinstance(fr, Account) or fr == "local"
assert not (file is None and folder is None)
send_dir = False
if file is None:
send_dir = True
elif folder is None:
folder = os.path.dirname(file)
file = os.path.basename(file)
if clear_dest_dir:
if to == "local":
command = subprocess.Popen(
'rm -f "%s"/*' % destination,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
stdout, stderr = command.communicate()
else:
stdout, stderr = self.execute('rm -f "%/"*' % destination)
logging.debug("stdout: %s" % stdout)
if fr == "local":
scp = "scp "
else:
scp = "scp {user}@{host}:".format(user=fr.username, host=fr.host.ip_address)
if not file:
scp += "-r "
if send_dir:
scp += os.path.abspath("{}".format(folder))
else:
scp += '"{path}"/{file}'.format(path=os.path.abspath(folder), file=file)
if to == "local":
scp += " " + destination
else:
scp += ' {user}@{host}:"{path}"'.format(
user=to.username,
host=to.host.ip_address,
path=os.path.abspath(destination),
)
logging.debug("copy command: %s" % scp)
cmd = subprocess.Popen(
scp, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = cmd.communicate()
logging.debug("stdout: %s" % stdout)
logging.debug("stderr: %s" % stderr)
if move:
if send_dir:
rmcmd = 'rm -rf "{path}"'.format(path=os.path.abspath(folder))
else:
rmcmd = 'rm -f "{path}/"{file}'.format(
file=file, path=os.path.abspath(folder)
)
logging.debug("wiping source: %s" % rmcmd)
stdout = fr.execute(rmcmd)
logging.debug("output: %s" % stdout)
# ===============================================================================#
[docs]class Allocation(models.Model):
"""
Base class for an Allocation on a computing resources.
Attributes:
| host
| job
| key
| name
| project
| state
| users
"""
name = models.CharField(max_length=63, primary_key=True)
key = models.CharField(max_length=100, default="")
host = models.ForeignKey(Host, on_delete=models.CASCADE)
users = models.ManyToManyField(User)
state = models.IntegerField(default=1)
class Meta:
app_label = "qmpy"
db_table = "allocations"
def __str__(self):
return self.name
@classmethod
def create(self):
name = input("Name your allocation:")
if Allocation.objects.filter(name=name).exists():
print("Allocation by that name already exists!")
exit(-1)
host = input("Which cluster is this allocation on?")
if not Host.objects.filter(name=host).exists():
print("This host doesn't exist!")
exit(-1)
host = Host.objects.get(name=host)
alloc = Allocation(name=name, host=host)
alloc.save()
print("Now we will assign users to this allocation")
for acct in Account.objects.filter(host=host):
inc = input("Can %s use this allocation? y/n [y]:" % acct.user.username)
if inc == "y" or inc == "":
alloc.users.add(acct.user)
print("If this allocation requires a special password, enter", end=" ")
key = input("it now:")
alloc.key = key
alloc.save()
@classmethod
def get(cls, name):
try:
return cls.objects.get(name=name)
except cls.DoesNotExist:
return cls(name=name)
@property
def active(self):
if self.state < 1:
return False
elif not self.host.active:
return False
else:
return True
def get_user(self):
return random.choice(self.users.filter(state=1))
def get_account(self, users=None):
if users is None:
users = self.users.all()
user = random.choice(list(users))
return user.account_set.get(host=self.host)
@property
def percent_utilization(self):
return self.host.percent_utilization
# ===============================================================================#
[docs]class Project(models.Model):
"""
Base class for a project within qmpy.
Attributes:
| allocations
| entry
| name
| priority
| state
| task
| users
"""
name = models.CharField(max_length=63, primary_key=True)
priority = models.IntegerField(default=0)
users = models.ManyToManyField(User)
allocations = models.ManyToManyField(Allocation)
state = models.IntegerField(default=1)
class Meta:
app_label = "qmpy"
db_table = "projects"
def __str__(self):
return self.name
@classmethod
def get(cls, name):
if isinstance(name, cls):
return name
try:
return cls.objects.get(name=name)
except cls.DoesNotExist:
return cls(name=name)
@property
def completed(self):
return self.task_set.filter(state=2)
@property
def running(self):
return self.task_set.filter(state=1)
@property
def waiting(self):
return self.task_set.filter(state=0).order_by("priority")
@property
def failed(self):
return self.task_set.filter(state=-1)
@property
def held(self):
return self.task_set.filter(state=-2)
[docs] @staticmethod
def create():
"""
Create a new project. Prompts user on std-in
for name, users, and allocations of this project.
"""
name = input("Name your project: ")
if Project.objects.filter(name=name).exists():
print("Project by that name already exists!")
exit(-1)
proj = Project(name=name)
proj.save()
proj.priority = input("Project priority (1-100): ")
users = input("List project users (e.g. sjk648 jsaal531 bwm291): ")
for u in users.split():
if not User.objects.filter(username=u).exists():
print("User named", u, "doesn't exist!")
else:
proj.users.add(User.objects.get(username=u))
alloc = input("List project allocations (e.g. byrd victoria b1004): ")
for a in alloc.split():
if not Allocation.objects.filter(name=a).exists():
print("Allocation named", a, "doesn't exist!")
else:
proj.allocations.add(Allocation.objects.get(name=a))
@property
def active(self):
if self.state < 0:
return False
else:
if self.state != 1:
self.state = 1
self.save()
return True
def get_allocation(self):
available = [a for a in self.allocations.all() if a.active]
if available:
return random.choice(available)
else:
return []
# !vih
def write_resources():
current_loc = os.path.dirname(__file__)
######
# headers for various configuration files
######
hosts_header = """# host1:
# binaries:
# bin_name1: /path/to/bin1
# bin_name2: /path/to/bin2
# check_queue: /full/path/to/showq
# hostname: full.host.name
# ip_address: ###.###.##.###
# nodes: # of nodes on machine
# ppn: # of processors per node
# sub_script: /full/path/to/submission/command
# sub_text: filename for qfile to use a template.
# A file named "filename" must be in configuration/qfiles
# walltime: maximum walltime, in seconds
# host2: ...
"""
users_header = """# user1:
# hostname1:
# run_path:/where/to/run/on/host1
# username: usernameonhost1
# hostname2:
# run_path:/where/to/run/on/host2
# username: usernameonhost2
# user2:
# hostname1: ...
"""
allocations_header = """# allocation1:
# host: hostname
# key: key needed for identifying allocation
# users:
# - user1
# - user2
# allocation2: ...
"""
projects_header = """# project1:
# allocations:
# - allocation1
# - allocation2
# priority: Base priority for the project. Lower numbers will be done soonest.
# users:
# - user1
# - user2
# project2: ...
"""
######
# list of values that need to be written into the configuration files
######
host_values = [
"binaries",
"check_queue",
"hostname",
"ip_address",
"nodes",
"ppn",
"sub_script",
"sub_text",
"walltime",
]
user_values = ["run_path", "username"]
allocation_values = ["host", "key", "users"]
project_values = ["allocations", "priority", "users"]
######
# a function to 'clean' the values from type unicode/ long/ etc. to string/ int
######
def clean(val):
if isinstance(val, str):
val = str(val)
elif isinstance(val, numbers.Number):
val = int(val)
return val
######
# write host configurations into hosts.yml
######
hosts = Host.objects.all()
dict1 = {}
for h in hosts:
dict2 = {}
for hv in host_values:
dict2[hv] = clean(h.__getattribute__(hv))
dict1[clean(h.name)] = dict2
with open(current_loc + "/../configuration/resources/hosts.yml", "w") as f_hosts:
f_hosts.write(hosts_header)
f_hosts.write("\n")
yaml.dump(dict1, f_hosts, default_flow_style=False)
######
# write user configurations into users.yml
######
users = User.objects.all()
dict1 = {}
for u in users:
dict2 = {}
accounts = Account.objects.filter(user=u)
for a in accounts:
dict2[clean(a.host.name)] = {
"run_path": clean(a.run_path),
"username": clean(a.username),
}
dict1[clean(u.username)] = dict2
with open(current_loc + "/../configuration/resources/users.yml", "w") as f_users:
f_users.write(users_header)
f_users.write("\n")
yaml.dump(dict1, f_users, default_flow_style=False)
######
# write allocation configurations into allocations.yml
######
alloc = Allocation.objects.all()
dict1 = {}
for a in alloc:
dict2 = {}
dict2["host"] = clean(a.host.name)
dict2["key"] = clean(a.key)
dict2["users"] = [
clean(u) for u in a.users.all().values_list("username", flat=True)
]
dict1[clean(a.name)] = dict2
with open(
current_loc + "/../configuration/resources/allocations.yml", "w"
) as f_allocations:
f_allocations.write(allocations_header)
f_allocations.write("\n")
yaml.dump(dict1, f_allocations, default_flow_style=False)
######
# write project configurations into projects.yml
######
pro = Project.objects.all()
dict1 = {}
for p in pro:
dict2 = {}
dict2["allocations"] = [
clean(a) for a in p.allocations.all().values_list("name", flat=True)
]
dict2["priority"] = clean(p.priority)
dict2["users"] = [
clean(u) for u in p.users.all().values_list("username", flat=True)
]
dict1[clean(p.name)] = dict2
with open(
current_loc + "/../configuration/resources/projects.yml", "w"
) as f_projects:
f_projects.write(projects_header)
f_projects.write("\n")
yaml.dump(dict1, f_projects, default_flow_style=False)