import json import numpy as np import os import scipy from scipy.optimize import minimize from scipy.optimize import curve_fit from scipy.signal import argrelextrema from pystog import Pre_Proc import matplotlib.pyplot as plt import random rebin = Pre_Proc.rebin # ================ # Input parameters # vvvvvvvvvvvvvvvv # ---------- # Some flags # ---------- testing = False # flag for testing # ------------------------------------------------------------------ # Directory and config files definition # # run_dir => directory for storing the reduced data files # according to the the two theta and azimuthal # angle bins. # out_dir => directory for storing the outputs. # det_map_file => detector group information for all the two # theta and azimuthal bins. # ttheta_group_file => config file defining groups for second stage # processing. # ------------------------------------------------------------------ run_dir = os.path.join( ".", "texture_proc" ) out_dir = os.path.join( run_dir, "texture_proc_output" ) det_map_file = os.path.join( "/SNS/NOM/shared/scripts/texture", "nom_texture_pointer.json" ) ttheta_group_file = os.path.join( ".", "ttheta_group_params_new_new_new.json" ) # --------------------- # Processing parameters # --------------------- stem_name = "NOM_Si_640e_" ttheta_binw = 5. # in degrees azm_binw = 10. # in degrees header_num = 2 sph_l_max = 1 # for first stage tt_sph_l_max = 2 # for second stage amp_thresh = .9 # Threshold for peak intensity -- in a certain two theta # bins, all patterns with the fitted peak intensity # different from the median intensity by more than this # percentage threshold will be discarded. # ^^^^^^^^^^^^^^^^ # Input parameters # ================ if testing: print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("[Warning] Testing mode is on.") print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") def gaussian(x, a, b, c): """Gaussian function. Parameters ---------- x : ndarray Independent variable. a : float Amplitude. b : float Center. c : flaot Width. Returns ------- float Gaussian function value. """ return a * np.exp(-(x - b)**2 / (2 * c**2)) def bkg_expo(x, a, b, c): """Background exponential function. Parameters ---------- x : ndarray Independent variable. a : float offset. b : float coefficient. c : flaot exponent. Returns ------- float Exponential background value. """ return a - b * np.exp(-c * x**2.) def sph_harm_real(m, l, theta, phi): """Real spherical harmonics. Refer to the Wikipedia, https://en.wikipedia.org/wiki/Table_of_spherical_harmonics We could follow the standard definition of the spherical harmonics to define the function here, but for the purpose of this script, we only need the first few terms so we just implemented the function directly. Parameters ---------- m : order of spherical harmonics l : degree of spherical harmonics theta : polar angle in radian phi : azimuthal angle in radian Returns ------- float Spherical harmonics real value """ if l == 0 and m == 0: return np.sqrt(1. / (4. * np.pi)) if l == 1 and m == -1: return np.sqrt(3. / (4. * np.pi)) * np.sin(phi) * np.sin(theta) if l == 1 and m == 0: return np.sqrt(3. / (4. * np.pi)) * np.cos(theta) if l == 1 and m == 1: return np.sqrt(3. / (4. * np.pi)) * np.cos(phi) * np.sin(theta) if l == 2 and m == -2: part = np.sin(phi) * np.cos(phi) * (np.sin(theta))**2. return np.sqrt(15. / (4. * np.pi)) * part if l == 2 and m == -1: part = np.sin(phi) * np.sin(theta) * np.cos(theta) return np.sqrt(15. / (4. * np.pi)) * part if l == 2 and m == 0: part = 3. * (np.cos(theta))**2. - 1. return np.sqrt(5. / (16. * np.pi)) * part if l == 2 and m == 1: part = np.cos(phi) * np.sin(theta) * np.cos(theta) return np.sqrt(15. / (4. * np.pi)) * part if l == 2 and m == 2: part = ((np.cos(phi))**2. - (np.sin(phi))**2.) * (np.sin(theta))**2. return np.sqrt(15. / (16. * np.pi)) * part def target_function(params, sph_mat, txt_int): """Target function for the optimization of the spherical harmonic coefficients. Parameters ---------- params : ndarray Spherical harmonic coefficients. sph_mat : ndarray Spherical harmonic matrix. txt_int : ndarray Textured S(Q) intensity. Returns ------- float : Norm of the residuals. """ result = sph_mat @ params residuals = result - txt_int return np.linalg.norm(residuals) if not os.path.exists(out_dir): os.makedirs(out_dir) # ================ # Read in Q values # ================ print("[Info] Read in Q values...") bank = 1 while True: tr_file = os.path.join( run_dir, stem_name + f"bank{bank}.dat" ) if os.path.exists(tr_file): data = np.loadtxt(tr_file, skiprows=header_num, dtype=float) q_vals = data[:, 0] break else: bank += 1 # =================== # Read in S(Q) values # =================== print("[Info] Read in all S(Q)'s...") num_groups = int(180. / ttheta_binw) * int(360. / azm_binw) all_sofq = dict() for i in range(num_groups): group_tmp = i + 1 tr_file = os.path.join( run_dir, stem_name + "bank%d.dat" % group_tmp ) if os.path.exists(tr_file): data = np.loadtxt(tr_file, skiprows=header_num, dtype=float) sofq_tmp = data[:, 1] all_sofq[group_tmp] = sofq_tmp # =========================================================================== # Read in the detector map, telling the center two theta and azimuthal angle, # and the ID for each group of detectors. # =========================================================================== det_pointer = json.load(open(det_map_file, "r")) # ============================================================================= # Determine the valid azimuthal bins for each two theta bin. Here follows are # the criteria: # 1. The 2theta bin is in use # 2. The peak fitting succeeded # 3. The fitted peak intensity is close to the median intensity # ============================================================================= with open(ttheta_group_file, "r") as f: ttheta_groups = json.load(f) print( "[Info] Pre-processing the data for all " "the two theta and azimuthal bins..." ) ttheta_bins = np.arange(0, 181, ttheta_binw) for i in range(len(ttheta_bins) - 1): ttheta_cv = (ttheta_bins[i] + ttheta_bins[i + 1]) / 2. ttheta_cvk = "{:d}".format(int(np.floor(ttheta_cv))) if ttheta_cvk in det_pointer: ttheta_valid = False for tkey, item in ttheta_groups.items(): if ttheta_cvk in item["QRange"].keys(): ttheta_valid = True ttheta_group = tkey break # A valid two theta value means it belongs to one of the defined # groups in the `ttheta_group_file` file. if ttheta_valid: pa_lb = ttheta_groups[ttheta_group]["PAParams"]["LeftBound"] pa_rb = ttheta_groups[ttheta_group]["PAParams"]["RightBound"] mask = (q_vals >= pa_lb) & (q_vals <= pa_rb) q_vals_fit = q_vals[mask] b0 = (pa_lb + pa_rb) / 2. c0 = 1. amp_tmp = dict() center_tmp = dict() for key, item in det_pointer[ttheta_cvk]["AZMDict"].items(): # Check whether the data exist for each azimuthal bin. If not, # we just discard the bin. if int(item["GroupID"]) in all_sofq.keys(): sofq_fit = all_sofq[int(item["GroupID"])][mask] a0 = np.max(sofq_fit) p_init = [a0, b0, c0] try: popt, _ = curve_fit( gaussian, q_vals_fit, sofq_fit, p0=p_init ) amp_tmp[key] = popt[0] center_tmp[key] = popt[1] except RuntimeError: # We want to handle the peak fitting failure. For # testing mode, we just set the amplitude to 1 and the # center to 0. For real data, we just discard the data. if testing: amp_tmp[key] = 1. center_tmp[key] = 0. else: det_pointer[ttheta_cvk][ "AZMDict" ][key]["Use"] = False det_pointer[ttheta_cvk][ "AZMDict" ][key]["Offset"] = 0. else: det_pointer[ttheta_cvk][ "AZMDict" ][key]["Use"] = False det_pointer[ttheta_cvk][ "AZMDict" ][key]["Offset"] = 0. if (len(center_tmp) > 0): # center_ref = next(iter(center_tmp.items()))[1] # first item center_ref = 5.06 # Here we check the peak intensity. If the intensity is too # far from the median intensity, we discard the data. If not, # we calculate the offset for each azimuthal bin, relative to # the center of the first azimuthal bin, within the same two # theta bin. median_amp = np.median(list(amp_tmp.values())) used_keys = list() for key, item in amp_tmp.items(): if abs((item - median_amp) / median_amp) < amp_thresh: used_keys.append(key) for key in det_pointer[ttheta_cvk]["AZMDict"].keys(): if key in used_keys: det_pointer[ttheta_cvk]["AZMDict"][key]["Use"] = True offset = center_ref - center_tmp[key] det_pointer[ttheta_cvk][ "AZMDict" ][key]["Offset"] = offset else: det_pointer[ttheta_cvk][ "AZMDict" ][key]["Use"] = False det_pointer[ttheta_cvk][ "AZMDict" ][key]["Offset"] = 0. else: # If none of the detector groups in the two theta bin is # valid (either file not existing, or fitting failed), # we mask out the whole two theta bin. for key in det_pointer[ttheta_cvk]["AZMDict"].keys(): det_pointer[ttheta_cvk]["AZMDict"][key]["Use"] = False det_pointer[ttheta_cvk]["AZMDict"][key]["Offset"] = 0. else: # If the two theta bin is invalid, we just discard the bin. # By 'invalid' two theta bin, we mean that the two theta value # is not included in the `ttheta_group_file`. for key in det_pointer[ttheta_cvk]["AZMDict"].keys(): det_pointer[ttheta_cvk]["AZMDict"][key]["Use"] = False det_pointer[ttheta_cvk]["AZMDict"][key]["Offset"] = 0. # ============================================================================= # Loop over the two theta bins and optimize the spherical harmonic coefficients # ============================================================================= print("[Info] Working on the isotropic correction for each two theta bin...") ttheta_bins = np.arange(0, 181, ttheta_binw) total_items = len(ttheta_bins) sofq_corrected_first = list() ttheta_valid = list() print("[Info] Processing 2Theta array (0, 180)...") for i in range(len(ttheta_bins) - 1): ttheta_cv = (ttheta_bins[i] + ttheta_bins[i + 1]) / 2. ttheta_cvk = "{:d}".format(int(np.floor(ttheta_cv))) print(f"[Info] Processing 2Theta: {ttheta_cvk}...") if ttheta_cvk in det_pointer: sofq_corrected = list() s_coefficients = list() valid_group_found = True for j in range(len(q_vals)): sofq_phi = list() sph_matrix = list() for _, item in det_pointer[ttheta_cvk]["AZMDict"].items(): # According to the `Use` flag created for each azimuthal bin # in previous step, we are going to check whether there # exist valid azimuthal bins in a two theta bin. # NOTE: We may need to add in further check for whether the # number of valid azimuthal bins is greater than the number # of parameters to fit. if int(item["GroupID"]) in all_sofq.keys() and item["Use"]: azm_v = item["CentrAMZ"] sph_row = list() for l in range(sph_l_max + 1): for m in range(-l, l + 1): if l > 0 and m == 0: continue sph_row.append( sph_harm_real( m, l, ttheta_cv * np.pi / 180., azm_v * np.pi / 180. ) ) sph_matrix.append(sph_row) sofq_phi.append(all_sofq[int(item["GroupID"])][j]) else: pass sph_matrix = np.array(sph_matrix) sofq_phi = np.array(sofq_phi) if len(sph_matrix) == 0 or len(sofq_phi) == 0: valid_group_found = False break num_terms = (sph_l_max + 1)**2 - sph_l_max params = np.array( [1 for _ in range(num_terms)] ) result = minimize( target_function, params, args=(sph_matrix, sofq_phi), method='SLSQP' ) optimal_params = result.x # Calculate the S(Q) corrected, taking into account only the # isotropic term. y00 = sph_harm_real(0, 0, 0, 0) s00 = optimal_params[0] sofq_corrected.append(y00 * s00) s_coefficients.append(optimal_params) # Output the corrected S(Q) values and the coefficients for # each two theta bin. # For a certain two theta bin, if no valid azimuthal bin # found for any Q value, the `valid_group_found` would be # False and thus the corresponding two theta bin will be # marked as invalid, i.e., not included in the valid two # theta bin list `ttheta_valid`. if valid_group_found: # The valid two theta bin array and the isotropic # corrected S(Q) array are synchronized, i.e., they # share the same index, making it convenient for next # step implementation. sofq_corrected_first.append(sofq_corrected) ttheta_valid.append(ttheta_cv) with open( os.path.join( out_dir, f"{stem_name}2theta_{ttheta_cvk}.dat" ), "w" ) as f: f.write(f"{len(q_vals)}\n") f.write("\n") for j in range(len(q_vals)): f.write( f"{q_vals[j]:.4f} {sofq_corrected[j]:.4f}\n" ) with open( os.path.join( out_dir, f"{stem_name}2theta_{ttheta_cvk}_coeffs.dat" ), "w" ) as f: for j in range(len(q_vals)): f.write(f"{q_vals[j]:.4f} ") for k in range(len(s_coefficients[j])): f.write( f"{s_coefficients[j][k]:.4f} " ) f.write("\n") ttheta_valid = np.array(ttheta_valid) sofq_corrected_first = np.array(sofq_corrected_first) # ============================================================================= # Pre-process the isotropic corrected S(Q) data corresponding to all the two # theta bins. Here is the thing to do: # 1. For each two theta bin, grab the data in a certain Q range according to # read-in parameters from `ttheta_group_file`. # 2. For each two theta bin, fill the data in the low Q range with the data # in the same low Q range that comes from the first two theta bin in the # same group. # ============================================================================= print( "[Info] Pre-processing (step-1) the isotropic corrected data for the " "second stage correction..." ) ttheta_k_valid = [ "{:d}".format(int(np.floor(ttheta))) for ttheta in ttheta_valid ] for i, ttheta in enumerate(ttheta_valid): ttheta_k = ttheta_k_valid[i] for key, item in ttheta_groups.items(): if ttheta_k in item["QRange"].keys(): group = key break qrange_tmp = ttheta_groups[group]["QRange"] left_b = qrange_tmp[ttheta_k]["LeftBound"] leftmost_b = next(iter(qrange_tmp.items()))[1]["LeftBound"] right_b_fill = left_b # Fill the low Q range with the data from the first two theta bin # in the same group. ref_key_ig = next(iter(qrange_tmp)) ref_key_id = ttheta_k_valid.index(ref_key_ig) mask_fill = (q_vals >= leftmost_b) & (q_vals < right_b_fill) sofq_corrected_first[i][mask_fill] = sofq_corrected_first[ ref_key_id ][mask_fill] # ============================================================================= # Process each of the two theta groups separately. Here is the thing to do: # 1. For each group, align all the patterns according to the peak fitting. # 2. Within each group, we, # - for each two theta, figure out and remove the background, using an # exponential function # - integrate intensities in those pre-defined Q chunks # - for each Q chunk, fit against the spherical harmonics across two # theta angles, just like what we did with each single Q point when # correcting the phi part. # - extract the isotropic term for each Q chunk, calculate the ratio # of the original integrated intensity over the isotropic intensity # for each two theta value. Then rescale each Q chunk for each two # theta. # - add back the exponential background # ============================================================================= print( "[Info] Pre-processing (step-2) the isotropic corrected data for the " "second stage correction..." ) sofq_bkg_removed = dict() bkg_off = np.inf for tg_key, item in ttheta_groups.items(): print(f"[Info] Processing 2Theta group {tg_key}...") pa_lb = item["PAParams"]["LeftBound"] pa_rb = item["PAParams"]["RightBound"] center_tmp = dict() for tt_key in item["QRange"].keys(): if tt_key not in ttheta_k_valid: continue tt_index = ttheta_k_valid.index(tt_key) sofq_tmp = sofq_corrected_first[tt_index] mask = (q_vals >= pa_lb) & (q_vals <= pa_rb) q_vals_fit = q_vals[mask] sofq_fit = sofq_tmp[mask] p_init = [np.max(sofq_fit), (pa_lb + pa_rb) / 2., 1.] if testing: center_tmp[tt_key] = 0. else: try: popt, _ = curve_fit( gaussian, q_vals_fit, sofq_fit, p0=p_init ) center_tmp[tt_key] = popt[1] except RuntimeError: center_tmp[tt_key] = 0. # center_ref = next(iter(center_tmp.items()))[1] center_ref = 5.06 for tt_key, c_item in center_tmp.items(): # According to the peak fitting failure handling just above, the # corresponding peak center would be set to 0. if c_item == 0.: offset = 0. else: offset = center_ref - c_item tt_index = ttheta_k_valid.index(tt_key) q_vals_tmp = q_vals + offset sofq_tmp = sofq_corrected_first[tt_index].copy() q_bin_s = q_vals[1] - q_vals[0] if offset < 0: while q_vals_tmp[-1] < q_vals[-1]: q_vals_tmp = np.append( q_vals_tmp, q_vals_tmp[-1] + q_bin_s ) sofq_tmp = np.append( sofq_tmp, sofq_corrected_first[tt_index][-1] ) else: while q_vals_tmp[0] > q_vals[0]: q_vals_tmp = np.insert( q_vals_tmp, 0, q_vals_tmp[0] - q_bin_s ) sofq_tmp = np.insert( sofq_tmp, 0, sofq_corrected_first[tt_index][0] ) _, sofq_rebin = rebin( q_vals_tmp, sofq_tmp, q_vals[0], q_bin_s, q_vals[-1] ) sofq_corrected_first[tt_index] = sofq_rebin for tt_key in item["QRange"].keys(): if tt_key not in ttheta_k_valid: continue tt_index = ttheta_k_valid.index(tt_key) ttheta = ttheta_valid[tt_index] sofq_ttheta = sofq_corrected_first[tt_index] bkg_lb = item["QRange"][tt_key]["LeftBound"] bkg_rb = item["QRange"][tt_key]["RightBound"] # Here we output the aligned patterns. # N.B. The alignment only happens internally in each of the defined # groups. Therefore, a final stage alignment may still be needed to # align the patterns after all steps. with open( os.path.join( out_dir, f"{stem_name}ttheta_{tt_key}_corrected_aligned.dat" ), "w" ) as f: f.write(f"{len(q_vals)}\n") f.write("\n") for i, q_val in enumerate(q_vals): f.write("{0:10.3F}{1:20.5F}\n".format(q_val, sofq_ttheta[i])) # We try to use the `argrelextrema` method in `scipy` to figure out # local valley points on the pattern and use those valley points for # fitting an exponential background. mask_tmp = q_vals >= bkg_lb sofq_ttheta_m = sofq_ttheta[mask_tmp] q_vals_m = q_vals[mask_tmp] # The `order` parameter here controls how 'local' the valley will be. # If set to a very small value, it will reproduce the overall pattern # itself. But setting the value too high may induce too few points. q_min = argrelextrema(sofq_ttheta_m, np.less, order=20) sofq_min = np.array([sofq_ttheta_m[item_i] for item_i in q_min[0]]) q_min = np.array([q_vals_m[item_i] for item_i in q_min[0]]) # For the background fitting purpose, we set all S(Q) values above # the specified threshold to the value just below (inclusive) the # boundary. This will get rid of the impact of those strong noise # in the high Q region. indices = np.where(q_min <= bkg_rb)[0] max_tmp = np.max(q_min[indices]) up_index = np.where(q_min == max_tmp)[0][0] for qm_i, qm_val in enumerate(q_min): if qm_val > bkg_rb: sofq_min[qm_i] = sofq_min[up_index] # Specifying the bounding parameter for `curve_fit` helps a lot with # fitting convergence. p_init = [0, 0.1, 0.1] bounds_l = [-np.inf, 0., 0.] bounds_u = [np.inf, 1., 1.] popt, pconv = curve_fit( bkg_expo, q_min, sofq_min, p0=p_init, bounds=(bounds_l, bounds_u) ) is_infinite = False for row in pconv: for element in row: if np.isinf(element): is_infinite = True break if is_infinite: print("[Warning] Infinite covariance in the curve fitting.") with open( os.path.join( out_dir, f"{stem_name}ttheta_{tt_key}_corrected_bkg_pts.dat" ), "w" ) as f: f.write(f"{len(q_min)}\n") f.write("\n") for i, q_val in enumerate(q_min): f.write( "{0:10.3F}{1:20.5F}\n".format( q_val, sofq_min[i] ) ) sofq_ttheta_bkg = bkg_expo(q_vals, popt[0], popt[1], popt[2]) sofq_ttheta_mb = sofq_ttheta - sofq_ttheta_bkg sofq_bkg_removed[tt_key] = sofq_ttheta_mb # We want to keep a record of the background that most closely resemble # 1 at high Q. bkg_hq_val = bkg_expo(q_vals[-1], popt[0], popt[1], popt[2]) bkg_off_tmp = abs(1. - bkg_hq_val) if bkg_off_tmp < bkg_off: bkg_off = bkg_off_tmp bkg_popt = popt with open( os.path.join( out_dir, f"{stem_name}ttheta_{tt_key}_corrected_bkg_rm_fittedbkg.dat" ), "w" ) as f: f.write(f"{len(q_vals)}\n") f.write("\n") for i, q_val in enumerate(q_vals): f.write( "{0:10.3F}{1:20.5F}{2:20.5F}\n".format( q_val, sofq_ttheta_mb[i], sofq_ttheta_bkg[i] ) ) # Integrate intensities inside Q chunks. ttheta_groups[tg_key]["QRange"][tt_key]["QChunkInt"] = list() for qv_i, q_val in enumerate(q_vals): v_t = sofq_ttheta_mb[qv_i] for qc_i, qc_val in enumerate(item["QChunkDef"]): if qc_i == 0: if q_val <= qc_val: if len(item["QRange"][tt_key]["QChunkInt"]) == 0: ttheta_groups[ tg_key ]["QRange"][tt_key]["QChunkInt"].append(v_t) else: ttheta_groups[ tg_key ]["QRange"][tt_key]["QChunkInt"][0] += v_t break else: if item["QChunkDef"][qc_i - 1] < q_val <= qc_val: if len(item["QRange"][tt_key]["QChunkInt"]) == qc_i: in_range1 = bkg_lb <= item["QChunkDef"][qc_i - 1] in_range2 = bkg_rb >= qc_val if in_range1 and in_range2: ttheta_groups[ tg_key ]["QRange"][tt_key]["QChunkInt"].append(v_t) else: ttheta_groups[ tg_key ]["QRange"][tt_key]["QChunkInt"].append(np.inf) else: ttheta_groups[ tg_key ]["QRange"][tt_key]["QChunkInt"][-1] += v_t break # Treat each Q chunk as a single 'point' and perform the isotropic correction # just as before. sph00 = sph_harm_real(0, 0, 0, 0) for tg_key, item in ttheta_groups.items(): params_all = list() qchunks_len = len(item["QChunkDef"]) for i in range(qchunks_len): sph_matrix = list() sofq_ttheta = list() lin_limit = item["LinLimit"][i] num_eqs = 0 for tt_key in item["QRange"].keys(): if tt_key in ttheta_k_valid: num_eqs += 1 tt_sph_l_max = 0 while (tt_sph_l_max + 1 + 1)**2 < num_eqs: tt_sph_l_max += 1 for tt_key in item["QRange"].keys(): if "Factor" not in item["QRange"][tt_key].keys(): ttheta_groups[tg_key]["QRange"][tt_key]["Factor"] = list() if tt_key not in ttheta_k_valid: continue tt_index = ttheta_k_valid.index(tt_key) ttheta_tmp = ttheta_valid[tt_index] v_t = item["QRange"][tt_key]["QChunkInt"][i] if v_t == np.inf: continue sofq_ttheta.append(v_t) sph_row = list() for l in range(tt_sph_l_max + 1): for m in range(-l, l + 1): sph_row.append( sph_harm_real( m, l, ttheta_tmp * np.pi / 180., 0. ) ) if "Infinity" not in lin_limit: sph_row.append(ttheta_tmp) sph_matrix.append(sph_row) sph_matrix = np.array(sph_matrix) sofq_ttheta = np.array(sofq_ttheta) if len(sph_matrix) > 0 or len(sofq_ttheta) > 0: if "Infinity" in lin_limit: num_terms = (tt_sph_l_max + 1)**2 else: num_terms = (tt_sph_l_max + 1)**2 + 1 params = np.array( [1 for _ in range(num_terms)] ) min_b = min(sofq_ttheta) / sph00 max_b = max(sofq_ttheta) / sph00 bounds = [(min_b, max_b)] if "Infinity" not in lin_limit: num_b_add = num_terms - 2 else: num_b_add = num_terms - 1 for _ in range(num_b_add): bounds.append( (-float('inf'), float('inf')) ) if "Infinity" not in lin_limit: bounds.append((lin_limit[0], lin_limit[1])) result = minimize( target_function, params, args=(sph_matrix, sofq_ttheta), method='SLSQP', bounds=bounds ) optimal_params = result.x y00 = sph_harm_real(0, 0, 0, 0) s00 = optimal_params[0] if "Infinity" not in lin_limit: ctt = optimal_params[-1] else: y00 = 0. s00 = 0. # Calculate the ratio of the original integrated intensity over # the isotropic intensity. for tt_key in item["QRange"].keys(): if tt_key not in ttheta_k_valid: continue if i == 0: factor_v = 1. else: tt_index = ttheta_k_valid.index(tt_key) ttheta_tmp = ttheta_valid[tt_index] if "Infinity" in lin_limit: top_val = y00 * s00 else: top_val = y00 * s00 + ctt * ttheta_tmp bottom_val = item["QRange"][tt_key]["QChunkInt"][i] factor_v = top_val / bottom_val # if tg_key == "Group-1": # if i == 2: # print("Debugging -> ") # print( # f"fit val: {top_val}, init val: {bottom_val}", # f"y00: {y00}, s00: {s00}", # f"tt: {ttheta_tmp}" # ) # print( # f"fit val: {top_val}, init val: {bottom_val}", # f"y00: {y00}, s00: {s00}, ctt: {ctt}", # f"tt: {ttheta_tmp}" # ) ttheta_groups[tg_key]["QRange"][tt_key]["Factor"].append( factor_v ) # Rescale the data by Q chunks according to the ratio obtained above. for tt_key in item["QRange"].keys(): if tt_key not in ttheta_k_valid: continue for i, q_val in enumerate(q_vals): for ii, qc_val in enumerate(item["QChunkDef"]): if ii > 0: # if q_vals[ii - 1] < q_val <= qc_val and q_val > 4.: if q_vals[ii - 1] < q_val <= qc_val: sofq_bkg_removed[tt_key][i] *= item[ "QRange" ][tt_key]["Factor"][ii] break # Add back in the exponential background. bkg_add = bkg_expo(q_vals, bkg_popt[0], bkg_popt[1], bkg_popt[2]) sofq_final = sofq_bkg_removed[tt_key] + bkg_add with open( os.path.join( out_dir, f"{stem_name}ttheta_{tt_key}_corrected_wo_bkg.dat" ), "w" ) as f: f.write(f"{len(q_vals)}\n") f.write("\n") for i, q_val in enumerate(q_vals): f.write( "{0:10.3F}{1:20.5F}\n".format( q_val, sofq_bkg_removed[tt_key][i] ) ) with open( os.path.join( out_dir, f"{stem_name}ttheta_{tt_key}_corrected_final.dat" ), "w" ) as f: f.write(f"{len(q_vals)}\n") f.write("\n") for i, q_val in enumerate(q_vals): f.write( "{0:10.3F}{1:20.5F}\n".format( q_val, sofq_final[i] ) ) with open(os.path.join(out_dir, f"{stem_name}_ttheta_1na_2a.json"), "w") as f: json.dump(ttheta_groups, f, indent=4) print("\n=========================================================") print("[Info] Done with the two-stage isotropic S(Q) correction.") print("[Info] Proceed to merge the corrected S(Q) data by bank.") print("=========================================================\n") with open("output_group.json", "r") as f: group_dict = json.load(f) bank_dict = dict() for key in ttheta_k_valid: for keykey in ttheta_groups.keys(): if key in ttheta_groups[keykey]["QRange"].keys(): key_group = keykey break file_t = os.path.join( out_dir, f"{stem_name}ttheta_{key}_corrected_final.dat" ) if os.path.exists(file_t): y_tmp = np.loadtxt(file_t, skiprows=2, dtype=float)[:, 1] for gkey, item in group_dict.items(): if item[0] <= float(key) < item[1]: bank = gkey break if bank not in bank_dict: bank_dict[bank] = { "y": y_tmp, "num_bands": 1, "q_counts": [0 for _ in range(len(q_vals))] } else: bank_dict[bank]["y"] += y_tmp bank_dict[bank]["num_bands"] += 1 q_chunk_tmp = ttheta_groups[key_group]["QChunkDef"] for i, q_val in enumerate(q_vals): for ii, qc_val in enumerate(q_chunk_tmp): if ii == 0: if q_val <= qc_val: q_chunk_id = ii break else: if q_chunk_tmp[ii - 1] < q_val <= qc_val: q_chunk_id = ii break if q_chunk_id == 0: factor = 1. else: factor = ttheta_groups[ key_group ]["QRange"][key]["Factor"][q_chunk_id] if factor > 0.: bank_dict[bank]["q_counts"][i] += 1 for key, item in bank_dict.items(): for i in range(len(q_vals)): if item["q_counts"][i] > 0: item["y"][i] /= item["q_counts"][i] else: item["y"][i] /= item["num_bands"] with open( os.path.join(out_dir, f"{stem_name}{key}_merged_1na_2a_ave.dat"), "w" ) as f: f.write(f"{len(q_vals)}\n") f.write("\n") for i, q_val in enumerate(q_vals): f.write("{0:10.3F}{1:20.5F}\n".format(q_val, item["y"][i])) print("=========================================================") print("[Info] Done with merging the corrected S(Q) by banks.") print("=========================================================\n")