import logging
import os
import shutil
import socket
import subprocess
from typing import Dict
from typing import Iterator
from typing import Optional
from typing import Tuple
HOST_NAME = socket.gethostname()
OutDirType = Tuple[Optional[str], str]
OutDirsType = Dict[str, OutDirType]
logger = logging.getLogger(__name__)
[docs]
def prepare_outdirs(outdirs: Dict[str, str], primary_outdir: str) -> OutDirsType:
"""
:param outdirs: maps names to local directories (/users/opid22/data1/)
or remote directories (opid22@diffract22new:/users/opid22/data1/)
:param primary_outdir: "primary" directory or if there is already, "processed", "processed1", "processed2", etc.
:raise KeyError: no "primary" file.
:raise ValueError: "primary" file is not local.
"""
if outdirs is None:
outdirs = dict()
else:
outdirs = dict(outdirs)
if primary_outdir:
for key in _outdir_key_generator():
if key not in outdirs:
break
outdirs[key] = primary_outdir
outdirs = {name: parse_outdir(dirname) for name, dirname in outdirs.items()}
if outdirs.get("primary"):
userhost, _ = outdirs["primary"]
if userhost:
raise ValueError(
"The primary output directory should be locally accessible"
)
return outdirs
def _outdir_key_generator() -> Iterator[str]:
yield "primary"
yield "processed"
i = 0
while True:
i += 1
yield f"processed{i}"
[docs]
def parse_outdir(dirname: str) -> OutDirType:
err_msg = f"malformed directory name '{dirname}'"
if dirname.count(":") > 1:
raise ValueError(err_msg)
parts = dirname.split(":")
if len(parts) not in (1, 2):
raise ValueError(err_msg)
if len(parts) == 1:
return None, dirname
userhost, dirname = parts
if userhost.endswith(HOST_NAME):
return None, dirname
return userhost, dirname
[docs]
def copy_primary_file(
filename: str, outdirs: OutDirsType, overwrite: bool = True
) -> None:
"""
Copy file from the primary output directory (locally accessible) to the others.
"""
local_filename = primary_file(filename, outdirs)
if not local_filename:
return
filename = os.path.basename(local_filename)
for name, (userhost, dirname) in outdirs.items():
if name == "primary":
continue
remote_filename = os.path.join(dirname, filename)
if not userhost:
copy_local_file(local_filename, remote_filename, overwrite=overwrite)
continue
if not overwrite:
check_cmd = ["ssh", userhost, "test", "-f", remote_filename]
exists = subprocess.call(check_cmd) == 0
if exists:
logger.warning("Skip copying existing remote file: %s", remote_filename)
continue
remote_location = f"{userhost}:'{remote_filename}'"
logger.info("Copy %r -> %r", local_filename, remote_location)
cmd = ["scp", "-q", local_filename, remote_location]
output = subprocess.check_output(cmd, text=True)
if output:
print(output)
[docs]
def copy_file_to_primary(
filename: str, outdirs: OutDirsType, overwrite: bool = True
) -> None:
"""Copy file to the primary output directory (which is always locally accessible).
:raises KeyError: no primary directory defined
"""
dst = primary_file(filename, outdirs)
if dst:
copy_local_file(filename, dst, overwrite=overwrite)
[docs]
def primary_file(filename: str, outdirs: OutDirsType) -> Optional[str]:
"""The file but located in the pimrary output directory.
:raises KeyError: no primary directory defined
"""
primary = outdirs.get("primary")
if not primary:
logger.warning("No primary output location provided: do not save any results")
return None
filename = os.path.basename(filename)
return os.path.join(primary[1], filename)
[docs]
def copy_local_file(src: str, dst: str, overwrite: bool = True) -> None:
"""Copy file locally (no SCP)."""
if os.path.exists(dst):
if overwrite:
logger.warning("Copy and overwriting existing file: %s", dst)
else:
logger.warning("Skip copying existing file: %s", dst)
return
logger.info("Copy %r -> %r", src, dst)
dirname = os.path.dirname(dst)
if dirname:
os.makedirs(dirname, exist_ok=True)
shutil.copyfile(src, dst)