Source code for ewoksid22.h5_to_spec

"""Convert an ID22 HDF5 file to a SPEC .dat file"""

import datetime
import io
import logging
import os
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import h5py
import numpy
from scipy.interpolate import interp1d
from silx.io.h5py_utils import File
from silx.io.h5py_utils import retry

from . import dirutils
from .utils import bliss_scan
from .utils import spec

logger = logging.getLogger(__name__)


[docs] def convert_h5( raw_filename: str, outprefix: Optional[str] = None, entries: Optional[List[str]] = None, outdirs: Optional[Dict[str, str]] = None, primary_outdir: Optional[str] = None, retry_timeout: float = 10, rebin_filename: Optional[str] = None, ascii_extension: str = ".dat", overwrite: bool = False, ) -> Union[Optional[str], List[str]]: """Convert ID22 HDF5 scan entries to SPEC scans. :param raw_filename: full path of the Nexus file :param outprefix: something unique to the proposal/session :param entries: for example ["1.1", "1.2", ...] :param outdirs: :param primary_outdir: :param retry_timeout: :param rebin_filename: full path of the id22rebin file :param ascii_extension: :returns: primary spec output file and new entries added """ raw_filename = os.path.abspath(raw_filename) spec_basename = rebin_filename or raw_filename spec_basename = ( f"{os.path.splitext(os.path.basename(spec_basename))[0]}{ascii_extension}" ) if outprefix: spec_basename = f"{outprefix}_{spec_basename}" # Scans that are converted (NXentry names) converted_entries: List[str] = [] outdirs = dirutils.prepare_outdirs(outdirs, primary_outdir) local_filename = dirutils.primary_file(spec_basename, outdirs) if not local_filename: return local_filename, converted_entries # Scans to convert (NXentry names) if entries: entries_to_convert = {} for name in entries: scannr = int(float(name)) entries_to_convert.setdefault(scannr, []).append(name) else: entries_to_convert = bliss_scan.get_scan_names( raw_filename, retry_timeout=retry_timeout ) if not entries_to_convert: logger.warning("No scans to convert") return local_filename, converted_entries # Skip scans: # - not two sub-scans # - already existing and overwrite=False already_saved = spec.saved_scan_numbers(local_filename) spec_exists = bool(already_saved) entries_to_convert, to_overwrite = _reduce_entries( entries_to_convert, already_saved, overwrite, spec_basename ) if not entries_to_convert: logger.warning("No scans to convert") return local_filename, converted_entries # SPEC scans to be saved first_error = None scans: Dict[int, List[str]] = {} for scannr, subscans in sorted(entries_to_convert.items()): # fscan with 2 complete subscans subscan1, subscan2 = subscans try: scans[scannr] = _fscan_spec_lines( raw_filename, subscan1, subscan2, rebin_filename=rebin_filename, ) except Exception as e: if first_error is None: first_error = e continue converted_entries.extend(subscans) # Append, overwrite or create SPEC file if spec_exists: if to_overwrite: # Instead of overwriting the specfic scans from `to_overwrite` # we read the entire scan file, merge the new scans and write again. header, existing_scans = spec.read_spec_file(local_filename) existing_scans.update(scans) spec.write_spec_file(local_filename, header, existing_scans) else: spec.append_spec_file(local_filename, scans) else: first_scan = sorted(entries_to_convert.items())[0][1][0] start_time = bliss_scan.get_start_time( raw_filename, first_scan, retry_timeout=retry_timeout ) header = _create_spec_header(raw_filename, start_time=start_time) spec.write_spec_file(local_filename, header, scans) # Copy the local (primary) file to other local and remote directories dirutils.copy_primary_file(spec_basename, outdirs, overwrite=True) # Raise first scan data extraction error if first_error is not None: raise first_error return local_filename, converted_entries
def _reduce_entries( entries_to_convert: Dict[int, List[str]], already_saved: List[int], overwrite: bool, basename: str, ) -> Union[Dict[int, List[str]], List[int]]: """Remove invalid scans and scans that were already saved when overwrite=False.""" reduced_entries_to_convert = {} to_overwrite = [] for scannr, subscans in sorted(entries_to_convert.items()): if scannr in already_saved: if overwrite: to_overwrite.append(scannr) reduced_entries_to_convert[scannr] = subscans logger.warning( "Overwriting scan #%d: already exists in %s", scannr, basename ) continue logger.warning("Skipping scan #%d: already exists in %s", scannr, basename) continue if len(subscans) != 2: logger.warning( "Skipping scan #%d of %s: %d subscans instead of 2", scannr, basename, len(subscans), ) continue reduced_entries_to_convert[scannr] = subscans return reduced_entries_to_convert, to_overwrite _MOTOR_NAMES = [ ["tth", "om", "manom", "mantth", "mantr", "Dh", "Dhd"], ["Dhm", "Dhu", "spinp", "bluspin", "t1h", "t1h1", "t1h2", "t1x"], ["t1y", "t1rz", "t1trans", "robtran", "s3vg", "s3vo", "s3hg", "s3ho"], ["s4vg", "s4vo", "s4hg", "s4ho", "u26b", "chi"], ["d2dtran", "Dy", "Dyu", "Dyd", "Drx", "Dry"], ["mos", "rst", "rsg", "redtrans"], ["xtrans", "ytrans", "ztrans", "gasspin", "DET_Z", "DET_X", "DET_Y", "DET_RZ"], ] def _create_spec_header( filename: str, start_time: Optional[datetime.datetime] = None ) -> List[str]: if not start_time: start_time = datetime.datetime.now().astimezone().isoformat() spec_lines = [ f'#F "{filename}"\n', f"#D {start_time}\n", "#C exp User = opid22\n\n", ] for i, motor_names in enumerate(_MOTOR_NAMES): spec_lines.append(f"#o{i} {' '.join(motor_names)}\n") spec_lines.append("\n") return spec_lines @retry(retry_period=0.5, retry_timeout=10) def _fscan_spec_lines( raw_filename: str, subscan1: str, subscan2: str, rebin_filename: Optional[str] = None, ) -> List[str]: with File(raw_filename, mode="r") as h5file: gsubscan1 = h5file[subscan1] gsubscan2 = h5file[subscan2] if rebin_filename: with File(rebin_filename, mode="r") as h5filerebin: try: rebinscan = h5filerebin[subscan1] except KeyError: return [] return _entry_to_spec_lines(gsubscan1, gsubscan2, rebinscan=rebinscan) return _entry_to_spec_lines(gsubscan1, gsubscan2) def _entry_to_spec_lines( subscan1: h5py.Group, subscan2: h5py.Group, rebinscan: Optional[h5py.Group] = None, ) -> List[str]: """Read NXentry data and return SPEC file lines.""" fast_data = subscan1["measurement"] slow_data = subscan2["measurement"] rebin_data = rebinscan["id22rebin/data"] if rebinscan is not None else None fscan_params = subscan1["instrument/fscan_parameters"] positioners_start = subscan1["instrument/positioners_start"] machine = subscan1.get("instrument/machine", {}) robot = subscan1.get("instrument/robot", {}) scannr = subscan1.name[1:].split(".")[0] start_time = bliss_scan.str_from_dataset(subscan1["start_time"]) start_pos = float(fscan_params["start_pos"][()]) step = float(fscan_params["step_size"][()]) npoints = float(fscan_params["npoints"][()]) acq_time = float(fscan_params["acq_time"][()]) end_pos = f"{start_pos + step * npoints:.2f}" deg_per_min = f"{step / acq_time * 60:.2f}" # Scan header spec_lines = [ ( f"#S {scannr} hookscan " f"{bliss_scan.str_from_dataset(fscan_params['motor'])} " f"{bliss_scan.read_position(fscan_params, 'start_pos', '{:.2f}')} " f"{end_pos} {deg_per_min} " f"{bliss_scan.read_position(fscan_params, 'acq_time', '{:.5f}', modif=lambda x: x * 1000)} \n" ), f"#D {start_time}\n", f"#T {bliss_scan.read_position(fscan_params, 'acq_time', '{:.5f}')} (Seconds)\n", "#Q \n", ] for i, motor_names in enumerate(_MOTOR_NAMES): positions = " ".join( bliss_scan.read_position(positioners_start, name, "{:.4f}") for name in motor_names ) spec_lines.append(f"#P{i} {positions}\n") spec_lines.extend( [ "#UMI0 Current AutoM Shutter U26B_GAP \n", ( "#UMI1 " f"{bliss_scan.read_position(machine, 'current', '{:.4f}')} " f"{bliss_scan.str_from_dataset(machine.get('automatic_mode'))} " f"{bliss_scan.str_from_dataset(machine.get('front_end'))} " f"{bliss_scan.read_position(positioners_start, 'u26b', '{:.4f}')}\n" ), ( "#UMI2 Refill in " f"{bliss_scan.str_from_dataset(machine.get('refill_countdown'))} sec, " f"Fill Mode: {bliss_scan.str_from_dataset(machine.get('mode'))}, " f"Op. Message: {bliss_scan.str_from_dataset(machine.get('message'))}\n" ), ( "#CR Last robot sample loaded: " f"{bliss_scan.str_from_dataset(robot.get('sample_label'))}\n" ), ] ) # Slow counters slow_ctrs_spec = [ "blowerT", "Cryostream", "Cryostat", "Press_in", "Press_out", "monin", "bmon", ] slow_ctrs_fmt = ["%4.3f", "%4.3f", "%4.3f", "%7.4f", "%7.4f", "%.5e", "%.5e"] slow_ctrs_h5 = [ ("blower_in", False), ("ox700", False), ("ls340_A", False), ("pace_in", False), ("pace_press", False), ("monin", False), ("bmon", False), ] npts_slow = bliss_scan.min_npts_ctrs(slow_data, slow_ctrs_h5) # Fast counters prefix, nchannels = ("eiger_roi", 13) if "eiger" in fast_data else ("ma", 9) if rebin_data is None: fast_ctrs_spec = ( ["2_theta"] + [f"MA{i}" for i in range(nchannels)] + ["Monitor", "Epoch", "Omega"] ) fast_ctrs_fmt = ["%3.8f"] + ["%i"] * nchannels + ["%i", "%15.8f", "%3.8f"] fast_ctrs_h5 = ( ["tth"] + [f"{prefix}{i}" for i in range(nchannels)] + ["mon", "epoch_trig", "om"] ) else: fast_ctrs_spec = ["Epoch", "Omega"] fast_ctrs_fmt = ["%3.8f"] * len(fast_ctrs_spec) fast_ctrs_h5 = ["epoch_trig", "om"] fast_ctrs_h5 = [(name, name != "om") for name in fast_ctrs_h5] npts_fast = bliss_scan.min_npts_ctrs(fast_data, fast_ctrs_h5) # Rebin counters if rebin_data is None: rebin_ctrs_spec = [] rebin_ctrs_fmt = [] rebin_ctrs_h5 = [] npts_rebin = 0 nrows = npts_fast else: rebin_ctrs_h5 = ["2th", "I_sum", "norm"] rebin_ctrs_spec = ["2_theta"] + [ f"{name}{i}" for name in ("MA", "Mon") for i in list(range(nchannels)) + ["av"] ] rebin_ctrs_fmt = ( ["%3.8f"] + ["%i"] * nchannels + ["%3.8f"] + ["%i"] * nchannels + ["%3.8f"] ) rebin_ctrs_h5 = [(name, True) for name in rebin_ctrs_h5] npts_rebin = bliss_scan.min_npts_ctrs(rebin_data, rebin_ctrs_h5) nrows = npts_rebin # Prepare data ctrs_spec = rebin_ctrs_spec + fast_ctrs_spec + slow_ctrs_spec ctrs_fmt = rebin_ctrs_fmt + fast_ctrs_fmt + slow_ctrs_fmt rebinoff = 0 fastoff = len(rebin_ctrs_spec) slowoff = len(rebin_ctrs_spec) + len(fast_ctrs_spec) data = numpy.zeros((nrows, len(ctrs_spec))) # Read rebin data off = rebinoff for i, idata in bliss_scan.read_counters(rebin_data, rebin_ctrs_h5, npts_rebin): if idata.ndim == 3: idata = idata.sum(axis=1) if idata.ndim == 2: idata = idata.T nadd = idata.shape[-1] data[:, off : off + nadd] = idata off += nadd data[:, off] = numpy.mean(idata, axis=1) off += 1 else: data[:, off] = idata off += 1 # Read fast data + interpolate at rebinned 2-theta if npts_rebin: xnew = list(bliss_scan.read_counters(rebin_data, [("2th", True)], npts_rebin))[ 0 ][-1] xold = list(bliss_scan.read_counters(fast_data, [("tth", True)], npts_fast))[0][ -1 ] for i, idata in bliss_scan.read_counters(fast_data, fast_ctrs_h5, npts_fast): func = interp1d(xold, idata, kind="nearest", fill_value="extrapolate") try: data[:, fastoff + i] = func(xnew) except Exception: pass else: for i, idata in bliss_scan.read_counters(fast_data, fast_ctrs_h5, npts_fast): data[:, fastoff + i] = idata # Read slow data + interpolate at fast epoch xold = list(bliss_scan.read_counters(slow_data, [("epoch", True)], npts_slow))[0][ -1 ] xnew = data[:, ctrs_spec.index("Epoch")] for i, idata in bliss_scan.read_counters(slow_data, slow_ctrs_h5, npts_slow): func = interp1d(xold, idata, kind="nearest", fill_value="extrapolate") try: data[:, slowoff + i] = func(xnew) except Exception: pass # Scan data header spec_lines.append(f"#N {len(ctrs_spec)}\n") spec_lines.append(f"#L {' '.join(ctrs_spec)}\n") # Scan data buf = io.StringIO() numpy.savetxt(buf, data, delimiter=" ", fmt=" ".join(ctrs_fmt)) spec_lines.append(buf.getvalue()) spec_lines.append("\n") return spec_lines