Source code for foscat.HealBili

"""
HealBili: Bilinear weights from a curvilinear (theta, phi) source grid to arbitrary HEALPix targets.

This module provides a class `HealBili` that, given a *curvilinear* source grid of angular
coordinates (theta[y,x], phi[y,x]) on the sphere (e.g., a tangent-plane grid built on an ellipsoid),
computes **bilinear interpolation weights** to map values from that grid onto arbitrary target
directions specified by HEALPix angles (heal_theta[n], heal_phi[n]).

Key idea
--------
Because the source grid is not rectilinear in index-space, we cannot assume a simple affine mapping
from (i,j) to angles. Instead, for each target direction (theta_h, phi_h), we:
  1) locate a nearby source cell (seed) by nearest neighbor search on the unit sphere;
  2) consider up to 4 candidate quads around the seed: [(i0,j0),(i0+1,j0),(i0,j0+1),(i0+1,j0+1)];
  3) project the 4 corner unit vectors and the target onto a **local tangent plane** built at the
     quad barycenter;
  4) *invert* the bilinear mapping f(s,t) from the quad corners to the plane point using Newton,
     retrieving (s,t) in [0,1]^2;
  5) build the 4 bilinear weights [(1-s)(1-t), s(1-t), (1-s)t, st] and the 4 linear indices
     into the source image (row-major, j*W + i).

If no candidate quad cleanly contains the point, we choose the one with the smallest residual in the
plane and clamp (s,t) to [0,1].

The code is NumPy-only by default, but can optionally use `scipy.spatial.cKDTree` for a faster nearest
neighbor seed search by setting `prefer_kdtree=True` (falls back automatically if SciPy is absent).

Usage
-----
>>> hb = HealBili(src_theta, src_phi, prefer_kdtree=True)
>>> I, W = hb.compute_weights(heal_theta, heal_phi)
>>> # Apply to a source image `img` of shape (H,W):
>>> vals = hb.apply_weights(img, I, W)  # shape (N,)

All angles must be in **radians**. theta is colatitude (0 at north pole), phi is longitude in [0, 2*pi).
"""
from __future__ import annotations

from typing import Tuple
import healpy as hp
import numpy as np

try:
    from scipy.spatial import cKDTree  # optional
    _HAVE_SCIPY = True
except Exception:  # pragma: no cover
    cKDTree = None
    _HAVE_SCIPY = False


