"""Convert an HDF5 file to a SPEC .dat file
"""
import os
import io
import datetime
import logging
import numpy
from scipy.interpolate import interp1d
from silx.io.h5py_utils import retry, File
from . import dirutils
from . import specutils
logger = logging.getLogger(__name__)
[docs]
def convert_h5(
raw_filename,
outprefix=None,
entries=None,
outdirs=None,
primary_outdir=None,
retry_timeout=10,
rebin_filename=None,
ascii_extension=".dat",
):
"""
:param str raw_filename: full path of the Nexus file
:param str outprefix: something unique to the proposal/session
:param list entries: for example ["1.1", "1.2", ...]
:param dict outdirs:
:param str primary_outdir:
:param str rebin_filename: full path of the id22rebin file
:returns str, List[str]: primary spec output file and new entries added
"""
raw_filename = os.path.abspath(raw_filename)
if rebin_filename:
spec_filename = rebin_filename
else:
spec_filename = raw_filename
spec_filename = (
os.path.splitext(os.path.basename(spec_filename))[0] + ascii_extension
)
if outprefix:
spec_filename = outprefix + "_" + spec_filename
converted_entries = list()
outdirs = dirutils.prepare_outdirs(outdirs, primary_outdir)
if "primary" not in outdirs:
logger.warning("No primary output directory: not saving anything")
return spec_filename, converted_entries
if entries:
names = {}
for name in entries:
scannr = int(float(name))
names.setdefault(scannr, []).append(name)
else:
names = get_scan_names(raw_filename, retry_timeout=retry_timeout)
if not names:
logger.warning("No scans to convert")
return spec_filename, converted_entries
# Save SPEC header when no scans have been saved yet
has_new_data = False
saved = saved_scan_numbers(spec_filename, outdirs)
if not saved:
first_scan = sorted(names.items())[0][1][0]
start_time = get_start_time(
raw_filename, first_scan, retry_timeout=retry_timeout
)
specdata = create_spec_header(raw_filename, start_time=start_time)
has_new_data |= bool(specdata)
add_to_specfile(spec_filename, specdata, outdirs)
# Save scans
first_error = None
for scannr, subscans in sorted(names.items()):
if scannr in saved:
# fscan already saved
continue
if len(subscans) != 2:
# incomplete fscan
continue
# fscan with 2 complete subscans
subscan1, subscan2 = subscans
try:
specdata = read_fscan_data(
raw_filename,
subscan1,
subscan2,
rebin_filename=rebin_filename,
retry_timeout=retry_timeout,
)
except Exception as e:
if first_error is None:
first_error = e
continue
has_new_data |= bool(specdata)
add_to_specfile(spec_filename, specdata, outdirs)
converted_entries.extend(subscans)
if has_new_data:
dirutils.copy_file(spec_filename, outdirs)
if first_error is not None:
raise first_error
return dirutils.primary_file(spec_filename, outdirs), converted_entries
[docs]
@retry(retry_period=0.5, retry_timeout=10)
def get_scan_names(filename, title=None):
"""Get the subscan names for all scans in the Nexus file
:param str filename:
:param str title:id22
:returns dict: scannr(int)->subscan_names(list)
"""
with File(filename, mode="r") as h5file:
names = list(h5file["/"])
def include(name):
try:
scan = h5file[name]
except Exception as e:
logger.warning(
"cannot read scan " + repr(name) + " (cause: " + str(e) + ")"
)
return False
if "end_time" not in scan:
return False
if "measurement" not in scan:
return False
if title:
stitle = str_from_dataset(scan["title"])
if not any(s in stitle for s in ["fscan", "f2scan"]):
return False
return True
scans = dict()
for name in names:
if include(name):
scannr = int(float(name))
scans.setdefault(scannr, []).append(name)
return scans
[docs]
def saved_scan_numbers(filename, outdirs):
"""Scans saved in the SPEC file.
:param str filename:
:param dict outdirs:
:returns list(int):
"""
local_filename = dirutils.primary_file(filename, outdirs)
return specutils.saved_scan_numbers(local_filename)
[docs]
def add_to_specfile(spec_filename, specdata, outdirs):
"""
:param str spec_filename:
:param list(2-tuple) specdata:
:param dict outdirs:
"""
if not outdirs:
return
local_filename = dirutils.primary_file(spec_filename, outdirs)
dirname = os.path.dirname(local_filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
for mode, lines in specdata:
with open(local_filename, mode) as f:
f.writelines(lines)
MOTOR_NAMES = [
["tth", "om", "manom", "mantth", "mantr", "Dh", "Dhd"],
["Dhm", "Dhu", "spinp", "bluspin", "t1h", "t1h1", "t1h2", "t1x"],
["t1y", "t1rz", "t1trans", "robtran", "s3vg", "s3vo", "s3hg", "s3ho"],
["s4vg", "s4vo", "s4hg", "s4ho", "u26b", "chi"],
["d2dtran", "Dy", "Dyu", "Dyd", "Drx", "Dry"],
["mos", "rst", "rsg", "redtrans"],
["xtrans", "ytrans", "ztrans", "gasspin", "DET_Z", "DET_X", "DET_Y", "DET_RZ"],
]
[docs]
@retry(retry_period=0.5, retry_timeout=10)
def get_start_time(filename, scan):
"""
:param str filename:
:param str scan:
:returns str:
"""
with File(filename, mode="r") as h5file:
return str_from_dataset(h5file[scan]["start_time"])
[docs]
@retry(retry_period=0.5, retry_timeout=10)
def read_fscan_data(raw_filename, subscan1, subscan2, rebin_filename=None):
"""
:param str raw_filename:
:param str subscan1:
:param str subscan2:
:param str rebin_filename:
:returns list(2-tuple):
"""
with File(raw_filename, mode="r") as h5file:
gsubscan1 = h5file[subscan1]
gsubscan2 = h5file[subscan2]
if rebin_filename:
with File(rebin_filename, mode="r") as h5filerebin:
try:
rebinscan = h5filerebin[subscan1]
except KeyError:
return list()
return _read_fscan_data(gsubscan1, gsubscan2, rebinscan=rebinscan)
else:
return _read_fscan_data(gsubscan1, gsubscan2)
def _read_fscan_data(subscan1, subscan2, rebinscan=None):
"""
:param h5py.Group subscan1:
:param h5py.Group subscan2:
:param h5py.Group rebinscan:
:returns list(2-tuple):
"""
specdata = []
fast_data = subscan1["measurement"]
slow_data = subscan2["measurement"]
if rebinscan is None:
rebin_data = None
else:
rebin_data = rebinscan["id22rebin/data"]
fscan_params = subscan1["instrument/fscan_parameters"]
positioners_start = subscan1["instrument/positioners_start"]
try:
machine = subscan1["instrument/machine"]
except KeyError:
machine = {}
try:
robot = subscan1["instrument/robot"]
except KeyError:
robot = {}
# Scan parameters
scannr = subscan1.name[1:].split(".")[0]
start_time = str_from_dataset(subscan1["start_time"])
start_pos = float(fscan_params["start_pos"][()])
step = float(fscan_params["step_size"][()])
no_scan_points = float(fscan_params["npoints"][()])
acq_time = float(fscan_params["acq_time"][()])
end_pos = "{:.2f}".format(start_pos + step * no_scan_points)
deg_per_min = "{:.2f}".format(step / acq_time * 60)
# Scan header
lines = []
specdata.append(("a", lines))
lines.append(
"#S "
+ scannr
+ " hookscan "
+ str_from_dataset(fscan_params["motor"])
+ " "
+ read_position(fscan_params, "start_pos", "{:.2f}")
+ " "
+ end_pos
+ " "
+ deg_per_min
+ " "
+ read_position(fscan_params, "acq_time", "{:.5f}", modif=lambda x: x * 1000)
+ " "
+ "\n"
)
lines.append("#D " + start_time + "\n")
lines.append(
"#T " + read_position(fscan_params, "acq_time", "{:.5f}") + " (Seconds)\n"
)
lines.append("#Q \n")
for i, names in enumerate(MOTOR_NAMES):
positions = " ".join(
[read_position(positioners_start, name, "{:.4f}") for name in names]
)
lines.append("#P{} ".format(i) + positions + "\n")
lines.append("#UMI0 Current AutoM Shutter U26B_GAP \n")
lines.append(
"#UMI1"
+ " "
+ read_position(machine, "current", "{:.4f}")
+ " "
+ str_from_dataset(machine.get("automatic_mode"))
+ " "
+ str_from_dataset(machine.get("front_end"))
+ " "
+ read_position(positioners_start, "u26b", "{:.4f}")
+ "\n"
)
lines.append(
"#UMI2"
+ " Refill in "
+ str_from_dataset(machine.get("refill_countdown"))
+ " sec,"
+ " Fill Mode: "
+ str_from_dataset(machine.get("mode"))
+ ","
+ " Op. Message: "
+ str_from_dataset(machine.get("message"))
+ "\n"
)
lines.append(
"#CR"
+ " Last robot sample loaded: "
+ str_from_dataset(robot.get("sample_label"))
+ "\n"
)
# Slow counters
slow_ctrs_spec = [
"blowerT",
"Cryostream",
"Cryostat",
"Press_in",
"Press_out",
"monin",
"bmon",
]
slow_ctrs_fmt = ["%4.3f", "%4.3f", "%4.3f", "%7.4f", "%7.4f", "%.5e", "%.5e"]
slow_ctrs_h5 = [
("blower_in", False),
("ox700", False),
("ls340_A", False),
("pace_in", False),
("pace_press", False),
("monin", False),
("bmon", False),
]
nslow_ctrs = len(slow_ctrs_spec)
npts_slow = min_npts_ctrs(slow_data, slow_ctrs_h5)
# Fast counters
if "eiger" in fast_data:
prefix = "eiger_roi"
nchannels = 13
else:
prefix = "ma"
nchannels = 9
if rebin_data is None:
fast_ctrs_spec = (
["2_theta"]
+ ["MA{}".format(i) for i in range(nchannels)]
+ ["Monitor", "Epoch", "Omega"]
)
fast_ctrs_fmt = ["%3.8f"] + ["%i"] * nchannels + ["%i", "%15.8f", "%3.8f"]
fast_ctrs_h5 = (
["tth"]
+ [prefix + str(i) for i in range(nchannels)]
+ ["mon", "epoch_trig", "om"]
)
else:
fast_ctrs_spec = ["Epoch", "Omega"]
fast_ctrs_fmt = ["%3.8f"] * len(fast_ctrs_spec)
fast_ctrs_h5 = ["epoch_trig", "om"]
not_required = ["om"]
fast_ctrs_h5 = [(name, name not in not_required) for name in fast_ctrs_h5]
nfast_ctrs = len(fast_ctrs_spec)
npts_fast = min_npts_ctrs(fast_data, fast_ctrs_h5)
# Rebin counters
if rebin_data is None:
rebin_ctrs_h5 = []
rebin_ctrs_spec = []
rebin_ctrs_fmt = []
rebin_ctrs_h5 = []
else:
rebin_ctrs_h5 = ["2th", "I_sum", "norm"]
rebin_ctrs_spec = ["2_theta"] + [
f"{name}{i}"
for name in ["MA", "Mon"]
for i in list(range(nchannels)) + ["av"]
]
rebin_ctrs_fmt = (
["%3.8f"] + ["%i"] * nchannels + ["%3.8f"] + ["%i"] * nchannels + ["%3.8f"]
)
rebin_ctrs_h5 = [(name, True) for name in rebin_ctrs_h5]
nrebin_ctrs = len(rebin_ctrs_spec)
npts_rebin = min_npts_ctrs(rebin_data, rebin_ctrs_h5)
# Prepare data
if npts_rebin:
nrows = npts_rebin
else:
nrows = npts_fast
ncols = nrebin_ctrs + nfast_ctrs + nslow_ctrs
data = numpy.zeros((nrows, ncols))
ctrs_spec = rebin_ctrs_spec + fast_ctrs_spec + slow_ctrs_spec
ctrs_fmt = rebin_ctrs_fmt + fast_ctrs_fmt + slow_ctrs_fmt
rebinoff = 0
fastoff = nrebin_ctrs
slowoff = nrebin_ctrs + nfast_ctrs
# Read rebin data
off = rebinoff
for i, idata in read_ctrs(rebin_data, rebin_ctrs_h5, npts_rebin):
if idata.ndim == 2:
idata = idata.T
nadd = idata.shape[-1]
data[:, off : off + nadd] = idata
off += nadd
data[:, off] = numpy.mean(idata, axis=1)
off += 1
else:
data[:, off] = idata
off += 1
# Read fast data + interpolate at rebinned 2-theta
if npts_rebin:
xnew = list(read_ctrs(rebin_data, [("2th", True)], npts_rebin))[0][-1]
xold = list(read_ctrs(fast_data, [("tth", True)], npts_fast))[0][-1]
for i, idata in read_ctrs(fast_data, fast_ctrs_h5, npts_fast):
func = interp1d(xold, idata, kind="nearest", fill_value="extrapolate")
try:
data[:, fastoff + i] = func(xnew)
except Exception:
pass
else:
for i, idata in read_ctrs(fast_data, fast_ctrs_h5, npts_fast):
data[:, fastoff + i] = idata
# Read slow data + interpolate at fast epoch
xold = list(read_ctrs(slow_data, [("epoch", True)], npts_slow))[0][-1]
xnew = data[:, ctrs_spec.index("Epoch")]
for i, idata in read_ctrs(slow_data, slow_ctrs_h5, npts_slow):
func = interp1d(xold, idata, kind="nearest", fill_value="extrapolate")
try:
data[:, slowoff + i] = func(xnew)
except Exception:
pass
# Scan data header
lines = []
specdata.append(("a", lines))
lines.append("#N {}\n".format(ncols))
lines.append("#L " + " ".join(ctrs_spec) + "\n")
# Scan data
lines = []
specdata.append(("ab", lines))
f = io.BytesIO()
numpy.savetxt(f, data, delimiter=" ", fmt=" ".join(ctrs_fmt))
lines.append(f.getbuffer())
lines.append(b"\n")
return specdata
[docs]
def read_ctrs(group, ctrs, npts):
"""Read datasets
:param h5py.Group group:
:param list(2-tuple) ctrs:
:param int npts:
:yield numpy.ndarray:
"""
for i, (name, must_exist) in enumerate(ctrs):
try:
dset = group[name]
except KeyError:
if must_exist:
raise
else:
try:
data = dset[:npts]
except Exception as e:
logger.warning(
"skip counter data " + repr(name) + " (cause: " + str(e) + ")"
)
else:
if not len(data):
logger.warning("no data in " + repr(name))
continue
data[numpy.isnan(data)] = 0
yield i, data
[docs]
def min_npts_ctrs(group, ctrs):
"""Smallest number of points of a group of datasets.
:param h5py.Group group:
:param list(2-tuple) ctrs:
:returns int:
"""
if not ctrs:
return 0
npts = []
for name, must_exist in ctrs:
try:
dset = group[name]
except KeyError:
if must_exist:
raise
else:
npts.append(dset.shape[-1])
return min(npts)
[docs]
def str_from_dataset(dataset):
"""Read dataset as a string
:param h5py.Dataset dataset:
:returns str:
"""
if isinstance(dataset, str):
return dataset
if dataset is None:
return "UNKNOWN"
try:
return dataset.asstr()[()]
except (AttributeError, TypeError):
return str(dataset[()])
[docs]
def read_position(grp, key, fmt, modif=None):
"""Read a motor position from grp[key], return "-999" when missing.
:param h5py.Group grp:
:param str key:
:param callable or None modif:
:returns str:
"""
if key in grp:
pos = grp[key][()]
if pos == "*DIS*":
return str(-999)
try:
num = float(fmt.format(pos))
except Exception as e:
raise RuntimeError("Error in formatting motor position " + repr(key)) from e
if modif:
num = modif(num)
return str(num)
return str(-999)