Source code for hpsspy.os._os

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
hpsspy.os._os
~~~~~~~~~~~~~

Contains the actual functions in :mod:`hpsspy.os`.
"""
from os.path import join
from .path import islink
from .. import HpssOSError
from ..util import HpssFile, hsi
import re

__all__ = ['chmod', 'listdir', 'makedirs', 'mkdir', 'stat', 'lstat', 'walk']

linere = re.compile(r"""([dl-])           # file type
                        ([rwxsStT-]+)\s+  # file permissions
                        (\d+)\s+          # number of links
                        (\S+)\s+          # user
                        (\S+)\s+          # group
                        (\d+)\s+          # size
                        ([A-Za-z]+)\s+    # day of week
                        ([A-Za-z]+)\s+    # month
                        (\d+)\s+          # day
                        ([0-9:]+)\s       # H:M:S
                        ([0-9]+)\s        # year
                        (.*)$             # filename
                        """, re.VERBOSE)


[docs]def chmod(path, mode): """Reproduces the behavior of :func:`os.chmod` for HPSS files. Parameters ---------- path : :class:`str` File to chmod. mode : :class:`str` or :class:`int` Desired file permissions. This mode will be converted to a string. Raises ------ :class:`~hpsspy.HpssOSError` If the underlying :command:`hsi` reports an error. """ out = hsi('chmod', str(mode), path) if out.startswith('**'): raise HpssOSError(out) return
def _ls(path, options=''): """Perform :command:`hsi ls` and parse the results. Parameters ---------- path : :class:`str` Directory or file to examine. options : :class:`str`, optional Options to ``ls`` that will be appended to a base set of options. The base set is ``-D``, which is needed for high-precision timestamps. Returns ------- :class:`list` A list of :class:`~hpsspy.util.HpssFile` objects. """ out = hsi('ls', '-D' + options, path) if out.startswith('**'): raise HpssOSError(out) lines = out.split('\n') lspath = path # sometimes you don't get the path echoed back. files = list() for f in lines: if len(f) == 0: continue m = linere.match(f) if m is None: if f.endswith(':'): lspath = f.strip(': ') else: raise HpssOSError("Could not match line!\n{0}".format(f)) else: g = m.groups() files.append(HpssFile(lspath, *g)) return files
[docs]def listdir(path): """List the contents of an HPSS directory, similar to :func:`os.listdir`. Parameters ---------- path : :class:`str` Directory to examine. Returns ------- :class:`list` A list of :class:`~hpsspy.util.HpssFile` objects. Raises ------ :class:`~hpsspy.HpssOSError` If the underlying :command:`hsi` reports an error. """ files = _ls(path, options='a') # # Create a unique set of filenames for use below. # fileset = set([f.name for f in files]) # # Go back and identify htar files # for f in files: if f.name.endswith('.tar') and f.name + '.idx' in fileset: f.ishtar = True return files
[docs]def makedirs(path, mode=None): """Reproduces the behavior of :func:`os.makedirs`. Parameters ---------- path : :class:`str` Directory to create. mode : :class:`str`, optional String representation of the octal directory mode. Raises ------ :class:`~hpsspy.HpssOSError` If the underlying :command:`hsi` reports an error. Notes ----- Unlike :func:`os.makedirs`, attempts to create existing directories raise no exception. """ if mode is None: out = hsi('mkdir', '-p', path) else: out = hsi('mkdir', '-p', '-m', mode, path) if out.startswith('**'): raise HpssOSError(out) return
[docs]def mkdir(path, mode=None): """Reproduces the behavior of :func:`os.mkdir`. Parameters ---------- path : :class:`str` Directory to create. mode : :class:`str`, optional String representation of the octal directory mode. Raises ------ :class:`~hpsspy.HpssOSError` If the underlying :command:`hsi` reports an error. Notes ----- Unlike :func:`os.mkdir`, attempts to create existing directories raise no exception. """ if mode is None: out = hsi('mkdir', path) else: out = hsi('mkdir', '-m', mode, path) if out.startswith('**'): raise HpssOSError(out) return
[docs]def stat(path, follow_symlinks=True): """Perform the equivalent of :func:`os.stat` on the HPSS file `path`. Parameters ---------- path : :class:`str` Path to file or directory. follow_symlinks : :class:`bool`, optional If ``False``, makes :func:`stat` behave like :func:`os.lstat`. Returns ------- :class:`~hpsspy.util.HpssFile` An object that contains information similar to the data returned by :func:`os.stat`. Raises ------ :class:`~hpsspy.HpssOSError` If the underlying :command:`hsi ls` reports an error. """ files = _ls(path, options='d') if len(files) != 1: raise HpssOSError("Non-unique response for {0}!".format(path)) if files[0].islink and follow_symlinks: return stat(files[0].readlink) else: return files[0]
[docs]def lstat(path): """Perform the equivalent of :func:`os.lstat` on the HPSS file `path`. Parameters ---------- path : :class:`str` Path to file or directory. Returns ------- :class:`~hpsspy.util.HpssFile` An object that contains information similar to the data returned by :func:`os.stat`. Raises ------ :class:`~hpsspy.HpssOSError` If the underlying :command:`hsi` reports an error. """ return stat(path, follow_symlinks=False)
[docs]def walk(top, topdown=True, onerror=None, followlinks=False): """Traverse a directory tree on HPSS, similar to :func:`os.walk`. Parameters ---------- top : :class:`str` Starting directory. topdown : :class:`bool`, optional Direction to traverse the directory tree. onerror : callable, optional Call this function if an error is detected. followlinks : :class:`bool`, optional If ``True`` symlinks to directories are treated as directories. Returns ------- iterable This function can be used in the same way as :func:`os.walk`. """ # # We may not have read permission for top, in which case we can't # get a list of the files the directory contains. os.path.walk # always suppressed the exception then, rather than blow up for a # minor reason when (say) a thousand readable directories are still # left to visit. That logic is copied here. # try: names = listdir(top) except HpssOSError as err: if onerror is not None: onerror(err) return dirs, nondirs = [], [] for name in names: if name.isdir: dirs.append(name) else: nondirs.append(name) if topdown: yield top, dirs, nondirs for name in dirs: new_path = join(top, str(name)) if followlinks or not islink(new_path): for x in walk(new_path, topdown, onerror, followlinks): yield x if not topdown: yield top, dirs, nondirs