Source code for ewoksid22.rebin

import logging
import os
import shutil
import subprocess
import tempfile
import time

import h5py
import numpy
from ewokscore import Task
from multianalyzer._multianalyzer import MultiAnalyzer
from multianalyzer.file_io import FSCAN
from multianalyzer.file_io import save_rebin as _save_rebin
from multianalyzer.file_io import topas_parser

try:
    from multianalyzer.opencl import OclMultiAnalyzer
except ImportError:
    OclMultiAnalyzer = None

from silx.io.h5py_utils import File
from silx.io.h5py_utils import retry
from silx.io.h5py_utils import safe_top_level_names

from . import dirutils
from .roitools import unscramble_roi_collection

HDF5_PLUGIN_PATH = os.environ.get("HDF5_PLUGIN_PATH")
if HDF5_PLUGIN_PATH:
    del os.environ["HDF5_PLUGIN_PATH"]


logger = logging.getLogger(__name__)


[docs] class ID22Rebin( Task, input_names=["filename", "parsfile"], optional_input_names=[ "entries", "debug", "wavelength", "energy", "step", "range", "phi", "iter", "startp", "endp", "pixel", "width", "delta2theta", "device", "outdirs", "primary_outdir", "outprefix", "unscramble_method", "retry_timeout", "columnorder", "num_col", "overwrite", ], output_names=["outfile", "entries"], ): """ Rebin data from area detector ROIs corresponding to multiple analyzer crystals to produce per-crystal 1D ROIs for each entry (Bliss scan) in the HDF5 file. Each entry in the input HDF5 file contains 1D ROIs—one per analyzer crystal— for every diffractometer angle. These 1D ROIs are produced by LIMA summing the corresponding 2D ROI on the area detector along one axis. The geometry of the detector and analyzer crystals is accounted for using parameters from the `parsfile`. """
[docs] def run(self): outdirs = self.get_input_value("outdirs", {}) primary_outdir = self.get_input_value("primary_outdir", None) outdirs = dirutils.prepare_outdirs(outdirs, primary_outdir) overwrite = self.get_input_value("overwrite", False) param = topas_parser(self.inputs.parsfile) # Ensure all units are consistent. Here lengths are in millimeters. L = param["L1"] L2 = param["L2"] # Angles are all given in degrees center = numpy.array(param["centre"]) psi = numpy.rad2deg(param["offset"]) rollx = numpy.rad2deg(param["rollx"]) rolly = numpy.rad2deg(param["rolly"]) tha = numpy.rad2deg(param["manom"]) thd = numpy.rad2deg(param["mantth"]) pixel = self.get_input_value("pixel", 75e-3) # mm width = self.get_input_value("width", None) delta2theta = self.get_input_value("delta2theta", 0.0) sdelta2theta = str(delta2theta).replace(".", "") outfilename = ( os.path.splitext(os.path.basename(self.inputs.filename))[0] + f"_w{sdelta2theta}.h5" ) if not self.missing_inputs.outprefix: outfilename = self.inputs.outprefix + "_" + outfilename outfile = dirutils.primary_file(outfilename, outdirs) # Initialize the rebinning engine. if self.get_input_value("device", None) is not None and OclMultiAnalyzer: mma = OclMultiAnalyzer( L, L2, pixel, center, tha, thd, psi, rollx, rolly, device=str(self.get_input_value("device")).split(","), ) print(f"Using device {mma.ctx.devices[0]}") else: mma = MultiAnalyzer(L, L2, pixel, center, tha, thd, psi, rollx, rolly) print("Using Cython+OpenMP") filename = self.inputs.filename entries = self.get_input_value("entries", None) unscramble_method = self.get_input_value("unscramble_method", "xy") print(f"Unscrambling of ROI collection: {unscramble_method}") retry_timeout = self.get_input_value("retry_timeout", 10) exclude_entries = None if outfile and os.path.exists(outfile): exclude_entries = safe_top_level_names(outfile, retry_timeout=retry_timeout) if exclude_entries: if overwrite: logger.warning( "Overwriting existing entries %s in file %s", exclude_entries, outfile, ) exclude_entries = None else: logger.warning( "Skipping existing entries %s in file %s", exclude_entries, outfile, ) hdf5_data = read_data( filename, unscramble_method=unscramble_method, entries=entries, exclude_entries=exclude_entries, retry_timeout=retry_timeout, ) for entry, one_entry in hdf5_data.items(): print(f"Rebin data from {filename}::{entry}") start_time = time.perf_counter() arm = one_entry["arm"] mon = one_entry["mon"] roicol = one_entry["roicol"] kept_points = min(len(roicol), len(arm), len(mon)) roicol = roicol[:kept_points] arm = arm[:kept_points] mon = mon[:kept_points] scan = one_entry["scan"] dtth = self.get_input_value("step", None) or scan.step_size tth_min = scan.start + psi.min() tth_max = scan.stop + psi.max() if self.get_input_value("range", None): _range = self.get_input_value("range") if numpy.isfinite(_range[0]): tth_min = _range[0] if numpy.isfinite(_range[1]): tth_max = _range[1] num_col = self.get_input_value("num_col", None) if num_col is None: num_col = 1 columnorder = self.get_input_value("columnorder", None) if columnorder is None: columnorder = 0 res = mma.integrate( roicol, arm, mon, tth_min, tth_max, dtth=dtth, iter_max=self.get_input_value("iter", 250), roi_min=self.get_input_value("startp", 0), roi_max=self.get_input_value("endp", 1024), phi_max=self.get_input_value("phi", 75), # degrees num_row=512, num_col=num_col, columnorder=columnorder, # // 0: (column=31, channel=13, row=512), 1: (channel=13, column=31, row=512), 2: (channel=13, row=512, column=31) width=width or param.get("wg", 0.0), dtthw=delta2theta, ) if outfile: print(f"Save to {outfile}::{entry}") save_rebin( outfile, entry, overwrite=overwrite, beamline="id22", name="id22rebin", topas=param, res=res, start_time=start_time, ) self.outputs.outfile = outfile self.outputs.entries = entries
[docs] @retry(retry_period=0.5, retry_timeout=10) def read_data(filename, unscramble_method=None, entries=None, exclude_entries=None): """Read data to rebin.""" res = {} with File(filename, mode="r") as fh: if entries: entries = set(entries) else: entries = {k for k, v in fh.items() if v.attrs.get("NX_class") == "NXentry"} if exclude_entries: entries -= set(exclude_entries) try: entries = sorted(entries, key=lambda x: float(x)) except TypeError: entries = sorted(entries) entries = [entry for entry in entries if entry.endswith(".1")] print(f"Reading and unscrambling {len(entries)} entries: {entries}") for entry in entries: title = entry + "/title" if title not in fh: logger.warning(f"skip scan {entry}: no 'title'") continue end_time = entry + "/end_time" if end_time not in fh: logger.warning(f"skip scan {entry}: no 'end_time'") continue title = fh[title][()] try: title = title.decode() except Exception: pass if not title.startswith("fscan") and not title.startswith("f2scan"): logger.warning(f"skip scan {entry}: {title}") continue sort_data = {} non_sort_data = {} try: entry_grp = fh[entry] sort_data["x"] = entry_grp[ "instrument/eiger_roi_collection/selection/x" ][()] sort_data["y"] = entry_grp[ "instrument/eiger_roi_collection/selection/y" ][()] sort_data["roicol"] = entry_grp["measurement/eiger_roi_collection"][()] scan_col = entry_grp["instrument/fscan_parameters"] motor_name = scan_col["motor"][()] try: motor_name = motor_name.decode() except AttributeError: pass non_sort_data["scan"] = FSCAN( motor_name, float(scan_col["start_pos"][()]), float(scan_col["step_size"][()]), int(scan_col["npoints"][()]), float(scan_col["step_time"][()]), ) non_sort_data["arm"] = entry_grp["measurement/tth"][()] non_sort_data["mon"] = entry_grp["measurement/mon"][()] non_sort_data["tha"] = entry_grp["instrument/positioners/manom"][()] non_sort_data["thd"] = entry_grp["instrument/positioners/mantth"][()] except KeyError as error: logger.warning(str(error)) continue if unscramble_method == "xy": keys = "x", "y" elif unscramble_method == "yx": keys = "y", "x" else: keys = None if keys: sort_data = unscramble_roi_collection(sort_data, keys) res[entry] = {**sort_data, **non_sort_data} for name, val in res[entry].items(): if name == "scan": print(f"Scan description: {val}") else: try: print(f"{name} shape = {val.shape}") except (TypeError, AttributeError): pass return res
[docs] def save_rebin(filename: str, entry: str, overwrite: bool = False, **kwargs): """Save rebin result.""" if overwrite: try: _delete_entry(filename, entry) except Exception: logger.exception( "Failed deleting /%s from HDF5 file %r failed", entry, filename ) _save_rebin(filename, entry=entry, **kwargs)
def _delete_entry(filename: str, entry: str) -> None: if not os.path.exists(filename): return remove_file = False with h5py.File(filename, mode="a") as fh: entries = list(fh) if entries == [entry]: remove_file = True elif entry in entries: del fh[entry] if remove_file: os.remove(filename) else: _repack_hdf5(filename) def _repack_hdf5(filename: str) -> None: tmp_fd, tmp_name = tempfile.mkstemp(suffix=".h5", dir=os.path.dirname(filename)) os.close(tmp_fd) try: subprocess.run( ["h5repack", filename, tmp_name], check=True, ) shutil.move(tmp_name, filename) except Exception: logger.exception("Repacking HDF5 file %r failed", filename) finally: if os.path.exists(tmp_name): os.remove(tmp_name)