texture_proc_real_1step_not_aligned_2step_aligned.py
· 36 KiB · Python
Raw
import json
import numpy as np
import os
import scipy
from scipy.optimize import minimize
from scipy.optimize import curve_fit
from scipy.signal import argrelextrema
from pystog import Pre_Proc
import matplotlib.pyplot as plt
import random
rebin = Pre_Proc.rebin
# ================
# Input parameters
# vvvvvvvvvvvvvvvv
# ----------
# Some flags
# ----------
testing = False # flag for testing
# ------------------------------------------------------------------
# Directory and config files definition
#
# run_dir => directory for storing the reduced data files
# according to the the two theta and azimuthal
# angle bins.
# out_dir => directory for storing the outputs.
# det_map_file => detector group information for all the two
# theta and azimuthal bins.
# ttheta_group_file => config file defining groups for second stage
# processing.
# ------------------------------------------------------------------
run_dir = os.path.join(
".",
"texture_proc"
)
out_dir = os.path.join(
run_dir,
"texture_proc_output"
)
det_map_file = os.path.join(
"/SNS/NOM/shared/scripts/texture",
"nom_texture_pointer.json"
)
ttheta_group_file = os.path.join(
".",
"ttheta_group_params_new_new_new.json"
)
# ---------------------
# Processing parameters
# ---------------------
stem_name = "NOM_Si_640e_"
ttheta_binw = 5. # in degrees
azm_binw = 10. # in degrees
header_num = 2
sph_l_max = 1 # for first stage
tt_sph_l_max = 2 # for second stage
amp_thresh = .9 # Threshold for peak intensity -- in a certain two theta
# bins, all patterns with the fitted peak intensity
# different from the median intensity by more than this
# percentage threshold will be discarded.
# ^^^^^^^^^^^^^^^^
# Input parameters
# ================
if testing:
print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("[Warning] Testing mode is on.")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n")
def gaussian(x, a, b, c):
"""Gaussian function.
Parameters
----------
x : ndarray Independent variable.
a : float Amplitude.
b : float Center.
c : flaot Width.
Returns
-------
float Gaussian function value.
"""
return a * np.exp(-(x - b)**2 / (2 * c**2))
def bkg_expo(x, a, b, c):
"""Background exponential function.
Parameters
----------
x : ndarray Independent variable.
a : float offset.
b : float coefficient.
c : flaot exponent.
Returns
-------
float Exponential background value.
"""
return a - b * np.exp(-c * x**2.)
def sph_harm_real(m, l, theta, phi):
"""Real spherical harmonics. Refer to the Wikipedia,
https://en.wikipedia.org/wiki/Table_of_spherical_harmonics
We could follow the standard definition of the spherical harmonics to
define the function here, but for the purpose of this script, we only
need the first few terms so we just implemented the function directly.
Parameters
----------
m : order of spherical harmonics
l : degree of spherical harmonics
theta : polar angle in radian
phi : azimuthal angle in radian
Returns
-------
float Spherical harmonics real value
"""
if l == 0 and m == 0:
return np.sqrt(1. / (4. * np.pi))
if l == 1 and m == -1:
return np.sqrt(3. / (4. * np.pi)) * np.sin(phi) * np.sin(theta)
if l == 1 and m == 0:
return np.sqrt(3. / (4. * np.pi)) * np.cos(theta)
if l == 1 and m == 1:
return np.sqrt(3. / (4. * np.pi)) * np.cos(phi) * np.sin(theta)
if l == 2 and m == -2:
part = np.sin(phi) * np.cos(phi) * (np.sin(theta))**2.
return np.sqrt(15. / (4. * np.pi)) * part
if l == 2 and m == -1:
part = np.sin(phi) * np.sin(theta) * np.cos(theta)
return np.sqrt(15. / (4. * np.pi)) * part
if l == 2 and m == 0:
part = 3. * (np.cos(theta))**2. - 1.
return np.sqrt(5. / (16. * np.pi)) * part
if l == 2 and m == 1:
part = np.cos(phi) * np.sin(theta) * np.cos(theta)
return np.sqrt(15. / (4. * np.pi)) * part
if l == 2 and m == 2:
part = ((np.cos(phi))**2. - (np.sin(phi))**2.) * (np.sin(theta))**2.
return np.sqrt(15. / (16. * np.pi)) * part
def target_function(params, sph_mat, txt_int):
"""Target function for the optimization of the spherical harmonic
coefficients.
Parameters
----------
params : ndarray Spherical harmonic coefficients.
sph_mat : ndarray Spherical harmonic matrix.
txt_int : ndarray Textured S(Q) intensity.
Returns
-------
float : Norm of the residuals.
"""
result = sph_mat @ params
residuals = result - txt_int
return np.linalg.norm(residuals)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
# ================
# Read in Q values
# ================
print("[Info] Read in Q values...")
bank = 1
while True:
tr_file = os.path.join(
run_dir,
stem_name + f"bank{bank}.dat"
)
if os.path.exists(tr_file):
data = np.loadtxt(tr_file, skiprows=header_num, dtype=float)
q_vals = data[:, 0]
break
else:
bank += 1
# ===================
# Read in S(Q) values
# ===================
print("[Info] Read in all S(Q)'s...")
num_groups = int(180. / ttheta_binw) * int(360. / azm_binw)
all_sofq = dict()
for i in range(num_groups):
group_tmp = i + 1
tr_file = os.path.join(
run_dir,
stem_name + "bank%d.dat" % group_tmp
)
if os.path.exists(tr_file):
data = np.loadtxt(tr_file, skiprows=header_num, dtype=float)
sofq_tmp = data[:, 1]
all_sofq[group_tmp] = sofq_tmp
# ===========================================================================
# Read in the detector map, telling the center two theta and azimuthal angle,
# and the ID for each group of detectors.
# ===========================================================================
det_pointer = json.load(open(det_map_file, "r"))
# =============================================================================
# Determine the valid azimuthal bins for each two theta bin. Here follows are
# the criteria:
# 1. The 2theta bin is in use
# 2. The peak fitting succeeded
# 3. The fitted peak intensity is close to the median intensity
# =============================================================================
with open(ttheta_group_file, "r") as f:
ttheta_groups = json.load(f)
print(
"[Info] Pre-processing the data for all "
"the two theta and azimuthal bins..."
)
ttheta_bins = np.arange(0, 181, ttheta_binw)
for i in range(len(ttheta_bins) - 1):
ttheta_cv = (ttheta_bins[i] + ttheta_bins[i + 1]) / 2.
ttheta_cvk = "{:d}".format(int(np.floor(ttheta_cv)))
if ttheta_cvk in det_pointer:
ttheta_valid = False
for tkey, item in ttheta_groups.items():
if ttheta_cvk in item["QRange"].keys():
ttheta_valid = True
ttheta_group = tkey
break
# A valid two theta value means it belongs to one of the defined
# groups in the `ttheta_group_file` file.
if ttheta_valid:
pa_lb = ttheta_groups[ttheta_group]["PAParams"]["LeftBound"]
pa_rb = ttheta_groups[ttheta_group]["PAParams"]["RightBound"]
mask = (q_vals >= pa_lb) & (q_vals <= pa_rb)
q_vals_fit = q_vals[mask]
b0 = (pa_lb + pa_rb) / 2.
c0 = 1.
amp_tmp = dict()
center_tmp = dict()
for key, item in det_pointer[ttheta_cvk]["AZMDict"].items():
# Check whether the data exist for each azimuthal bin. If not,
# we just discard the bin.
if int(item["GroupID"]) in all_sofq.keys():
sofq_fit = all_sofq[int(item["GroupID"])][mask]
a0 = np.max(sofq_fit)
p_init = [a0, b0, c0]
try:
popt, _ = curve_fit(
gaussian,
q_vals_fit,
sofq_fit,
p0=p_init
)
amp_tmp[key] = popt[0]
center_tmp[key] = popt[1]
except RuntimeError:
# We want to handle the peak fitting failure. For
# testing mode, we just set the amplitude to 1 and the
# center to 0. For real data, we just discard the data.
if testing:
amp_tmp[key] = 1.
center_tmp[key] = 0.
else:
det_pointer[ttheta_cvk][
"AZMDict"
][key]["Use"] = False
det_pointer[ttheta_cvk][
"AZMDict"
][key]["Offset"] = 0.
else:
det_pointer[ttheta_cvk][
"AZMDict"
][key]["Use"] = False
det_pointer[ttheta_cvk][
"AZMDict"
][key]["Offset"] = 0.
if (len(center_tmp) > 0):
# center_ref = next(iter(center_tmp.items()))[1] # first item
center_ref = 5.06
# Here we check the peak intensity. If the intensity is too
# far from the median intensity, we discard the data. If not,
# we calculate the offset for each azimuthal bin, relative to
# the center of the first azimuthal bin, within the same two
# theta bin.
median_amp = np.median(list(amp_tmp.values()))
used_keys = list()
for key, item in amp_tmp.items():
if abs((item - median_amp) / median_amp) < amp_thresh:
used_keys.append(key)
for key in det_pointer[ttheta_cvk]["AZMDict"].keys():
if key in used_keys:
det_pointer[ttheta_cvk]["AZMDict"][key]["Use"] = True
offset = center_ref - center_tmp[key]
det_pointer[ttheta_cvk][
"AZMDict"
][key]["Offset"] = offset
else:
det_pointer[ttheta_cvk][
"AZMDict"
][key]["Use"] = False
det_pointer[ttheta_cvk][
"AZMDict"
][key]["Offset"] = 0.
else:
# If none of the detector groups in the two theta bin is
# valid (either file not existing, or fitting failed),
# we mask out the whole two theta bin.
for key in det_pointer[ttheta_cvk]["AZMDict"].keys():
det_pointer[ttheta_cvk]["AZMDict"][key]["Use"] = False
det_pointer[ttheta_cvk]["AZMDict"][key]["Offset"] = 0.
else:
# If the two theta bin is invalid, we just discard the bin.
# By 'invalid' two theta bin, we mean that the two theta value
# is not included in the `ttheta_group_file`.
for key in det_pointer[ttheta_cvk]["AZMDict"].keys():
det_pointer[ttheta_cvk]["AZMDict"][key]["Use"] = False
det_pointer[ttheta_cvk]["AZMDict"][key]["Offset"] = 0.
# =============================================================================
# Loop over the two theta bins and optimize the spherical harmonic coefficients
# =============================================================================
print("[Info] Working on the isotropic correction for each two theta bin...")
ttheta_bins = np.arange(0, 181, ttheta_binw)
total_items = len(ttheta_bins)
sofq_corrected_first = list()
ttheta_valid = list()
print("[Info] Processing 2Theta array (0, 180)...")
for i in range(len(ttheta_bins) - 1):
ttheta_cv = (ttheta_bins[i] + ttheta_bins[i + 1]) / 2.
ttheta_cvk = "{:d}".format(int(np.floor(ttheta_cv)))
print(f"[Info] Processing 2Theta: {ttheta_cvk}...")
if ttheta_cvk in det_pointer:
sofq_corrected = list()
s_coefficients = list()
valid_group_found = True
for j in range(len(q_vals)):
sofq_phi = list()
sph_matrix = list()
for _, item in det_pointer[ttheta_cvk]["AZMDict"].items():
# According to the `Use` flag created for each azimuthal bin
# in previous step, we are going to check whether there
# exist valid azimuthal bins in a two theta bin.
# NOTE: We may need to add in further check for whether the
# number of valid azimuthal bins is greater than the number
# of parameters to fit.
if int(item["GroupID"]) in all_sofq.keys() and item["Use"]:
azm_v = item["CentrAMZ"]
sph_row = list()
for l in range(sph_l_max + 1):
for m in range(-l, l + 1):
if l > 0 and m == 0:
continue
sph_row.append(
sph_harm_real(
m,
l,
ttheta_cv * np.pi / 180.,
azm_v * np.pi / 180.
)
)
sph_matrix.append(sph_row)
sofq_phi.append(all_sofq[int(item["GroupID"])][j])
else:
pass
sph_matrix = np.array(sph_matrix)
sofq_phi = np.array(sofq_phi)
if len(sph_matrix) == 0 or len(sofq_phi) == 0:
valid_group_found = False
break
num_terms = (sph_l_max + 1)**2 - sph_l_max
params = np.array(
[1 for _ in range(num_terms)]
)
result = minimize(
target_function,
params,
args=(sph_matrix, sofq_phi),
method='SLSQP'
)
optimal_params = result.x
# Calculate the S(Q) corrected, taking into account only the
# isotropic term.
y00 = sph_harm_real(0, 0, 0, 0)
s00 = optimal_params[0]
sofq_corrected.append(y00 * s00)
s_coefficients.append(optimal_params)
# Output the corrected S(Q) values and the coefficients for
# each two theta bin.
# For a certain two theta bin, if no valid azimuthal bin
# found for any Q value, the `valid_group_found` would be
# False and thus the corresponding two theta bin will be
# marked as invalid, i.e., not included in the valid two
# theta bin list `ttheta_valid`.
if valid_group_found:
# The valid two theta bin array and the isotropic
# corrected S(Q) array are synchronized, i.e., they
# share the same index, making it convenient for next
# step implementation.
sofq_corrected_first.append(sofq_corrected)
ttheta_valid.append(ttheta_cv)
with open(
os.path.join(
out_dir,
f"{stem_name}2theta_{ttheta_cvk}.dat"
),
"w"
) as f:
f.write(f"{len(q_vals)}\n")
f.write("\n")
for j in range(len(q_vals)):
f.write(
f"{q_vals[j]:.4f} {sofq_corrected[j]:.4f}\n"
)
with open(
os.path.join(
out_dir,
f"{stem_name}2theta_{ttheta_cvk}_coeffs.dat"
),
"w"
) as f:
for j in range(len(q_vals)):
f.write(f"{q_vals[j]:.4f} ")
for k in range(len(s_coefficients[j])):
f.write(
f"{s_coefficients[j][k]:.4f} "
)
f.write("\n")
ttheta_valid = np.array(ttheta_valid)
sofq_corrected_first = np.array(sofq_corrected_first)
# =============================================================================
# Pre-process the isotropic corrected S(Q) data corresponding to all the two
# theta bins. Here is the thing to do:
# 1. For each two theta bin, grab the data in a certain Q range according to
# read-in parameters from `ttheta_group_file`.
# 2. For each two theta bin, fill the data in the low Q range with the data
# in the same low Q range that comes from the first two theta bin in the
# same group.
# =============================================================================
print(
"[Info] Pre-processing (step-1) the isotropic corrected data for the "
"second stage correction..."
)
ttheta_k_valid = [
"{:d}".format(int(np.floor(ttheta))) for ttheta in ttheta_valid
]
for i, ttheta in enumerate(ttheta_valid):
ttheta_k = ttheta_k_valid[i]
for key, item in ttheta_groups.items():
if ttheta_k in item["QRange"].keys():
group = key
break
qrange_tmp = ttheta_groups[group]["QRange"]
left_b = qrange_tmp[ttheta_k]["LeftBound"]
leftmost_b = next(iter(qrange_tmp.items()))[1]["LeftBound"]
right_b_fill = left_b
# Fill the low Q range with the data from the first two theta bin
# in the same group.
ref_key_ig = next(iter(qrange_tmp))
ref_key_id = ttheta_k_valid.index(ref_key_ig)
mask_fill = (q_vals >= leftmost_b) & (q_vals < right_b_fill)
sofq_corrected_first[i][mask_fill] = sofq_corrected_first[
ref_key_id
][mask_fill]
# =============================================================================
# Process each of the two theta groups separately. Here is the thing to do:
# 1. For each group, align all the patterns according to the peak fitting.
# 2. Within each group, we,
# - for each two theta, figure out and remove the background, using an
# exponential function
# - integrate intensities in those pre-defined Q chunks
# - for each Q chunk, fit against the spherical harmonics across two
# theta angles, just like what we did with each single Q point when
# correcting the phi part.
# - extract the isotropic term for each Q chunk, calculate the ratio
# of the original integrated intensity over the isotropic intensity
# for each two theta value. Then rescale each Q chunk for each two
# theta.
# - add back the exponential background
# =============================================================================
print(
"[Info] Pre-processing (step-2) the isotropic corrected data for the "
"second stage correction..."
)
sofq_bkg_removed = dict()
bkg_off = np.inf
for tg_key, item in ttheta_groups.items():
print(f"[Info] Processing 2Theta group {tg_key}...")
pa_lb = item["PAParams"]["LeftBound"]
pa_rb = item["PAParams"]["RightBound"]
center_tmp = dict()
for tt_key in item["QRange"].keys():
if tt_key not in ttheta_k_valid:
continue
tt_index = ttheta_k_valid.index(tt_key)
sofq_tmp = sofq_corrected_first[tt_index]
mask = (q_vals >= pa_lb) & (q_vals <= pa_rb)
q_vals_fit = q_vals[mask]
sofq_fit = sofq_tmp[mask]
p_init = [np.max(sofq_fit), (pa_lb + pa_rb) / 2., 1.]
if testing:
center_tmp[tt_key] = 0.
else:
try:
popt, _ = curve_fit(
gaussian,
q_vals_fit,
sofq_fit,
p0=p_init
)
center_tmp[tt_key] = popt[1]
except RuntimeError:
center_tmp[tt_key] = 0.
# center_ref = next(iter(center_tmp.items()))[1]
center_ref = 5.06
for tt_key, c_item in center_tmp.items():
# According to the peak fitting failure handling just above, the
# corresponding peak center would be set to 0.
if c_item == 0.:
offset = 0.
else:
offset = center_ref - c_item
tt_index = ttheta_k_valid.index(tt_key)
q_vals_tmp = q_vals + offset
sofq_tmp = sofq_corrected_first[tt_index].copy()
q_bin_s = q_vals[1] - q_vals[0]
if offset < 0:
while q_vals_tmp[-1] < q_vals[-1]:
q_vals_tmp = np.append(
q_vals_tmp,
q_vals_tmp[-1] + q_bin_s
)
sofq_tmp = np.append(
sofq_tmp,
sofq_corrected_first[tt_index][-1]
)
else:
while q_vals_tmp[0] > q_vals[0]:
q_vals_tmp = np.insert(
q_vals_tmp,
0,
q_vals_tmp[0] - q_bin_s
)
sofq_tmp = np.insert(
sofq_tmp,
0,
sofq_corrected_first[tt_index][0]
)
_, sofq_rebin = rebin(
q_vals_tmp,
sofq_tmp,
q_vals[0],
q_bin_s,
q_vals[-1]
)
sofq_corrected_first[tt_index] = sofq_rebin
for tt_key in item["QRange"].keys():
if tt_key not in ttheta_k_valid:
continue
tt_index = ttheta_k_valid.index(tt_key)
ttheta = ttheta_valid[tt_index]
sofq_ttheta = sofq_corrected_first[tt_index]
bkg_lb = item["QRange"][tt_key]["LeftBound"]
bkg_rb = item["QRange"][tt_key]["RightBound"]
# Here we output the aligned patterns.
# N.B. The alignment only happens internally in each of the defined
# groups. Therefore, a final stage alignment may still be needed to
# align the patterns after all steps.
with open(
os.path.join(
out_dir,
f"{stem_name}ttheta_{tt_key}_corrected_aligned.dat"
),
"w"
) as f:
f.write(f"{len(q_vals)}\n")
f.write("\n")
for i, q_val in enumerate(q_vals):
f.write("{0:10.3F}{1:20.5F}\n".format(q_val, sofq_ttheta[i]))
# We try to use the `argrelextrema` method in `scipy` to figure out
# local valley points on the pattern and use those valley points for
# fitting an exponential background.
mask_tmp = q_vals >= bkg_lb
sofq_ttheta_m = sofq_ttheta[mask_tmp]
q_vals_m = q_vals[mask_tmp]
# The `order` parameter here controls how 'local' the valley will be.
# If set to a very small value, it will reproduce the overall pattern
# itself. But setting the value too high may induce too few points.
q_min = argrelextrema(sofq_ttheta_m, np.less, order=20)
sofq_min = np.array([sofq_ttheta_m[item_i] for item_i in q_min[0]])
q_min = np.array([q_vals_m[item_i] for item_i in q_min[0]])
# For the background fitting purpose, we set all S(Q) values above
# the specified threshold to the value just below (inclusive) the
# boundary. This will get rid of the impact of those strong noise
# in the high Q region.
indices = np.where(q_min <= bkg_rb)[0]
max_tmp = np.max(q_min[indices])
up_index = np.where(q_min == max_tmp)[0][0]
for qm_i, qm_val in enumerate(q_min):
if qm_val > bkg_rb:
sofq_min[qm_i] = sofq_min[up_index]
# Specifying the bounding parameter for `curve_fit` helps a lot with
# fitting convergence.
p_init = [0, 0.1, 0.1]
bounds_l = [-np.inf, 0., 0.]
bounds_u = [np.inf, 1., 1.]
popt, pconv = curve_fit(
bkg_expo,
q_min,
sofq_min,
p0=p_init,
bounds=(bounds_l, bounds_u)
)
is_infinite = False
for row in pconv:
for element in row:
if np.isinf(element):
is_infinite = True
break
if is_infinite:
print("[Warning] Infinite covariance in the curve fitting.")
with open(
os.path.join(
out_dir,
f"{stem_name}ttheta_{tt_key}_corrected_bkg_pts.dat"
),
"w"
) as f:
f.write(f"{len(q_min)}\n")
f.write("\n")
for i, q_val in enumerate(q_min):
f.write(
"{0:10.3F}{1:20.5F}\n".format(
q_val, sofq_min[i]
)
)
sofq_ttheta_bkg = bkg_expo(q_vals, popt[0], popt[1], popt[2])
sofq_ttheta_mb = sofq_ttheta - sofq_ttheta_bkg
sofq_bkg_removed[tt_key] = sofq_ttheta_mb
# We want to keep a record of the background that most closely resemble
# 1 at high Q.
bkg_hq_val = bkg_expo(q_vals[-1], popt[0], popt[1], popt[2])
bkg_off_tmp = abs(1. - bkg_hq_val)
if bkg_off_tmp < bkg_off:
bkg_off = bkg_off_tmp
bkg_popt = popt
with open(
os.path.join(
out_dir,
f"{stem_name}ttheta_{tt_key}_corrected_bkg_rm_fittedbkg.dat"
),
"w"
) as f:
f.write(f"{len(q_vals)}\n")
f.write("\n")
for i, q_val in enumerate(q_vals):
f.write(
"{0:10.3F}{1:20.5F}{2:20.5F}\n".format(
q_val, sofq_ttheta_mb[i], sofq_ttheta_bkg[i]
)
)
# Integrate intensities inside Q chunks.
ttheta_groups[tg_key]["QRange"][tt_key]["QChunkInt"] = list()
for qv_i, q_val in enumerate(q_vals):
v_t = sofq_ttheta_mb[qv_i]
for qc_i, qc_val in enumerate(item["QChunkDef"]):
if qc_i == 0:
if q_val <= qc_val:
if len(item["QRange"][tt_key]["QChunkInt"]) == 0:
ttheta_groups[
tg_key
]["QRange"][tt_key]["QChunkInt"].append(v_t)
else:
ttheta_groups[
tg_key
]["QRange"][tt_key]["QChunkInt"][0] += v_t
break
else:
if item["QChunkDef"][qc_i - 1] < q_val <= qc_val:
if len(item["QRange"][tt_key]["QChunkInt"]) == qc_i:
in_range1 = bkg_lb <= item["QChunkDef"][qc_i - 1]
in_range2 = bkg_rb >= qc_val
if in_range1 and in_range2:
ttheta_groups[
tg_key
]["QRange"][tt_key]["QChunkInt"].append(v_t)
else:
ttheta_groups[
tg_key
]["QRange"][tt_key]["QChunkInt"].append(np.inf)
else:
ttheta_groups[
tg_key
]["QRange"][tt_key]["QChunkInt"][-1] += v_t
break
# Treat each Q chunk as a single 'point' and perform the isotropic correction
# just as before.
sph00 = sph_harm_real(0, 0, 0, 0)
for tg_key, item in ttheta_groups.items():
params_all = list()
qchunks_len = len(item["QChunkDef"])
for i in range(qchunks_len):
sph_matrix = list()
sofq_ttheta = list()
lin_limit = item["LinLimit"][i]
num_eqs = 0
for tt_key in item["QRange"].keys():
if tt_key in ttheta_k_valid:
num_eqs += 1
tt_sph_l_max = 0
while (tt_sph_l_max + 1 + 1)**2 < num_eqs:
tt_sph_l_max += 1
for tt_key in item["QRange"].keys():
if "Factor" not in item["QRange"][tt_key].keys():
ttheta_groups[tg_key]["QRange"][tt_key]["Factor"] = list()
if tt_key not in ttheta_k_valid:
continue
tt_index = ttheta_k_valid.index(tt_key)
ttheta_tmp = ttheta_valid[tt_index]
v_t = item["QRange"][tt_key]["QChunkInt"][i]
if v_t == np.inf:
continue
sofq_ttheta.append(v_t)
sph_row = list()
for l in range(tt_sph_l_max + 1):
for m in range(-l, l + 1):
sph_row.append(
sph_harm_real(
m,
l,
ttheta_tmp * np.pi / 180.,
0.
)
)
if "Infinity" not in lin_limit:
sph_row.append(ttheta_tmp)
sph_matrix.append(sph_row)
sph_matrix = np.array(sph_matrix)
sofq_ttheta = np.array(sofq_ttheta)
if len(sph_matrix) > 0 or len(sofq_ttheta) > 0:
if "Infinity" in lin_limit:
num_terms = (tt_sph_l_max + 1)**2
else:
num_terms = (tt_sph_l_max + 1)**2 + 1
params = np.array(
[1 for _ in range(num_terms)]
)
min_b = min(sofq_ttheta) / sph00
max_b = max(sofq_ttheta) / sph00
bounds = [(min_b, max_b)]
if "Infinity" not in lin_limit:
num_b_add = num_terms - 2
else:
num_b_add = num_terms - 1
for _ in range(num_b_add):
bounds.append(
(-float('inf'), float('inf'))
)
if "Infinity" not in lin_limit:
bounds.append((lin_limit[0], lin_limit[1]))
result = minimize(
target_function,
params,
args=(sph_matrix, sofq_ttheta),
method='SLSQP',
bounds=bounds
)
optimal_params = result.x
y00 = sph_harm_real(0, 0, 0, 0)
s00 = optimal_params[0]
if "Infinity" not in lin_limit:
ctt = optimal_params[-1]
else:
y00 = 0.
s00 = 0.
# Calculate the ratio of the original integrated intensity over
# the isotropic intensity.
for tt_key in item["QRange"].keys():
if tt_key not in ttheta_k_valid:
continue
if i == 0:
factor_v = 1.
else:
tt_index = ttheta_k_valid.index(tt_key)
ttheta_tmp = ttheta_valid[tt_index]
if "Infinity" in lin_limit:
top_val = y00 * s00
else:
top_val = y00 * s00 + ctt * ttheta_tmp
bottom_val = item["QRange"][tt_key]["QChunkInt"][i]
factor_v = top_val / bottom_val
# if tg_key == "Group-1":
# if i == 2:
# print("Debugging -> ")
# print(
# f"fit val: {top_val}, init val: {bottom_val}",
# f"y00: {y00}, s00: {s00}",
# f"tt: {ttheta_tmp}"
# )
# print(
# f"fit val: {top_val}, init val: {bottom_val}",
# f"y00: {y00}, s00: {s00}, ctt: {ctt}",
# f"tt: {ttheta_tmp}"
# )
ttheta_groups[tg_key]["QRange"][tt_key]["Factor"].append(
factor_v
)
# Rescale the data by Q chunks according to the ratio obtained above.
for tt_key in item["QRange"].keys():
if tt_key not in ttheta_k_valid:
continue
for i, q_val in enumerate(q_vals):
for ii, qc_val in enumerate(item["QChunkDef"]):
if ii > 0:
# if q_vals[ii - 1] < q_val <= qc_val and q_val > 4.:
if q_vals[ii - 1] < q_val <= qc_val:
sofq_bkg_removed[tt_key][i] *= item[
"QRange"
][tt_key]["Factor"][ii]
break
# Add back in the exponential background.
bkg_add = bkg_expo(q_vals, bkg_popt[0], bkg_popt[1], bkg_popt[2])
sofq_final = sofq_bkg_removed[tt_key] + bkg_add
with open(
os.path.join(
out_dir,
f"{stem_name}ttheta_{tt_key}_corrected_wo_bkg.dat"
),
"w"
) as f:
f.write(f"{len(q_vals)}\n")
f.write("\n")
for i, q_val in enumerate(q_vals):
f.write(
"{0:10.3F}{1:20.5F}\n".format(
q_val, sofq_bkg_removed[tt_key][i]
)
)
with open(
os.path.join(
out_dir,
f"{stem_name}ttheta_{tt_key}_corrected_final.dat"
),
"w"
) as f:
f.write(f"{len(q_vals)}\n")
f.write("\n")
for i, q_val in enumerate(q_vals):
f.write(
"{0:10.3F}{1:20.5F}\n".format(
q_val, sofq_final[i]
)
)
with open(os.path.join(out_dir, f"{stem_name}_ttheta_1na_2a.json"), "w") as f:
json.dump(ttheta_groups, f, indent=4)
print("\n=========================================================")
print("[Info] Done with the two-stage isotropic S(Q) correction.")
print("[Info] Proceed to merge the corrected S(Q) data by bank.")
print("=========================================================\n")
with open("output_group.json", "r") as f:
group_dict = json.load(f)
bank_dict = dict()
for key in ttheta_k_valid:
for keykey in ttheta_groups.keys():
if key in ttheta_groups[keykey]["QRange"].keys():
key_group = keykey
break
file_t = os.path.join(
out_dir,
f"{stem_name}ttheta_{key}_corrected_final.dat"
)
if os.path.exists(file_t):
y_tmp = np.loadtxt(file_t, skiprows=2, dtype=float)[:, 1]
for gkey, item in group_dict.items():
if item[0] <= float(key) < item[1]:
bank = gkey
break
if bank not in bank_dict:
bank_dict[bank] = {
"y": y_tmp,
"num_bands": 1,
"q_counts": [0 for _ in range(len(q_vals))]
}
else:
bank_dict[bank]["y"] += y_tmp
bank_dict[bank]["num_bands"] += 1
q_chunk_tmp = ttheta_groups[key_group]["QChunkDef"]
for i, q_val in enumerate(q_vals):
for ii, qc_val in enumerate(q_chunk_tmp):
if ii == 0:
if q_val <= qc_val:
q_chunk_id = ii
break
else:
if q_chunk_tmp[ii - 1] < q_val <= qc_val:
q_chunk_id = ii
break
if q_chunk_id == 0:
factor = 1.
else:
factor = ttheta_groups[
key_group
]["QRange"][key]["Factor"][q_chunk_id]
if factor > 0.:
bank_dict[bank]["q_counts"][i] += 1
for key, item in bank_dict.items():
for i in range(len(q_vals)):
if item["q_counts"][i] > 0:
item["y"][i] /= item["q_counts"][i]
else:
item["y"][i] /= item["num_bands"]
with open(
os.path.join(out_dir, f"{stem_name}{key}_merged_1na_2a_ave.dat"),
"w"
) as f:
f.write(f"{len(q_vals)}\n")
f.write("\n")
for i, q_val in enumerate(q_vals):
f.write("{0:10.3F}{1:20.5F}\n".format(q_val, item["y"][i]))
print("=========================================================")
print("[Info] Done with merging the corrected S(Q) by banks.")
print("=========================================================\n")
1 | import json |
2 | import numpy as np |
3 | import os |
4 | import scipy |
5 | from scipy.optimize import minimize |
6 | from scipy.optimize import curve_fit |
7 | from scipy.signal import argrelextrema |
8 | from pystog import Pre_Proc |
9 | import matplotlib.pyplot as plt |
10 | import random |
11 | |
12 | rebin = Pre_Proc.rebin |
13 | |
14 | # ================ |
15 | # Input parameters |
16 | # vvvvvvvvvvvvvvvv |
17 | |
18 | # ---------- |
19 | # Some flags |
20 | # ---------- |
21 | testing = False # flag for testing |
22 | |
23 | # ------------------------------------------------------------------ |
24 | # Directory and config files definition |
25 | # |
26 | # run_dir => directory for storing the reduced data files |
27 | # according to the the two theta and azimuthal |
28 | # angle bins. |
29 | # out_dir => directory for storing the outputs. |
30 | # det_map_file => detector group information for all the two |
31 | # theta and azimuthal bins. |
32 | # ttheta_group_file => config file defining groups for second stage |
33 | # processing. |
34 | # ------------------------------------------------------------------ |
35 | run_dir = os.path.join( |
36 | ".", |
37 | "texture_proc" |
38 | ) |
39 | out_dir = os.path.join( |
40 | run_dir, |
41 | "texture_proc_output" |
42 | ) |
43 | det_map_file = os.path.join( |
44 | "/SNS/NOM/shared/scripts/texture", |
45 | "nom_texture_pointer.json" |
46 | ) |
47 | ttheta_group_file = os.path.join( |
48 | ".", |
49 | "ttheta_group_params_new_new_new.json" |
50 | ) |
51 | |
52 | # --------------------- |
53 | # Processing parameters |
54 | # --------------------- |
55 | stem_name = "NOM_Si_640e_" |
56 | ttheta_binw = 5. # in degrees |
57 | azm_binw = 10. # in degrees |
58 | header_num = 2 |
59 | sph_l_max = 1 # for first stage |
60 | tt_sph_l_max = 2 # for second stage |
61 | amp_thresh = .9 # Threshold for peak intensity -- in a certain two theta |
62 | # bins, all patterns with the fitted peak intensity |
63 | # different from the median intensity by more than this |
64 | # percentage threshold will be discarded. |
65 | |
66 | # ^^^^^^^^^^^^^^^^ |
67 | # Input parameters |
68 | # ================ |
69 | |
70 | if testing: |
71 | print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") |
72 | print("[Warning] Testing mode is on.") |
73 | print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") |
74 | |
75 | |
76 | def gaussian(x, a, b, c): |
77 | """Gaussian function. |
78 | Parameters |
79 | ---------- |
80 | x : ndarray Independent variable. |
81 | a : float Amplitude. |
82 | b : float Center. |
83 | c : flaot Width. |
84 | |
85 | Returns |
86 | ------- |
87 | float Gaussian function value. |
88 | """ |
89 | return a * np.exp(-(x - b)**2 / (2 * c**2)) |
90 | |
91 | |
92 | def bkg_expo(x, a, b, c): |
93 | """Background exponential function. |
94 | Parameters |
95 | ---------- |
96 | x : ndarray Independent variable. |
97 | a : float offset. |
98 | b : float coefficient. |
99 | c : flaot exponent. |
100 | |
101 | Returns |
102 | ------- |
103 | float Exponential background value. |
104 | """ |
105 | return a - b * np.exp(-c * x**2.) |
106 | |
107 | |
108 | def sph_harm_real(m, l, theta, phi): |
109 | """Real spherical harmonics. Refer to the Wikipedia, |
110 | https://en.wikipedia.org/wiki/Table_of_spherical_harmonics |
111 | We could follow the standard definition of the spherical harmonics to |
112 | define the function here, but for the purpose of this script, we only |
113 | need the first few terms so we just implemented the function directly. |
114 | |
115 | Parameters |
116 | ---------- |
117 | m : order of spherical harmonics |
118 | l : degree of spherical harmonics |
119 | theta : polar angle in radian |
120 | phi : azimuthal angle in radian |
121 | |
122 | Returns |
123 | ------- |
124 | float Spherical harmonics real value |
125 | """ |
126 | if l == 0 and m == 0: |
127 | return np.sqrt(1. / (4. * np.pi)) |
128 | if l == 1 and m == -1: |
129 | return np.sqrt(3. / (4. * np.pi)) * np.sin(phi) * np.sin(theta) |
130 | if l == 1 and m == 0: |
131 | return np.sqrt(3. / (4. * np.pi)) * np.cos(theta) |
132 | if l == 1 and m == 1: |
133 | return np.sqrt(3. / (4. * np.pi)) * np.cos(phi) * np.sin(theta) |
134 | if l == 2 and m == -2: |
135 | part = np.sin(phi) * np.cos(phi) * (np.sin(theta))**2. |
136 | return np.sqrt(15. / (4. * np.pi)) * part |
137 | if l == 2 and m == -1: |
138 | part = np.sin(phi) * np.sin(theta) * np.cos(theta) |
139 | return np.sqrt(15. / (4. * np.pi)) * part |
140 | if l == 2 and m == 0: |
141 | part = 3. * (np.cos(theta))**2. - 1. |
142 | return np.sqrt(5. / (16. * np.pi)) * part |
143 | if l == 2 and m == 1: |
144 | part = np.cos(phi) * np.sin(theta) * np.cos(theta) |
145 | return np.sqrt(15. / (4. * np.pi)) * part |
146 | if l == 2 and m == 2: |
147 | part = ((np.cos(phi))**2. - (np.sin(phi))**2.) * (np.sin(theta))**2. |
148 | return np.sqrt(15. / (16. * np.pi)) * part |
149 | |
150 | |
151 | def target_function(params, sph_mat, txt_int): |
152 | """Target function for the optimization of the spherical harmonic |
153 | coefficients. |
154 | |
155 | Parameters |
156 | ---------- |
157 | params : ndarray Spherical harmonic coefficients. |
158 | sph_mat : ndarray Spherical harmonic matrix. |
159 | txt_int : ndarray Textured S(Q) intensity. |
160 | |
161 | Returns |
162 | ------- |
163 | float : Norm of the residuals. |
164 | """ |
165 | result = sph_mat @ params |
166 | residuals = result - txt_int |
167 | |
168 | return np.linalg.norm(residuals) |
169 | |
170 | |
171 | if not os.path.exists(out_dir): |
172 | os.makedirs(out_dir) |
173 | |
174 | # ================ |
175 | # Read in Q values |
176 | # ================ |
177 | print("[Info] Read in Q values...") |
178 | bank = 1 |
179 | while True: |
180 | tr_file = os.path.join( |
181 | run_dir, |
182 | stem_name + f"bank{bank}.dat" |
183 | ) |
184 | if os.path.exists(tr_file): |
185 | data = np.loadtxt(tr_file, skiprows=header_num, dtype=float) |
186 | q_vals = data[:, 0] |
187 | break |
188 | else: |
189 | bank += 1 |
190 | |
191 | # =================== |
192 | # Read in S(Q) values |
193 | # =================== |
194 | print("[Info] Read in all S(Q)'s...") |
195 | num_groups = int(180. / ttheta_binw) * int(360. / azm_binw) |
196 | all_sofq = dict() |
197 | for i in range(num_groups): |
198 | group_tmp = i + 1 |
199 | tr_file = os.path.join( |
200 | run_dir, |
201 | stem_name + "bank%d.dat" % group_tmp |
202 | ) |
203 | if os.path.exists(tr_file): |
204 | data = np.loadtxt(tr_file, skiprows=header_num, dtype=float) |
205 | sofq_tmp = data[:, 1] |
206 | all_sofq[group_tmp] = sofq_tmp |
207 | |
208 | # =========================================================================== |
209 | # Read in the detector map, telling the center two theta and azimuthal angle, |
210 | # and the ID for each group of detectors. |
211 | # =========================================================================== |
212 | det_pointer = json.load(open(det_map_file, "r")) |
213 | |
214 | # ============================================================================= |
215 | # Determine the valid azimuthal bins for each two theta bin. Here follows are |
216 | # the criteria: |
217 | # 1. The 2theta bin is in use |
218 | # 2. The peak fitting succeeded |
219 | # 3. The fitted peak intensity is close to the median intensity |
220 | # ============================================================================= |
221 | with open(ttheta_group_file, "r") as f: |
222 | ttheta_groups = json.load(f) |
223 | |
224 | print( |
225 | "[Info] Pre-processing the data for all " |
226 | "the two theta and azimuthal bins..." |
227 | ) |
228 | ttheta_bins = np.arange(0, 181, ttheta_binw) |
229 | for i in range(len(ttheta_bins) - 1): |
230 | ttheta_cv = (ttheta_bins[i] + ttheta_bins[i + 1]) / 2. |
231 | ttheta_cvk = "{:d}".format(int(np.floor(ttheta_cv))) |
232 | if ttheta_cvk in det_pointer: |
233 | ttheta_valid = False |
234 | for tkey, item in ttheta_groups.items(): |
235 | if ttheta_cvk in item["QRange"].keys(): |
236 | ttheta_valid = True |
237 | ttheta_group = tkey |
238 | break |
239 | |
240 | # A valid two theta value means it belongs to one of the defined |
241 | # groups in the `ttheta_group_file` file. |
242 | if ttheta_valid: |
243 | pa_lb = ttheta_groups[ttheta_group]["PAParams"]["LeftBound"] |
244 | pa_rb = ttheta_groups[ttheta_group]["PAParams"]["RightBound"] |
245 | mask = (q_vals >= pa_lb) & (q_vals <= pa_rb) |
246 | q_vals_fit = q_vals[mask] |
247 | |
248 | b0 = (pa_lb + pa_rb) / 2. |
249 | c0 = 1. |
250 | |
251 | amp_tmp = dict() |
252 | center_tmp = dict() |
253 | |
254 | for key, item in det_pointer[ttheta_cvk]["AZMDict"].items(): |
255 | # Check whether the data exist for each azimuthal bin. If not, |
256 | # we just discard the bin. |
257 | if int(item["GroupID"]) in all_sofq.keys(): |
258 | sofq_fit = all_sofq[int(item["GroupID"])][mask] |
259 | a0 = np.max(sofq_fit) |
260 | p_init = [a0, b0, c0] |
261 | try: |
262 | popt, _ = curve_fit( |
263 | gaussian, |
264 | q_vals_fit, |
265 | sofq_fit, |
266 | p0=p_init |
267 | ) |
268 | amp_tmp[key] = popt[0] |
269 | center_tmp[key] = popt[1] |
270 | except RuntimeError: |
271 | # We want to handle the peak fitting failure. For |
272 | # testing mode, we just set the amplitude to 1 and the |
273 | # center to 0. For real data, we just discard the data. |
274 | if testing: |
275 | amp_tmp[key] = 1. |
276 | center_tmp[key] = 0. |
277 | else: |
278 | det_pointer[ttheta_cvk][ |
279 | "AZMDict" |
280 | ][key]["Use"] = False |
281 | det_pointer[ttheta_cvk][ |
282 | "AZMDict" |
283 | ][key]["Offset"] = 0. |
284 | else: |
285 | det_pointer[ttheta_cvk][ |
286 | "AZMDict" |
287 | ][key]["Use"] = False |
288 | det_pointer[ttheta_cvk][ |
289 | "AZMDict" |
290 | ][key]["Offset"] = 0. |
291 | |
292 | if (len(center_tmp) > 0): |
293 | # center_ref = next(iter(center_tmp.items()))[1] # first item |
294 | center_ref = 5.06 |
295 | |
296 | # Here we check the peak intensity. If the intensity is too |
297 | # far from the median intensity, we discard the data. If not, |
298 | # we calculate the offset for each azimuthal bin, relative to |
299 | # the center of the first azimuthal bin, within the same two |
300 | # theta bin. |
301 | median_amp = np.median(list(amp_tmp.values())) |
302 | used_keys = list() |
303 | for key, item in amp_tmp.items(): |
304 | if abs((item - median_amp) / median_amp) < amp_thresh: |
305 | used_keys.append(key) |
306 | for key in det_pointer[ttheta_cvk]["AZMDict"].keys(): |
307 | if key in used_keys: |
308 | det_pointer[ttheta_cvk]["AZMDict"][key]["Use"] = True |
309 | offset = center_ref - center_tmp[key] |
310 | det_pointer[ttheta_cvk][ |
311 | "AZMDict" |
312 | ][key]["Offset"] = offset |
313 | else: |
314 | det_pointer[ttheta_cvk][ |
315 | "AZMDict" |
316 | ][key]["Use"] = False |
317 | det_pointer[ttheta_cvk][ |
318 | "AZMDict" |
319 | ][key]["Offset"] = 0. |
320 | else: |
321 | # If none of the detector groups in the two theta bin is |
322 | # valid (either file not existing, or fitting failed), |
323 | # we mask out the whole two theta bin. |
324 | for key in det_pointer[ttheta_cvk]["AZMDict"].keys(): |
325 | det_pointer[ttheta_cvk]["AZMDict"][key]["Use"] = False |
326 | det_pointer[ttheta_cvk]["AZMDict"][key]["Offset"] = 0. |
327 | else: |
328 | # If the two theta bin is invalid, we just discard the bin. |
329 | # By 'invalid' two theta bin, we mean that the two theta value |
330 | # is not included in the `ttheta_group_file`. |
331 | for key in det_pointer[ttheta_cvk]["AZMDict"].keys(): |
332 | det_pointer[ttheta_cvk]["AZMDict"][key]["Use"] = False |
333 | det_pointer[ttheta_cvk]["AZMDict"][key]["Offset"] = 0. |
334 | |
335 | # ============================================================================= |
336 | # Loop over the two theta bins and optimize the spherical harmonic coefficients |
337 | # ============================================================================= |
338 | print("[Info] Working on the isotropic correction for each two theta bin...") |
339 | ttheta_bins = np.arange(0, 181, ttheta_binw) |
340 | total_items = len(ttheta_bins) |
341 | sofq_corrected_first = list() |
342 | ttheta_valid = list() |
343 | print("[Info] Processing 2Theta array (0, 180)...") |
344 | for i in range(len(ttheta_bins) - 1): |
345 | ttheta_cv = (ttheta_bins[i] + ttheta_bins[i + 1]) / 2. |
346 | ttheta_cvk = "{:d}".format(int(np.floor(ttheta_cv))) |
347 | print(f"[Info] Processing 2Theta: {ttheta_cvk}...") |
348 | if ttheta_cvk in det_pointer: |
349 | sofq_corrected = list() |
350 | s_coefficients = list() |
351 | valid_group_found = True |
352 | for j in range(len(q_vals)): |
353 | sofq_phi = list() |
354 | sph_matrix = list() |
355 | for _, item in det_pointer[ttheta_cvk]["AZMDict"].items(): |
356 | # According to the `Use` flag created for each azimuthal bin |
357 | # in previous step, we are going to check whether there |
358 | # exist valid azimuthal bins in a two theta bin. |
359 | # NOTE: We may need to add in further check for whether the |
360 | # number of valid azimuthal bins is greater than the number |
361 | # of parameters to fit. |
362 | if int(item["GroupID"]) in all_sofq.keys() and item["Use"]: |
363 | azm_v = item["CentrAMZ"] |
364 | sph_row = list() |
365 | for l in range(sph_l_max + 1): |
366 | for m in range(-l, l + 1): |
367 | if l > 0 and m == 0: |
368 | continue |
369 | sph_row.append( |
370 | sph_harm_real( |
371 | m, |
372 | l, |
373 | ttheta_cv * np.pi / 180., |
374 | azm_v * np.pi / 180. |
375 | ) |
376 | ) |
377 | sph_matrix.append(sph_row) |
378 | sofq_phi.append(all_sofq[int(item["GroupID"])][j]) |
379 | else: |
380 | pass |
381 | |
382 | sph_matrix = np.array(sph_matrix) |
383 | sofq_phi = np.array(sofq_phi) |
384 | |
385 | if len(sph_matrix) == 0 or len(sofq_phi) == 0: |
386 | valid_group_found = False |
387 | break |
388 | |
389 | num_terms = (sph_l_max + 1)**2 - sph_l_max |
390 | params = np.array( |
391 | [1 for _ in range(num_terms)] |
392 | ) |
393 | |
394 | result = minimize( |
395 | target_function, |
396 | params, |
397 | args=(sph_matrix, sofq_phi), |
398 | method='SLSQP' |
399 | ) |
400 | |
401 | optimal_params = result.x |
402 | |
403 | # Calculate the S(Q) corrected, taking into account only the |
404 | # isotropic term. |
405 | y00 = sph_harm_real(0, 0, 0, 0) |
406 | s00 = optimal_params[0] |
407 | sofq_corrected.append(y00 * s00) |
408 | |
409 | s_coefficients.append(optimal_params) |
410 | |
411 | # Output the corrected S(Q) values and the coefficients for |
412 | # each two theta bin. |
413 | # For a certain two theta bin, if no valid azimuthal bin |
414 | # found for any Q value, the `valid_group_found` would be |
415 | # False and thus the corresponding two theta bin will be |
416 | # marked as invalid, i.e., not included in the valid two |
417 | # theta bin list `ttheta_valid`. |
418 | if valid_group_found: |
419 | # The valid two theta bin array and the isotropic |
420 | # corrected S(Q) array are synchronized, i.e., they |
421 | # share the same index, making it convenient for next |
422 | # step implementation. |
423 | sofq_corrected_first.append(sofq_corrected) |
424 | ttheta_valid.append(ttheta_cv) |
425 | with open( |
426 | os.path.join( |
427 | out_dir, |
428 | f"{stem_name}2theta_{ttheta_cvk}.dat" |
429 | ), |
430 | "w" |
431 | ) as f: |
432 | f.write(f"{len(q_vals)}\n") |
433 | f.write("\n") |
434 | for j in range(len(q_vals)): |
435 | f.write( |
436 | f"{q_vals[j]:.4f} {sofq_corrected[j]:.4f}\n" |
437 | ) |
438 | |
439 | with open( |
440 | os.path.join( |
441 | out_dir, |
442 | f"{stem_name}2theta_{ttheta_cvk}_coeffs.dat" |
443 | ), |
444 | "w" |
445 | ) as f: |
446 | for j in range(len(q_vals)): |
447 | f.write(f"{q_vals[j]:.4f} ") |
448 | for k in range(len(s_coefficients[j])): |
449 | f.write( |
450 | f"{s_coefficients[j][k]:.4f} " |
451 | ) |
452 | f.write("\n") |
453 | |
454 | ttheta_valid = np.array(ttheta_valid) |
455 | sofq_corrected_first = np.array(sofq_corrected_first) |
456 | |
457 | # ============================================================================= |
458 | # Pre-process the isotropic corrected S(Q) data corresponding to all the two |
459 | # theta bins. Here is the thing to do: |
460 | # 1. For each two theta bin, grab the data in a certain Q range according to |
461 | # read-in parameters from `ttheta_group_file`. |
462 | # 2. For each two theta bin, fill the data in the low Q range with the data |
463 | # in the same low Q range that comes from the first two theta bin in the |
464 | # same group. |
465 | # ============================================================================= |
466 | print( |
467 | "[Info] Pre-processing (step-1) the isotropic corrected data for the " |
468 | "second stage correction..." |
469 | ) |
470 | ttheta_k_valid = [ |
471 | "{:d}".format(int(np.floor(ttheta))) for ttheta in ttheta_valid |
472 | ] |
473 | for i, ttheta in enumerate(ttheta_valid): |
474 | ttheta_k = ttheta_k_valid[i] |
475 | for key, item in ttheta_groups.items(): |
476 | if ttheta_k in item["QRange"].keys(): |
477 | group = key |
478 | break |
479 | |
480 | qrange_tmp = ttheta_groups[group]["QRange"] |
481 | left_b = qrange_tmp[ttheta_k]["LeftBound"] |
482 | leftmost_b = next(iter(qrange_tmp.items()))[1]["LeftBound"] |
483 | right_b_fill = left_b |
484 | |
485 | # Fill the low Q range with the data from the first two theta bin |
486 | # in the same group. |
487 | ref_key_ig = next(iter(qrange_tmp)) |
488 | ref_key_id = ttheta_k_valid.index(ref_key_ig) |
489 | mask_fill = (q_vals >= leftmost_b) & (q_vals < right_b_fill) |
490 | sofq_corrected_first[i][mask_fill] = sofq_corrected_first[ |
491 | ref_key_id |
492 | ][mask_fill] |
493 | |
494 | # ============================================================================= |
495 | # Process each of the two theta groups separately. Here is the thing to do: |
496 | # 1. For each group, align all the patterns according to the peak fitting. |
497 | # 2. Within each group, we, |
498 | # - for each two theta, figure out and remove the background, using an |
499 | # exponential function |
500 | # - integrate intensities in those pre-defined Q chunks |
501 | # - for each Q chunk, fit against the spherical harmonics across two |
502 | # theta angles, just like what we did with each single Q point when |
503 | # correcting the phi part. |
504 | # - extract the isotropic term for each Q chunk, calculate the ratio |
505 | # of the original integrated intensity over the isotropic intensity |
506 | # for each two theta value. Then rescale each Q chunk for each two |
507 | # theta. |
508 | # - add back the exponential background |
509 | # ============================================================================= |
510 | print( |
511 | "[Info] Pre-processing (step-2) the isotropic corrected data for the " |
512 | "second stage correction..." |
513 | ) |
514 | sofq_bkg_removed = dict() |
515 | bkg_off = np.inf |
516 | |
517 | for tg_key, item in ttheta_groups.items(): |
518 | print(f"[Info] Processing 2Theta group {tg_key}...") |
519 | pa_lb = item["PAParams"]["LeftBound"] |
520 | pa_rb = item["PAParams"]["RightBound"] |
521 | center_tmp = dict() |
522 | for tt_key in item["QRange"].keys(): |
523 | if tt_key not in ttheta_k_valid: |
524 | continue |
525 | tt_index = ttheta_k_valid.index(tt_key) |
526 | sofq_tmp = sofq_corrected_first[tt_index] |
527 | mask = (q_vals >= pa_lb) & (q_vals <= pa_rb) |
528 | q_vals_fit = q_vals[mask] |
529 | sofq_fit = sofq_tmp[mask] |
530 | |
531 | p_init = [np.max(sofq_fit), (pa_lb + pa_rb) / 2., 1.] |
532 | |
533 | if testing: |
534 | center_tmp[tt_key] = 0. |
535 | else: |
536 | try: |
537 | popt, _ = curve_fit( |
538 | gaussian, |
539 | q_vals_fit, |
540 | sofq_fit, |
541 | p0=p_init |
542 | ) |
543 | center_tmp[tt_key] = popt[1] |
544 | except RuntimeError: |
545 | center_tmp[tt_key] = 0. |
546 | |
547 | # center_ref = next(iter(center_tmp.items()))[1] |
548 | center_ref = 5.06 |
549 | for tt_key, c_item in center_tmp.items(): |
550 | # According to the peak fitting failure handling just above, the |
551 | # corresponding peak center would be set to 0. |
552 | if c_item == 0.: |
553 | offset = 0. |
554 | else: |
555 | offset = center_ref - c_item |
556 | |
557 | tt_index = ttheta_k_valid.index(tt_key) |
558 | q_vals_tmp = q_vals + offset |
559 | sofq_tmp = sofq_corrected_first[tt_index].copy() |
560 | q_bin_s = q_vals[1] - q_vals[0] |
561 | if offset < 0: |
562 | while q_vals_tmp[-1] < q_vals[-1]: |
563 | q_vals_tmp = np.append( |
564 | q_vals_tmp, |
565 | q_vals_tmp[-1] + q_bin_s |
566 | ) |
567 | sofq_tmp = np.append( |
568 | sofq_tmp, |
569 | sofq_corrected_first[tt_index][-1] |
570 | ) |
571 | else: |
572 | while q_vals_tmp[0] > q_vals[0]: |
573 | q_vals_tmp = np.insert( |
574 | q_vals_tmp, |
575 | 0, |
576 | q_vals_tmp[0] - q_bin_s |
577 | ) |
578 | sofq_tmp = np.insert( |
579 | sofq_tmp, |
580 | 0, |
581 | sofq_corrected_first[tt_index][0] |
582 | ) |
583 | _, sofq_rebin = rebin( |
584 | q_vals_tmp, |
585 | sofq_tmp, |
586 | q_vals[0], |
587 | q_bin_s, |
588 | q_vals[-1] |
589 | ) |
590 | sofq_corrected_first[tt_index] = sofq_rebin |
591 | |
592 | for tt_key in item["QRange"].keys(): |
593 | if tt_key not in ttheta_k_valid: |
594 | continue |
595 | tt_index = ttheta_k_valid.index(tt_key) |
596 | ttheta = ttheta_valid[tt_index] |
597 | sofq_ttheta = sofq_corrected_first[tt_index] |
598 | |
599 | bkg_lb = item["QRange"][tt_key]["LeftBound"] |
600 | bkg_rb = item["QRange"][tt_key]["RightBound"] |
601 | |
602 | # Here we output the aligned patterns. |
603 | # N.B. The alignment only happens internally in each of the defined |
604 | # groups. Therefore, a final stage alignment may still be needed to |
605 | # align the patterns after all steps. |
606 | with open( |
607 | os.path.join( |
608 | out_dir, |
609 | f"{stem_name}ttheta_{tt_key}_corrected_aligned.dat" |
610 | ), |
611 | "w" |
612 | ) as f: |
613 | f.write(f"{len(q_vals)}\n") |
614 | f.write("\n") |
615 | for i, q_val in enumerate(q_vals): |
616 | f.write("{0:10.3F}{1:20.5F}\n".format(q_val, sofq_ttheta[i])) |
617 | |
618 | # We try to use the `argrelextrema` method in `scipy` to figure out |
619 | # local valley points on the pattern and use those valley points for |
620 | # fitting an exponential background. |
621 | mask_tmp = q_vals >= bkg_lb |
622 | sofq_ttheta_m = sofq_ttheta[mask_tmp] |
623 | q_vals_m = q_vals[mask_tmp] |
624 | |
625 | # The `order` parameter here controls how 'local' the valley will be. |
626 | # If set to a very small value, it will reproduce the overall pattern |
627 | # itself. But setting the value too high may induce too few points. |
628 | q_min = argrelextrema(sofq_ttheta_m, np.less, order=20) |
629 | sofq_min = np.array([sofq_ttheta_m[item_i] for item_i in q_min[0]]) |
630 | q_min = np.array([q_vals_m[item_i] for item_i in q_min[0]]) |
631 | |
632 | # For the background fitting purpose, we set all S(Q) values above |
633 | # the specified threshold to the value just below (inclusive) the |
634 | # boundary. This will get rid of the impact of those strong noise |
635 | # in the high Q region. |
636 | indices = np.where(q_min <= bkg_rb)[0] |
637 | max_tmp = np.max(q_min[indices]) |
638 | up_index = np.where(q_min == max_tmp)[0][0] |
639 | for qm_i, qm_val in enumerate(q_min): |
640 | if qm_val > bkg_rb: |
641 | sofq_min[qm_i] = sofq_min[up_index] |
642 | |
643 | # Specifying the bounding parameter for `curve_fit` helps a lot with |
644 | # fitting convergence. |
645 | p_init = [0, 0.1, 0.1] |
646 | bounds_l = [-np.inf, 0., 0.] |
647 | bounds_u = [np.inf, 1., 1.] |
648 | popt, pconv = curve_fit( |
649 | bkg_expo, |
650 | q_min, |
651 | sofq_min, |
652 | p0=p_init, |
653 | bounds=(bounds_l, bounds_u) |
654 | ) |
655 | |
656 | is_infinite = False |
657 | for row in pconv: |
658 | for element in row: |
659 | if np.isinf(element): |
660 | is_infinite = True |
661 | break |
662 | |
663 | if is_infinite: |
664 | print("[Warning] Infinite covariance in the curve fitting.") |
665 | |
666 | with open( |
667 | os.path.join( |
668 | out_dir, |
669 | f"{stem_name}ttheta_{tt_key}_corrected_bkg_pts.dat" |
670 | ), |
671 | "w" |
672 | ) as f: |
673 | f.write(f"{len(q_min)}\n") |
674 | f.write("\n") |
675 | for i, q_val in enumerate(q_min): |
676 | f.write( |
677 | "{0:10.3F}{1:20.5F}\n".format( |
678 | q_val, sofq_min[i] |
679 | ) |
680 | ) |
681 | |
682 | sofq_ttheta_bkg = bkg_expo(q_vals, popt[0], popt[1], popt[2]) |
683 | sofq_ttheta_mb = sofq_ttheta - sofq_ttheta_bkg |
684 | sofq_bkg_removed[tt_key] = sofq_ttheta_mb |
685 | |
686 | # We want to keep a record of the background that most closely resemble |
687 | # 1 at high Q. |
688 | bkg_hq_val = bkg_expo(q_vals[-1], popt[0], popt[1], popt[2]) |
689 | bkg_off_tmp = abs(1. - bkg_hq_val) |
690 | if bkg_off_tmp < bkg_off: |
691 | bkg_off = bkg_off_tmp |
692 | bkg_popt = popt |
693 | |
694 | with open( |
695 | os.path.join( |
696 | out_dir, |
697 | f"{stem_name}ttheta_{tt_key}_corrected_bkg_rm_fittedbkg.dat" |
698 | ), |
699 | "w" |
700 | ) as f: |
701 | f.write(f"{len(q_vals)}\n") |
702 | f.write("\n") |
703 | for i, q_val in enumerate(q_vals): |
704 | f.write( |
705 | "{0:10.3F}{1:20.5F}{2:20.5F}\n".format( |
706 | q_val, sofq_ttheta_mb[i], sofq_ttheta_bkg[i] |
707 | ) |
708 | ) |
709 | |
710 | # Integrate intensities inside Q chunks. |
711 | ttheta_groups[tg_key]["QRange"][tt_key]["QChunkInt"] = list() |
712 | for qv_i, q_val in enumerate(q_vals): |
713 | v_t = sofq_ttheta_mb[qv_i] |
714 | for qc_i, qc_val in enumerate(item["QChunkDef"]): |
715 | if qc_i == 0: |
716 | if q_val <= qc_val: |
717 | if len(item["QRange"][tt_key]["QChunkInt"]) == 0: |
718 | ttheta_groups[ |
719 | tg_key |
720 | ]["QRange"][tt_key]["QChunkInt"].append(v_t) |
721 | else: |
722 | ttheta_groups[ |
723 | tg_key |
724 | ]["QRange"][tt_key]["QChunkInt"][0] += v_t |
725 | break |
726 | else: |
727 | if item["QChunkDef"][qc_i - 1] < q_val <= qc_val: |
728 | if len(item["QRange"][tt_key]["QChunkInt"]) == qc_i: |
729 | in_range1 = bkg_lb <= item["QChunkDef"][qc_i - 1] |
730 | in_range2 = bkg_rb >= qc_val |
731 | if in_range1 and in_range2: |
732 | ttheta_groups[ |
733 | tg_key |
734 | ]["QRange"][tt_key]["QChunkInt"].append(v_t) |
735 | else: |
736 | ttheta_groups[ |
737 | tg_key |
738 | ]["QRange"][tt_key]["QChunkInt"].append(np.inf) |
739 | else: |
740 | ttheta_groups[ |
741 | tg_key |
742 | ]["QRange"][tt_key]["QChunkInt"][-1] += v_t |
743 | break |
744 | |
745 | # Treat each Q chunk as a single 'point' and perform the isotropic correction |
746 | # just as before. |
747 | sph00 = sph_harm_real(0, 0, 0, 0) |
748 | for tg_key, item in ttheta_groups.items(): |
749 | params_all = list() |
750 | qchunks_len = len(item["QChunkDef"]) |
751 | for i in range(qchunks_len): |
752 | sph_matrix = list() |
753 | sofq_ttheta = list() |
754 | lin_limit = item["LinLimit"][i] |
755 | |
756 | num_eqs = 0 |
757 | for tt_key in item["QRange"].keys(): |
758 | if tt_key in ttheta_k_valid: |
759 | num_eqs += 1 |
760 | |
761 | tt_sph_l_max = 0 |
762 | while (tt_sph_l_max + 1 + 1)**2 < num_eqs: |
763 | tt_sph_l_max += 1 |
764 | |
765 | for tt_key in item["QRange"].keys(): |
766 | if "Factor" not in item["QRange"][tt_key].keys(): |
767 | ttheta_groups[tg_key]["QRange"][tt_key]["Factor"] = list() |
768 | |
769 | if tt_key not in ttheta_k_valid: |
770 | continue |
771 | |
772 | tt_index = ttheta_k_valid.index(tt_key) |
773 | ttheta_tmp = ttheta_valid[tt_index] |
774 | v_t = item["QRange"][tt_key]["QChunkInt"][i] |
775 | if v_t == np.inf: |
776 | continue |
777 | sofq_ttheta.append(v_t) |
778 | sph_row = list() |
779 | for l in range(tt_sph_l_max + 1): |
780 | for m in range(-l, l + 1): |
781 | sph_row.append( |
782 | sph_harm_real( |
783 | m, |
784 | l, |
785 | ttheta_tmp * np.pi / 180., |
786 | 0. |
787 | ) |
788 | ) |
789 | |
790 | if "Infinity" not in lin_limit: |
791 | sph_row.append(ttheta_tmp) |
792 | |
793 | sph_matrix.append(sph_row) |
794 | |
795 | sph_matrix = np.array(sph_matrix) |
796 | sofq_ttheta = np.array(sofq_ttheta) |
797 | |
798 | if len(sph_matrix) > 0 or len(sofq_ttheta) > 0: |
799 | if "Infinity" in lin_limit: |
800 | num_terms = (tt_sph_l_max + 1)**2 |
801 | else: |
802 | num_terms = (tt_sph_l_max + 1)**2 + 1 |
803 | |
804 | params = np.array( |
805 | [1 for _ in range(num_terms)] |
806 | ) |
807 | |
808 | min_b = min(sofq_ttheta) / sph00 |
809 | max_b = max(sofq_ttheta) / sph00 |
810 | bounds = [(min_b, max_b)] |
811 | if "Infinity" not in lin_limit: |
812 | num_b_add = num_terms - 2 |
813 | else: |
814 | num_b_add = num_terms - 1 |
815 | |
816 | for _ in range(num_b_add): |
817 | bounds.append( |
818 | (-float('inf'), float('inf')) |
819 | ) |
820 | |
821 | if "Infinity" not in lin_limit: |
822 | bounds.append((lin_limit[0], lin_limit[1])) |
823 | |
824 | result = minimize( |
825 | target_function, |
826 | params, |
827 | args=(sph_matrix, sofq_ttheta), |
828 | method='SLSQP', |
829 | bounds=bounds |
830 | ) |
831 | |
832 | optimal_params = result.x |
833 | |
834 | y00 = sph_harm_real(0, 0, 0, 0) |
835 | s00 = optimal_params[0] |
836 | if "Infinity" not in lin_limit: |
837 | ctt = optimal_params[-1] |
838 | else: |
839 | y00 = 0. |
840 | s00 = 0. |
841 | |
842 | # Calculate the ratio of the original integrated intensity over |
843 | # the isotropic intensity. |
844 | for tt_key in item["QRange"].keys(): |
845 | if tt_key not in ttheta_k_valid: |
846 | continue |
847 | |
848 | if i == 0: |
849 | factor_v = 1. |
850 | else: |
851 | tt_index = ttheta_k_valid.index(tt_key) |
852 | ttheta_tmp = ttheta_valid[tt_index] |
853 | if "Infinity" in lin_limit: |
854 | top_val = y00 * s00 |
855 | else: |
856 | top_val = y00 * s00 + ctt * ttheta_tmp |
857 | |
858 | bottom_val = item["QRange"][tt_key]["QChunkInt"][i] |
859 | factor_v = top_val / bottom_val |
860 | |
861 | # if tg_key == "Group-1": |
862 | # if i == 2: |
863 | # print("Debugging -> ") |
864 | # print( |
865 | # f"fit val: {top_val}, init val: {bottom_val}", |
866 | # f"y00: {y00}, s00: {s00}", |
867 | # f"tt: {ttheta_tmp}" |
868 | # ) |
869 | # print( |
870 | # f"fit val: {top_val}, init val: {bottom_val}", |
871 | # f"y00: {y00}, s00: {s00}, ctt: {ctt}", |
872 | # f"tt: {ttheta_tmp}" |
873 | # ) |
874 | |
875 | ttheta_groups[tg_key]["QRange"][tt_key]["Factor"].append( |
876 | factor_v |
877 | ) |
878 | |
879 | # Rescale the data by Q chunks according to the ratio obtained above. |
880 | for tt_key in item["QRange"].keys(): |
881 | if tt_key not in ttheta_k_valid: |
882 | continue |
883 | |
884 | for i, q_val in enumerate(q_vals): |
885 | for ii, qc_val in enumerate(item["QChunkDef"]): |
886 | if ii > 0: |
887 | # if q_vals[ii - 1] < q_val <= qc_val and q_val > 4.: |
888 | if q_vals[ii - 1] < q_val <= qc_val: |
889 | sofq_bkg_removed[tt_key][i] *= item[ |
890 | "QRange" |
891 | ][tt_key]["Factor"][ii] |
892 | |
893 | break |
894 | |
895 | # Add back in the exponential background. |
896 | bkg_add = bkg_expo(q_vals, bkg_popt[0], bkg_popt[1], bkg_popt[2]) |
897 | sofq_final = sofq_bkg_removed[tt_key] + bkg_add |
898 | |
899 | with open( |
900 | os.path.join( |
901 | out_dir, |
902 | f"{stem_name}ttheta_{tt_key}_corrected_wo_bkg.dat" |
903 | ), |
904 | "w" |
905 | ) as f: |
906 | f.write(f"{len(q_vals)}\n") |
907 | f.write("\n") |
908 | for i, q_val in enumerate(q_vals): |
909 | f.write( |
910 | "{0:10.3F}{1:20.5F}\n".format( |
911 | q_val, sofq_bkg_removed[tt_key][i] |
912 | ) |
913 | ) |
914 | |
915 | with open( |
916 | os.path.join( |
917 | out_dir, |
918 | f"{stem_name}ttheta_{tt_key}_corrected_final.dat" |
919 | ), |
920 | "w" |
921 | ) as f: |
922 | f.write(f"{len(q_vals)}\n") |
923 | f.write("\n") |
924 | for i, q_val in enumerate(q_vals): |
925 | f.write( |
926 | "{0:10.3F}{1:20.5F}\n".format( |
927 | q_val, sofq_final[i] |
928 | ) |
929 | ) |
930 | |
931 | with open(os.path.join(out_dir, f"{stem_name}_ttheta_1na_2a.json"), "w") as f: |
932 | json.dump(ttheta_groups, f, indent=4) |
933 | |
934 | print("\n=========================================================") |
935 | print("[Info] Done with the two-stage isotropic S(Q) correction.") |
936 | print("[Info] Proceed to merge the corrected S(Q) data by bank.") |
937 | print("=========================================================\n") |
938 | |
939 | with open("output_group.json", "r") as f: |
940 | group_dict = json.load(f) |
941 | |
942 | bank_dict = dict() |
943 | for key in ttheta_k_valid: |
944 | for keykey in ttheta_groups.keys(): |
945 | if key in ttheta_groups[keykey]["QRange"].keys(): |
946 | key_group = keykey |
947 | break |
948 | |
949 | file_t = os.path.join( |
950 | out_dir, |
951 | f"{stem_name}ttheta_{key}_corrected_final.dat" |
952 | ) |
953 | if os.path.exists(file_t): |
954 | y_tmp = np.loadtxt(file_t, skiprows=2, dtype=float)[:, 1] |
955 | for gkey, item in group_dict.items(): |
956 | if item[0] <= float(key) < item[1]: |
957 | bank = gkey |
958 | break |
959 | |
960 | if bank not in bank_dict: |
961 | bank_dict[bank] = { |
962 | "y": y_tmp, |
963 | "num_bands": 1, |
964 | "q_counts": [0 for _ in range(len(q_vals))] |
965 | } |
966 | else: |
967 | bank_dict[bank]["y"] += y_tmp |
968 | bank_dict[bank]["num_bands"] += 1 |
969 | |
970 | q_chunk_tmp = ttheta_groups[key_group]["QChunkDef"] |
971 | for i, q_val in enumerate(q_vals): |
972 | for ii, qc_val in enumerate(q_chunk_tmp): |
973 | if ii == 0: |
974 | if q_val <= qc_val: |
975 | q_chunk_id = ii |
976 | break |
977 | else: |
978 | if q_chunk_tmp[ii - 1] < q_val <= qc_val: |
979 | q_chunk_id = ii |
980 | break |
981 | |
982 | if q_chunk_id == 0: |
983 | factor = 1. |
984 | else: |
985 | factor = ttheta_groups[ |
986 | key_group |
987 | ]["QRange"][key]["Factor"][q_chunk_id] |
988 | |
989 | if factor > 0.: |
990 | bank_dict[bank]["q_counts"][i] += 1 |
991 | |
992 | for key, item in bank_dict.items(): |
993 | for i in range(len(q_vals)): |
994 | if item["q_counts"][i] > 0: |
995 | item["y"][i] /= item["q_counts"][i] |
996 | else: |
997 | item["y"][i] /= item["num_bands"] |
998 | |
999 | with open( |
1000 | os.path.join(out_dir, f"{stem_name}{key}_merged_1na_2a_ave.dat"), |
1001 | "w" |
1002 | ) as f: |
1003 | f.write(f"{len(q_vals)}\n") |
1004 | f.write("\n") |
1005 | for i, q_val in enumerate(q_vals): |
1006 | f.write("{0:10.3F}{1:20.5F}\n".format(q_val, item["y"][i])) |
1007 | |
1008 | print("=========================================================") |
1009 | print("[Info] Done with merging the corrected S(Q) by banks.") |
1010 | print("=========================================================\n") |