Source code for hpsspy.util

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
hpsspy.util
~~~~~~~~~~~

Low-level utilities.
"""
import os
import stat
import re
from datetime import datetime
from subprocess import call
from tempfile import TemporaryFile
import pytz
from . import HpssOSError


[docs]class HpssFile(object): """This class is used to store and access an HPSS file's metadata. Parameters ---------- args : iterable This object this will normally be initialized by a tuple produced by :func:`hpsspy.os.listdir`. Attributes ---------- hpss_path : :class:`str` Path on the HPSS filesystem. raw_type : :class:`str` Raw type string. raw_permission : :class:`str` Raw permission string. st_nlink : :class:`int` Number of hard links. st_uid : :class:`str` Owner's name. st_gid : :class:`str` Group name. st_size : :class:`int` File size in bytes. raw_dow : :class:`str` Day-of-week of modification time. raw_month : :class:`str` Month of modification time. raw_day : :class:`int` Day of modification time. raw_hms : :class:`str` H:M:S of modification time. raw_year : :class:`int` Year of modification time. raw_name : :class:`str` Name of file. ishtar : :class:`bool` ``True`` if the file is an htar file. """ _htarre = re.compile(r"""HTAR:\s([dl-]) # file type ([rwxsStT-]+)\s+ # file permissions ([^/]+)/(\S+)\s+ # user/group (\d+)\s+ # size (\d+)-(\d+)-(\d+)\s+ # Year-month-day ([0-9:]+)\s+ # HH:MM (\S.*)$ # filename """, re.VERBOSE) _file_modes = {'l': stat.S_IFLNK, 'd': stat.S_IFDIR, '-': stat.S_IFREG} _months = ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec') _pacific = pytz.timezone('US/Pacific') def __init__(self, *args): self.hpss_path = args[0] self.raw_type = args[1] self.raw_permission = args[2] self.st_nlink = int(args[3]) self.st_uid = args[4] self.st_gid = args[5] self.st_size = int(args[6]) self.raw_dow = args[7] self.raw_month = args[8] self.raw_day = int(args[9]) self.raw_hms = args[10] self.raw_year = int(args[11]) self.raw_name = args[12] self.ishtar = False self._contents = None # placeholder for htar file contents. self._property_cache = dict() return def __repr__(self): return ("HpssFile('{0.hpss_path}', '{0.raw_type}', " + "'{0.raw_permission}', {0.st_nlink:d}, '{0.st_uid}', " + "'{0.st_gid}', {0.st_size:d}, '{0.raw_dow}', '{0.raw_month}', " + "{0.raw_day:d}, '{0.raw_hms}', {0.raw_year:d}, " + "'{0.raw_name}')").format(self) def __str__(self): return self.name @property def islink(self): """``True`` if the file is a symbolic link. """ return self.raw_type == 'l' @property def isdir(self): """``True`` if the file is a directory or a symbolic link that points to a directory. """ from .os import stat as hpss_stat if self.islink: new_path = self.readlink if new_path.startswith('/'): return hpss_stat(new_path).isdir else: return hpss_stat(os.path.join(self.hpss_path, new_path)).isdir else: return self.raw_type == 'd' @property def readlink(self): """Destination of symbolic link. """ if self.islink: return self.raw_name.split('->')[1].strip() else: raise HpssOSError("Invalid argument: '{0}'".format(self.raw_name)) @property def name(self): """Name of the file. """ if self.islink: return self.raw_name.split('->')[0].rstrip('@ ') else: return self.raw_name @property def path(self): """Full path to the file. """ return os.path.join(self.hpss_path, self.name) @property def st_mode(self): """File permission mode. """ if 'st_mode' not in self._property_cache: try: mode = self._file_modes[self.raw_type] except KeyError: raise AttributeError(("Unknown file type, {0.raw_type}, " + "for {0.name}!").format(self)) if self.raw_permission[0] == 'r': mode |= stat.S_IRUSR if self.raw_permission[1] == 'w': mode |= stat.S_IWUSR if self.raw_permission[2] == 'x': mode |= stat.S_IXUSR if self.raw_permission[2] == 'S': mode |= stat.S_ISUID if self.raw_permission[2] == 's': mode |= (stat.S_IXUSR | stat.S_ISUID) if self.raw_permission[3] == 'r': mode |= stat.S_IRGRP if self.raw_permission[4] == 'w': mode |= stat.S_IWGRP if self.raw_permission[5] == 'x': mode |= stat.S_IXGRP if self.raw_permission[5] == 'S': mode |= stat.S_ISGID if self.raw_permission[5] == 's': mode |= (stat.S_IXGRP | stat.S_ISGID) if self.raw_permission[6] == 'r': mode |= stat.S_IROTH if self.raw_permission[7] == 'w': mode |= stat.S_IWOTH if self.raw_permission[8] == 'x': mode |= stat.S_IXOTH if self.raw_permission[8] == 'T': mode |= stat.S_ISVTX if self.raw_permission[8] == 't': mode |= (stat.S_IXOTH | stat.S_ISVTX) self._property_cache['st_mode'] = mode return self._property_cache['st_mode'] @property def st_mtime(self): """File modification time. """ if 'st_mtime' not in self._property_cache: # seconds = 0 month = self._months.index(self.raw_month) + 1 h, m, s = map(int, self.raw_hms.split(':')) mtime = int(datetime(self.raw_year, month, self.raw_day, h, m, s, tzinfo=self._pacific).strftime('%s')) self._property_cache['st_mtime'] = mtime return self._property_cache['st_mtime']
[docs] def htar_contents(self): """Return (and cache) the contents of an htar file. Returns ------- :class:`list` List containing the contents. """ if self.ishtar: if self._contents is None: out, err = htar('-t', '-f', self.path) self._contents = list() for line in out.split('\n'): m = self._htarre.match(line) if m is not None: self._contents.append(m.groups()) return self._contents else: return None
[docs]def get_hpss_dir(): """Return the directory containing HPSS commands. Returns ------- :class:`str` Full path to the directory containing HPSS commands. Raises ------ KeyError If the :envvar:`HPSS_DIR` environment variable has not been set. """ return os.path.join(os.environ['HPSS_DIR'], 'bin')
[docs]def get_tmpdir(**kwargs): """Return the path to a suitable temporary directory. Resolves the path to the temporary directory in the following order: 1. If ``tmpdir`` is present as a keyword argument, the value is returned. 2. If :envvar:`TMPDIR` is set, its value is returned. 3. If neither are set, ``/tmp`` is returned. Parameters ---------- kwargs : :class:`dict` Keyword arguments from another function may be passed to this function. If ``tmpdir`` is present as a key, its value will be returned. Returns ------- :class:`str` The name of a temporary directory. """ if 'tmpdir' in kwargs: t = kwargs['tmpdir'] elif 'TMPDIR' in os.environ: t = os.environ['TMPDIR'] else: t = '/tmp' return t
[docs]def hsi(*args, **kwargs): """Run :command:`hsi` with arguments. Parameters ---------- args : :func:`tuple` Arguments to be passed to :command:`hsi`. tmpdir : :class:`str`, optional Write temporary files to this directory. Defaults to the value returned by :func:`hpsspy.util.get_tmpdir`. This option must be passed as a keyword! Returns ------- :class:`str` The standard output from :command:`hsi`. Raises ------ KeyError If the :envvar:`HPSS_DIR` environment variable has not been set. """ path = get_hpss_dir() ofile = os.path.join(get_tmpdir(**kwargs), 'hsi.txt') base_command = [os.path.join(path, 'hsi'), '-O', ofile, '-s', 'archive'] command = base_command + list(args) status = call(command) with open(ofile) as o: out = o.read() if os.path.exists(ofile): os.remove(ofile) return out
[docs]def htar(*args): """Run :command:`htar` with arguments. Parameters ---------- args : :func:`tuple` Arguments to be passed to :command:`htar`. Returns ------- :func:`tuple` The standard output and standard error from :command:`htar`. Raises ------ KeyError If the :envvar:`HPSS_DIR` environment variable has not been set. """ outfile = TemporaryFile() errfile = TemporaryFile() path = get_hpss_dir() command = [os.path.join(path, 'htar')] + list(args) status = call(command, stdout=outfile, stderr=errfile) outfile.seek(0) out = outfile.read() errfile.seek(0) err = errfile.read() outfile.close() errfile.close() return (out.decode('utf8'), err.decode('utf8'))