Source code for ewoksid22.dirutils

import logging
import os
import shutil
import socket
import subprocess
from typing import Dict
from typing import Iterator
from typing import Optional
from typing import Tuple

HOST_NAME = socket.gethostname()

OutDirType = Tuple[Optional[str], str]
OutDirsType = Dict[str, OutDirType]

logger = logging.getLogger(__name__)


[docs] def prepare_outdirs(outdirs: Dict[str, str], primary_outdir: str) -> OutDirsType: """ :param outdirs: maps names to local directories (/users/opid22/data1/) or remote directories (opid22@diffract22new:/users/opid22/data1/) :param primary_outdir: "primary" directory or if there is already, "processed", "processed1", "processed2", etc. :raise KeyError: no "primary" file. :raise ValueError: "primary" file is not local. """ if outdirs is None: outdirs = dict() else: outdirs = dict(outdirs) if primary_outdir: for key in _outdir_key_generator(): if key not in outdirs: break outdirs[key] = primary_outdir outdirs = {name: parse_outdir(dirname) for name, dirname in outdirs.items()} if outdirs.get("primary"): userhost, _ = outdirs["primary"] if userhost: raise ValueError( "The primary output directory should be locally accessible" ) return outdirs
def _outdir_key_generator() -> Iterator[str]: yield "primary" yield "processed" i = 0 while True: i += 1 yield f"processed{i}"
[docs] def parse_outdir(dirname: str) -> OutDirType: err_msg = f"malformed directory name '{dirname}'" if dirname.count(":") > 1: raise ValueError(err_msg) parts = dirname.split(":") if len(parts) not in (1, 2): raise ValueError(err_msg) if len(parts) == 1: return None, dirname userhost, dirname = parts if userhost.endswith(HOST_NAME): return None, dirname return userhost, dirname
[docs] def copy_primary_file( filename: str, outdirs: OutDirsType, overwrite: bool = True ) -> None: """ Copy file from the primary output directory (locally accessible) to the others. """ local_filename = primary_file(filename, outdirs) if not local_filename: return filename = os.path.basename(local_filename) for name, (userhost, dirname) in outdirs.items(): if name == "primary": continue remote_filename = os.path.join(dirname, filename) if not userhost: copy_local_file(local_filename, remote_filename, overwrite=overwrite) continue if not overwrite: check_cmd = ["ssh", userhost, "test", "-f", remote_filename] exists = subprocess.call(check_cmd) == 0 if exists: logger.warning("Skip copying existing remote file: %s", remote_filename) continue remote_location = f"{userhost}:'{remote_filename}'" logger.info("Copy %r -> %r", local_filename, remote_location) cmd = ["scp", "-q", local_filename, remote_location] output = subprocess.check_output(cmd, text=True) if output: print(output)
[docs] def copy_file_to_primary( filename: str, outdirs: OutDirsType, overwrite: bool = True ) -> None: """Copy file to the primary output directory (which is always locally accessible). :raises KeyError: no primary directory defined """ dst = primary_file(filename, outdirs) if dst: copy_local_file(filename, dst, overwrite=overwrite)
[docs] def primary_file(filename: str, outdirs: OutDirsType) -> Optional[str]: """The file but located in the pimrary output directory. :raises KeyError: no primary directory defined """ primary = outdirs.get("primary") if not primary: logger.warning("No primary output location provided: do not save any results") return None filename = os.path.basename(filename) return os.path.join(primary[1], filename)
[docs] def copy_local_file(src: str, dst: str, overwrite: bool = True) -> None: """Copy file locally (no SCP).""" if os.path.exists(dst): if overwrite: logger.warning("Copy and overwriting existing file: %s", dst) else: logger.warning("Skip copying existing file: %s", dst) return logger.info("Copy %r -> %r", src, dst) dirname = os.path.dirname(dst) if dirname: os.makedirs(dirname, exist_ok=True) shutil.copyfile(src, dst)