"""Convert an ID22 HDF5 file to a SPEC .dat file"""
import datetime
import io
import logging
import os
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
import h5py
import numpy
from scipy.interpolate import interp1d
from silx.io.h5py_utils import File
from silx.io.h5py_utils import retry
from . import dirutils
from .utils import bliss_scan
from .utils import spec
logger = logging.getLogger(__name__)
[docs]
def convert_h5(
raw_filename: str,
outprefix: Optional[str] = None,
entries: Optional[List[str]] = None,
outdirs: Optional[Dict[str, str]] = None,
primary_outdir: Optional[str] = None,
retry_timeout: float = 10,
rebin_filename: Optional[str] = None,
ascii_extension: str = ".dat",
overwrite: bool = False,
) -> Union[Optional[str], List[str]]:
"""Convert ID22 HDF5 scan entries to SPEC scans.
:param raw_filename: full path of the Nexus file
:param outprefix: something unique to the proposal/session
:param entries: for example ["1.1", "1.2", ...]
:param outdirs:
:param primary_outdir:
:param retry_timeout:
:param rebin_filename: full path of the id22rebin file
:param ascii_extension:
:returns: primary spec output file and new entries added
"""
raw_filename = os.path.abspath(raw_filename)
spec_basename = rebin_filename or raw_filename
spec_basename = (
f"{os.path.splitext(os.path.basename(spec_basename))[0]}{ascii_extension}"
)
if outprefix:
spec_basename = f"{outprefix}_{spec_basename}"
# Scans that are converted (NXentry names)
converted_entries: List[str] = []
outdirs = dirutils.prepare_outdirs(outdirs, primary_outdir)
local_filename = dirutils.primary_file(spec_basename, outdirs)
if not local_filename:
return local_filename, converted_entries
# Scans to convert (NXentry names)
if entries:
entries_to_convert = {}
for name in entries:
scannr = int(float(name))
entries_to_convert.setdefault(scannr, []).append(name)
else:
entries_to_convert = bliss_scan.get_scan_names(
raw_filename, retry_timeout=retry_timeout
)
if not entries_to_convert:
logger.warning("No scans to convert")
return local_filename, converted_entries
# Skip scans:
# - not two sub-scans
# - already existing and overwrite=False
already_saved = spec.saved_scan_numbers(local_filename)
spec_exists = bool(already_saved)
entries_to_convert, to_overwrite = _reduce_entries(
entries_to_convert, already_saved, overwrite, spec_basename
)
if not entries_to_convert:
logger.warning("No scans to convert")
return local_filename, converted_entries
# SPEC scans to be saved
first_error = None
scans: Dict[int, List[str]] = {}
for scannr, subscans in sorted(entries_to_convert.items()):
# fscan with 2 complete subscans
subscan1, subscan2 = subscans
try:
scans[scannr] = _fscan_spec_lines(
raw_filename,
subscan1,
subscan2,
rebin_filename=rebin_filename,
)
except Exception as e:
if first_error is None:
first_error = e
continue
converted_entries.extend(subscans)
# Append, overwrite or create SPEC file
if spec_exists:
if to_overwrite:
# Instead of overwriting the specfic scans from `to_overwrite`
# we read the entire scan file, merge the new scans and write again.
header, existing_scans = spec.read_spec_file(local_filename)
existing_scans.update(scans)
spec.write_spec_file(local_filename, header, existing_scans)
else:
spec.append_spec_file(local_filename, scans)
else:
first_scan = sorted(entries_to_convert.items())[0][1][0]
start_time = bliss_scan.get_start_time(
raw_filename, first_scan, retry_timeout=retry_timeout
)
header = _create_spec_header(raw_filename, start_time=start_time)
spec.write_spec_file(local_filename, header, scans)
# Copy the local (primary) file to other local and remote directories
dirutils.copy_primary_file(spec_basename, outdirs, overwrite=True)
# Raise first scan data extraction error
if first_error is not None:
raise first_error
return local_filename, converted_entries
def _reduce_entries(
entries_to_convert: Dict[int, List[str]],
already_saved: List[int],
overwrite: bool,
basename: str,
) -> Union[Dict[int, List[str]], List[int]]:
"""Remove invalid scans and scans that were already saved when overwrite=False."""
reduced_entries_to_convert = {}
to_overwrite = []
for scannr, subscans in sorted(entries_to_convert.items()):
if scannr in already_saved:
if overwrite:
to_overwrite.append(scannr)
reduced_entries_to_convert[scannr] = subscans
logger.warning(
"Overwriting scan #%d: already exists in %s", scannr, basename
)
continue
logger.warning("Skipping scan #%d: already exists in %s", scannr, basename)
continue
if len(subscans) != 2:
logger.warning(
"Skipping scan #%d of %s: %d subscans instead of 2",
scannr,
basename,
len(subscans),
)
continue
reduced_entries_to_convert[scannr] = subscans
return reduced_entries_to_convert, to_overwrite
_MOTOR_NAMES = [
["tth", "om", "manom", "mantth", "mantr", "Dh", "Dhd"],
["Dhm", "Dhu", "spinp", "bluspin", "t1h", "t1h1", "t1h2", "t1x"],
["t1y", "t1rz", "t1trans", "robtran", "s3vg", "s3vo", "s3hg", "s3ho"],
["s4vg", "s4vo", "s4hg", "s4ho", "u26b", "chi"],
["d2dtran", "Dy", "Dyu", "Dyd", "Drx", "Dry"],
["mos", "rst", "rsg", "redtrans"],
["xtrans", "ytrans", "ztrans", "gasspin", "DET_Z", "DET_X", "DET_Y", "DET_RZ"],
]
def _create_spec_header(
filename: str, start_time: Optional[datetime.datetime] = None
) -> List[str]:
if not start_time:
start_time = datetime.datetime.now().astimezone().isoformat()
spec_lines = [
f'#F "{filename}"\n',
f"#D {start_time}\n",
"#C exp User = opid22\n\n",
]
for i, motor_names in enumerate(_MOTOR_NAMES):
spec_lines.append(f"#o{i} {' '.join(motor_names)}\n")
spec_lines.append("\n")
return spec_lines
@retry(retry_period=0.5, retry_timeout=10)
def _fscan_spec_lines(
raw_filename: str,
subscan1: str,
subscan2: str,
rebin_filename: Optional[str] = None,
) -> List[str]:
with File(raw_filename, mode="r") as h5file:
gsubscan1 = h5file[subscan1]
gsubscan2 = h5file[subscan2]
if rebin_filename:
with File(rebin_filename, mode="r") as h5filerebin:
try:
rebinscan = h5filerebin[subscan1]
except KeyError:
return []
return _entry_to_spec_lines(gsubscan1, gsubscan2, rebinscan=rebinscan)
return _entry_to_spec_lines(gsubscan1, gsubscan2)
def _entry_to_spec_lines(
subscan1: h5py.Group,
subscan2: h5py.Group,
rebinscan: Optional[h5py.Group] = None,
) -> List[str]:
"""Read NXentry data and return SPEC file lines."""
fast_data = subscan1["measurement"]
slow_data = subscan2["measurement"]
rebin_data = rebinscan["id22rebin/data"] if rebinscan is not None else None
fscan_params = subscan1["instrument/fscan_parameters"]
positioners_start = subscan1["instrument/positioners_start"]
machine = subscan1.get("instrument/machine", {})
robot = subscan1.get("instrument/robot", {})
scannr = subscan1.name[1:].split(".")[0]
start_time = bliss_scan.str_from_dataset(subscan1["start_time"])
start_pos = float(fscan_params["start_pos"][()])
step = float(fscan_params["step_size"][()])
npoints = float(fscan_params["npoints"][()])
acq_time = float(fscan_params["acq_time"][()])
end_pos = f"{start_pos + step * npoints:.2f}"
deg_per_min = f"{step / acq_time * 60:.2f}"
# Scan header
spec_lines = [
(
f"#S {scannr} hookscan "
f"{bliss_scan.str_from_dataset(fscan_params['motor'])} "
f"{bliss_scan.read_position(fscan_params, 'start_pos', '{:.2f}')} "
f"{end_pos} {deg_per_min} "
f"{bliss_scan.read_position(fscan_params, 'acq_time', '{:.5f}', modif=lambda x: x * 1000)} \n"
),
f"#D {start_time}\n",
f"#T {bliss_scan.read_position(fscan_params, 'acq_time', '{:.5f}')} (Seconds)\n",
"#Q \n",
]
for i, motor_names in enumerate(_MOTOR_NAMES):
positions = " ".join(
bliss_scan.read_position(positioners_start, name, "{:.4f}")
for name in motor_names
)
spec_lines.append(f"#P{i} {positions}\n")
spec_lines.extend(
[
"#UMI0 Current AutoM Shutter U26B_GAP \n",
(
"#UMI1 "
f"{bliss_scan.read_position(machine, 'current', '{:.4f}')} "
f"{bliss_scan.str_from_dataset(machine.get('automatic_mode'))} "
f"{bliss_scan.str_from_dataset(machine.get('front_end'))} "
f"{bliss_scan.read_position(positioners_start, 'u26b', '{:.4f}')}\n"
),
(
"#UMI2 Refill in "
f"{bliss_scan.str_from_dataset(machine.get('refill_countdown'))} sec, "
f"Fill Mode: {bliss_scan.str_from_dataset(machine.get('mode'))}, "
f"Op. Message: {bliss_scan.str_from_dataset(machine.get('message'))}\n"
),
(
"#CR Last robot sample loaded: "
f"{bliss_scan.str_from_dataset(robot.get('sample_label'))}\n"
),
]
)
# Slow counters
slow_ctrs_spec = [
"blowerT",
"Cryostream",
"Cryostat",
"Press_in",
"Press_out",
"monin",
"bmon",
]
slow_ctrs_fmt = ["%4.3f", "%4.3f", "%4.3f", "%7.4f", "%7.4f", "%.5e", "%.5e"]
slow_ctrs_h5 = [
("blower_in", False),
("ox700", False),
("ls340_A", False),
("pace_in", False),
("pace_press", False),
("monin", False),
("bmon", False),
]
npts_slow = bliss_scan.min_npts_ctrs(slow_data, slow_ctrs_h5)
# Fast counters
prefix, nchannels = ("eiger_roi", 13) if "eiger" in fast_data else ("ma", 9)
if rebin_data is None:
fast_ctrs_spec = (
["2_theta"]
+ [f"MA{i}" for i in range(nchannels)]
+ ["Monitor", "Epoch", "Omega"]
)
fast_ctrs_fmt = ["%3.8f"] + ["%i"] * nchannels + ["%i", "%15.8f", "%3.8f"]
fast_ctrs_h5 = (
["tth"]
+ [f"{prefix}{i}" for i in range(nchannels)]
+ ["mon", "epoch_trig", "om"]
)
else:
fast_ctrs_spec = ["Epoch", "Omega"]
fast_ctrs_fmt = ["%3.8f"] * len(fast_ctrs_spec)
fast_ctrs_h5 = ["epoch_trig", "om"]
fast_ctrs_h5 = [(name, name != "om") for name in fast_ctrs_h5]
npts_fast = bliss_scan.min_npts_ctrs(fast_data, fast_ctrs_h5)
# Rebin counters
if rebin_data is None:
rebin_ctrs_spec = []
rebin_ctrs_fmt = []
rebin_ctrs_h5 = []
npts_rebin = 0
nrows = npts_fast
else:
rebin_ctrs_h5 = ["2th", "I_sum", "norm"]
rebin_ctrs_spec = ["2_theta"] + [
f"{name}{i}"
for name in ("MA", "Mon")
for i in list(range(nchannels)) + ["av"]
]
rebin_ctrs_fmt = (
["%3.8f"] + ["%i"] * nchannels + ["%3.8f"] + ["%i"] * nchannels + ["%3.8f"]
)
rebin_ctrs_h5 = [(name, True) for name in rebin_ctrs_h5]
npts_rebin = bliss_scan.min_npts_ctrs(rebin_data, rebin_ctrs_h5)
nrows = npts_rebin
# Prepare data
ctrs_spec = rebin_ctrs_spec + fast_ctrs_spec + slow_ctrs_spec
ctrs_fmt = rebin_ctrs_fmt + fast_ctrs_fmt + slow_ctrs_fmt
rebinoff = 0
fastoff = len(rebin_ctrs_spec)
slowoff = len(rebin_ctrs_spec) + len(fast_ctrs_spec)
data = numpy.zeros((nrows, len(ctrs_spec)))
# Read rebin data
off = rebinoff
for i, idata in bliss_scan.read_counters(rebin_data, rebin_ctrs_h5, npts_rebin):
if idata.ndim == 3:
idata = idata.sum(axis=1)
if idata.ndim == 2:
idata = idata.T
nadd = idata.shape[-1]
data[:, off : off + nadd] = idata
off += nadd
data[:, off] = numpy.mean(idata, axis=1)
off += 1
else:
data[:, off] = idata
off += 1
# Read fast data + interpolate at rebinned 2-theta
if npts_rebin:
xnew = list(bliss_scan.read_counters(rebin_data, [("2th", True)], npts_rebin))[
0
][-1]
xold = list(bliss_scan.read_counters(fast_data, [("tth", True)], npts_fast))[0][
-1
]
for i, idata in bliss_scan.read_counters(fast_data, fast_ctrs_h5, npts_fast):
func = interp1d(xold, idata, kind="nearest", fill_value="extrapolate")
try:
data[:, fastoff + i] = func(xnew)
except Exception:
pass
else:
for i, idata in bliss_scan.read_counters(fast_data, fast_ctrs_h5, npts_fast):
data[:, fastoff + i] = idata
# Read slow data + interpolate at fast epoch
xold = list(bliss_scan.read_counters(slow_data, [("epoch", True)], npts_slow))[0][
-1
]
xnew = data[:, ctrs_spec.index("Epoch")]
for i, idata in bliss_scan.read_counters(slow_data, slow_ctrs_h5, npts_slow):
func = interp1d(xold, idata, kind="nearest", fill_value="extrapolate")
try:
data[:, slowoff + i] = func(xnew)
except Exception:
pass
# Scan data header
spec_lines.append(f"#N {len(ctrs_spec)}\n")
spec_lines.append(f"#L {' '.join(ctrs_spec)}\n")
# Scan data
buf = io.StringIO()
numpy.savetxt(buf, data, delimiter=" ", fmt=" ".join(ctrs_fmt))
spec_lines.append(buf.getvalue())
spec_lines.append("\n")
return spec_lines