import healpy as hp
import numpy as np
[docs]
class alm:
def __init__(self, backend=None, lmax=24, nside=None, limit_range=1e10):
if backend is None:
import foscat.scat_cov as sc
self.sc = sc.funct()
self.backend = self.sc.backend
else:
self.backend = backend.backend
self._logtab = {}
self.lth = {}
self.lph = {}
self.matrix_shift_ph = {}
self.ratio_mm = {}
self.P_mm = {}
self.A = {}
self.B = {}
if nside is not None:
self.maxlog = 6 * nside + 1
self.lmax = 3 * nside
else:
self.lmax = lmax
self.maxlog = 2 * lmax + 1
for k in range(1, self.maxlog):
self._logtab[k] = self.backend.bk_log(self.backend.bk_cast(k))
self._logtab[0] = 0.0
if nside is not None:
self.ring_th(nside)
self.ring_ph(nside)
self.shift_ph(nside)
self._limit_range = 1 / limit_range
self._log_limit_range = np.log(limit_range)
self.Yp = {}
self.Ym = {}
[docs]
def ring_th(self, nside):
if nside not in self.lth:
n = 0
ith = []
for k in range(nside - 1):
N = 4 * (k + 1)
ith.append(n)
n += N
for k in range(2 * nside + 1):
N = 4 * nside
ith.append(n)
n += N
for k in range(nside - 1):
N = 4 * (nside - 1 - k)
ith.append(n)
n += N
th, ph = hp.pix2ang(nside, ith)
self.lth[nside] = th
return self.lth[nside]
[docs]
def ring_ph(self, nside):
if nside not in self.lph:
n = 0
iph = []
for k in range(nside - 1):
N = 4 * (k + 1)
iph.append(n)
n += N
for k in range(2 * nside + 1):
N = 4 * nside
iph.append(n)
n += N
for k in range(nside - 1):
N = 4 * (nside - 1 - k)
iph.append(n)
n += N
th, ph = hp.pix2ang(nside, iph)
self.lph[nside] = ph
[docs]
def shift_ph(self, nside):
if nside not in self.matrix_shift_ph:
self.ring_th(nside)
self.ring_ph(nside)
x = (-1j * np.arange(3 * nside)).reshape(1, 3 * nside)
self.matrix_shift_ph[nside] = self.backend.bk_exp(
self.backend.bk_cast(x * self.lph[nside].reshape(4 * nside - 1, 1))
)
self.lmax = 3 * nside - 1
# ratio_mm = {}
for m in range(3 * nside):
val = np.zeros([self.lmax - m + 1])
aval = np.zeros([self.lmax - m + 1])
bval = np.zeros([self.lmax - m + 1])
if m > 0:
val[0] = self.double_factorial_log(2 * m - 1) - 0.5 * np.sum(
np.log(1 + np.arange(2 * m))
)
else:
val[0] = self.double_factorial_log(2 * m - 1)
if m < self.lmax:
aval[1] = 2 * m + 1
val[1] = val[0] - 0.5 * self.log(2 * m + 1)
for ell in range(m + 2, self.lmax + 1):
aval[ell - m] = (2 * ell - 1) / (ell - m)
bval[ell - m] = (ell + m - 1) / (ell - m)
val[ell - m] = (
val[ell - m - 1]
+ 0.5 * self.log(ell - m)
- 0.5 * self.log(ell + m)
)
self.A[nside, m] = self.backend.bk_constant(aval)
self.B[nside, m] = self.backend.bk_constant(bval)
self.ratio_mm[nside, m] = self.backend.bk_constant(
np.sqrt(4 * np.pi) * np.expand_dims(np.exp(val), 1)
)
# Calcul de P_{mm}(x)
P_mm = np.ones([3 * nside, 4 * nside - 1])
x = np.cos(self.lth[nside])
if m == 0:
P_mm[m] = 1.0
for m in range(3 * nside - 1):
P_mm[m] = (0.5 - m % 2) * 2 * (1 - x**2) ** (m / 2)
self.P_mm[nside] = self.backend.bk_constant(P_mm)
[docs]
def init_Ys(self, s, nside):
if (s, nside) not in self.Yp:
import quaternionic
import spherical
ell_max = 3 * nside - 1 # Use the largest ℓ value you expect to need
wigner = spherical.Wigner(ell_max)
# th,ph=hp.pix2ang(nside,np.arange(12*nside*nside))
lth = self.ring_th(nside)
R = quaternionic.array.from_spherical_coordinates(lth, 0 * lth)
self.Yp[s, nside] = {}
self.Ym[s, nside] = {}
iplus = (wigner.sYlm(s, R) * (4 * np.pi / (12 * nside**2))).T.real
imoins = (wigner.sYlm(-s, R) * (4 * np.pi / (12 * nside**2))).T.real
for m in range(ell_max + 1):
idx = np.array([wigner.Yindex(k, m) for k in range(m, ell_max + 1)])
vnorm = 1 / np.expand_dims(
np.sqrt(2 * (np.arange(ell_max - m + 1) + m) + 1), 1
)
self.Yp[s, nside][m] = self.backend.bk_cast(iplus[idx] * vnorm + 0j)
self.Ym[s, nside][m] = self.backend.bk_cast(imoins[idx] * vnorm + 0j)
del iplus
del imoins
del wigner
[docs]
def log(self, v):
return np.log(v)
if isinstance(v, np.ndarray):
return np.array([self.backend.bk_log(self.backend.bk_cast(k)) for k in v])
if v < self.maxlog:
return self._logtab[v]
else:
self._logtab[v] = self.backend.bk_log(self.backend.bk_cast(v))
return self._logtab[v]
# Fonction pour calculer la double factorielle
[docs]
def double_factorial_log(self, n):
if n <= 0:
return 0.0
result = 0.0
for i in range(n, 0, -2):
result += np.log(i)
return result
[docs]
def recurrence_fn(self, states, inputs):
"""
Recurrence function for tf.scan.
states: un tuple (U_{n-1}, U_{n-2}) de forme [m]
inputs: a tuple (a_n(x), b_n) where a_n(x) has shape [m]
"""
U_prev, U_prev2 = states
a_n, b_n = inputs # a_n est de forme [m], b_n est un scalaire
U_n = a_n * U_prev - b_n * U_prev2
return (U_n, U_prev) # Advance the states
# Calcul des P_{lm}(x) pour tout l inclus dans [m,lmax]
[docs]
def compute_legendre_m(self, x, m, lmax, nside):
result = np.zeros([lmax - m + 1, x.shape[0]])
ratio = np.zeros([lmax - m + 1, 1])
ratio[0, 0] = self.double_factorial_log(2 * m - 1) - 0.5 * np.sum(
self.log(1 + np.arange(2 * m))
)
# Step 1: Compute P_{mm}(x)
if m == 0:
Pmm = 1.0
else:
# Pmm = (-1)**m * (1 - x**2)**(m/2)
Pmm = (0.5 - m % 2) * 2 * (1 - x**2) ** (m / 2)
# Si l == m, c'est directement P_{mm}
result[0] = Pmm
if m == lmax:
return result * np.exp(ratio) * np.sqrt(4 * np.pi) + 0j
# Step 2: Compute P_{l+1, m}(x)
result[1] = x * (2 * m + 1) * result[0]
ratio[1, 0] = ratio[0, 0] - 0.5 * self.log(2 * m + 1)
# Step 3: Recurrence for l > m + 1
for ell in range(m + 2, lmax + 1):
result[ell - m] = (
(2 * ell - 1) * x * result[ell - m - 1]
- (ell + m - 1) * result[ell - m - 2]
) / (ell - m)
ratio[ell - m, 0] = (
0.5 * self.log(ell - m)
- 0.5 * self.log(ell + m)
+ ratio[ell - m - 1, 0]
)
if np.max(abs(result[ell - m])) > self._limit_range:
result[ell - m - 1] *= self._limit_range
result[ell - m] *= self._limit_range
ratio[ell - m - 1, 0] += self._log_limit_range
ratio[ell - m, 0] += self._log_limit_range
return result * np.exp(ratio) * np.sqrt(4 * np.pi) + 0j
# Calcul des P_{lm}(x) pour tout l inclus dans [m,lmax]
[docs]
def compute_legendre_m_old2(self, x, m, lmax, nside):
result = {}
# Si l == m, c'est directement P_{mm}
result[0] = self.P_mm[nside][m]
if m == lmax:
v = self.backend.bk_reshape(
result[0] * self.ratio_mm[nside, m][0], [1, 4 * nside - 1]
)
return self.backend.bk_complex(v, 0 * v)
# Step 2: Compute P_{l+1, m}(x)
result[1] = x * self.A[nside, m][1] * result[0]
# Step 3: Recurrence for l > m + 1
for ell in range(m + 2, lmax + 1):
result[ell - m] = (
self.A[nside, m][ell - m] * x * result[ell - m - 1]
- self.B[nside, m][ell - m] * result[ell - m - 2]
)
result = self.backend.bk_reshape(
self.backend.bk_concat([result[k] for k in range(lmax + 1 - m)], axis=0),
[lmax + 1 - m, 4 * nside - 1],
)
return self.backend.bk_complex(result * self.ratio_mm[nside, m], 0 * result)
[docs]
def compute_legendre_m_old(self, x, m, lmax, nside):
import tensorflow as tf
result = {}
# Si l == m, c'est directement P_{mm}
U_0 = self.P_mm[nside][m]
if m == lmax:
v = self.backend.bk_reshape(
U_0 * self.ratio_mm[nside, m][0], [1, 4 * nside - 1]
)
return self.backend.bk_complex(v, 0 * v)
# Step 2: Compute P_{l+1, m}(x)
U_1 = x * self.A[nside, m][1] * U_0
if m == lmax - 1:
result = tf.concat(
[
self.backend.bk_expand_dims(U_0, 0),
self.backend.bk_expand_dims(U_1, 0),
],
0,
)
return self.backend.bk_complex(result * self.ratio_mm[nside, m], 0 * result)
a_values = self.backend.bk_expand_dims(
self.A[nside, m], 1
) * self.backend.bk_expand_dims(x, 0)
# Initialise states with (U_1, U_0) for each m
initial_states = (U_1, U_0)
inputs = (a_values[2:], self.B[nside, m][2:])
# Appliquer tf.scan
result = tf.scan(self.recurrence_fn, inputs, initializer=initial_states)
# The first element of result contains U[n]
result = tf.concat(
[
self.backend.bk_expand_dims(U_0, 0),
self.backend.bk_expand_dims(U_1, 0),
result[0],
],
axis=0,
)
"""
# Step 3: Recurrence for l > m + 1
for l in range(m + 2, lmax+1):
result[l-m] = self.A[nside,m][l-m] * x * result[l-m-1] - self.B[nside,m][l-m] * result[l-m-2]
if np.max(abs(result[l-m]))>self._limit_range:
result[l-m-1]*= self._limit_range
result[l-m]*= self._limit_range
ratio[l-m-1]+= self._log_limit_range
ratio[l-m]+= self._log_limit_range
result=self.backend.bk_reshape(self.backend.bk_concat([result[k] for k in range(lmax+1-m)],axis=0),[lmax+1-m,4*nside-1])
"""
return self.backend.bk_complex(result * self.ratio_mm[nside, m], 0 * result)
# Calcul des s_P_{lm}(x) pour tout l inclus dans [m,lmax]
[docs]
def compute_legendre_spin2_m(self, co_th, si_th, m, lmax):
result = np.zeros([lmax - m + 2, co_th.shape[0]])
ratio = np.zeros([lmax - m + 2, 1])
ratio[1, 0] = self.double_factorial_log(2 * m - 1) - 0.5 * np.sum(
self.log(1 + np.arange(2 * m))
)
# Step 1: Compute P_{mm}(x)
if m == 0:
Pmm = 1.0
else:
# Pmm = (-1)**m * (1 - x**2)**(m/2)
Pmm = (0.5 - m % 2) * 2 * (1 - co_th**2) ** (m / 2)
# Si l == m, c'est directement P_{mm}
result[1] = Pmm
if m == lmax:
ylm = result * np.exp(ratio)
ylm[1:] *= (
np.sqrt(4 * np.pi * (2 * (np.arange(lmax - m + 1) + m) + 1))
).reshape(lmax + 1 - m, 1)
else:
# Step 2: Compute P_{l+1, m}(x)
result[2] = co_th * (2 * m + 1) * result[0]
ratio[2, 0] = ratio[1, 0] - self.log(2 * m + 1) / 2
# Step 3: Recurrence for l > m + 1
for ell in range(m + 2, lmax + 1):
result[ell - m + 1] = (
(2 * ell - 1) * co_th * result[ell - m]
- (ell + m - 1) * result[ell - m - 1]
) / (ell - m)
ratio[ell - m + 1, 0] = (
self.log(ell - m) - self.log(ell + m)
) / 2 + ratio[ell - m, 0]
if np.max(abs(result[ell - m + 1])) > self._limit_range:
result[ell - m] *= self._limit_range
result[ell - m + 1] *= self._limit_range
ratio[ell - m, 0] += self._log_limit_range
ratio[ell - m + 1, 0] += self._log_limit_range
ylm = result * np.exp(ratio)
ylm[1:] *= (
np.sqrt(4 * np.pi * (2 * (np.arange(lmax - m + 1) + m) + 1))
).reshape(lmax + 1 - m, 1)
ell = (np.arange(lmax + 1 - m) + m).reshape(lmax + 1 - m, 1)
cot_th = co_th / si_th
si2_th = si_th * si_th
a = (2 * m**2 - ell * (ell + 1)) / (
si2_th.reshape(1, si2_th.shape[0])
) + ell * (ell - 1) * cot_th * cot_th
b = 2 * m * (ell - 1) * cot_th / si_th
w = np.zeros([lmax + 1 - m, 1])
l_ell = ell[ell > 1]
w[ell > 1] = np.sqrt(1 / ((l_ell + 2) * (l_ell + 1) * (l_ell) * (l_ell - 1)))
w = w.reshape(lmax + 1 - m, 1)
alpha_plus = w * (a + b)
alpha_moins = w * (a - b)
a = 2 * np.sqrt((2 * ell + 1) / (2 * ell - 1) * (ell * ell - m * m))
b = m / si2_th
beta_plus = w * a * (cot_th / si_th + b)
beta_moins = w * a * (cot_th / si_th - b)
ylm_plus = alpha_plus * ylm[1:] + beta_plus * ylm[:-1]
ylm_moins = alpha_moins * ylm[1:] + beta_moins * ylm[:-1]
return ylm_plus, ylm_moins
[docs]
def rfft2fft(self, val, axis=0):
r = self.backend.bk_rfft(val)
if axis == 0:
r_inv = self.backend.bk_reverse(
self.backend.bk_conjugate(r[..., 1:-1]), axis=-1
)
else:
r_inv = self.backend.bk_reverse(
self.backend.bk_conjugate(r[..., 1:-1]), axis=-1
)
return self.backend.bk_concat([r, r_inv], axis=axis + 1)
[docs]
def irfft2fft(self, val, N, axis=0):
if axis == 0:
return self.backend.bk_irfft(val[0 : N // 2 + 1])
else:
return self.backend.bk_irfft(val[:, 0 : N // 2 + 1])
[docs]
def comp_tf(self, im, nside, realfft=False):
# im is [Nimage,12*nside**2]
self.shift_ph(nside)
n = 0
ft_im = []
for k in range(nside - 1):
N = 4 * (k + 1)
if realfft:
tmp = self.rfft2fft(im[:, n : n + N])
else:
tmp = self.backend.bk_fft(im[:, n : n + N])
l_n = tmp.shape[1]
if l_n < 3 * nside + 1:
repeat_n = 3 * nside // l_n + 1
tmp = self.backend.bk_tile(tmp, repeat_n, axis=1)
ft_im.append(tmp[:, None, 0 : 3 * nside])
n += N
N = 4 * nside * (2 * nside + 1)
v = self.backend.bk_reshape(
im[:, n : n + N], [im.shape[0], 2 * nside + 1, 4 * nside]
)
if realfft:
v_fft = self.rfft2fft(v, axis=1)[:, :, : 3 * nside]
else:
v_fft = self.backend.bk_fft(v)[:, :, : 3 * nside]
n += N
ft_im.append(v_fft)
if nside > 1:
for k in range(nside - 1):
N = 4 * (nside - 1 - k)
if realfft:
tmp = self.rfft2fft(im[:, n : n + N])
else:
tmp = self.backend.bk_fft(im[:, n : n + N])
l_n = tmp.shape[1]
if l_n < 3 * nside + 1:
repeat_n = 3 * nside // l_n + 1
tmp = self.backend.bk_tile(tmp, repeat_n, axis=1)
ft_im.append(tmp[:, None, 0 : 3 * nside])
n += N
return (
self.backend.bk_concat(ft_im, axis=1)
* self.matrix_shift_ph[nside][None, :, :]
)
[docs]
def icomp_tf(self, i_im, nside, realfft=False):
self.shift_ph(nside)
n = 0
im = []
ft_im = i_im * self.backend.bk_conjugate(self.matrix_shift_ph[nside])
for k in range(nside - 1):
N = 4 * (k + 1)
if realfft:
tmp = self.irfft2fft(ft_im[k], N)
else:
tmp = self.backend.bk_ifft(im[k], N)
im.append(tmp[0:N])
n += N
if nside > 1:
result = self.backend.bk_concat(im, axis=0)
N = 4 * nside * (2 * nside + 1)
v = ft_im[nside - 1 : 3 * nside, 0 : 2 * nside + 1]
if realfft:
v_fft = self.backend.bk_reshape(
self.irfft2fft(v, N, axis=1), [4 * nside * (2 * nside + 1)]
)
else:
v_fft = self.backend.bk_ifft(v)
n += N
if nside > 1:
result = self.backend.bk_concat([result, v_fft], axis=0)
else:
result = v_fft
if nside > 1:
im = []
for k in range(nside - 1):
N = 4 * (nside - 1 - k)
if realfft:
tmp = self.irfft2fft(ft_im[k + 3 * nside], N)
else:
tmp = self.backend.bk_ifft(im[k + 3 * nside], N)
im.append(tmp[0:N])
n += N
return self.backend.bk_concat([result] + im, axis=0)
else:
return result
[docs]
def anafast(self, im, map2=None, nest=False, spin=2, axes=0):
"""The `anafast` function computes the L1 and L2 norm power spectra.
Currently, it is not optimized for single-pass computation due to the relatively inefficient computation of (Y_{lm}).
Nonetheless, it utilizes TensorFlow and can be integrated into gradient computations.
Input:
- `im`: a vector of size ([N_image, 12 \times \text{Nside}^2]) for scalar data, or of size ([N_image, 2, 12 \times \text{Nside}^2]) for Q,U polar data,
or of size ([N_image,3, 12 \times \text{Nside}^2]) for I,Q,U polar data.
- `map2` (optional): a vector of size ([12 \times \text{Nside}^2]) for scalar data, or of size
([3, 12 \times \text{Nside}^2]) for polar data. If provided, cross power spectra will be computed.
- `nest=True`: alters the ordering of the input maps.
- `spin=2` for 1/2 spin data as Q and U. Spin=1 for seep fields
Output:
-A tensor of size ([l_{\text{max}} \times (l_{\text{max}}-1)) formatted as ([6, ldots]),
ordered as TT, EE, BB, TE, EB.TBanafast function computes L1 and L2 norm powerspctra.
"""
no_input_column = False
i_im = self.backend.bk_cast(im)
if map2 is not None:
i_map2 = self.backend.bk_cast(map2)
doT = True
if len(i_im.shape) - axes == 1: # nopol
nside = int(np.sqrt(i_im.shape[axes] // 12))
else:
if len(i_im.shape) - axes == 2:
doT = False
nside = int(np.sqrt(i_im.shape[axes + 1] // 12))
do_all_pol = False
if i_im.shape[axes] == 3:
do_all_pol = True
self.shift_ph(nside)
if doT or do_all_pol:
if len(i_im.shape) == 1 + int(do_all_pol): # no pol if 1 all pol if 2
if do_all_pol:
l_im = i_im[None, 0, ...]
if map2 is not None:
l_map2 = i_map2[None, 0, ...]
else:
l_im = i_im[None, ...]
if map2 is not None:
l_map2 = i_map2[None, ...]
no_input_column = True
N_image = 1
else:
if do_all_pol:
l_im = i_im[:, 0]
if map2 is not None:
l_map2 = i_map2[:, 0]
N_image = i_im.shape[0]
else:
l_im = i_im
if map2 is not None:
l_map2 = i_map2
N_image = i_im.shape[0]
if nest:
idx = hp.ring2nest(nside, np.arange(12 * nside**2))
ft_im = self.comp_tf(
self.backend.bk_gather(l_im, idx, axis=1), nside, realfft=True
)
if map2 is not None:
ft_im2 = self.comp_tf(
self.backend.bk_gather(l_map2, idx, axis=1), nside, realfft=True
)
else:
ft_im = self.comp_tf(l_im, nside, realfft=True)
if map2 is not None:
ft_im2 = self.comp_tf(l_map2, nside, realfft=True)
lth = self.ring_th(nside)
co_th = np.cos(lth)
lmax = 3 * nside - 1
cl2 = None
if not doT: # polarize case
self.init_Ys(spin, nside)
if len(i_im.shape) == 2:
l_im = i_im[None, :, :]
if map2 is not None:
l_map2 = i_map2[None, :, :]
no_input_column = True
N_image = 1
else:
l_im = i_im
if map2 is not None:
l_map2 = i_map2
N_image = i_im.shape[0]
if nest:
idx = hp.ring2nest(nside, np.arange(12 * nside**2))
l_Q = self.backend.bk_gather(l_im[:, int(do_all_pol)], idx, axis=1)
l_U = self.backend.bk_gather(l_im[:, 1 + int(do_all_pol)], idx, axis=1)
ft_im_Pp = self.comp_tf(self.backend.bk_complex(l_Q, l_U), nside)
ft_im_Pm = self.comp_tf(self.backend.bk_complex(l_Q, -l_U), nside)
if map2 is not None:
l_Q = self.backend.bk_gather(
l_map2[:, int(do_all_pol)], idx, axis=1
)
l_U = self.backend.bk_gather(
l_map2[:, 1 + int(do_all_pol)], idx, axis=1
)
ft_im2_Pp = self.comp_tf(self.backend.bk_complex(l_Q, l_U), nside)
ft_im2_Pm = self.comp_tf(self.backend.bk_complex(l_Q, -l_U), nside)
else:
ft_im_Pp = self.comp_tf(
self.backend.bk_complex(
l_im[:, int(do_all_pol)], l_im[:, 1 + int(do_all_pol)]
),
nside,
)
ft_im_Pm = self.comp_tf(
self.backend.bk_complex(
l_im[:, int(do_all_pol)], -l_im[:, 1 + int(do_all_pol)]
),
nside,
)
if map2 is not None:
ft_im2_Pp = self.comp_tf(
self.backend.bk_complex(
l_map2[:, int(do_all_pol)], l_map2[:, 1 + int(do_all_pol)]
),
nside,
)
ft_im2_Pm = self.comp_tf(
self.backend.bk_complex(
l_map2[:, int(doT)], -l_map2[:, 1 + int(do_all_pol)]
),
nside,
)
l_cl = []
for m in range(lmax + 1):
plm = self.backend.bk_cast(
self.compute_legendre_m(co_th, m, 3 * nside - 1, nside)
/ (12 * nside**2)
)
if doT or do_all_pol:
tmp = self.backend.bk_reduce_sum(
plm[None, :, :] * ft_im[:, None, :, m], 2
)
if map2 is not None:
tmp2 = self.backend.bk_reduce_sum(
plm[None, :, :] * ft_im2[:, None, :, m], 2
)
else:
tmp2 = tmp
if not doT: # polarize case
plmp = self.Yp[spin, nside][m]
plmm = self.Ym[spin, nside][m]
tmpp = self.backend.bk_reduce_sum(
plmp[None, :, :] * ft_im_Pp[:, None, :, m], 2
)
tmpm = self.backend.bk_reduce_sum(
plmm[None, :, :] * ft_im_Pm[:, None, :, m], 2
)
almE = -(tmpp + tmpm) / 2.0
almB = (tmpp - tmpm) / (2j)
if map2 is not None:
tmpp2 = self.backend.bk_reduce_sum(
plmp[None, :, :] * ft_im2_Pp[:, None, :, m], 2
)
tmpm2 = self.backend.bk_reduce_sum(
plmm[None, :, :] * ft_im2_Pm[:, None, :, m], 2
)
almE2 = -(tmpp2 + tmpm2) / 2.0
almB2 = (tmpp2 - tmpm2) / (2j)
else:
almE2 = almE
almB2 = almB
if do_all_pol:
tmpTT = self.backend.bk_real(tmp * self.backend.bk_conjugate(tmp2))
tmpTE = self.backend.bk_real(tmp * self.backend.bk_conjugate(almE2))
tmpTB = -self.backend.bk_real(
tmp * self.backend.bk_conjugate(almB2)
)
tmpEE = self.backend.bk_real(almE * self.backend.bk_conjugate(almE2))
tmpBB = self.backend.bk_real(almB * self.backend.bk_conjugate(almB2))
tmpEB = -self.backend.bk_real(almE * self.backend.bk_conjugate(almB2))
if map2 is not None:
tmpEB = (
tmpEB
- self.backend.bk_real(almE2 * self.backend.bk_conjugate(almB))
) / 2
if do_all_pol:
tmpTE = (
tmpTE
+ self.backend.bk_real(
tmp2 * self.backend.bk_conjugate(almE)
)
) / 2
tmpTB = (
tmpTB
- self.backend.bk_real(
tmp2 * self.backend.bk_conjugate(almB)
)
) / 2
if m == 0:
if do_all_pol:
l_cl.append(tmpTT)
l_cl.append(tmpEE)
l_cl.append(tmpBB)
l_cl.append(tmpTE)
l_cl.append(tmpEB)
l_cl.append(tmpTB)
else:
l_cl.append(tmpEE)
l_cl.append(tmpBB)
l_cl.append(tmpEB)
else:
offset_tensor = self.backend.bk_zeros(
(N_image, m), dtype=self.backend.all_bk_type
)
if do_all_pol:
l_cl.append(offset_tensor)
l_cl.append(2 * tmpTT)
l_cl.append(offset_tensor)
l_cl.append(2 * tmpEE)
l_cl.append(offset_tensor)
l_cl.append(2 * tmpBB)
l_cl.append(offset_tensor)
l_cl.append(2 * tmpTE)
l_cl.append(offset_tensor)
l_cl.append(2 * tmpEB)
l_cl.append(offset_tensor)
l_cl.append(2 * tmpTB)
else:
l_cl.append(offset_tensor)
l_cl.append(2 * tmpEE)
l_cl.append(offset_tensor)
l_cl.append(2 * tmpBB)
l_cl.append(offset_tensor)
l_cl.append(2 * tmpEB)
else:
tmp = self.backend.bk_real(tmp * self.backend.bk_conjugate(tmp2))
if m == 0:
l_cl.append(tmp)
else:
offset_tensor = self.backend.bk_zeros(
(N_image, m), dtype=self.backend.all_bk_type
)
l_cl.append(offset_tensor)
l_cl.append(2 * tmp)
l_cl = self.backend.bk_concat(l_cl, 1)
if doT:
cl2 = self.backend.bk_reshape(l_cl, [N_image, lmax + 1, lmax + 1])
cl2 = self.backend.bk_reduce_sum(cl2, 1)
else:
if do_all_pol:
cl2 = self.backend.bk_reshape(l_cl, [N_image, lmax + 1, 6, lmax + 1])
else:
cl2 = self.backend.bk_reshape(l_cl, [N_image, lmax + 1, 3, lmax + 1])
cl2 = self.backend.bk_reduce_sum(cl2, 1)
if no_input_column:
cl2 = cl2[0]
cl2_l1 = self.backend.bk_L1(cl2)
return cl2, cl2_l1
[docs]
def map2alm(self, im, nest=False):
nside = int(np.sqrt(im.shape[0] // 12))
# ph = self.shift_ph(nside)
if nest:
idx = hp.ring2nest(nside, np.arange(12 * nside**2))
ft_im = self.comp_tf(
self.backend.bk_cast(self.backend.bk_gather(im, idx)),
nside,
realfft=True,
)
else:
ft_im = self.comp_tf(self.backend.bk_cast(im), nside, realfft=True)
lth = self.ring_th(nside)
co_th = np.cos(lth)
lmax = 3 * nside - 1
alm = None
for m in range(lmax + 1):
plm = self.compute_legendre_m(co_th, m, 3 * nside - 1, nside) / (
12 * nside**2
)
tmp = self.backend.bk_reduce_sum(plm * ft_im[:, m], 1)
if m == 0:
alm = tmp
else:
alm = self.backend.bk_concat([alm, tmp], axis=0)
return alm
[docs]
def alm2map(self, nside, alm):
lth = self.ring_th(nside)
co_th = np.cos(lth)
ft_im = []
n = 0
lmax = 3 * nside - 1
for m in range(lmax + 1):
plm = self.compute_legendre_m(co_th, m, 3 * nside - 1, nside) / (
12 * nside**2
)
print(alm[n : n + lmax - m + 1].shape, plm.shape)
ft_im.append(
self.backend.bk_reduce_sum(
self.backend.bk_reshape(
alm[n : n + lmax - m + 1], [lmax - m + 1, 1]
)
* plm,
0,
)
)
n = n + lmax - m + 1
return self.backend.bk_reshape(
self.backend.bk_concat(ft_im, 0), [lmax + 1, 4 * nside - 1]
)
"""
if nest:
idx = hp.ring2nest(nside, np.arange(12 * nside**2))
ft_im = self.comp_tf(
self.backend.bk_cast(self.backend.bk_gather(im, idx)),
nside,
realfft=True,
)
else:
ft_im = self.comp_tf(self.backend.bk_cast(im), nside, realfft=True)
lmax = 3 * nside - 1
alm = None
for m in range(lmax + 1):
plm = self.compute_legendre_m(co_th, m, 3 * nside - 1, nside) / (
12 * nside**2
)
tmp = self.backend.bk_reduce_sum(plm * ft_im[:, m], 1)
if m == 0:
alm = tmp
else:
alm = self.backend.bk_concat([alm, tmp], axis=0)
return o_map
"""
[docs]
def map2alm_spin(self, im_Q, im_U, spin=2, nest=False):
if spin == 0:
return self.map2alm(im_Q, nest=nest), self.map2alm(im_U, nest=nest)
nside = int(np.sqrt(im_Q.shape[0] // 12))
# lth = self.ring_th(nside)
# co_th = np.cos(lth)
if nest:
idx = hp.ring2nest(nside, np.arange(12 * nside**2))
l_Q = self.backend.bk_gather(im_Q, idx)
l_U = self.backend.bk_gather(im_U, idx)
ft_im_1 = self.comp_tf(self.backend.bk_complex(l_Q, l_U), nside)
ft_im_2 = self.comp_tf(self.backend.bk_complex(l_Q, -l_U), nside)
else:
ft_im_1 = self.comp_tf(self.backend.bk_complex(im_Q, im_U), nside)
ft_im_2 = self.comp_tf(self.backend.bk_complex(im_Q, -im_U), nside)
lmax = 3 * nside - 1
# alm = None
for m in range(lmax + 1):
# not yet debug use spherical
# plmp1,plmm1=self.compute_legendre_spin2_m(co_th,si_th,m,3*nside-1)
# plmp1/=(12*nside**2)
# plmm1/=(12*nside**2)
plmp = self.Yp[spin, nside][m]
plmm = self.Ym[spin, nside][m]
tmpp = self.backend.bk_reduce_sum(plmp * ft_im_1[:, m], 1)
tmpm = self.backend.bk_reduce_sum(plmm * ft_im_2[:, m], 1)
if m == 0:
almE = -(tmpp + tmpm) / 2.0
almB = (tmpp - tmpm) / (2j)
else:
almE = self.backend.bk_concat([almE, -(tmpp + tmpm) / 2], axis=0)
almB = self.backend.bk_concat([almB, (tmpp - tmpm) / (2j)], axis=0)
return almE, almB