"""Ripplemapper contours module."""
import heapq
import cv2
import numpy as np
from scipy.spatial import cKDTree
from skimage import measure
from ripplemapper.image import detect_edges, process_edges
__all__ = ["find_contours", "compute_recursive_midpoints", "extend_contour", "combine_contours", "smooth_contour", "distance_map", "neighbors", "a_star", "get_next_node", "find_boundaries", "find_bump_limits", "smooth_bumps", "average_boundaries"]
[docs]
def find_contours(edges_cleaned: np.ndarray, level: float = 0.5) -> np.ndarray:
"""
Find contours in the edge image and approximate them to simplify.
Parameters
----------
edges_cleaned : np.ndarray
Processed edge image.
level : float, optional
Contour level parameter for the find_contours function, by default 0.5.
Returns
-------
np.ndarray
Approximated contour vertices.
"""
contours = measure.find_contours(edges_cleaned, level=level)
contours = sorted(contours, key=lambda x: len(x), reverse=True)
return contours
[docs]
def compute_recursive_midpoints(poly_a: np.ndarray, poly_b: np.ndarray, iterations: int) -> np.ndarray:
"""
Compute midpoints between two contours recursively, addressing the shape mismatch.
Parameters
----------
poly_a : np.ndarray
Vertices of the first contour.
poly_b : np.ndarray
Vertices of the second contour.
iterations : int
Number of iterations for recursion.
Returns
-------
np.ndarray
Midpoint vertices after the final iteration.
"""
if iterations == 0:
return poly_a, poly_b
tree_a = cKDTree(poly_a)
tree_b = cKDTree(poly_b)
midpoints_a = np.empty_like(poly_a)
midpoints_b = np.empty_like(poly_b)
for i, point in enumerate(poly_a):
dist, index = tree_b.query(point)
nearest_point = poly_b[index]
midpoints_a[i] = (point + nearest_point) / 2
for i, point in enumerate(poly_b):
dist, index = tree_a.query(point)
nearest_point = poly_a[index]
midpoints_b[i] = (point + nearest_point) / 2
return compute_recursive_midpoints(midpoints_a, midpoints_b, iterations - 1)
[docs]
def extend_contour(contour: np.ndarray, shape: tuple) -> np.ndarray:
"""
Extend the contour to the edges of the image region defined by shape.
Parameters
----------
contour : np.ndarray
Points marking the vertices of the contour.
shape : tuple
Length and width of the image.
Returns
-------
np.ndarray
Extended contour vertices.
"""
if contour[0][1] > shape[1] / 2:
new_first_point = [contour[0][1], shape[1]]
new_last_point = [contour[-1][1], 0]
else:
new_first_point = [contour[0][1], 0]
new_last_point = [contour[-1][1], shape[0]]
contour = np.vstack([new_first_point, contour, new_last_point])
return contour
[docs]
def combine_contours(contour1: np.ndarray, contour2: np.ndarray) -> np.ndarray:
"""
Combine two contours into one.
Parameters
----------
contour1 : np.ndarray
Points marking the vertices of the first contour.
contour2 : np.ndarray
Points marking the vertices of the second contour.
Returns
-------
np.ndarray
Combined contour vertices.
"""
if contour1[0][1] > contour1[0][-1]:
contour1 = np.flip(contour1)
if contour2[0][1] < contour2[0][-1]:
contour2 = np.flip(contour2)
contour = np.vstack([contour1, contour2])
return contour
[docs]
def smooth_contour(contour: np.ndarray, window: int = 3) -> np.ndarray:
"""
Smooth a contour by convolving with a small window.
Parameters
----------
contour : np.ndarray
Points marking the vertices of the contour.
window : int, optional
Size of the smoothing window, by default 3.
Returns
-------
np.ndarray
Smoothed contour vertices.
"""
x = np.convolve(contour[:, 0], np.ones(window) / window, mode='valid')
y = np.convolve(contour[:, 1], np.ones(window) / window, mode='valid')
return np.vstack([x, y]).T
[docs]
def distance_map(binary_map: np.ndarray) -> np.ndarray:
"""
Compute the distance map of a binary image.
Parameters
----------
binary_map : np.ndarray
Binary image with interiors marked as 1's and exteriors as 0's.
Returns
-------
np.ndarray
Normalized distance map.
"""
binary_map = binary_map.astype(np.uint8)
distance_map = cv2.distanceTransform(binary_map, cv2.DIST_L2, cv2.DIST_MASK_PRECISE) ** 2
norm_distance_map = cv2.normalize(distance_map, None, 0, 1.0, cv2.NORM_MINMAX)
return norm_distance_map
[docs]
def neighbors(node: tuple, grid_shape: tuple) -> tuple:
"""
Generate neighbors for a given node.
Parameters
----------
node : tuple
The current node coordinates.
grid_shape : tuple
Shape of the grid.
Yields
------
tuple
Neighboring node coordinates.
"""
directions = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
for dx, dy in directions:
nx, ny = node[0] + dx, node[1] + dy
if 0 <= nx < grid_shape[0] and 0 <= ny < grid_shape[1]:
yield (nx, ny)
[docs]
def a_star(start: tuple, goal: tuple, grid: np.ndarray) -> list:
"""
A simple A* algorithm for pathfinding.
Parameters
----------
start : tuple
Starting node coordinates.
goal : tuple
Goal node coordinates.
grid : np.ndarray
Grid representing the map.
Returns
-------
list
Path from start to goal.
"""
open_set = []
heapq.heappush(open_set, (0, start))
came_from = {}
g_score = {start: 0}
f_score = {start: np.linalg.norm(np.array(start) - np.array(goal))}
while open_set:
current = heapq.heappop(open_set)[1]
if current == goal:
path = []
while current in came_from:
path.append(current)
current = came_from[current]
return path[::-1]
for neighbor in neighbors(current, grid.shape):
tentative_g_score = g_score[current] + 1 / (grid[neighbor] + 0.01)
if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
came_from[neighbor] = current
g_score[neighbor] = tentative_g_score
f_score[neighbor] = tentative_g_score + np.linalg.norm(np.array(neighbor) - np.array(goal))
heapq.heappush(open_set, (f_score[neighbor], neighbor))
return []
[docs]
def get_next_node(node: tuple, dx: int, dy: int) -> tuple:
"""
Get the next node coordinates by moving in the given direction.
Parameters
----------
node : tuple
Current node coordinates.
dx : int
Change in x-coordinate.
dy : int
Change in y-coordinate.
Returns
-------
tuple
Next node coordinates, or None if out of bounds.
"""
try:
nx, ny = node[0] + dx, node[1] + dy
except IndexError:
return None
return nx, ny
[docs]
def find_boundaries(gray_image: np.ndarray) -> tuple:
"""
Find the upper and lower boundaries of the edge region.
Parameters
----------
gray_image : np.ndarray
Grayscale image.
Returns
-------
tuple
Upper and lower boundaries as numpy arrays.
"""
edges_gray = detect_edges(gray_image)
edges_cleaned = process_edges(edges_gray)
contours = find_contours(edges_cleaned, level=0.5)
if np.mean(contours[0][:, 0]) > np.mean(contours[1][:, 0]):
upper, lower = contours[0], contours[1]
else:
upper, lower = contours[1], contours[0]
return upper, lower
[docs]
def find_bump_limits(large_changes: np.array, current: int = 0, max_size: int = 10, bumps: list[tuple[int, int]] = []) -> list[tuple[int, int]]:
"""
Recursive function to find the limits of "small" bumps in the data.
Parameters
----------
large_changes : np.array
Array of indices where large changes in the data occur.
current : int, optional
Current index to start from, by default 0.
max_size : int, optional
Maximum size of a bump, by default 10.
bumps : list[tuple[int, int]], optional
List of tuples representing the start and end indices of each bump, by default [].
Returns
-------
list[tuple[int, int]]
List of tuples representing the start and end indices of each bump.
"""
if not (large_changes > current).any():
return bumps
start = large_changes[(large_changes > current)][0]
end = False
for i in np.arange(1, max_size):
if start + i > large_changes[-1]:
return bumps
if start + i in large_changes:
end = start + i
if end:
bumps.append((start, end))
current = end or start + max_size
if current > large_changes[-1]:
return bumps
return find_bump_limits(large_changes, current=current, bumps=bumps, max_size=max_size)
[docs]
def smooth_bumps(contour, max_size: int = 40, std_factor: float = 2.0):
"""
Function to smooth out bumps in the contour data.
Parameters
----------
contour : RippleContour
Contour object containing the data to be smoothed.
max_size : int, optional
Maximum size of a bump, by default 40.
std_factor : float, optional
Standard deviation factor for identifying large changes, by default 2.0.
Returns
-------
RippleContour
The smoothed RippleContour object.
"""
moving_avg = np.convolve(contour.values[0, :], np.ones(100) / 100, mode='valid')
diffs = contour.values[0, :len(moving_avg)] - moving_avg
gradients = np.gradient(diffs)
large_changes = np.where(np.abs(gradients) > std_factor * np.std(gradients))[0]
if len(large_changes) == 0:
return contour
bumps = find_bump_limits(large_changes, max_size=max_size, bumps=[])
indices = []
for bump in bumps:
indices += list(np.arange(bump[0], bump[1]))
indices = np.array(indices)
print("num removed", indices.shape)
contour.values = np.delete(contour.values, indices[indices < contour.values.shape[1]], axis=1)
return contour
[docs]
def average_boundaries(self, contour_a = None, contour_b = None, iterations: int = 3, save_both: bool = True) -> np.ndarray:
"""
Average the two contours to get a more accurate representation of the interface.
Parameters
----------
contour_a : RippleContour, optional
The first contour to average, by default self.contours[0].
contour_b : RippleContour, optional
The second contour to average, by default self.contours[1].
iterations : int, optional
The number of iterations to average the contours over, by default 3.
save_both : bool, optional
Whether to save both averaged contours, by default True.
Returns
-------
np.ndarray
The averaged contour.
"""
from ripplemapper.classes import RippleContour
if not contour_a or not contour_b:
try:
contour_a = self.contours[0]
contour_b = self.contours[1]
except IndexError:
raise ValueError("No contours found in RippleImage.contours and no explicit contours passed.")
poly_a = contour_a.values
poly_b = contour_b.values
midpoints_a, midpoints_b = compute_recursive_midpoints(poly_a, poly_b, iterations)
self.contours.append(RippleContour(midpoints_a, "averaged", self))
if save_both:
self.contours.append(RippleContour(midpoints_b, "averaged", self))