import logging
import os
import shutil
import subprocess
import tempfile
import time
import h5py
import numpy
from ewokscore import Task
from multianalyzer._multianalyzer import MultiAnalyzer
from multianalyzer.file_io import FSCAN
from multianalyzer.file_io import save_rebin as _save_rebin
from multianalyzer.file_io import topas_parser
try:
from multianalyzer.opencl import OclMultiAnalyzer
except ImportError:
OclMultiAnalyzer = None
from silx.io.h5py_utils import File
from silx.io.h5py_utils import retry
from silx.io.h5py_utils import safe_top_level_names
from . import dirutils
from .roitools import unscramble_roi_collection
HDF5_PLUGIN_PATH = os.environ.get("HDF5_PLUGIN_PATH")
if HDF5_PLUGIN_PATH:
del os.environ["HDF5_PLUGIN_PATH"]
logger = logging.getLogger(__name__)
[docs]
class ID22Rebin(
Task,
input_names=["filename", "parsfile"],
optional_input_names=[
"entries",
"debug",
"wavelength",
"energy",
"step",
"range",
"phi",
"iter",
"startp",
"endp",
"pixel",
"width",
"delta2theta",
"device",
"outdirs",
"primary_outdir",
"outprefix",
"unscramble_method",
"retry_timeout",
"columnorder",
"num_col",
"overwrite",
],
output_names=["outfile", "entries"],
):
"""
Rebin data from area detector ROIs corresponding to multiple analyzer crystals
to produce per-crystal 1D ROIs for each entry (Bliss scan) in the HDF5 file.
Each entry in the input HDF5 file contains 1D ROIs—one per analyzer crystal—
for every diffractometer angle. These 1D ROIs are produced by LIMA summing the
corresponding 2D ROI on the area detector along one axis.
The geometry of the detector and analyzer crystals is accounted for using parameters
from the `parsfile`.
"""
[docs]
def run(self):
outdirs = self.get_input_value("outdirs", {})
primary_outdir = self.get_input_value("primary_outdir", None)
outdirs = dirutils.prepare_outdirs(outdirs, primary_outdir)
overwrite = self.get_input_value("overwrite", False)
param = topas_parser(self.inputs.parsfile)
# Ensure all units are consistent. Here lengths are in millimeters.
L = param["L1"]
L2 = param["L2"]
# Angles are all given in degrees
center = numpy.array(param["centre"])
psi = numpy.rad2deg(param["offset"])
rollx = numpy.rad2deg(param["rollx"])
rolly = numpy.rad2deg(param["rolly"])
tha = numpy.rad2deg(param["manom"])
thd = numpy.rad2deg(param["mantth"])
pixel = self.get_input_value("pixel", 75e-3) # mm
width = self.get_input_value("width", None)
delta2theta = self.get_input_value("delta2theta", 0.0)
sdelta2theta = str(delta2theta).replace(".", "")
outfilename = (
os.path.splitext(os.path.basename(self.inputs.filename))[0]
+ f"_w{sdelta2theta}.h5"
)
if not self.missing_inputs.outprefix:
outfilename = self.inputs.outprefix + "_" + outfilename
outfile = dirutils.primary_file(outfilename, outdirs)
# Initialize the rebinning engine.
if self.get_input_value("device", None) is not None and OclMultiAnalyzer:
mma = OclMultiAnalyzer(
L,
L2,
pixel,
center,
tha,
thd,
psi,
rollx,
rolly,
device=str(self.get_input_value("device")).split(","),
)
print(f"Using device {mma.ctx.devices[0]}")
else:
mma = MultiAnalyzer(L, L2, pixel, center, tha, thd, psi, rollx, rolly)
print("Using Cython+OpenMP")
filename = self.inputs.filename
entries = self.get_input_value("entries", None)
unscramble_method = self.get_input_value("unscramble_method", "xy")
print(f"Unscrambling of ROI collection: {unscramble_method}")
retry_timeout = self.get_input_value("retry_timeout", 10)
exclude_entries = None
if outfile and os.path.exists(outfile):
exclude_entries = safe_top_level_names(outfile, retry_timeout=retry_timeout)
if exclude_entries:
if overwrite:
logger.warning(
"Overwriting existing entries %s in file %s",
exclude_entries,
outfile,
)
exclude_entries = None
else:
logger.warning(
"Skipping existing entries %s in file %s",
exclude_entries,
outfile,
)
hdf5_data = read_data(
filename,
unscramble_method=unscramble_method,
entries=entries,
exclude_entries=exclude_entries,
retry_timeout=retry_timeout,
)
for entry, one_entry in hdf5_data.items():
print(f"Rebin data from {filename}::{entry}")
start_time = time.perf_counter()
arm = one_entry["arm"]
mon = one_entry["mon"]
roicol = one_entry["roicol"]
kept_points = min(len(roicol), len(arm), len(mon))
roicol = roicol[:kept_points]
arm = arm[:kept_points]
mon = mon[:kept_points]
scan = one_entry["scan"]
dtth = self.get_input_value("step", None) or scan.step_size
tth_min = scan.start + psi.min()
tth_max = scan.stop + psi.max()
if self.get_input_value("range", None):
_range = self.get_input_value("range")
if numpy.isfinite(_range[0]):
tth_min = _range[0]
if numpy.isfinite(_range[1]):
tth_max = _range[1]
num_col = self.get_input_value("num_col", None)
if num_col is None:
num_col = 1
columnorder = self.get_input_value("columnorder", None)
if columnorder is None:
columnorder = 0
res = mma.integrate(
roicol,
arm,
mon,
tth_min,
tth_max,
dtth=dtth,
iter_max=self.get_input_value("iter", 250),
roi_min=self.get_input_value("startp", 0),
roi_max=self.get_input_value("endp", 1024),
phi_max=self.get_input_value("phi", 75), # degrees
num_row=512,
num_col=num_col,
columnorder=columnorder, # // 0: (column=31, channel=13, row=512), 1: (channel=13, column=31, row=512), 2: (channel=13, row=512, column=31)
width=width or param.get("wg", 0.0),
dtthw=delta2theta,
)
if outfile:
print(f"Save to {outfile}::{entry}")
save_rebin(
outfile,
entry,
overwrite=overwrite,
beamline="id22",
name="id22rebin",
topas=param,
res=res,
start_time=start_time,
)
self.outputs.outfile = outfile
self.outputs.entries = entries
[docs]
@retry(retry_period=0.5, retry_timeout=10)
def read_data(filename, unscramble_method=None, entries=None, exclude_entries=None):
"""Read data to rebin."""
res = {}
with File(filename, mode="r") as fh:
if entries:
entries = set(entries)
else:
entries = {k for k, v in fh.items() if v.attrs.get("NX_class") == "NXentry"}
if exclude_entries:
entries -= set(exclude_entries)
try:
entries = sorted(entries, key=lambda x: float(x))
except TypeError:
entries = sorted(entries)
entries = [entry for entry in entries if entry.endswith(".1")]
print(f"Reading and unscrambling {len(entries)} entries: {entries}")
for entry in entries:
title = entry + "/title"
if title not in fh:
logger.warning(f"skip scan {entry}: no 'title'")
continue
end_time = entry + "/end_time"
if end_time not in fh:
logger.warning(f"skip scan {entry}: no 'end_time'")
continue
title = fh[title][()]
try:
title = title.decode()
except Exception:
pass
if not title.startswith("fscan") and not title.startswith("f2scan"):
logger.warning(f"skip scan {entry}: {title}")
continue
sort_data = {}
non_sort_data = {}
try:
entry_grp = fh[entry]
sort_data["x"] = entry_grp[
"instrument/eiger_roi_collection/selection/x"
][()]
sort_data["y"] = entry_grp[
"instrument/eiger_roi_collection/selection/y"
][()]
sort_data["roicol"] = entry_grp["measurement/eiger_roi_collection"][()]
scan_col = entry_grp["instrument/fscan_parameters"]
motor_name = scan_col["motor"][()]
try:
motor_name = motor_name.decode()
except AttributeError:
pass
non_sort_data["scan"] = FSCAN(
motor_name,
float(scan_col["start_pos"][()]),
float(scan_col["step_size"][()]),
int(scan_col["npoints"][()]),
float(scan_col["step_time"][()]),
)
non_sort_data["arm"] = entry_grp["measurement/tth"][()]
non_sort_data["mon"] = entry_grp["measurement/mon"][()]
non_sort_data["tha"] = entry_grp["instrument/positioners/manom"][()]
non_sort_data["thd"] = entry_grp["instrument/positioners/mantth"][()]
except KeyError as error:
logger.warning(str(error))
continue
if unscramble_method == "xy":
keys = "x", "y"
elif unscramble_method == "yx":
keys = "y", "x"
else:
keys = None
if keys:
sort_data = unscramble_roi_collection(sort_data, keys)
res[entry] = {**sort_data, **non_sort_data}
for name, val in res[entry].items():
if name == "scan":
print(f"Scan description: {val}")
else:
try:
print(f"{name} shape = {val.shape}")
except (TypeError, AttributeError):
pass
return res
[docs]
def save_rebin(filename: str, entry: str, overwrite: bool = False, **kwargs):
"""Save rebin result."""
if overwrite:
try:
_delete_entry(filename, entry)
except Exception:
logger.exception(
"Failed deleting /%s from HDF5 file %r failed", entry, filename
)
_save_rebin(filename, entry=entry, **kwargs)
def _delete_entry(filename: str, entry: str) -> None:
if not os.path.exists(filename):
return
remove_file = False
with h5py.File(filename, mode="a") as fh:
entries = list(fh)
if entries == [entry]:
remove_file = True
elif entry in entries:
del fh[entry]
if remove_file:
os.remove(filename)
else:
_repack_hdf5(filename)
def _repack_hdf5(filename: str) -> None:
tmp_fd, tmp_name = tempfile.mkstemp(suffix=".h5", dir=os.path.dirname(filename))
os.close(tmp_fd)
try:
subprocess.run(
["h5repack", filename, tmp_name],
check=True,
)
shutil.move(tmp_name, filename)
except Exception:
logger.exception("Repacking HDF5 file %r failed", filename)
finally:
if os.path.exists(tmp_name):
os.remove(tmp_name)