| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| from functools import partial |
| from typing import Optional, Tuple |
|
|
| import numpy as np |
| import torch |
|
|
| from .image_util import get_tv_resample_method, resize_max_res |
|
|
|
|
| def inter_distances(tensors: torch.Tensor): |
| """ |
| To calculate the distance between each two depth maps. |
| """ |
| distances = [] |
| for i, j in torch.combinations(torch.arange(tensors.shape[0])): |
| arr1 = tensors[i : i + 1] |
| arr2 = tensors[j : j + 1] |
| distances.append(arr1 - arr2) |
| dist = torch.concatenate(distances, dim=0) |
| return dist |
|
|
|
|
| def ensemble_depth( |
| depth: torch.Tensor, |
| scale_invariant: bool = True, |
| shift_invariant: bool = True, |
| output_uncertainty: bool = False, |
| reduction: str = "median", |
| regularizer_strength: float = 0.02, |
| max_iter: int = 2, |
| tol: float = 1e-3, |
| max_res: int = 1024, |
| ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: |
| """ |
| Ensembles depth maps represented by the `depth` tensor with expected shape `(B, 1, H, W)`, where B is the |
| number of ensemble members for a given prediction of size `(H x W)`. Even though the function is designed for |
| depth maps, it can also be used with disparity maps as long as the input tensor values are non-negative. The |
| alignment happens when the predictions have one or more degrees of freedom, that is when they are either |
| affine-invariant (`scale_invariant=True` and `shift_invariant=True`), or just scale-invariant (only |
| `scale_invariant=True`). For absolute predictions (`scale_invariant=False` and `shift_invariant=False`) |
| alignment is skipped and only ensembling is performed. |
| |
| Args: |
| depth (`torch.Tensor`): |
| Input ensemble depth maps. |
| scale_invariant (`bool`, *optional*, defaults to `True`): |
| Whether to treat predictions as scale-invariant. |
| shift_invariant (`bool`, *optional*, defaults to `True`): |
| Whether to treat predictions as shift-invariant. |
| output_uncertainty (`bool`, *optional*, defaults to `False`): |
| Whether to output uncertainty map. |
| reduction (`str`, *optional*, defaults to `"median"`): |
| Reduction method used to ensemble aligned predictions. The accepted values are: `"mean"` and |
| `"median"`. |
| regularizer_strength (`float`, *optional*, defaults to `0.02`): |
| Strength of the regularizer that pulls the aligned predictions to the unit range from 0 to 1. |
| max_iter (`int`, *optional*, defaults to `2`): |
| Maximum number of the alignment solver steps. Refer to `scipy.optimize.minimize` function, `options` |
| argument. |
| tol (`float`, *optional*, defaults to `1e-3`): |
| Alignment solver tolerance. The solver stops when the tolerance is reached. |
| max_res (`int`, *optional*, defaults to `1024`): |
| Resolution at which the alignment is performed; `None` matches the `processing_resolution`. |
| Returns: |
| A tensor of aligned and ensembled depth maps and optionally a tensor of uncertainties of the same shape: |
| `(1, 1, H, W)`. |
| """ |
| if depth.dim() != 4 or depth.shape[1] != 1: |
| raise ValueError(f"Expecting 4D tensor of shape [B,1,H,W]; got {depth.shape}.") |
| if reduction not in ("mean", "median"): |
| raise ValueError(f"Unrecognized reduction method: {reduction}.") |
| if not scale_invariant and shift_invariant: |
| raise ValueError("Pure shift-invariant ensembling is not supported.") |
|
|
| def init_param(depth: torch.Tensor): |
| init_min = depth.reshape(ensemble_size, -1).min(dim=1).values |
| init_max = depth.reshape(ensemble_size, -1).max(dim=1).values |
|
|
| if scale_invariant and shift_invariant: |
| init_s = 1.0 / (init_max - init_min).clamp(min=1e-6) |
| init_t = -init_s * init_min |
| param = torch.cat((init_s, init_t)).cpu().numpy() |
| elif scale_invariant: |
| init_s = 1.0 / init_max.clamp(min=1e-6) |
| param = init_s.cpu().numpy() |
| else: |
| raise ValueError("Unrecognized alignment.") |
|
|
| return param |
|
|
| def align(depth: torch.Tensor, param: np.ndarray) -> torch.Tensor: |
| if scale_invariant and shift_invariant: |
| s, t = np.split(param, 2) |
| s = torch.from_numpy(s).to(depth).view(ensemble_size, 1, 1, 1) |
| t = torch.from_numpy(t).to(depth).view(ensemble_size, 1, 1, 1) |
| out = depth * s + t |
| elif scale_invariant: |
| s = torch.from_numpy(param).to(depth).view(ensemble_size, 1, 1, 1) |
| out = depth * s |
| else: |
| raise ValueError("Unrecognized alignment.") |
| return out |
|
|
| def ensemble( |
| depth_aligned: torch.Tensor, return_uncertainty: bool = False |
| ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: |
| uncertainty = None |
| if reduction == "mean": |
| prediction = torch.mean(depth_aligned, dim=0, keepdim=True) |
| if return_uncertainty: |
| uncertainty = torch.std(depth_aligned, dim=0, keepdim=True) |
| elif reduction == "median": |
| prediction = torch.median(depth_aligned, dim=0, keepdim=True).values |
| if return_uncertainty: |
| uncertainty = torch.median( |
| torch.abs(depth_aligned - prediction), dim=0, keepdim=True |
| ).values |
| else: |
| raise ValueError(f"Unrecognized reduction method: {reduction}.") |
| return prediction, uncertainty |
|
|
| def cost_fn(param: np.ndarray, depth: torch.Tensor) -> float: |
| cost = 0.0 |
| depth_aligned = align(depth, param) |
|
|
| for i, j in torch.combinations(torch.arange(ensemble_size)): |
| diff = depth_aligned[i] - depth_aligned[j] |
| cost += (diff**2).mean().sqrt().item() |
|
|
| if regularizer_strength > 0: |
| prediction, _ = ensemble(depth_aligned, return_uncertainty=False) |
| err_near = (0.0 - prediction.min()).abs().item() |
| err_far = (1.0 - prediction.max()).abs().item() |
| cost += (err_near + err_far) * regularizer_strength |
|
|
| return cost |
|
|
| def compute_param(depth: torch.Tensor): |
| import scipy |
|
|
| depth_to_align = depth.to(torch.float32) |
| if max_res is not None and max(depth_to_align.shape[2:]) > max_res: |
| depth_to_align = resize_max_res( |
| depth_to_align, max_res, get_tv_resample_method("nearest-exact") |
| ) |
|
|
| param = init_param(depth_to_align) |
|
|
| res = scipy.optimize.minimize( |
| partial(cost_fn, depth=depth_to_align), |
| param, |
| method="BFGS", |
| tol=tol, |
| options={"maxiter": max_iter, "disp": False}, |
| ) |
|
|
| return res.x |
|
|
| requires_aligning = scale_invariant or shift_invariant |
| ensemble_size = depth.shape[0] |
|
|
| if requires_aligning: |
| param = compute_param(depth) |
| depth = align(depth, param) |
|
|
| depth, uncertainty = ensemble(depth, return_uncertainty=output_uncertainty) |
|
|
| depth_max = depth.max() |
| if scale_invariant and shift_invariant: |
| depth_min = depth.min() |
| elif scale_invariant: |
| depth_min = 0 |
| else: |
| raise ValueError("Unrecognized alignment.") |
| depth_range = (depth_max - depth_min).clamp(min=1e-6) |
| depth = (depth - depth_min) / depth_range |
| if output_uncertainty: |
| uncertainty /= depth_range |
|
|
| return depth, uncertainty |
|
|