Source code for ewoksid22.id22sum

"""Script for batch runs with id22sum and id22sumall:
processes powder diffraction data from a multianalyser instrument

Fortran source code: https://github.com/jonwright/id31sum

Written by Ola G. Grendal @ ID22 ESRF
"""

import os
import sys
import subprocess
import glob

import numpy
import h5py
import hdf5plugin  # noqa F401

try:
    import matplotlib.pyplot as plt
except ImportError:
    plt = None


from . import specutils


[docs] def usage_msg(name=None): """Function for re-defining the usage-message when no and/or wrong input is given""" return """id22sumepy [-h, for more help than given here] -i, start of, or full filename. Ex: ch5900_S1_ or ch5900_S1_RT_0001.dat -b, binsize. Ex: 0.002 [-int, True for interactive mode, default is False] """
[docs] def epilog_msg(name=None): """Function for re-defining the usage-message when no and/or wrong input is given""" return """ PRO TIP #1): Put filename in quotes with * or ? for linux wildcard expansion. Ex: -i \"hc4000_sample*_\" """
[docs] def generate_sum_cmd( program, programdir, filename, binsize, first_scan, last_scan, lowtth, scaling, excludedetector, excludescan, zingit, advanced=None, ): """Fortran sum program command""" if first_scan is None: return "" if programdir: program = os.path.join(programdir, program) cmd = [ program, filename, str(binsize), str(first_scan), str(last_scan), f"lowtth={lowtth}", ] if scaling is None: cmd.append("scalmon") elif scaling.startswith("scal"): cmd.append("{}".format(scaling)) if excludescan: cmd.append("es={}".format(excludescan)) if excludedetector: cmd.append("ed={}".format(excludedetector)) if zingit: cmd.append("ef={}.zin".format(os.path.basename(filename))) if advanced is not None: for i in advanced: cmd.append(i) return cmd
def _iter_numbers(list_of_numbers): for numbers in list_of_numbers: if numbers is None: continue if isinstance(numbers, int): yield numbers continue if isinstance(numbers, str): numbers = numbers.split(",") for number in numbers: if isinstance(number, str): number = int(number) yield number
[docs] def parse_scan_numbers( first_scan, last_scan, include_scans=tuple(), exclude_scans=tuple() ): """Exclude scan numbers: exclude from start, missing from spec file, interactive prompt""" if include_scans: include_numbers = sorted(set(_iter_numbers(include_scans))) if include_numbers: first_scan = max(min(include_numbers), first_scan) last_scan = min(max(include_numbers), last_scan) exclude_numbers = sorted( {i for i in _iter_numbers(exclude_scans) if i >= first_scan and i <= last_scan} ) if exclude_numbers: if len(exclude_numbers) == (last_scan - first_scan + 1): first_scan, last_scan, exclude_numbers = None, None, None else: exclude_numbers = ",".join(list(map(str, exclude_numbers))) else: exclude_numbers = None return first_scan, last_scan, exclude_numbers
[docs] def yes_no(question): """Yes/no prompt. Returns True or False.""" answers = ["n", "no", "y", "yes"] while True: try: answer = input(question) except ValueError: print("Sorry, I did not understand that!") continue answer = answer.lower() if answer not in answers: print("Your response must be [y]es or [n]o") continue else: break if answer.startswith("y"): return True else: return False
[docs] def new_bin(question): """Retrieve new binsize""" while True: try: answer = float(input(question)) break except ValueError: print("Please type a valid number. Ex: 0.002") continue return str(answer)
[docs] def removed_scans(question): """Retrieve scans to be removed""" while True: answer = input(question) try: scannrs = [int(i) for i in answer.split(",")] break except ValueError: print("Please type a valid number or list of numbers. Ex: 2 or 1,2,5") return ",".join(map(str, scannrs))
[docs] def plot_xy( filename, first_scan, last_scan, es_scans, sum_single, sum_all, binsize=None, show=True, ): """Reads XY data and plots it""" if es_scans is None: exclude_scans = [] else: exclude_scans = [int(x) for x in es_scans.split(",")] all_scans = list(range(first_scan, last_scan + 1)) plot_scans = [x for x in all_scans if x not in exclude_scans] if sum_all: for i in plot_scans: xyfilename = "{}_{}.xye".format(os.path.splitext(filename)[0], i) data = numpy.loadtxt(xyfilename, usecols=(0, 1)).T plt.plot(data[0], data[1], label="Scan: {}".format(i)) if sum_single: if binsize: xyfilename = "{}_b{}.xye".format( os.path.splitext(filename)[0], str(binsize).replace(".", "") ) else: xyfilename = "{}.xye".format(os.path.splitext(filename)[0]) data = numpy.loadtxt(xyfilename, usecols=(0, 1)).T plt.plot(data[0], data[1], label="Final pattern") plt.plot( [], [], " ", label="To continue press: Ctrl + w" ) # Simple hack for adding explanatory text to legend plt.legend() plt.xlim(data[0].min(), data[0].max()) plt.title(str(filename)) if show: plt.show()
[docs] def find_h5_files(user_str): all_files = [] if any(element in user_str for element in ["*", "?", "["]): all_files = glob.glob(user_str + ".h5") else: basename = os.path.basename(user_str) dirname = os.path.dirname(user_str) if not dirname: dirname = "." for filename in os.listdir(dirname): if filename.startswith(basename) and filename.endswith(".h5"): all_files.append(os.path.join(dirname, filename)) print("id22sumpy: Found {} files starting with {}".format(len(all_files), user_str)) return all_files
[docs] def get_all_scans_h5(file_path): with h5py.File(file_path, "r") as h: all_scans = sorted(h.keys(), key=lambda x: float(x)) return all_scans
[docs] def read_h5(file_path, entry="1.1"): """Read h5-file and returns dict with data""" print("id22sumpy: Reading data...") res = {} with h5py.File(file_path, "r") as h: res["I"] = numpy.nan_to_num( h["/{}/id22rebin/data/I_MA".format(entry)][()], nan=0 ) res["I_sum"] = h["/{}/id22rebin/data/I_sum".format(entry)][()] res["tth"] = h["/{}/id22rebin/data/2th".format(entry)][()] res["norm"] = h["/{}/id22rebin/data/norm".format(entry)][()] return res
[docs] def save_spec(file_path, data, monav, entry="1.1"): scannr = int(float(entry)) header_1 = "#S {} hookscan \n".format(scannr) header_2 = "#N 28\n" header_3 = "#L 2_theta MA0 MA1 MA2 MA3 MA4 MA5 MA6 MA7 MA8 MA9 MA10 MA11 MA12 Mon0 Mon1 Mon2 Mon3 Mon4 Mon5 Mon6 Mon7 Mon8 Mon9 Mon10 Mon11 Mon12 Monav" header = header_1 + header_2 + header_3 fmt = "%3.8f" + 13 * " %i" + 14 * " %i" if not os.path.exists(file_path): dirname = os.path.dirname(file_path) if dirname: os.makedirs(dirname, exist_ok=True) with open(file_path, "wb+") as dat: numpy.savetxt( dat, numpy.c_[ data["tth"], numpy.transpose(data["I_sum"]), numpy.transpose(data["norm"]), numpy.transpose(monav), ], header=header, comments="", fmt=fmt, ) else: with open(file_path, "ab+") as dat: numpy.savetxt( dat, numpy.c_[ data["tth"], numpy.transpose(data["I_sum"]), numpy.transpose(data["norm"]), numpy.transpose(monav), ], header="\n" + header, comments="", fmt=fmt, ) print("id22sumpy: Saved scan {} to {}".format(scannr, file_path)) return scannr
[docs] def execute_cmd(cmd, wd): cmd = list(map(str, cmd)) print("\nid22sumpy: Execute " + " ".join(cmd)) output = subprocess.check_output(cmd, universal_newlines=True, cwd=wd) if output: print(output)
[docs] def run( filename, binsize, lowtth=0, scaling=None, entries=None, excludescan=None, excludedetector=None, interactive=0, sum_single=1, # extract+sum all scans sum_all=1, # extract individual scans zingit=0, advanced=tuple(), outdir=None, resbasename="temp.res", ascii_extension=".dat", programdir=None, ): if outdir is None: outdir = os.getcwd() resfile = os.path.join(outdir, resbasename) do_sum = sum_single or sum_all if do_sum and not os.path.isfile(resfile): raise RuntimeError("Cannot find {}".format(resfile)) print("\nid22sumpy: processing file " + filename) if entries: requested_scans = sorted(set(map(int, entries))) else: requested_scans = None if filename.endswith(ascii_extension): specfile = filename scan_numbers = specutils.saved_scan_numbers(specfile) else: # Convert HDF5 to ASCII scans_h5 = get_all_scans_h5(filename) if not scans_h5: print("id22sumpy: No scans to process") return specfile = os.path.splitext(filename)[0] + ascii_extension specfile = os.path.join(outdir, os.path.basename(specfile)) processed = specutils.saved_scan_numbers(specfile) scan_numbers = list() for entry in scans_h5: if not entry.endswith(".1"): continue scannr = int(float(entry)) if scannr in processed: continue data = read_h5(filename, entry) monav = numpy.mean(data["norm"], axis=0) save_spec(specfile, data, monav, entry) scan_numbers.append(scannr) if not scan_numbers: print("id22sumpy: no scans to process") return specfile if zingit: program = "zingit" if programdir: program = os.path.join(programdir) cmd = [program, specfile, resbasename] execute_cmd(cmd, outdir) first = True while first or ( interactive and not yes_no("Are you happy with selected binsize and scans[y/n]? ") ): if not first: if yes_no("Do you want to change binsize[y/n]? "): sbinsize = new_bin( f"Please type new binsize for {specfile} [Ex:0.002]: " ) if yes_no("Do you want to exclude specific scans[y/n]? "): es_new = removed_scans( f"Please type scan(s) to be removed from {specfile} [Ex:2 or 1,2,5]: " ) else: es_new = None first = False sbinsize = str(binsize) cmdfilename = os.path.abspath(specfile) if sum_all: # Only process the scans for which the xye file does not exist basename = os.path.splitext( os.path.join(outdir, os.path.basename(specfile)) )[0] xybfile_sum_all_format = basename + "_b{}_{{}}.xye".format( sbinsize.replace(".", "") ) scan_numbers_sum_all = [ number for number in scan_numbers if not os.path.exists(xybfile_sum_all_format.format(number)) ] first_scan = min(scan_numbers_sum_all) last_scan = max(scan_numbers_sum_all) missing_scans = sorted( set(range(first_scan, last_scan + 1)) - set(scan_numbers_sum_all) ) ( first_scan_parsed, last_scan_parsed, excludescan_parsed, ) = parse_scan_numbers( first_scan, last_scan, include_scans=(requested_scans,), exclude_scans=(excludescan, missing_scans, es_new), ) cmd = generate_sum_cmd( "id22sumalle", programdir=programdir, filename=cmdfilename, binsize=binsize, first_scan=first_scan_parsed, last_scan=last_scan_parsed, lowtth=lowtth, scaling=scaling, excludedetector=excludedetector, excludescan=excludescan_parsed, zingit=zingit, advanced=advanced, ) if cmd: execute_cmd(cmd, outdir) else: print("id22sumpy: no scans to process (sum all)") if sum_single: scan_numbers_sum_single = scan_numbers first_scan = min(scan_numbers_sum_single) last_scan = max(scan_numbers_sum_single) missing_scans = sorted( set(range(first_scan, last_scan + 1)) - set(scan_numbers_sum_single) ) ( first_scan_parsed, last_scan_parsed, excludescan_parsed, ) = parse_scan_numbers( first_scan, last_scan, exclude_scans=(excludescan, missing_scans, es_new), ) cmd = generate_sum_cmd( "id22sume", programdir=programdir, filename=cmdfilename, binsize=binsize, first_scan=first_scan_parsed, last_scan=last_scan_parsed, lowtth=lowtth, scaling=scaling, excludedetector=excludedetector, excludescan=excludescan_parsed, zingit=zingit, advanced=advanced, ) if cmd: execute_cmd(cmd, outdir) else: print("id22sumpy: no scans to process (sum single)") if interactive: plot_xy(filename, first_scan, last_scan, excludescan, sum_single, sum_all) return specfile
[docs] def main(argv=None): import argparse from datetime import datetime if argv is None: argv = sys.argv starttime = datetime.now() parser = argparse.ArgumentParser( description="Script for processing of high resolution diffraction data using id22sume @ ID22, ESRF ", usage=usage_msg(), epilog=epilog_msg(), formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "-i", "--filename", type=str, required=True, help="Full filename of a .h5-file, or start of filename for processing all files with same starting filename. Ex: hc4000_sample1_", ) parser.add_argument( "-b", "--binsize", type=str, required=True, help="Binsize. Ex: 0.002" ) parser.add_argument( "-lowtth", "--lowtth", type=float, default=0, help="Set low tth. Default is 0." ) parser.add_argument( "-scal", "--scaling", type=str, default=None, help="Pattern scaling options (default is option 1 with default value for XXX): 1) scalmon=XXX: for counts per XXX monitor counts (default = 100000) 2) scalpk: to scale highest peak to correct number of obs counts 3) scaltot: to make total number of counts in scan correct 4) no: for no pattern scaling", ) parser.add_argument( "-es", "--excludescan", type=str, help="Scans to be excluded. Ex: 3 or 3,5,8. Default is none.", ) parser.add_argument( "-ed", "--excludedetector", type=str, help="Detectors to be excluded. Ex: 3 or 3,5,8. Default is none.", ) parser.add_argument( "-int", "--interactive", type=int, default=0, help="Set to 1 if you want plotting and the option to change binsize and/or selected scans for each .dat-file being processed. Default is not interactive mode.", ) parser.add_argument( "-all", "--sum_all", type=int, default=1, help="Set to 0 if you do NOT want to do id22sumall. Default is to do id22sumall.", ) parser.add_argument( "-single", "--sum_single", type=int, default=1, help="Set to 0 if you do NOT want to do id22sum. Default is to do id22sum.", ) parser.add_argument( "-zin", "--zingit", type=int, default=0, help="Set 1 if you need to do zingit (looking for saturated pixels in data). Default is not to do it. !!SHOULD NOT BE NEEDED ANYMORE!!", ) parser.add_argument( "-a", "--advanced", type=str, default=None, nargs="+", help="For using any of the id22sum options not directly supported with this python-script. Ex: hightth=120 zap=1 gsas. Type id22sum+Enter for full list of options.", ) parser.add_argument( "--programdir", type=str, default=None, help="Directory of the fortran programs.", ) args = parser.parse_args(argv[1:]) programdir = args.programdir if not programdir and os.path.exists("/users/opid22/bin"): programdir = "/users/opid22/bin" all_h5_files = find_h5_files(args.filename) if not all_h5_files: print( "Did not find any files starting/matching with {}...".format(args.filename) ) for filename in all_h5_files: run( filename, binsize=args.binsize, lowtth=args.lowtth, scaling=args.scaling, excludescan=args.excludescan, excludedetector=args.excludedetector, interactive=args.interactive, sum_single=args.sum_single, sum_all=args.sum_all, zingit=args.zingit, advanced=args.advanced, programdir=args.programdir, ) print( "Finished! :) Time taken in hh:min:sec : {} ".format(datetime.now() - starttime) )
if __name__ == "__main__": sys.exit(main())