# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
hpsspy.scan
~~~~~~~~~~~
Functions for scanning directory trees to find files in need of backup.
"""
import csv
import json
import logging
import os
import re
import sys
from argparse import ArgumentParser
from pkg_resources import resource_exists, resource_stream
from . import __version__ as hpsspyVersion
from .os import makedirs, walk
from .util import get_tmpdir, hsi, htar
[docs]def validate_configuration(config):
"""Check the configuration file for validity.
Parameters
----------
config : :class:`str`
Name of the configuration file.
Returns
-------
:class:`int`
An integer suitable for passing to :func:`sys.exit`.
"""
logger = logging.getLogger(__name__ + '.files_to_hpss')
foo, xtn = os.path.splitext(os.path.basename(config))
if xtn != '.json':
logger.warning("%s might not be a JSON file!", config)
try:
with open(config) as fp:
try:
json_data = json.load(fp)
# except json.JSONDecodeError:
except ValueError:
logger.critical("%s is not valid JSON.", config)
return 1
# except FileNotFoundError:
except IOError:
logger.critical("%s does not exist. Try again.", config)
return 1
if '__config__' in json_data:
for k in ('root', 'hpss_root', 'physical_disks'):
if k not in json_data['__config__']:
logger.warning("%s '__config__' section does not contain an " +
"entry for '%s'.", config, k)
else:
logger.critical("%s does not contain a '__config__' section.", config)
return 1
for k in json_data:
if k == '__config__':
continue
if '__exclude__' not in json_data[k]:
logger.warning("Section '%s' should at least have an " +
"'__exclude__' entry.", k)
try:
new_map = compile_map(json_data, k)
except re.error:
logger.critical("Regular Expression error detected in " +
"section '%s'!", k)
return 1
return 0
[docs]def compile_map(old_map, section):
"""Compile the regular expressions in a map.
Parameters
----------
old_map : :class:`dict`
A dictionary containing regular expressions to compile.
section : :class:`str`
An initial key to determine the section of the dictionary of interest.
Typically, this will be a top-level directory.
Returns
-------
:class:`dict`
A new dictionary containing compiled regular expressions.
"""
new_map = dict()
for key in old_map[section]:
if key == '__exclude__':
new_map[key] = frozenset(old_map[section][key])
else:
foo = list()
for r in old_map[section][key]:
foo.append((re.compile(r), old_map[section][key][r]))
new_map[key] = tuple(foo)
return new_map
[docs]def files_to_hpss(hpss_map_cache, section):
"""Create a map of files on disk to HPSS files.
Parameters
----------
hpss_map_cache : :class:`str`
Data file containing the map.
section : :class:`str`
An initial key to determine the section of the dictionary of interest.
Typically, this will be a top-level directory.
Returns
-------
:func:`tuple`
A tuple contiaining the compiled mapping and an additional
configuration dictionary.
"""
logger = logging.getLogger(__name__ + '.files_to_hpss')
if os.path.exists(hpss_map_cache):
logger.info("Found map file %s.", hpss_map_cache)
with open(hpss_map_cache) as t:
hpss_map = json.load(t)
else:
if resource_exists('hpsspy', 'data/' + hpss_map_cache):
logger.info("Reading from file %s in the hpsspy distribution.",
hpss_map_cache)
t = resource_stream('hpsspy', 'data/' + hpss_map_cache)
hpss_map = json.loads(t.read().decode())
t.close()
else:
logger.warning("Returning empty map file!")
hpss_map = {"__config__": {},
"dr8": {"__exclude__": [], "casload": {},
"apogee": {}, "boss": {}, "sdss": {}},
"dr9": {"__exclude__": [], "casload": {},
"apogee": {}, "boss": {}, "sdss": {}},
"dr10": {"__exclude__": [], "casload": {},
"apogee": {}, "boss": {}, "sdss": {}},
"dr11": {"__exclude__": [], "casload": {},
"apogee": {}, "boss": {}, "marvels": {},
"sdss": {}},
"dr12": {"__exclude__": [], "casload": {}, "apo": {},
"apogee": {}, "boss": {}, "marvels": {},
"sdss": {}}}
return (compile_map(hpss_map, section), hpss_map['__config__'])
[docs]def find_missing(hpss_map, hpss_files, disk_files_cache, missing_files,
report=10000, limit=1024.0):
"""Compare HPSS files to disk files.
Parameters
----------
hpss_map : :class:`dict`
A mapping of file names to HPSS files.
hpss_files : :class:`dict`
The list of actual HPSS files.
disk_files_cache : :class:`str`
Name of the disk cache file.
missing_files : :class:`str`
Name of the file that will contain the list of missing files.
report : :class:`int`, optional
Print an informational message when N files have been scanned.
limit : :class:`float`, optional
HPSS archive files should be smaller than this size (in GB).
Returns
-------
:class:`bool`
``True`` if no serious problems were found.
"""
logger = logging.getLogger(__name__ + '.find_missing')
nfiles = 0
nmissing = 0
nmultiple = 0
backups = dict()
pattern_used = dict()
section_warning = set()
with open(disk_files_cache, newline='') as t:
reader = csv.DictReader(t)
for row in reader:
f = row['Name']
nfiles += 1
if (nfiles % report) == 0:
logger.info("%9d files scanned.", nfiles)
if f in hpss_map["__exclude__"]:
logger.info("%s is excluded.", f)
continue
section = f.split('/')[0]
if section == f:
#
# Top-level section containing no subdirectories.
#
section = '__top__'
try:
s = hpss_map[section]
except KeyError:
#
# If the section is not described, that's not
# good, but continue.
#
if section not in section_warning:
section_warning.add(section)
logger.warning("Directory %s is not " +
"described in the configuration!",
section)
continue
if not s:
#
# If the section is blank, that's OK.
#
if section not in section_warning:
section_warning.add(section)
logger.warning("Directory %s is not configured!", section)
continue
#
# Now check if it is mapped.
#
mapped = 0
for r in s:
if r[0].pattern not in pattern_used:
pattern_used[r[0].pattern] = 0
m = r[0].match(f)
if m is not None:
logger.debug("pattern_used[r'%s'] += 1", r[0].pattern)
logger.debug("r[1] = r'%s'", r[1])
pattern_used[r[0].pattern] += 1
mapped += 1
if r[1] == "EXCLUDE":
logger.debug("%s is excluded from backups.", f)
elif r[1] == "AUTOMATED":
logger.debug("%s is backed up by some other " +
"automated process.", f)
else:
reName = r[0].sub(r[1], f)
logger.debug("%s in %s.", f, reName)
exists = reName in hpss_files
if exists:
newer = int(row['Mtime']) > hpss_files[reName][1]
else:
newer = False
if newer:
logger.warning("%s is newer than %s, " +
"marking as missing!",
f, reName)
if reName in backups:
backups[reName]['files'].append(f)
backups[reName]['size'] += int(row['Size'])
#
# 'newer' can change from False to True, but
# it should never change back to False.
#
if newer:
backups[reName]['newer'] = newer
else:
backups[reName] = {'files': [f],
'size': int(row['Size']),
'newer': newer,
'exists': exists}
if mapped == 0:
logger.error("%s is not mapped to any file on HPSS!", f)
nmissing += 1
if mapped > 1:
logger.error("%s is mapped to multiple files on HPSS!", f)
nmultiple += 1
for p in pattern_used:
if pattern_used[p] == 0:
logger.info("Pattern '%s' was never used, " +
"maybe files have been removed from disk?", p)
#
# Eliminate backups that exist and have no newer files on disk.
#
missing = dict()
nbackups = 0
for k, v in backups.items():
if v['exists'] and not v['newer']:
logger.debug("%s is a valid backup.", k)
else:
logger.info('%s is %d bytes.', k, v['size'])
if v['size']/1024/1024/1024 > limit:
logger.error("HPSS file %s would be too large, " +
"skipping backup!", k)
else:
logger.debug("Adding %s to missing backups.", k)
nbackups += len(v['files'])
missing[k] = v
if nbackups > 0:
logger.info('%d files selected for backup.', nbackups)
with open(missing_files, 'w') as fp:
json.dump(missing, fp, indent=2, separators=(',', ': '))
if nmissing > 0:
logger.critical("Not all files would be backed up with " +
"this configuration!")
return False
if nmultiple > 0:
logger.critical("Some files would be backed up more than " +
"once with this configuration!")
return False
return (nmissing == 0) and (nmultiple == 0)
[docs]def process_missing(missing_cache, disk_root, hpss_root, dirmode='2770',
test=False):
"""Convert missing files into HPSS commands.
Parameters
----------
missing_cache : :class:`str`
Name of a JSON file containing the missing file data.
disk_root : :class:`str`
Missing files are relative to this root on disk.
hpss_root : :class:`str`
Missing files are relative to this root on HPSS.
dirmode : :class:`str`, optional
Create directories on HPSS with this mode (default ``drwxrws---``).
test : :class:`bool`, optional
Test mode. Try not to make any changes.
"""
logger = logging.getLogger(__name__ + '.process_missing')
logger.debug("Processing missing files from %s.", missing_cache)
with open(missing_cache) as fp:
missing = json.load(fp)
created_directories = set()
start_directory = os.getcwd()
for h in missing:
h_file = os.path.join(hpss_root, h)
if h.endswith('.tar'):
disk_chdir = os.path.dirname(h)
full_chdir = os.path.join(disk_root, disk_chdir)
if h.endswith('_files.tar'):
Lfile = os.path.join(get_tmpdir(),
os.path.basename(h.replace('.tar',
'.txt')))
logger.debug(Lfile)
htar_dir = None
Lfile_lines = ('\n'.join([os.path.basename(f)
for f in missing[h]['files']]) +
'\n')
if test:
logger.debug(Lfile_lines)
else:
with open(Lfile, 'w') as fp:
fp.write(Lfile_lines)
else:
Lfile = None
#
# Be careful, because the directory name may itself
# contain underscore characters, or X characters.
#
b = extract_directory_name(h)
htar_dir = []
if os.path.isdir(os.path.join(full_chdir, b)):
htar_dir = [b]
elif b.endswith('X'):
htar_re = re.compile(b.replace('X', '.') + '$')
htar_dir = [d for d in os.listdir(full_chdir)
if os.path.isdir(os.path.join(full_chdir, d))
and htar_re.match(d) is not None]
else:
logger.error(("Could not find directories corresponding " +
"to %s!"), h)
continue
logger.debug("os.chdir('%s')", full_chdir)
os.chdir(full_chdir)
#
# Avoid adding a trailing slash.
#
if disk_chdir:
h_dir = os.path.join(hpss_root, disk_chdir)
else:
h_dir = hpss_root
if h_dir not in created_directories:
logger.debug("makedirs('%s', mode='%s')", h_dir, dirmode)
if not test:
makedirs(h_dir, mode=dirmode)
created_directories.add(h_dir)
if Lfile is None:
logger.info("htar('-cvf', '%s', '-H', " +
"'crc:verify=all', %s)", h_file,
', '.join(["'{0}'".format(h) for h in htar_dir]))
if test:
out, err = ('Test mode, skipping htar command.', '')
else:
out, err = htar('-cvf', h_file, '-H', 'crc:verify=all',
*htar_dir)
else:
logger.info("htar('-cvf', '%s', '-H', 'crc:verify=all', " +
"'-L', '%s')", h_file, Lfile)
if test:
out, err = ('Test mode, skipping htar command.', '')
else:
out, err = htar('-cvf', h_file, '-H', 'crc:verify=all',
'-L', Lfile)
logger.debug(out)
if err:
logger.warning(err)
if Lfile is not None:
logger.debug("os.remove('%s')", Lfile)
if not test:
os.remove(Lfile)
else:
d_h_file = os.path.dirname(h_file)
if d_h_file not in created_directories:
logger.debug("makedirs('%s', mode='%s')", d_h_file, dirmode)
if not test:
makedirs(d_h_file, mode=dirmode)
created_directories.add(d_h_file)
logger.info("hsi('put', '%s', ':', '%s')",
os.path.join(disk_root, missing[h]['files'][0]),
h_file)
if test:
out = "Test mode, skipping hsi command."
else:
out = hsi('put',
os.path.join(disk_root, missing[h]['files'][0]),
':', h_file)
logger.debug(out)
logger.debug("os.chdir('%s')", start_directory)
os.chdir(start_directory)
return
[docs]def iterrsplit(s, c):
"""Split string `s` on `c` and rejoin on `c` from the end of `s`.
Parameters
----------
s : :class:`str`
String to split
c : :class:`str`
Split on this string.
Returns
-------
:class:`str`
Iteratively return the joined parts of `s`.
"""
ss = s.split(c)
i = -1
while abs(i) <= len(ss):
yield c.join(ss[i:])
i -= 1
return
[docs]def scan_disk(disk_roots, disk_files_cache, overwrite=False):
"""Scan a directory tree on disk and cache the files found there.
Parameters
----------
disk_roots : :class:`list`
Name(s) of a directory in which to start the scan.
disk_files_cache : :class:`str`
Name of a file to hold the cache.
overwrite : :class:`bool`, optional
If ``True``, ignore any existing cache files.
Returns
-------
:class:`bool`
Returns ``True`` if the cache is populated and ready to read.
"""
logger = logging.getLogger(__name__ + '.scan_disk')
if os.path.exists(disk_files_cache) and not overwrite:
logger.debug("Using existing file cache: %s", disk_files_cache)
return True
else:
logger.info("No disk cache file, starting scan.")
with open(disk_files_cache, 'w', newline='') as t:
writer = csv.writer(t)
writer.writerow(['Name', 'Size', 'Mtime'])
for disk_root in disk_roots:
logger.debug("Starting os.walk at %s.", disk_root)
try:
for root, dirs, files in os.walk(disk_root):
logger.debug("Scanning disk directory %s.", root)
for f in files:
fullname = os.path.join(root, f)
if not os.path.islink(fullname):
cachename = fullname.replace(disk_root+'/', '')
try:
s = os.stat(fullname)
except PermissionError as perr:
logger.error("%s: %s",
perr.strerror, perr.filename)
continue
try:
writer.writerow([cachename,
s.st_size,
int(s.st_mtime)])
except UnicodeEncodeError as e:
logger.error("Could not write %s to cache file due to unusual characters!",
fullname.encode(errors='surrogatepass'))
logger.error("Message was: %s.", str(e))
except OSError as oerr:
logger.error('Exception encountered while traversing %s!', disk_root)
logger.error(oerr.strerror)
return False
return True
[docs]def scan_hpss(hpss_root, hpss_files_cache, overwrite=False):
"""Scan a directory on HPSS and return the files found there.
Parameters
----------
hpss_root : :class:`str`
Name of a directory in which to start the scan.
hpss_files_cache : :class:`str`
Name of a file to hold the cache.
overwrite : :class:`bool`, optional
If ``True``, ignore any existing cache files.
Returns
-------
:class:`dict`
The set of files found on HPSS, with size and modification time.
"""
logger = logging.getLogger(__name__ + '.scan_hpss')
hpss_files = dict()
if os.path.exists(hpss_files_cache) and not overwrite:
logger.info("Found cache file %s.", hpss_files_cache)
with open(hpss_files_cache, newline='') as t:
reader = csv.DictReader(t)
for row in reader:
hpss_files[row['Name']] = (int(row['Size']), int(row['Mtime']))
else:
logger.info("No HPSS cache file, starting scan at %s.", hpss_root)
with open(hpss_files_cache, 'w', newline='') as t:
w = csv.writer(t)
w.writerow(['Name', 'Size', 'Mtime'])
for root, dirs, files in walk(hpss_root):
logger.debug("Scanning HPSS directory %s.", root)
for f in files:
if not f.path.endswith('.idx'):
ff = f.path.replace(hpss_root+'/', '')
hpss_files[ff] = (f.st_size, f.st_mtime)
w.writerow([ff, f.st_size, f.st_mtime])
return hpss_files
[docs]def physical_disks(release_root, config):
"""Convert a root path into a list of physical disks containing data.
Parameters
----------
release_root : :class:`str`
The "official" path to the data.
config : :class:`dict`
A dictionary containing path information.
Returns
-------
:func:`tuple`
A tuple containing the physical disk paths.
"""
try:
pd = config['physical_disks']
except KeyError:
return (release_root,)
if not pd:
return (release_root,)
broot = os.path.basename(config['root'])
if ((len(pd) == 1) and (pd[0] == broot)):
return (release_root,)
if pd[0].startswith('/'):
roots = [os.path.join(d, os.path.basename(release_root))
for d in pd]
else:
roots = [release_root.replace(broot, d) for d in pd]
#
# Is any root a pure symlink to another root?
#
remove = list()
for r in roots:
if os.path.islink(r):
rr = os.readlink(r)
if rr.startswith('.'):
rr = os.path.normpath(os.path.join(config['root'], rr))
if rr in roots:
remove.append(r)
rs = set(roots)
for r in remove:
rs.remove(r)
return tuple(rs)
[docs]def _options():
"""Parse command-line options.
Returns
-------
:class:`argparse.Namespace`
The parsed command-line arguments.
"""
desc = 'Verify the presence of files on HPSS.'
parser = ArgumentParser(prog=os.path.basename(sys.argv[0]), description=desc)
parser.add_argument('-c', '--cache-dir', action='store', dest='cache',
metavar='DIR',
default=os.path.join(os.environ['HOME'], 'cache'),
help=('Write cache files to DIR (Default: ' +
'%(default)s).'))
parser.add_argument('-D', '--overwrite-disk', action='store_true',
dest='overwrite_disk',
help='Ignore any existing disk cache files.')
parser.add_argument('-E', '--exit-on-error', action='store_true',
dest='errexit',
help='Exit if an error is detected in the file analysis stages.')
parser.add_argument('-H', '--overwrite-hpss', action='store_true',
dest='overwrite_hpss',
help='Ignore any existing HPSS cache files.')
parser.add_argument('-l', '--size-limit', action='store', type=float,
dest='limit', metavar='N', default=1024.0,
help=("Do not allow archive files larger than " +
"N GB (Default: %(default)s GB)."))
parser.add_argument('-p', '--process', action='store_true',
dest='process',
help=('Process the list of missing files to produce ' +
'HPSS commands.'))
parser.add_argument('-r', '--report', action='store', type=int,
dest='report', metavar='N', default=10000,
help=("Print an informational message after " +
"every N files (Default: %(default)s)."))
parser.add_argument('-t', '--test', action='store_true',
dest='test',
help="Test mode. Try not to make any changes.")
parser.add_argument('-v', '--verbose', action='store_true',
dest='verbose',
help="Increase verbosity. Increase it a lot.")
parser.add_argument('-V', '--version', action='version',
version="%(prog)s " + hpsspyVersion)
parser.add_argument('config', metavar='FILE',
help="Read configuration from FILE.")
parser.add_argument('release', metavar='SECTION',
help="Read SECTION from the configuration file.")
return parser.parse_args()
[docs]def main():
"""Entry-point for command-line scripts.
Returns
-------
:class:`int`
An integer suitable for passing to :func:`sys.exit`.
"""
#
# Options
#
options = _options()
#
# Logging
#
ll = logging.WARNING
if options.test:
ll = logging.INFO
if options.verbose:
ll = logging.DEBUG
log_format = '%(asctime)s %(name)s %(levelname)s: %(message)s'
logging.basicConfig(level=ll, format=log_format,
datefmt='%Y-%m-%dT%H:%M:%S')
logger = logging.getLogger(__name__)
#
# Config file
#
status = validate_configuration(options.config)
if status > 0:
return status
hpss_map, config = files_to_hpss(options.config, options.release)
release_root = os.path.join(config['root'], options.release)
hpss_release_root = os.path.join(config['hpss_root'], options.release)
#
# Read HPSS files and cache.
#
if options.test:
logger.info("Test mode. Pretending no files exist on HPSS.")
hpss_files = dict()
else:
logger.debug("Cache files will be written to %s.", options.cache)
hpss_files_cache = os.path.join(options.cache,
('hpss_files_' +
'{0}.csv').format(options.release))
logger.debug("hpss_files_cache = '%s'", hpss_files_cache)
hpss_files = scan_hpss(hpss_release_root, hpss_files_cache,
overwrite=options.overwrite_hpss)
#
# Read disk files and cache.
#
disk_files_cache = os.path.join(options.cache,
('disk_files_' +
'{0}.csv').format(options.release))
logger.debug("disk_files_cache = '%s'", disk_files_cache)
disk_roots = physical_disks(release_root, config)
status = scan_disk(disk_roots, disk_files_cache,
overwrite=options.overwrite_disk)
if options.errexit and not status:
return 1
#
# See if the files are on HPSS.
#
missing_files_cache = os.path.join(options.cache,
('missing_files_' +
'{0}.json').format(options.release))
logger.debug("missing_files_cache = '%s'", missing_files_cache)
status = find_missing(hpss_map, hpss_files, disk_files_cache,
missing_files_cache, options.report, options.limit)
if options.errexit and not status:
return 1
#
# Post process to generate HPSS commands
#
if options.process or options.test:
process_missing(missing_files_cache, release_root, hpss_release_root,
test=options.test)
return 0