Source code for ewoksid22.extract

import logging
import os
import subprocess

from ewokscore import Task

from . import bin
from . import dirutils
from . import topas_extract

logger = logging.getLogger(__name__)


[docs] class ID22TopasExtract( Task, input_names=["filename", "inp_file"], optional_input_names=[ "entries", "tth_min", "tth_max", "full_tth", "outdirs", "outprefix", "primary_outdir", "inp_step", "startp", "overwrite", ], ): """ Extract data from area detector ROIs corresponding to multiple analyzer crystals to produce per-crystal 1D ROIs for each entry (Bliss scan) in the HDF5 file. The result is a XYE file for each analyzer crystal and scan. Each entry in the input HDF5 file contains 1D ROIs—one per analyzer crystal— for every diffractometer angle. These 1D ROIs are produced by LIMA summing the corresponding 2D ROI on the area detector along one axis. """
[docs] def run(self): in_file = self.get_input_value("filename", None) scans = self.get_input_value("entries", None) if scans: scans = sorted({int(float(s)) for s in scans}) if len(scans) != 1: raise ValueError(f"Extract only supports 1 scan instead of {scans}") tth_min = self.get_input_value("tth_min", None) tth_max = self.get_input_value("tth_max", None) full_tth = self.get_input_value("full_tth", False) outdirs = self.get_input_value("outdirs", dict()) primary_outdir = self.get_input_value("primary_outdir", None) outdirs = dirutils.prepare_outdirs(outdirs, primary_outdir) overwrite = self.get_input_value("overwrite", False) out_file = os.path.splitext(os.path.basename(in_file))[0] if not self.missing_inputs.outprefix: out_file = self.inputs.outprefix + "_" + out_file out_file = out_file + "_calib" # TODO: use all outdirs rootpath = os.path.join(outdirs["primary"][1], out_file) savepath_xye = os.path.join(rootpath, "topas_extract") savepath_inp = os.path.join(rootpath, "topas_refine") os.makedirs(savepath_xye, exist_ok=True) os.makedirs(savepath_inp, exist_ok=True) topas_extract.run( in_file, out_file, scans=scans, savepath=savepath_xye, min_usr_tth=tth_min, max_usr_tth=tth_max, full_tth=full_tth, overwrite=overwrite, ) scannr = scans[0] savepath_xye = os.path.relpath(savepath_xye, savepath_inp) xye_files = os.path.join(savepath_xye, out_file + f"_s{scannr}_MA") inp_file = self.inputs.inp_file inp_step = self.get_input_value("inp_step", 2) startp = self.get_input_value("startp", 31) inp_file_cp = os.path.join(savepath_inp, os.path.basename(inp_file)) dirutils.copy_local_file(inp_file, inp_file_cp, overwrite=True) # Modify inp_file_cp and save xdds.inp cmd = [ "gen_xdds_file", xye_files, os.path.basename(inp_file_cp), 0, topas_extract.NUM_CHANNELS - 1, startp, topas_extract.X_HIGH, inp_step, ] cwd = os.path.dirname(inp_file_cp) outs = execute_cmd(cmd, cwd) with open(os.path.join(savepath_inp, "xdds.inp"), "wb") as f: f.write(outs)
[docs] def execute_cmd(cmd, cwd): cmd = list(map(str, cmd)) with bin.program_path(cmd[0]) as path: cmd[0] = path with subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd ) as proc: try: outs, errs = proc.communicate(timeout=120) except subprocess.TimeoutExpired: proc.kill() raise if proc.returncode != 0: raise RuntimeError(f"gen_xdds_file failed: {errs.decode()}") return outs