Source code for hpsspy.scan

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-

Functions for scanning directory trees to find files in need of backup.
import csv
import json
import logging
import os
import re
import sys
from argparse import ArgumentParser
from pkg_resources import resource_exists, resource_stream
from . import __version__ as hpsspyVersion
from .os import makedirs, walk
from .util import get_tmpdir, hsi, htar

[docs]def validate_configuration(config): """Check the configuration file for validity. Parameters ---------- config : :class:`str` Name of the configuration file. Returns ------- :class:`int` An integer suitable for passing to :func:`sys.exit`. """ logger = logging.getLogger(__name__ + '.files_to_hpss') foo, xtn = os.path.splitext(os.path.basename(config)) if xtn != '.json': logger.warning("%s might not be a JSON file!", config) try: with open(config) as fp: try: json_data = json.load(fp) # except json.JSONDecodeError: except ValueError: logger.critical("%s is not valid JSON.", config) return 1 # except FileNotFoundError: except IOError: logger.critical("%s does not exist. Try again.", config) return 1 if '__config__' in json_data: for k in ('root', 'hpss_root', 'physical_disks'): if k not in json_data['__config__']: logger.warning("%s '__config__' section does not contain an " + "entry for '%s'.", config, k) else: logger.critical("%s does not contain a '__config__' section.", config) return 1 for k in json_data: if k == '__config__': continue if '__exclude__' not in json_data[k]: logger.warning("Section '%s' should at least have an " + "'__exclude__' entry.", k) try: new_map = compile_map(json_data, k) except re.error: logger.critical("Regular Expression error detected in " + "section '%s'!", k) return 1 return 0
[docs]def compile_map(old_map, section): """Compile the regular expressions in a map. Parameters ---------- old_map : :class:`dict` A dictionary containing regular expressions to compile. section : :class:`str` An initial key to determine the section of the dictionary of interest. Typically, this will be a top-level directory. Returns ------- :class:`dict` A new dictionary containing compiled regular expressions. """ new_map = dict() for key in old_map[section]: if key == '__exclude__': new_map[key] = frozenset(old_map[section][key]) else: foo = list() for r in old_map[section][key]: foo.append((re.compile(r), old_map[section][key][r])) new_map[key] = tuple(foo) return new_map
[docs]def files_to_hpss(hpss_map_cache, section): """Create a map of files on disk to HPSS files. Parameters ---------- hpss_map_cache : :class:`str` Data file containing the map. section : :class:`str` An initial key to determine the section of the dictionary of interest. Typically, this will be a top-level directory. Returns ------- :func:`tuple` A tuple contiaining the compiled mapping and an additional configuration dictionary. """ logger = logging.getLogger(__name__ + '.files_to_hpss') if os.path.exists(hpss_map_cache):"Found map file %s.", hpss_map_cache) with open(hpss_map_cache) as t: hpss_map = json.load(t) else: if resource_exists('hpsspy', 'data/' + hpss_map_cache):"Reading from file %s in the hpsspy distribution.", hpss_map_cache) t = resource_stream('hpsspy', 'data/' + hpss_map_cache) hpss_map = json.loads( t.close() else: logger.warning("Returning empty map file!") hpss_map = {"__config__": {}, "dr8": {"__exclude__": [], "casload": {}, "apogee": {}, "boss": {}, "sdss": {}}, "dr9": {"__exclude__": [], "casload": {}, "apogee": {}, "boss": {}, "sdss": {}}, "dr10": {"__exclude__": [], "casload": {}, "apogee": {}, "boss": {}, "sdss": {}}, "dr11": {"__exclude__": [], "casload": {}, "apogee": {}, "boss": {}, "marvels": {}, "sdss": {}}, "dr12": {"__exclude__": [], "casload": {}, "apo": {}, "apogee": {}, "boss": {}, "marvels": {}, "sdss": {}}} return (compile_map(hpss_map, section), hpss_map['__config__'])
[docs]def find_missing(hpss_map, hpss_files, disk_files_cache, missing_files, report=10000, limit=1024.0): """Compare HPSS files to disk files. Parameters ---------- hpss_map : :class:`dict` A mapping of file names to HPSS files. hpss_files : :class:`dict` The list of actual HPSS files. disk_files_cache : :class:`str` Name of the disk cache file. missing_files : :class:`str` Name of the file that will contain the list of missing files. report : :class:`int`, optional Print an informational message when N files have been scanned. limit : :class:`float`, optional HPSS archive files should be smaller than this size (in GB). Returns ------- :class:`bool` ``True`` if no serious problems were found. """ logger = logging.getLogger(__name__ + '.find_missing') nfiles = 0 nmissing = 0 nmultiple = 0 backups = dict() pattern_used = dict() section_warning = set() with open(disk_files_cache, newline='') as t: reader = csv.DictReader(t) for row in reader: f = row['Name'] nfiles += 1 if (nfiles % report) == 0:"%9d files scanned.", nfiles) if f in hpss_map["__exclude__"]:"%s is excluded.", f) continue section = f.split('/')[0] if section == f: # # Top-level section containing no subdirectories. # section = '__top__' try: s = hpss_map[section] except KeyError: # # If the section is not described, that's not # good, but continue. # if section not in section_warning: section_warning.add(section) logger.warning("Directory %s is not " + "described in the configuration!", section) continue if not s: # # If the section is blank, that's OK. # if section not in section_warning: section_warning.add(section) logger.warning("Directory %s is not configured!", section) continue # # Now check if it is mapped. # mapped = 0 for r in s: if r[0].pattern not in pattern_used: pattern_used[r[0].pattern] = 0 m = r[0].match(f) if m is not None: logger.debug("pattern_used[r'%s'] += 1", r[0].pattern) logger.debug("r[1] = r'%s'", r[1]) pattern_used[r[0].pattern] += 1 mapped += 1 if r[1] == "EXCLUDE": logger.debug("%s is excluded from backups.", f) elif r[1] == "AUTOMATED": logger.debug("%s is backed up by some other " + "automated process.", f) else: reName = r[0].sub(r[1], f) logger.debug("%s in %s.", f, reName) exists = reName in hpss_files if exists: newer = int(row['Mtime']) > hpss_files[reName][1] else: newer = False if newer: logger.warning("%s is newer than %s, " + "marking as missing!", f, reName) if reName in backups: backups[reName]['files'].append(f) backups[reName]['size'] += int(row['Size']) # # 'newer' can change from False to True, but # it should never change back to False. # if newer: backups[reName]['newer'] = newer else: backups[reName] = {'files': [f], 'size': int(row['Size']), 'newer': newer, 'exists': exists} if mapped == 0: logger.error("%s is not mapped to any file on HPSS!", f) nmissing += 1 if mapped > 1: logger.error("%s is mapped to multiple files on HPSS!", f) nmultiple += 1 for p in pattern_used: if pattern_used[p] == 0:"Pattern '%s' was never used, " + "maybe files have been removed from disk?", p) # # Eliminate backups that exist and have no newer files on disk. # missing = dict() nbackups = 0 for k, v in backups.items(): if v['exists'] and not v['newer']: logger.debug("%s is a valid backup.", k) else:'%s is %d bytes.', k, v['size']) if v['size']/1024/1024/1024 > limit: logger.error("HPSS file %s would be too large, " + "skipping backup!", k) else: logger.debug("Adding %s to missing backups.", k) nbackups += len(v['files']) missing[k] = v if nbackups > 0:'%d files selected for backup.', nbackups) with open(missing_files, 'w') as fp: json.dump(missing, fp, indent=2, separators=(',', ': ')) if nmissing > 0: logger.critical("Not all files would be backed up with " + "this configuration!") return False if nmultiple > 0: logger.critical("Some files would be backed up more than " + "once with this configuration!") return False return (nmissing == 0) and (nmultiple == 0)
[docs]def process_missing(missing_cache, disk_root, hpss_root, dirmode='2770', test=False): """Convert missing files into HPSS commands. Parameters ---------- missing_cache : :class:`str` Name of a JSON file containing the missing file data. disk_root : :class:`str` Missing files are relative to this root on disk. hpss_root : :class:`str` Missing files are relative to this root on HPSS. dirmode : :class:`str`, optional Create directories on HPSS with this mode (default ``drwxrws---``). test : :class:`bool`, optional Test mode. Try not to make any changes. """ logger = logging.getLogger(__name__ + '.process_missing') logger.debug("Processing missing files from %s.", missing_cache) with open(missing_cache) as fp: missing = json.load(fp) created_directories = set() start_directory = os.getcwd() for h in missing: h_file = os.path.join(hpss_root, h) if h.endswith('.tar'): disk_chdir = os.path.dirname(h) full_chdir = os.path.join(disk_root, disk_chdir) if h.endswith('_files.tar'): Lfile = os.path.join(get_tmpdir(), os.path.basename(h.replace('.tar', '.txt'))) logger.debug(Lfile) htar_dir = None Lfile_lines = ('\n'.join([os.path.basename(f) for f in missing[h]['files']]) + '\n') if test: logger.debug(Lfile_lines) else: with open(Lfile, 'w') as fp: fp.write(Lfile_lines) else: Lfile = None # # Be careful, because the directory name may itself # contain underscore characters, or X characters. # b = extract_directory_name(h) htar_dir = [] if os.path.isdir(os.path.join(full_chdir, b)): htar_dir = [b] elif b.endswith('X'): htar_re = re.compile(b.replace('X', '.') + '$') htar_dir = [d for d in os.listdir(full_chdir) if os.path.isdir(os.path.join(full_chdir, d)) and htar_re.match(d) is not None] else: logger.error(("Could not find directories corresponding " + "to %s!"), h) continue logger.debug("os.chdir('%s')", full_chdir) os.chdir(full_chdir) # # Avoid adding a trailing slash. # if disk_chdir: h_dir = os.path.join(hpss_root, disk_chdir) else: h_dir = hpss_root if h_dir not in created_directories: logger.debug("makedirs('%s', mode='%s')", h_dir, dirmode) if not test: makedirs(h_dir, mode=dirmode) created_directories.add(h_dir) if Lfile is None:"htar('-cvf', '%s', '-H', " + "'crc:verify=all', %s)", h_file, ', '.join(["'{0}'".format(h) for h in htar_dir])) if test: out, err = ('Test mode, skipping htar command.', '') else: out, err = htar('-cvf', h_file, '-H', 'crc:verify=all', *htar_dir) else:"htar('-cvf', '%s', '-H', 'crc:verify=all', " + "'-L', '%s')", h_file, Lfile) if test: out, err = ('Test mode, skipping htar command.', '') else: out, err = htar('-cvf', h_file, '-H', 'crc:verify=all', '-L', Lfile) logger.debug(out) if err: logger.warning(err) if Lfile is not None: logger.debug("os.remove('%s')", Lfile) if not test: os.remove(Lfile) else: d_h_file = os.path.dirname(h_file) if d_h_file not in created_directories: logger.debug("makedirs('%s', mode='%s')", d_h_file, dirmode) if not test: makedirs(d_h_file, mode=dirmode) created_directories.add(d_h_file)"hsi('put', '%s', ':', '%s')", os.path.join(disk_root, missing[h]['files'][0]), h_file) if test: out = "Test mode, skipping hsi command." else: out = hsi('put', os.path.join(disk_root, missing[h]['files'][0]), ':', h_file) logger.debug(out) logger.debug("os.chdir('%s')", start_directory) os.chdir(start_directory) return
[docs]def extract_directory_name(filename): """Extract a directory name from a HTAR `filename` that may contain various prefixes. Parameters ---------- filename : :class:`str` Name of HTAR file, including directory path. Returns ------- :class:`str` Name of a directory. """ d = os.path.dirname(filename) basefile = os.path.basename(filename).rsplit('.', 1)[0] # remove .tar if not d: return basefile prefix = d.replace('/', '_') + '_' try: i = basefile.index(prefix) except ValueError: try: prefix = '_'.join(prefix.split('_')[1:]) i = basefile.index(prefix) except ValueError: return basefile return basefile[(i + len(prefix)):]
[docs]def iterrsplit(s, c): """Split string `s` on `c` and rejoin on `c` from the end of `s`. Parameters ---------- s : :class:`str` String to split c : :class:`str` Split on this string. Returns ------- :class:`str` Iteratively return the joined parts of `s`. """ ss = s.split(c) i = -1 while abs(i) <= len(ss): yield c.join(ss[i:]) i -= 1 return
[docs]def scan_disk(disk_roots, disk_files_cache, overwrite=False): """Scan a directory tree on disk and cache the files found there. Parameters ---------- disk_roots : :class:`list` Name(s) of a directory in which to start the scan. disk_files_cache : :class:`str` Name of a file to hold the cache. overwrite : :class:`bool`, optional If ``True``, ignore any existing cache files. Returns ------- :class:`bool` Returns ``True`` if the cache is populated and ready to read. """ logger = logging.getLogger(__name__ + '.scan_disk') if os.path.exists(disk_files_cache) and not overwrite: logger.debug("Using existing file cache: %s", disk_files_cache) return True else:"No disk cache file, starting scan.") with open(disk_files_cache, 'w', newline='') as t: writer = csv.writer(t) writer.writerow(['Name', 'Size', 'Mtime']) for disk_root in disk_roots: logger.debug("Starting os.walk at %s.", disk_root) try: for root, dirs, files in os.walk(disk_root): logger.debug("Scanning disk directory %s.", root) for f in files: fullname = os.path.join(root, f) if not os.path.islink(fullname): cachename = fullname.replace(disk_root+'/', '') try: s = os.stat(fullname) except PermissionError as perr: logger.error("%s: %s", perr.strerror, perr.filename) continue try: writer.writerow([cachename, s.st_size, int(s.st_mtime)]) except UnicodeEncodeError as e: logger.error("Could not write %s to cache file due to unusual characters!", fullname.encode(errors='surrogatepass')) logger.error("Message was: %s.", str(e)) except OSError as oerr: logger.error('Exception encountered while traversing %s!', disk_root) logger.error(oerr.strerror) return False return True
[docs]def scan_hpss(hpss_root, hpss_files_cache, overwrite=False): """Scan a directory on HPSS and return the files found there. Parameters ---------- hpss_root : :class:`str` Name of a directory in which to start the scan. hpss_files_cache : :class:`str` Name of a file to hold the cache. overwrite : :class:`bool`, optional If ``True``, ignore any existing cache files. Returns ------- :class:`dict` The set of files found on HPSS, with size and modification time. """ logger = logging.getLogger(__name__ + '.scan_hpss') hpss_files = dict() if os.path.exists(hpss_files_cache) and not overwrite:"Found cache file %s.", hpss_files_cache) with open(hpss_files_cache, newline='') as t: reader = csv.DictReader(t) for row in reader: hpss_files[row['Name']] = (int(row['Size']), int(row['Mtime'])) else:"No HPSS cache file, starting scan at %s.", hpss_root) with open(hpss_files_cache, 'w', newline='') as t: w = csv.writer(t) w.writerow(['Name', 'Size', 'Mtime']) for root, dirs, files in walk(hpss_root): logger.debug("Scanning HPSS directory %s.", root) for f in files: if not f.path.endswith('.idx'): ff = f.path.replace(hpss_root+'/', '') hpss_files[ff] = (f.st_size, f.st_mtime) w.writerow([ff, f.st_size, f.st_mtime]) return hpss_files
[docs]def physical_disks(release_root, config): """Convert a root path into a list of physical disks containing data. Parameters ---------- release_root : :class:`str` The "official" path to the data. config : :class:`dict` A dictionary containing path information. Returns ------- :func:`tuple` A tuple containing the physical disk paths. """ try: pd = config['physical_disks'] except KeyError: return (release_root,) if not pd: return (release_root,) broot = os.path.basename(config['root']) if ((len(pd) == 1) and (pd[0] == broot)): return (release_root,) if pd[0].startswith('/'): roots = [os.path.join(d, os.path.basename(release_root)) for d in pd] else: roots = [release_root.replace(broot, d) for d in pd] # # Is any root a pure symlink to another root? # remove = list() for r in roots: if os.path.islink(r): rr = os.readlink(r) if rr.startswith('.'): rr = os.path.normpath(os.path.join(config['root'], rr)) if rr in roots: remove.append(r) rs = set(roots) for r in remove: rs.remove(r) return tuple(rs)
[docs]def _options(): """Parse command-line options. Returns ------- :class:`argparse.Namespace` The parsed command-line arguments. """ desc = 'Verify the presence of files on HPSS.' parser = ArgumentParser(prog=os.path.basename(sys.argv[0]), description=desc) parser.add_argument('-c', '--cache-dir', action='store', dest='cache', metavar='DIR', default=os.path.join(os.environ['HOME'], 'cache'), help=('Write cache files to DIR (Default: ' + '%(default)s).')) parser.add_argument('-D', '--overwrite-disk', action='store_true', dest='overwrite_disk', help='Ignore any existing disk cache files.') parser.add_argument('-E', '--exit-on-error', action='store_true', dest='errexit', help='Exit if an error is detected in the file analysis stages.') parser.add_argument('-H', '--overwrite-hpss', action='store_true', dest='overwrite_hpss', help='Ignore any existing HPSS cache files.') parser.add_argument('-l', '--size-limit', action='store', type=float, dest='limit', metavar='N', default=1024.0, help=("Do not allow archive files larger than " + "N GB (Default: %(default)s GB).")) parser.add_argument('-p', '--process', action='store_true', dest='process', help=('Process the list of missing files to produce ' + 'HPSS commands.')) parser.add_argument('-r', '--report', action='store', type=int, dest='report', metavar='N', default=10000, help=("Print an informational message after " + "every N files (Default: %(default)s).")) parser.add_argument('-t', '--test', action='store_true', dest='test', help="Test mode. Try not to make any changes.") parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', help="Increase verbosity. Increase it a lot.") parser.add_argument('-V', '--version', action='version', version="%(prog)s " + hpsspyVersion) parser.add_argument('config', metavar='FILE', help="Read configuration from FILE.") parser.add_argument('release', metavar='SECTION', help="Read SECTION from the configuration file.") return parser.parse_args()
[docs]def main(): """Entry-point for command-line scripts. Returns ------- :class:`int` An integer suitable for passing to :func:`sys.exit`. """ # # Options # options = _options() # # Logging # ll = logging.WARNING if options.test: ll = logging.INFO if options.verbose: ll = logging.DEBUG log_format = '%(asctime)s %(name)s %(levelname)s: %(message)s' logging.basicConfig(level=ll, format=log_format, datefmt='%Y-%m-%dT%H:%M:%S') logger = logging.getLogger(__name__) # # Config file # status = validate_configuration(options.config) if status > 0: return status hpss_map, config = files_to_hpss(options.config, options.release) release_root = os.path.join(config['root'], options.release) hpss_release_root = os.path.join(config['hpss_root'], options.release) # # Read HPSS files and cache. # if options.test:"Test mode. Pretending no files exist on HPSS.") hpss_files = dict() else: logger.debug("Cache files will be written to %s.", options.cache) hpss_files_cache = os.path.join(options.cache, ('hpss_files_' + '{0}.csv').format(options.release)) logger.debug("hpss_files_cache = '%s'", hpss_files_cache) hpss_files = scan_hpss(hpss_release_root, hpss_files_cache, overwrite=options.overwrite_hpss) # # Read disk files and cache. # disk_files_cache = os.path.join(options.cache, ('disk_files_' + '{0}.csv').format(options.release)) logger.debug("disk_files_cache = '%s'", disk_files_cache) disk_roots = physical_disks(release_root, config) status = scan_disk(disk_roots, disk_files_cache, overwrite=options.overwrite_disk) if options.errexit and not status: return 1 # # See if the files are on HPSS. # missing_files_cache = os.path.join(options.cache, ('missing_files_' + '{0}.json').format(options.release)) logger.debug("missing_files_cache = '%s'", missing_files_cache) status = find_missing(hpss_map, hpss_files, disk_files_cache, missing_files_cache,, options.limit) if options.errexit and not status: return 1 # # Post process to generate HPSS commands # if options.process or options.test: process_missing(missing_files_cache, release_root, hpss_release_root, test=options.test) return 0