Source code for ewoksid22.utils.bliss_scan

import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple

import h5py
import numpy
from silx.io.h5py_utils import File
from silx.io.h5py_utils import retry

logger = logging.getLogger(__name__)


[docs] @retry(retry_period=0.5, retry_timeout=10) def get_scan_names(filename: str, title: Optional[str] = None) -> Dict[int, List[str]]: """Get the subscan names for all scans in the Nexus file :param filename: :param title: id22 :returns: scannr -> subscan_names """ with File(filename, mode="r") as h5file: names = list(h5file["/"]) def include(name): try: scan = h5file[name] except Exception as e: logger.warning( "cannot read scan " + repr(name) + " (cause: " + str(e) + ")" ) return False if "end_time" not in scan: return False if "measurement" not in scan: return False if title: stitle = str_from_dataset(scan["title"]) if "fscan" not in stitle and "f2scan" not in stitle: return False return True scans = dict() for name in names: if include(name): scannr = int(float(name)) scans.setdefault(scannr, []).append(name) return scans
[docs] def read_counters( group: h5py.Group, ctrs: List[Tuple[str, bool]], npts: int ) -> Generator[numpy.ndarray, None, None]: """Read datasets""" for i, (name, must_exist) in enumerate(ctrs): try: dset = group[name] except KeyError: if must_exist: raise continue try: data = dset[:npts] except Exception as e: logger.warning("skip counter data %r (cause: %s)", name, e) continue if not len(data): logger.warning("no data in %r", name) continue data[numpy.isnan(data)] = 0 yield i, data
[docs] def read_position( grp: h5py.Group, key: str, fmt: str, modif: Optional[Callable[[float], Any]] = None ): """Read a motor position from grp[key], return "-999" when missing.""" if key not in grp: return str(-999) pos = grp[key][()] if isinstance(pos, bytes): pos = pos.decode() if pos in ("*DIS*", "ERR"): return str(-999) try: num = float(fmt.format(pos)) except Exception as e: raise RuntimeError(f"Error in formatting motor position '{key}' = {pos}") from e if modif: num = modif(num) return str(num)
[docs] def min_npts_ctrs(group: h5py.Group, ctrs: List[Tuple[str, bool]]) -> int: """Smallest number of points of a group of datasets.""" if not ctrs: return 0 npts = [] for name, must_exist in ctrs: try: dset = group[name] except KeyError: if must_exist: raise else: npts.append(dset.shape[-1]) return min(npts)
[docs] @retry(retry_period=0.5, retry_timeout=10) def get_start_time(filename: str, entry: str) -> str: with File(filename, mode="r") as h5file: return str_from_dataset(h5file[entry]["start_time"])
[docs] def str_from_dataset(dataset: h5py.Dataset) -> str: """Read dataset as a string""" if isinstance(dataset, str): return dataset if dataset is None: return "UNKNOWN" try: return dataset.asstr()[()] except (AttributeError, TypeError): return str(dataset[()])