[docs] class HealBili: """Compute bilinear interpolation weights from a curvilinear (theta, phi) grid to HEALPix targets. Parameters ---------- src_theta : np.ndarray, shape (H, W) Source **colatitude** (radians) at each grid node. src_phi : np.ndarray, shape (H, W) Source **longitude** (radians) at each grid node. prefer_kdtree : bool, default False If True and SciPy is available, use cKDTree on unit vectors for a faster nearest-neighbor seed. Falls back to blocked brute-force dot-product search otherwise. """ def __init__(self, src_theta: np.ndarray, src_phi: np.ndarray, *, prefer_kdtree: bool = False) -> None: if src_theta.shape != src_phi.shape or src_theta.ndim != 2: raise ValueError("src_theta and src_phi must have the same 2D shape (H, W)") self.src_theta = np.asarray(src_theta, dtype=float) self.src_phi = np.asarray(src_phi, dtype=float) self.H, self.W = self.src_theta.shape # Precompute unit vectors of source grid nodes self._Vsrc = self._sph_to_vec(self.src_theta.ravel(), self.src_phi.ravel()) # (H*W, 3) self.prefer_kdtree = bool(prefer_kdtree) and _HAVE_SCIPY if self.prefer_kdtree: # optional acceleration self._kdtree = cKDTree(self._Vsrc) else: self._kdtree = None # ----------------------------- # Public API # -----------------------------
[docs] def compute_weights( self, level, cell_ids: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """Compute bilinear weights/indices for target HEALPix angles. Parameters ---------- cell_ids : np.ndarray, shape (N,) Target **cell_ids** . Returns ------- I : np.ndarray, shape (4, N), dtype=int64 Linear indices of the 4 source corners per target (row-major: j*W + i). Invalid corners are -1. W : np.ndarray, shape (4, N), dtype=float64 Bilinear weights aligned with `I`. Weights are set to 0.0 for invalid corners and normalized to sum to 1 when at least one corner is valid. """ #compute the coordinate of the selected cell_ids heal_theta, heal_phi = hp.pix2ang(2**level,cell_ids,nest=True) ht = np.asarray(heal_theta, dtype=float).ravel() hpt = np.asarray(heal_phi, dtype=float).ravel() if ht.shape != hpt.shape: raise ValueError("heal_theta and heal_phi must have the same 1D shape (N,)") N = ht.size # Target unit vectors Vtgt = self._sph_to_vec(ht, hpt) # (N,3) # 1) Choose a seed node for each target (nearest source grid node on the sphere) seed_flat = self._nearest_source_indices(Vtgt) seed_j, seed_i = np.divmod(seed_flat, self.W) # 2) For each target, test up to 4 candidate quads around the seed; pick the best I = np.full((4, N), -1, dtype=np.int64) W = np.zeros((4, N), dtype=float) candidates = [(0, 0), (-1, 0), (0, -1), (-1, -1)] # offsets for (i0,j0) for n in range(N): v = Vtgt[n] best = None # (score_tuple, s, t, i0, j0, (idx00, idx10, idx01, idx11)) for di, dj in candidates: i0 = seed_i[n] + di j0 = seed_j[n] + dj if i0 < 0 or j0 < 0 or i0 + 1 >= self.W or j0 + 1 >= self.H: continue # out of bounds idx00 = j0 * self.W + i0 idx10 = j0 * self.W + (i0 + 1) idx01 = (j0 + 1) * self.W + i0 idx11 = (j0 + 1) * self.W + (i0 + 1) v00 = self._Vsrc[idx00] v10 = self._Vsrc[idx10] v01 = self._Vsrc[idx01] v11 = self._Vsrc[idx11] # Local tangent plane at the quad barycenter vC = v00 + v10 + v01 + v11 vC /= np.linalg.norm(vC) ex, ey, _ = self._tangent_axes_from_vec(vC) # Project 4 corners + target onto (ex, ey) P00 = np.array(self._project_to_plane(v00, ex, ey)) P10 = np.array(self._project_to_plane(v10, ex, ey)) P01 = np.array(self._project_to_plane(v01, ex, ey)) P11 = np.array(self._project_to_plane(v11, ex, ey)) P = np.array(self._project_to_plane(v, ex, ey)) # Invert bilinear mapping f(s,t) = P s, t, ok, resid = self._invert_bilinear(P00, P10, P01, P11, P) # Prefer in-bounds solutions; otherwise smallest residual score = (0, resid) if ok else (1, resid) if (best is None) or (score < best[0]): best = (score, s, t, i0, j0, (idx00, idx10, idx01, idx11)) if best is None: continue # leave weights at 0 and indices at -1 _, s, t, i0, j0, (idx00, idx10, idx01, idx11) = best # Bilinear weights w00 = (1.0 - s) * (1.0 - t) w10 = s * (1.0 - t) w01 = (1.0 - s) * t w11 = s * t I[:, n] = np.array([idx00, idx10, idx01, idx11], dtype=np.int64) W[:, n] = np.array([w00, w10, w01, w11], dtype=float) # Normalize for numerical safety sW = W[:, n].sum() if sW > 0: W[:, n] /= sW return I, W
[docs] def apply_weights(self, img: np.ndarray, I: np.ndarray, W: np.ndarray) -> np.ndarray: """Apply precomputed (I, W) to a source image to obtain values at the HEALPix targets. Parameters ---------- img : np.ndarray, shape (H, W) Source image values defined on the same grid as (src_theta, src_phi). I : np.ndarray, shape (4, N), dtype=int64 Linear indices (row-major) of corner samples; -1 for invalid corners. W : np.ndarray, shape (4, N), dtype=float64 Bilinear weights aligned with I. Returns ------- vals : np.ndarray, shape (N,) Interpolated values at the target directions. """ if img.shape != (self.H, self.W): raise ValueError(f"img must have shape {(self.H, self.W)}, got {img.shape}") img_flat = img.reshape(-1) N = I.shape[1] vals = np.zeros(N, dtype=float) for k in range(4): idx = I[k] w = W[k] m = idx >= 0 vals[m] += w[m] * img_flat[idx[m]] return vals
# ----------------------------- # Internal helpers (geometry) # ----------------------------- @staticmethod def _sph_to_vec(theta: np.ndarray, phi: np.ndarray) -> np.ndarray: """(theta, phi) -> unit vectors (x,y,z). theta=colat, phi=lon, radians.""" st, ct = np.sin(theta), np.cos(theta) sp, cp = np.sin(phi), np.cos(phi) x = st * cp y = st * sp z = ct return np.stack([x, y, z], axis=-1) @staticmethod def _tangent_axes_from_vec(v: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Return an orthonormal basis (ex, ey, ez=v_hat) for the tangent plane at v.""" ez = v / np.linalg.norm(v) a = np.array([1.0, 0.0, 0.0], dtype=float) if abs(np.dot(a, ez)) > 0.9: # avoid near-colinearity a = np.array([0.0, 1.0, 0.0], dtype=float) ex = a - np.dot(a, ez) * ez ex /= np.linalg.norm(ex) ey = np.cross(ez, ex) return ex, ey, ez @staticmethod def _project_to_plane(vec: np.ndarray, ex: np.ndarray, ey: np.ndarray) -> Tuple[float, float]: """Project 3D unit vector `vec` onto plane spanned by (ex, ey): returns (x, y).""" return float(np.dot(vec, ex)), float(np.dot(vec, ey)) @staticmethod def _bilinear_f(s: float, t: float, P00: np.ndarray, P10: np.ndarray, P01: np.ndarray, P11: np.ndarray) -> np.ndarray: """Bilinear blend of 4 points in R^2.""" return ((1 - s) * (1 - t)) * P00 + (s * (1 - t)) * P10 + ((1 - s) * t) * P01 + (s * t) * P11 @staticmethod def _bilinear_jacobian(s: float, t: float, P00: np.ndarray, P10: np.ndarray, P01: np.ndarray, P11: np.ndarray) -> np.ndarray: """2x2 Jacobian of the bilinear map at (s,t).""" dFds = (-(1 - t)) * P00 + (1 - t) * P10 + (-t) * P01 + t * P11 dFdt = (-(1 - s)) * P00 + (-s) * P10 + (1 - s) * P01 + s * P11 return np.stack([dFds, dFdt], axis=-1) @classmethod def _invert_bilinear(cls, P00: np.ndarray, P10: np.ndarray, P01: np.ndarray, P11: np.ndarray, P: np.ndarray, max_iter: int = 10, tol: float = 1e-9) -> Tuple[float, float, bool, float]: """Invert the bilinear map f(s,t)=P with a Newton loop; return (s,t, ok, residual).""" # Initial guess from parallelogram (ignore cross term) A = np.column_stack([P10 - P00, P01 - P00]) # 2x2 b = P - P00 try: st0 = np.linalg.lstsq(A, b, rcond=None)[0] s, t = float(st0[0]), float(st0[1]) except np.linalg.LinAlgError: # fallback s, t = 0.5, 0.5 for _ in range(max_iter): F = cls._bilinear_f(s, t, P00, P10, P01, P11) r = P - F if np.linalg.norm(r) < tol: break J = cls._bilinear_jacobian(s, t, P00, P10, P01, P11) try: delta = np.linalg.solve(J, r) except np.linalg.LinAlgError: break s += float(delta[0]) t += float(delta[1]) # Clamp to [0,1] softly and compute residual s_c = min(max(s, 0.0), 1.0) t_c = min(max(t, 0.0), 1.0) F_end = cls._bilinear_f(s_c, t_c, P00, P10, P01, P11) resid = float(np.linalg.norm(P - F_end)) ok = (0.0 <= s <= 1.0) and (0.0 <= t <= 1.0) return s_c, t_c, ok, resid # ----------------------------- # Internal helpers (search) # ----------------------------- def _nearest_source_indices(self, Vtgt: np.ndarray) -> np.ndarray: """Return flat indices of nearest source nodes for each target unit vector.""" if self._kdtree is not None: # fast path _, nn = self._kdtree.query(Vtgt, k=1) return nn.astype(np.int64) # Brute-force in blocks to limit memory N = Vtgt.shape[0] out = np.empty(N, dtype=np.int64) block = 20000 VsrcT = self._Vsrc.T # (3, H*W) for start in range(0, N, block): end = min(N, start + block) D = Vtgt[start:end] @ VsrcT # cosine similarities out[start:end] = np.argmax(D, axis=1) return out
__all__ = ["HealBili"]