Source code for fury.actor.curved

"""Curved primitives actors."""

from concurrent.futures import ThreadPoolExecutor
import itertools

import numpy as np

from fury.actor import Group, Line, Mesh, actor_from_primitive, create_mesh, read_buffer
from fury.colormap import normalize_colors
from fury.geometry import buffer_to_geometry, line_buffer_separator
from fury.lib import (
    Buffer,
    BufferUsage,
    gfx_wgpu,
    register_wgpu_render_function,
)
from fury.material import (
    StreamlinesMaterial,
    _StreamlineBakedMaterial,
    _StreamtubeBakedMaterial,
    _create_mesh_material,
    validate_opacity,
)
from fury.optpkg import optional_package
import fury.primitive as fp
from fury.shader import (
    StreamlinesShader,
    _StreamlineBakingShader,
    _StreamtubeBakingShader,
    _StreamtubeRenderShader,
)

numba, have_numba, _ = optional_package("numba")

if have_numba:
    njit = numba.njit
else:

    def njit(*args, **kwargs):
        def decorator(func):
            return func

        return decorator


[docs] def sphere( centers, *, colors=(1, 0, 0), radii=1.0, phi=16, theta=16, opacity=None, material="phong", enable_picking=True, smooth=True, wireframe=False, wireframe_thickness=1.0, impostor=True, ): """ Create one or many spheres with different colors and radii. Parameters ---------- centers : ndarray, shape (N, 3) Spheres positions. colors : ndarray, shape (N, 3) or (N, 4) or tuple (3,) or tuple (4,), optional RGB or RGBA colors. Accepts values in [0, 255] (int), [0, 1] (float), or hex strings (e.g. "#FF0000"). radii : float or ndarray, shape (N,), optional Sphere radius. Can be a single value for all spheres or an array of radii for each sphere. phi : int, optional The number of segments in the longitude direction. theta : int, optional The number of segments in the latitude direction. opacity : float, optional Takes values from 0 (fully transparent) to 1 (opaque). If both `opacity` and RGBA are provided, the final alpha will be: final_alpha = alpha_in_RGBA * opacity. material : str, optional The material type for the spheres. Options are 'phong' and 'basic'. enable_picking : bool, optional Whether the spheres should be pickable in a 3D scene. smooth : bool, optional Whether to create a smooth sphere or a faceted sphere. wireframe : bool, optional Whether to render the mesh as a wireframe. wireframe_thickness : float, optional The thickness of the wireframe lines. impostor : bool, optional Render spheres as billboard impostors instead of geometry when ``True``. Returns ------- Actor A mesh actor containing the generated spheres, with the specified material and properties. Examples -------- >>> from fury import window, actor >>> import numpy as np >>> scene = window.Scene() >>> centers = np.random.rand(5, 3) * 10 >>> colors = np.random.rand(5, 3) >>> radii = np.random.rand(5) >>> sphere_actor = actor.sphere(centers=centers, colors=colors, radii=radii) >>> _ = scene.add(sphere_actor) >>> show_manager = window.ShowManager(scene=scene, size=(600, 600)) >>> show_manager.start() """ centers_arr = np.asarray(centers, dtype=np.float32) if centers_arr.ndim == 1: centers_arr = centers_arr.reshape(1, 3) count = len(centers_arr) colors = normalize_colors(colors) radii_arr = np.asarray(radii, dtype=np.float32) if radii_arr.ndim == 0: radii_arr = np.full((count,), float(radii_arr), dtype=np.float32) else: radii_arr = radii_arr.reshape(-1).astype(np.float32) if radii_arr.size == 1 and count > 1: radii_arr = np.full((count,), radii_arr.item(), dtype=np.float32) elif radii_arr.size != count: radii_arr = np.full((count,), radii_arr.flat[0], dtype=np.float32) if impostor: from fury.actor._billboard import billboard_sphere obj = billboard_sphere( centers_arr, colors=colors, radii=radii_arr, opacity=opacity, enable_picking=enable_picking, ) obj.billboard_radii = radii_arr.copy() return obj scales = radii_arr directions = (1, 0, 0) vertices, faces = fp.prim_sphere(phi=phi, theta=theta) obj = actor_from_primitive( vertices, faces, centers=centers_arr, colors=colors, scales=scales, directions=directions, opacity=opacity, material=material, smooth=smooth, enable_picking=enable_picking, wireframe=wireframe, wireframe_thickness=wireframe_thickness, ) return obj
[docs] def ellipsoid( centers, *, orientation_matrices=None, lengths=(4, 2, 2), colors=(1, 0, 0), opacity=None, phi=16, theta=16, material="phong", enable_picking=True, smooth=True, wireframe=False, wireframe_thickness=1.0, ): """ Create ellipsoid actor(s) with specified orientation and scaling. Parameters ---------- centers : ndarray (N, 3) Centers of the ellipsoids. orientation_matrices : ndarray, shape (N, 3, 3) or (3, 3), optional Orthonormal rotation matrices defining the orientation of each ellipsoid. Each 3×3 matrix represents a local coordinate frame, with columns corresponding to the ellipsoid’s x-, y-, and z-axes in world coordinates. Must be right-handed and orthonormal. If a single (3, 3) matrix is provided, it is broadcast to all ellipsoids. lengths : ndarray (N, 3) or (3,) or tuple (3,), optional Scaling factors along each axis. colors : array-like or tuple, optional RGB or RGBA colors. Accepts values in [0, 255] (int), [0, 1] (float), or hex strings (e.g. "#FF0000"). Values above 1.0 are treated as [0, 255] and normalized internally. opacity : float, optional Opacity of the ellipsoids. Takes values from 0 (fully transparent) to 1 (opaque). If both `opacity` and RGBA are provided, the final alpha will be: final_alpha = alpha_in_RGBA * opacity. phi : int, optional The number of segments in the longitude direction. theta : int, optional The number of segments in the latitude direction. material : str, optional The material type for the ellipsoids. Options are 'phong' and 'basic'. enable_picking : bool, optional Allow picking of the ellipsoids in a 3D scene. smooth : bool, optional Whether to create a smooth ellipsoid or a faceted ellipsoid. wireframe : bool, optional Whether to render the mesh as a wireframe. wireframe_thickness : float, optional The thickness of the wireframe lines. Returns ------- Actor A mesh actor containing the generated ellipsoids. Examples -------- >>> from fury import window, actor >>> import numpy as np >>> from fury import actor, window >>> centers = np.array([[0, 0, 0]]) >>> lengths = np.array([[2, 1, 1]]) >>> colors = np.array([[1, 0, 0]]) >>> ellipsoid = actor.ellipsoid(centers=centers, lengths=lengths, colors=colors) >>> window.show([ellipsoid]) """ centers = np.asarray(centers) if orientation_matrices is None: orientation_matrices = np.tile(np.eye(3), (centers.shape[0], 1, 1)) orientation_matrices = np.asarray(orientation_matrices) lengths = np.asarray(lengths) colors = normalize_colors(colors) if centers.ndim == 1: centers = centers.reshape(1, 3) if centers.ndim != 2 or centers.shape[1] != 3: raise ValueError("Centers must be (N, 3) array") if orientation_matrices.ndim == 2: orientation_matrices = np.tile(orientation_matrices, (centers.shape[0], 1, 1)) if orientation_matrices.ndim != 3 or orientation_matrices.shape[1:] != (3, 3): raise ValueError("Axes must be (N, 3, 3) array") if lengths.ndim == 1: lengths = lengths.reshape(1, 3) if lengths.ndim != 2 or lengths.shape[1] != 3: raise ValueError("Lengths must be (N, 3) array") if lengths.size == 3: lengths = np.tile(lengths.reshape(1, -1), (centers.shape[0], 1)) if lengths.shape != centers.shape: raise ValueError("Lengths must match centers shape") if colors.size == 3 or colors.size == 4: colors = np.tile(colors.reshape(1, -1), (centers.shape[0], 1)) base_verts, base_faces = fp.prim_sphere(phi=phi, theta=theta) base_verts = np.asarray(base_verts) base_faces = np.asarray(base_faces) if base_verts.ndim != 2 or base_verts.shape[1] != 3: raise ValueError(f"base_verts has unexpected shape {base_verts.shape}") if isinstance(colors, (list, tuple)): colors = np.asarray(colors) if colors.ndim == 1: colors = np.tile(colors, (centers.shape[0], 1)) n_ellipsoids = centers.shape[0] n_verts = base_verts.shape[0] scaled_transforms = orientation_matrices * lengths[:, np.newaxis, :] transformed = ( np.einsum("nij,mj->nmi", scaled_transforms, base_verts) + centers[:, np.newaxis, :] ) all_vertices = transformed.reshape(-1, 3) all_faces = np.tile(base_faces, (n_ellipsoids, 1)) + ( np.arange(n_ellipsoids)[:, None, None] * n_verts ) all_faces = all_faces.reshape(-1, 3) all_colors = np.repeat(colors, n_verts, axis=0) return actor_from_primitive( centers=centers, vertices=all_vertices, faces=all_faces, colors=all_colors, opacity=opacity, material=material, smooth=smooth, enable_picking=enable_picking, repeat_primitive=False, wireframe=wireframe, wireframe_thickness=wireframe_thickness, )
[docs] def cylinder( centers, *, colors=(1, 1, 1), height=1, sectors=36, radii=0.5, scales=(1, 1, 1), directions=(0, 1, 0), capped=True, opacity=None, material="phong", enable_picking=True, wireframe=False, wireframe_thickness=1.0, ): """ Create one or many cylinders with different features. Parameters ---------- centers : ndarray, shape (N, 3) Cylinder positions. colors : ndarray, shape (N, 3) or (N, 4) or tuple (3,) or tuple (4,), optional RGB or RGBA colors. Accepts values in [0, 255] (int), [0, 1] (float), or hex strings (e.g. "#FF0000"). Values above 1.0 are treated as [0, 255] and normalized internally. height : float or ndarray, shape (N,), optional The height of the cylinder. A single value applies to all cylinders, while an array specifies a height for each cylinder individually. sectors : int, optional The number of divisions around the cylinder's circumference. Higher values produce smoother cylinders. radii : float or ndarray, shape (N,) or tuple, optional The radius of the base of the cylinders. A single value applies to all cylinders, while an array specifies a radius for each cylinder individually. scales : ndarray, shape (N, 3) or tuple (3,) or float, optional The size of the cylinder in each dimension. If a single value is provided, the same size will be used for all cylinders. directions : ndarray, shape (N, 3) or tuple (3,), optional The orientation vector of the cylinder. capped : bool, optional Whether to add caps (circular ends) to the cylinders. opacity : float, optional Takes values from 0 (fully transparent) to 1 (opaque). If both `opacity` and RGBA are provided, the final alpha will be: final_alpha = alpha_in_RGBA * opacity. material : str, optional The material type for the cylinders. Options are 'phong' and 'basic'. enable_picking : bool, optional Whether the cylinders should be pickable in a 3D scene. wireframe : bool, optional Whether to render the mesh as a wireframe. wireframe_thickness : float, optional The thickness of the wireframe lines. Returns ------- Actor A mesh actor containing the generated cylinders, with the specified material and properties. Examples -------- >>> from fury import window, actor >>> import numpy as np >>> scene = window.Scene() >>> centers = np.random.rand(5, 3) * 10 >>> colors = np.random.rand(5, 3) >>> cylinder_actor = actor.cylinder(centers=centers, colors=colors) >>> _ = scene.add(cylinder_actor) >>> show_manager = window.ShowManager(scene=scene, size=(600, 600)) >>> show_manager.start() """ n_centers = len(centers) radii_arr = fp._normalize_geom_param(radii, n_centers, "radii") height_arr = fp._normalize_geom_param(height, n_centers, "height") all_uniform = np.all(radii_arr == radii_arr[0]) and np.all( height_arr == height_arr[0] ) if all_uniform: vertices, faces = fp.prim_cylinder( radius=radii_arr[0], height=height_arr[0], sectors=sectors, capped=capped ) return actor_from_primitive( vertices, faces, centers=centers, colors=colors, scales=scales, directions=directions, opacity=opacity, material=material, enable_picking=enable_picking, wireframe=wireframe, wireframe_thickness=wireframe_thickness, ) _, faces = fp.prim_cylinder( radius=radii_arr[0], height=height_arr[0], sectors=sectors, capped=capped ) all_verts = [ fp.prim_cylinder( radius=radii_arr[i], height=height_arr[i], sectors=sectors, capped=capped )[0] for i in range(n_centers) ] vertices = np.concatenate(all_verts) return actor_from_primitive( vertices, faces, centers=centers, colors=colors, scales=scales, directions=directions, opacity=opacity, material=material, enable_picking=enable_picking, wireframe=wireframe, wireframe_thickness=wireframe_thickness, have_tiled_verts=True, )
[docs] def cone( centers, *, colors=(1, 1, 1), height=1, sectors=10, radii=0.5, scales=(1, 1, 1), directions=(0, 1, 0), opacity=None, material="phong", enable_picking=True, wireframe=False, wireframe_thickness=1.0, ): """ Create one or many cones with different features. Parameters ---------- centers : ndarray, shape (N, 3) Cone positions. colors : ndarray, shape (N, 3) or (N, 4) or tuple (3,) or tuple (4,), optional RGB or RGBA colors. Accepts values in [0, 255] (int), [0, 1] (float), or hex strings (e.g. "#FF0000"). Values above 1.0 are treated as [0, 255] and normalized internally. height : float or ndarray, shape (N,), optional The height of the cone. A single value applies to all cones, while an array specifies a height for each cone individually. sectors : int, optional The number of divisions around the cone's circumference. Higher values produce smoother cones. radii : float or ndarray, shape (N,) or tuple, optional The radius of the base of the cones. A single value applies to all cones, while an array specifies a radius for each cone individually. scales : ndarray, shape (N, 3) or tuple (3,) or float, optional The size of the cone in each dimension. If a single value is provided, the same size will be used for all cones. directions : ndarray, shape (N, 3) or tuple (3,), optional The orientation vector of the cone. opacity : float, optional Takes values from 0 (fully transparent) to 1 (opaque). If both `opacity` and RGBA are provided, the final alpha will be: final_alpha = alpha_in_RGBA * opacity. material : str, optional The material type for the cones. Options are 'phong' and 'basic'. enable_picking : bool, optional Whether the cones should be pickable in a 3D scene. wireframe : bool, optional Whether to render the mesh as a wireframe. wireframe_thickness : float, optional The thickness of the wireframe lines. Returns ------- Actor A mesh actor containing the generated cones, with the specified material and properties. Examples -------- >>> from fury import window, actor >>> import numpy as np >>> scene = window.Scene() >>> centers = np.random.rand(5, 3) * 10 >>> colors = np.random.rand(5, 3) >>> cone_actor = actor.cone(centers=centers, colors=colors) >>> _ = scene.add(cone_actor) >>> show_manager = window.ShowManager(scene=scene, size=(600, 600)) >>> show_manager.start() """ n_centers = len(centers) radii_arr = fp._normalize_geom_param(radii, n_centers, "radii") height_arr = fp._normalize_geom_param(height, n_centers, "height") all_uniform = np.all(radii_arr == radii_arr[0]) and np.all( height_arr == height_arr[0] ) if all_uniform: vertices, faces = fp.prim_cone( radius=radii_arr[0], height=height_arr[0], sectors=sectors ) return actor_from_primitive( vertices, faces, centers=centers, colors=colors, scales=scales, directions=directions, opacity=opacity, material=material, enable_picking=enable_picking, wireframe=wireframe, wireframe_thickness=wireframe_thickness, ) _, faces = fp.prim_cone(radius=radii_arr[0], height=height_arr[0], sectors=sectors) all_verts = [ fp.prim_cone(radius=radii_arr[i], height=height_arr[i], sectors=sectors)[0] for i in range(n_centers) ] vertices = np.concatenate(all_verts) return actor_from_primitive( vertices, faces, centers=centers, colors=colors, scales=scales, directions=directions, opacity=opacity, material=material, enable_picking=enable_picking, wireframe=wireframe, wireframe_thickness=wireframe_thickness, have_tiled_verts=True, )
class Streamlines(Line): """ Create a streamline representation. Parameters ---------- lines : ndarray, shape (N, 3) The positions of the points along the streamline. colors : ndarray, shape (N, 3) or (N, 4), optional RGB or RGBA (for opacity) R, G, B and A should be at the range [0, 1]. thickness : float, optional The thickness of the streamline. opacity : float, optional The opacity of the streamline. outline_thickness : float, optional The thickness of the outline. outline_color : tuple, optional The color of the outline. roi_mask : ndarray, optional 3D array where values > 0 define the ROI voxels. This enables shader-side ROI testing using the volumetric mask, which is assumed to be centered at the origin with unit voxel spacing. roi_origin : array-like of shape (3,), optional World-space coordinates of the ROI voxel (0, 0, 0). When not provided, defaults to (0, 0, 0). enable_picking : bool, optional Whether the streamline should be pickable in a 3D scene. line_lengths : ndarray, optional Optional precomputed point counts for each line (used for ROI baking). When omitted, lengths are inferred from NaN separators. line_offsets : ndarray, optional Optional precomputed offsets into the flattened buffer for each line (used for ROI baking). When omitted, offsets are inferred from NaN separators. """ @staticmethod def _compute_line_metadata(positions, *, line_lengths=None, line_offsets=None): """ Infer per-line lengths and offsets from a flattened buffer with NaNs. Parameters ---------- positions : ndarray, shape (N, 3) The flattened array of line positions, with NaNs separating lines. line_lengths : ndarray, optional Optional precomputed point counts for each line. line_offsets : ndarray, optional Optional precomputed offsets into the flattened buffer for each line. Returns ------- tuple of ndarray A tuple (line_lengths, line_offsets). """ if line_lengths is not None and line_offsets is not None: return ( np.asarray(line_lengths, dtype=np.uint32).reshape(-1), np.asarray(line_offsets, dtype=np.uint32).reshape(-1), ) positions = np.asarray(positions) if positions.ndim != 2 or positions.shape[1] != 3: raise ValueError("lines must be a 2D array of shape (N, 3)") nan_mask = np.any(~np.isfinite(positions), axis=1) split_indices = list(np.where(nan_mask)[0]) + [positions.shape[0]] lengths = [] offsets = [] start = 0 for idx in split_indices: if idx > start: offsets.append(start) lengths.append(idx - start) start = idx + 1 return np.asarray(lengths, dtype=np.uint32), np.asarray( offsets, dtype=np.uint32 ) @staticmethod def _validate_roi_mask(mask): """ Validate and extract the mask. Parameters ---------- mask : ndarray 3D array where values > 0 define the ROI voxels. Returns ------- ndarray The validated ROI mask as a binary array. Raises ------ ValueError If the mask is not a 3D array. """ mask = np.asarray(mask) if mask.ndim != 3: raise ValueError("roi_mask must be a 3D array.") return (mask > 0).astype(np.uint8) def __init__( self, lines, *, colors=None, thickness=2.0, opacity=1.0, outline_thickness=1.0, outline_color=(1, 0, 0), roi_mask=None, roi_origin=None, enable_picking=True, line_lengths=None, line_offsets=None, ): """ Create a streamline representation. Parameters ---------- lines : ndarray, shape (N, 3) The positions of the points along the streamline. colors : ndarray, shape (N, 3) or (N, 4), optional RGB or RGBA (for opacity) R, G, B and A should be at the range [0, 1]. thickness : float, optional The thickness of the streamline. opacity : float, optional The opacity of the streamline. outline_thickness : float, optional The thickness of the outline. outline_color : tuple, optional The color of the outline. roi_mask : ndarray, optional 3D array where values > 0 define the ROI voxels. This enables shader-side ROI testing using the volumetric mask, which is assumed to be centered at the origin with unit voxel spacing. roi_origin : array-like of shape (3,), optional World-space coordinates of the ROI voxel (0, 0, 0). When not provided, defaults to (0, 0, 0). enable_picking : bool, optional Whether the streamline should be pickable in a 3D scene. line_lengths : ndarray, optional Optional precomputed point counts for each line (used for ROI baking). When omitted, lengths are inferred from NaN separators. line_offsets : ndarray, optional Optional precomputed offsets into the flattened buffer for each line (used for ROI baking). When omitted, offsets are inferred from NaN separators. """ super().__init__() if not isinstance(thickness, (int, float)) or thickness <= 0: raise ValueError("thickness must be a positive number") opacity = validate_opacity(opacity) if not isinstance(outline_thickness, (int, float)) or outline_thickness < 0: raise ValueError("outline_thickness must be a non-negative number") outline_color = np.asarray(outline_color, dtype=np.float32) if outline_color.size not in (3, 4): raise ValueError( "outline_color must be a tuple/array of 3 (RGB) or 4 (RGBA) values" ) if not np.all((outline_color >= 0) & (outline_color <= 1)): raise ValueError("outline_color values must be between 0 and 1") if not isinstance(enable_picking, bool): raise TypeError("enable_picking must be a boolean") positions_arr = np.asarray(lines, dtype=np.float32) if colors is None: colors_arr = np.ones((positions_arr.shape[0], 4), dtype=np.float32) else: colors_arr = np.asarray(colors, dtype=np.float32) self._roi_origin = self._resolve_roi_origin(roi_origin) self._input_positions_array = positions_arr self._input_colors_array = colors_arr self._line_lengths, self._line_offsets = self._compute_line_metadata( positions_arr, line_lengths=line_lengths, line_offsets=line_offsets ) self.n_lines = int(self._line_lengths.size) self._out_capacity = int(positions_arr.shape[0]) self._color_channels = int(colors_arr.shape[1]) if colors_arr.ndim == 2 else 3 if roi_mask is None: positions_out = positions_arr colors_out = colors_arr else: positions_out = np.full_like(positions_arr, np.nan, dtype=np.float32) colors_out = np.full_like(colors_arr, np.nan, dtype=np.float32) self.geometry = buffer_to_geometry( positions=positions_out.astype("float32"), colors=colors_out.astype("float32"), ) self.geometry.positions._wgpu_usage |= BufferUsage.COPY_SRC material_kwargs = { "outline_thickness": outline_thickness, "outline_color": outline_color, "pick_write": enable_picking, "opacity": opacity, "thickness": thickness, "color_mode": "vertex", } if roi_mask is None: self.material = StreamlinesMaterial(**material_kwargs) self._needs_gpu_update = False else: self.material = _StreamlineBakedMaterial(**material_kwargs) self._needs_gpu_update = True self._line_lengths_buffer = Buffer(self._line_lengths.astype(np.uint32)) self._line_offsets_buffer = Buffer(self._line_offsets.astype(np.uint32)) self._line_positions_in = Buffer(positions_arr.astype(np.float32).ravel()) self._line_colors_in = Buffer(colors_arr.astype(np.float32).ravel()) self._roi_mask_buffer = None self._roi_auto_detach = True self.roi_mask = roi_mask @property def roi_mask(self): """ Get the ROI mask. Returns ------- ndarray or None The ROI mask as a 3D array, or None if no mask is set. """ if self._roi_mask_buffer is not None: return self._roi_mask_buffer.data return None @roi_mask.setter def roi_mask(self, mask): """ Set the ROI mask. Parameters ---------- mask : ndarray 3D array where values > 0 define the ROI voxels. """ if mask is None: self._roi_mask_buffer = None self.material.roi_dim = (0, 0, 0) self.material.roi_enabled = False self._roi_origin = self._resolve_roi_origin(None) self._needs_gpu_update = False self._restore_full_geometry() if isinstance(self.material, _StreamlineBakedMaterial): self.material = StreamlinesMaterial( outline_thickness=self.material.outline_thickness, outline_color=self.material.outline_color, pick_write=self.material.pick_write, opacity=self.material.opacity, thickness=self.material.thickness, color_mode=self.material.color_mode, ) return mask_arr = self._validate_roi_mask(mask) self._roi_mask_buffer = Buffer(mask_arr.astype(np.uint32).ravel()) self.material.roi_dim = mask_arr.shape self.material.roi_enabled = True self._reset_output_buffers() self._ensure_baked_material() self._needs_gpu_update = True @property def roi_origin(self): """ Get the ROI origin. Returns ------- ndarray or None The ROI origin as a (3,) array, or None if no origin is set. """ return self._roi_origin @roi_origin.setter def roi_origin(self, origin): """ Set the ROI origin (world-space position of voxel (0, 0, 0)). Parameters ---------- origin : array-like of shape (3,) or None The ROI origin coordinates. If None, defaults to (0, 0, 0). """ self._roi_origin = self._resolve_roi_origin(origin) if self._roi_mask_buffer is not None: self._ensure_baked_material() self._needs_gpu_update = True def _reset_output_buffers(self): """Clear output buffers before compute baking.""" if self._out_capacity == 0: return nan_positions = np.full_like( self._input_positions_array, np.nan, dtype=np.float32 ) nan_colors = np.full_like(self._input_colors_array, np.nan, dtype=np.float32) if hasattr(self.geometry.positions, "data"): pos_data = self.geometry.positions.data self.geometry.positions.data[...] = nan_positions.reshape(pos_data.shape) else: self.geometry.positions = Buffer(nan_positions.ravel()) if hasattr(self.geometry.colors, "data"): col_data = self.geometry.colors.data self.geometry.colors.data[...] = nan_colors.reshape(col_data.shape) else: self.geometry.colors = Buffer(nan_colors.ravel()) def _restore_full_geometry(self): """Restore full, unfiltered streamline buffers.""" if hasattr(self.geometry.positions, "data"): pos_data = self.geometry.positions.data self.geometry.positions.data[...] = self._input_positions_array.reshape( pos_data.shape ) else: self.geometry.positions = Buffer(self._input_positions_array.ravel()) if hasattr(self.geometry.colors, "data"): col_data = self.geometry.colors.data self.geometry.colors.data[...] = self._input_colors_array.reshape( col_data.shape ) else: self.geometry.colors = Buffer(self._input_colors_array.ravel()) def _ensure_baked_material(self): """Ensure baked material is active when ROI filtering is requested.""" if isinstance(self.material, _StreamlineBakedMaterial): return self.material = _StreamlineBakedMaterial( outline_thickness=self.material.outline_thickness, outline_color=self.material.outline_color, pick_write=self.material.pick_write, opacity=self.material.opacity, thickness=self.material.thickness, color_mode=self.material.color_mode, ) def _resolve_roi_origin(self, origin): """ Validate and resolve ROI origin from user input. Parameters ---------- origin : array-like of shape (3,) or None The ROI origin coordinates. If None, defaults to (0, 0, 0). Returns ------- ndarray The resolved ROI origin as a (3,) array. """ if origin is None: return np.zeros((3,), dtype=np.float32) origin = np.asarray(origin, dtype=np.float32).reshape(-1) if origin.size != 3: raise ValueError("roi_origin must have exactly three values.") return origin def filtered_streamlines(self): """ Get the currently filtered line ids after ROI baking. Returns ------- tuple of lists A tuple (kept_ids, filter_ids) where kept_ids is a list of line ids that passed the ROI filter and filter_ids is a list of line ids that were filtered out. """ read_buffer(self.geometry.positions) kept_ids = [] filter_ids = [] for line_id, (offset, length) in enumerate( zip(self._line_offsets, self._line_lengths, strict=False) ): segment = self.geometry.positions.data[offset : offset + length] if np.isfinite(segment).all(): kept_ids.append(line_id) else: filter_ids.append(line_id) return kept_ids, filter_ids
[docs] def streamlines( lines, *, colors=(1, 0, 0), thickness=2.0, opacity=1.0, outline_thickness=1.0, outline_color=(0, 0, 0), roi_mask=None, roi_origin=None, enable_picking=True, ): """ Create a streamline representation. Parameters ---------- lines : list of ndarray of shape (P, 3) or ndarray of shape (N, P, 3) Lines points. colors : ndarray, shape (N, 3) or (N, 4) or tuple (3,) or tuple (4,), optional RGB or RGBA (for opacity) R, G, B and A should be at the range [0, 1]. thickness : float, optional The thickness of the streamline. opacity : float, optional The opacity of the streamline. outline_thickness : float, optional The thickness of the outline. outline_color : tuple, optional The color of the outline. roi_mask : ndarray, optional 3D array where values > 0 define the ROI voxels. When provided, streamlines are filtered in a compute baking pass using this volumetric mask, which is assumed to be centered at the origin with unit voxel spacing. roi_origin : array-like of shape (3,), optional Origin of the ROI voxel (0, 0, 0) in world-space coordinates. When not provided, defaults to (0, 0, 0). enable_picking : bool, optional Whether the streamline should be pickable in a 3D scene. Returns ------- Streamline The created streamline object. """ lines_list = ( [np.asarray(seg) for seg in lines] if not isinstance(lines, np.ndarray) or lines.ndim != 3 else [np.asarray(seg) for seg in lines] ) lengths = np.asarray([len(seg) for seg in lines_list], dtype=np.uint32) offsets = np.zeros_like(lengths) running = 0 for i, ln in enumerate(lengths): offsets[i] = running running += int(ln) if i < len(lengths) - 1: running += 1 # separator slot lines_positions, lines_colors = line_buffer_separator(lines_list, color=colors) return Streamlines( lines_positions, colors=lines_colors, thickness=thickness, opacity=opacity, outline_thickness=outline_thickness, outline_color=outline_color, roi_mask=roi_mask, roi_origin=roi_origin, enable_picking=enable_picking, line_lengths=lengths, line_offsets=offsets, )
@register_wgpu_render_function(Streamlines, StreamlinesMaterial) def _register_render_streamline(wobject): """ Register the streamline render function. Parameters ---------- wobject : Streamline The streamline object to register. Returns ------- StreamlineShader The created streamline shader. """ return StreamlinesShader(wobject) @register_wgpu_render_function(Streamlines, _StreamlineBakedMaterial) def _register_streamline_baking_shaders(wobject): """ Register the streamline baking shaders. Parameters ---------- wobject : Streamline The streamline object to register. Returns ------- _StreamlineBakingShader, StreamlinesShader The created streamline shader pipeline. """ compute_shader = _StreamlineBakingShader(wobject) render_shader = StreamlinesShader(wobject) return compute_shader, render_shader @njit(cache=True) def compute_tangents(points): """ Calculate normalized tangent vectors for a series of points. Uses central differences for interior points and forward/backward for endpoints. Parameters ---------- points : ndarray, shape (N, 3) Input points for which to compute tangents. Returns ------- ndarray, shape (N, 3) Normalized tangent vectors for the input points. """ N = points.shape[0] tangents = np.zeros_like(points) if N < 2: return tangents for i in range(1, N - 1): tangents[i] = points[i + 1] - points[i - 1] tangents[0] = points[1] - points[0] tangents[-1] = points[-1] - points[-2] for i in range(N): norm = np.sqrt(np.sum(tangents[i] ** 2)) if norm > 1e-6: tangents[i] /= norm return tangents @njit(cache=True) def parallel_transport_frames(tangents): """ Generate a continuous coordinate frame along a curve defined by tangents. This is a continuous, non-twisting coordinate frame (normal, binormal) along a curve defined by tangents using parallel transport. Parameters ---------- tangents : ndarray, shape (N, 3) Tangent vectors along the curve. Returns ------- normals : ndarray, shape (N, 3) Normal vectors for the curve. binormals : ndarray, shape (N, 3) Binormal vectors for the curve. """ N = tangents.shape[0] normals = np.zeros_like(tangents) binormals = np.zeros_like(tangents) if N == 0: return normals, binormals t0 = tangents[0] ref1 = np.array([0.0, 0.0, 1.0], dtype=tangents.dtype) ref2 = np.array([1.0, 0.0, 0.0], dtype=tangents.dtype) ref = ref1 if np.abs(np.dot(t0, ref1)) < 0.99 else ref2 b0 = np.cross(t0, ref) b0_norm = np.linalg.norm(b0) if b0_norm > 1e-6: b0 /= b0_norm n0 = np.cross(b0, t0) n0_norm = np.linalg.norm(n0) if n0_norm > 1e-6: n0 /= n0_norm normals[0] = n0 binormals[0] = b0 for i in range(1, N): prev_t = tangents[i - 1] curr_t = tangents[i] axis = np.cross(prev_t, curr_t) sin_angle = np.linalg.norm(axis) cos_angle = np.dot(prev_t, curr_t) if sin_angle > 1e-6: axis /= sin_angle prev_n = normals[i - 1] normals[i] = ( prev_n * cos_angle + np.cross(axis, prev_n) * sin_angle + axis * np.dot(axis, prev_n) * (1 - cos_angle) ) prev_b = binormals[i - 1] binormals[i] = ( prev_b * cos_angle + np.cross(axis, prev_b) * sin_angle + axis * np.dot(axis, prev_b) * (1 - cos_angle) ) else: normals[i] = normals[i - 1] binormals[i] = binormals[i - 1] return normals, binormals @njit(cache=True) def generate_tube_geometry(points, number_of_sides, radius, end_caps): """ Generate vertices and triangles for a single tube. This function is Core Numba-optimized function to generate vertices and triangles for a single tube. Parameters ---------- points : ndarray, shape (N, 3) The points defining the centerline of the tube. number_of_sides : int The number of sides for the tube's cross-section. radius : float The radius of the tube. end_caps : bool Whether to include end caps on the tube. Returns ------- vertices : ndarray, shape (V, 3) The vertices of the tube. indices : ndarray, shape (T, 3) The triangle indices for the tube. """ N = points.shape[0] if N < 2: return np.zeros((0, 3), dtype=np.float32), np.zeros((0, 3), dtype=np.int32) tangents = compute_tangents(points) normals, binormals = parallel_transport_frames(tangents) cap_v_count = 2 if end_caps else 0 total_vertices = N * number_of_sides + cap_v_count vertices = np.empty((total_vertices, 3), dtype=np.float32) num_tube_tris = (N - 1) * number_of_sides * 2 cap_tri_count = number_of_sides * 2 if end_caps else 0 indices = np.empty((num_tube_tris + cap_tri_count, 3), dtype=np.int32) step = (2 * np.pi) / number_of_sides angles = np.arange(number_of_sides) * step for i in range(N): for j in range(number_of_sides): offset = normals[i] * np.cos(angles[j]) + binormals[i] * np.sin(angles[j]) vertices[i * number_of_sides + j] = points[i] + radius * offset idx = 0 for i in range(N - 1): for j in range(number_of_sides): v1 = i * number_of_sides + j v2 = i * number_of_sides + (j + 1) % number_of_sides v3 = (i + 1) * number_of_sides + j v4 = (i + 1) * number_of_sides + (j + 1) % number_of_sides indices[idx] = [v1, v2, v4] indices[idx + 1] = [v1, v4, v3] idx += 2 if end_caps: start_cap_v_idx = N * number_of_sides end_cap_v_idx = start_cap_v_idx + 1 vertices[start_cap_v_idx] = points[0] vertices[end_cap_v_idx] = points[-1] for i in range(number_of_sides): indices[idx] = [(i + 1) % number_of_sides, i, start_cap_v_idx] idx += 1 v_start_of_end_ring = (N - 1) * number_of_sides indices[idx] = [ v_start_of_end_ring + i, v_start_of_end_ring + (i + 1) % number_of_sides, end_cap_v_idx, ] idx += 1 return vertices, indices def _estimate_streamtube_buffer_size( line_lengths, segments, end_caps, color_components ): """ Estimate total buffer usage for streamtube geometry. Parameters ---------- line_lengths : ndarray, shape (N,) Number of points in each line. segments : int Number of radial segments used to build the tube cross-section. end_caps : bool If ``True``, caps are generated on both ends of each tube. color_components : int Number of color channels stored per line or vertex (e.g., 3 or 4). Returns ------- int Total bytes across all planned buffer allocations for the provided lines. """ lengths = np.asarray(line_lengths, dtype=np.uint32).reshape(-1) if lengths.size == 0: return 0 tube_sides = int(segments) max_line_length = int(lengths.max(initial=0)) n_lines = int(lengths.size) segments_per_line = np.maximum(lengths - 1, 0) ring_vertices_per_line = lengths * tube_sides cap_vertex_count = 2 if end_caps else 0 vertices_per_line = ring_vertices_per_line + cap_vertex_count ring_triangles_per_line = segments_per_line * tube_sides * 2 cap_triangles_per_line = (tube_sides * 2) if end_caps else 0 triangles_per_line = ring_triangles_per_line + cap_triangles_per_line total_vertices = int(vertices_per_line.astype(np.uint64).sum()) total_triangles = int(triangles_per_line.astype(np.uint64).sum()) line_data_bytes = int(n_lines * max_line_length * 3 * 4) length_buffer_bytes = int(n_lines * 4) color_buffer_bytes = int(n_lines * color_components * 4) vertex_offset_bytes = int(n_lines * 4) triangle_offset_bytes = int(n_lines * 4) positions_bytes = int(total_vertices * 3 * 4) normals_bytes = int(total_vertices * 3 * 4) vertex_colors_bytes = int(total_vertices * color_components * 4) indices_bytes = int(total_triangles * 3 * 4) return ( line_data_bytes + length_buffer_bytes + color_buffer_bytes + vertex_offset_bytes + triangle_offset_bytes + positions_bytes + normals_bytes + vertex_colors_bytes + indices_bytes ) def _split_streamtube_lines( lines, *, segments, end_caps, color_components, max_buffer_size ): """ Split lines into batches derived from total/available buffer ratio. Parameters ---------- lines : list of ndarray Streamtube centerlines to render. segments : int Number of radial segments used to build each tube. end_caps : bool Whether end caps are included. color_components : int Number of color channels stored per line or vertex. max_buffer_size : int Maximum allowed buffer size in bytes. Returns ------- list of list of ndarray Ordered batches of lines. A single batch is returned when no split is necessary. Raises ------ ValueError If even a single line cannot fit within the maximum buffer size. """ line_lengths = np.asarray([len(line) for line in lines], dtype=np.uint32) longest = np.asarray([int(line_lengths.max(initial=0))], dtype=np.uint32) longest_required = _estimate_streamtube_buffer_size( longest, segments=segments, end_caps=end_caps, color_components=color_components, ) if longest_required > max_buffer_size: raise ValueError( "Streamtube data for a single line exceeds the available buffer " "size; consider shortening the line or reducing the number of " "segments." ) full_required = _estimate_streamtube_buffer_size( line_lengths, segments=segments, end_caps=end_caps, color_components=color_components, ) if full_required <= max_buffer_size: return [lines] import math n_batches = max(1, int(math.ceil(full_required / max_buffer_size))) batch_size = int(math.ceil(len(lines) / n_batches)) return [ lines[start : start + batch_size] for start in range(0, len(lines), batch_size) ] def _create_streamtube_baked( lines, *, colors=None, opacity=1.0, radius=0.1, segments=6, end_caps=True, enable_picking=True, flat_shading=False, material="phong", ): """ Internal: Create streamtube geometry on the GPU using compute shaders. This function is used internally by streamtube() and should not be called directly by users. Parameters ---------- lines : sequence of array_like, shape (N_i, 3) Iterable of polylines representing streamline vertices. colors : array_like, str, list of ndarray, or None, optional Specifies how tubes are colored. The following modes are supported: - None : All tubes are colored white (default). - Single color : A tuple or 1-D array of 3 (RGB) or 4 (RGBA) floats in [0, 1], e.g. (1, 0, 0). Applied uniformly to every tube. - Per-line colors : A 2-D array of shape (n_lines, 3) or (n_lines, 4) giving one color per streamline. - Per-point colors : A list/tuple of n_lines arrays, each of shape (N_i, 3) or (N_i, 4) where N_i matches the number of points in the corresponding line. Each vertex receives its own color. - "rgb" : Each vertex is colored by the absolute value of its local tangent direction mapped to RGB channels (orientation-based coloring). opacity : float, optional Opacity multiplier applied to the material. Valid range is [0, 1]. radius : float, optional Tube radius in world units. segments : int, optional Number of radial segments making up the tube cross-section. end_caps : bool, optional If ``True`` flat caps are generated on both ends of each tube. enable_picking : bool, optional Whether the mesh writes to the picking buffer. flat_shading : bool, optional Controls whether flat or smooth shading is used by the Phong material. material : {"phong"}, optional Material type. GPU streamtubes currently only support ``"phong"``. Returns ------- Mesh A pygfx mesh containing GPU-generated streamtube geometry and material. """ if material != "phong": raise ValueError("GPU streamtubes currently support material='phong' only.") opacity = validate_opacity(opacity) lines_arr = [ np.asarray(line, dtype=np.float32).reshape(-1, 3) for line in np.asarray(lines, dtype=object) ] n_lines = len(lines_arr) if n_lines == 0: geometry = buffer_to_geometry( positions=np.zeros((0, 3), dtype=np.float32), normals=np.zeros((0, 3), dtype=np.float32), colors=np.zeros((0, 3), dtype=np.float32), indices=np.zeros((0, 3), dtype=np.uint32), ) material_obj = _StreamtubeBakedMaterial( opacity=opacity, pick_write=enable_picking, flat_shading=flat_shading, color_mode="vertex", ) material_obj.radius = radius material_obj.segments = segments material_obj.end_caps = end_caps return create_mesh(geometry=geometry, material=material_obj) line_lengths = np.array([line.shape[0] for line in lines_arr], dtype=np.uint32) max_line_length = int(line_lengths.max(initial=0)) line_data = np.zeros((n_lines, max_line_length, 3), dtype=np.float32) for idx, line in enumerate(lines_arr): line_data[idx, : line.shape[0]] = line use_rgb_mode = isinstance(colors, str) and colors.lower() == "rgb" use_per_point_colors = ( isinstance(colors, (list, tuple)) and len(colors) > 0 and isinstance(colors[0], np.ndarray) and colors[0].ndim == 2 ) point_color_data = None line_colors = None if colors is None: colors = np.array([1.0, 1.0, 1.0], dtype=np.float32) if not use_rgb_mode and not use_per_point_colors: colors = np.asarray(colors, dtype=np.float32) if colors.ndim == 1: if colors.size == 3: line_colors = np.tile(colors, (n_lines, 1)) elif colors.size == 4: line_colors = np.tile(colors[:3], (n_lines, 1)) else: raise ValueError( "Single color must have 3 (RGB) or 4 (RGBA) components, " f"got {colors.size}" ) elif colors.ndim == 2: if colors.shape[0] == 1: if colors.shape[1] in (3, 4): line_colors = np.tile(colors[0, :3], (n_lines, 1)) else: raise ValueError( "Color must have 3 (RGB) or 4 (RGBA) components, " f"got {colors.shape[1]}" ) elif colors.shape[0] == n_lines: if colors.shape[1] in (3, 4): line_colors = colors[:, :3].astype(np.float32) else: raise ValueError( "Color must have 3 (RGB) or 4 (RGBA) components, " f"got {colors.shape[1]}" ) else: raise ValueError( f"Color array first dimension must be 1 or {n_lines} " f"(number of lines), got {colors.shape[0]}" ) else: raise ValueError(f"Colors must be 1D or 2D array, got {colors.ndim}D array") elif use_per_point_colors: color_arrays = [np.asarray(c, dtype=np.float32) for c in colors] if len(color_arrays) != n_lines: raise ValueError( f"Per-point colors list must have {n_lines} arrays, " f"got {len(color_arrays)}" ) color_components = color_arrays[0].shape[1] if color_components not in (3, 4): raise ValueError( "Per-point colors must have 3 (RGB) or 4 (RGBA) components, " f"got {color_components}" ) if color_components == 4: color_arrays = [c[:, :3] for c in color_arrays] color_components = 3 for idx, (ca, ll) in enumerate(zip(color_arrays, line_lengths, strict=False)): if ca.shape[0] != ll: raise ValueError( f"Per-point color array {idx} has {ca.shape[0]} points " f"but line has {ll} points" ) point_color_data = np.zeros( (n_lines, max_line_length, color_components), dtype=np.float32 ) for idx, ca in enumerate(color_arrays): point_color_data[idx, : ca.shape[0]] = ca elif use_rgb_mode: color_components = 3 if line_colors is not None: line_colors = line_colors.astype(np.float32, copy=False) color_components = line_colors.shape[1] tube_sides = int(segments) segments_per_line = np.maximum(line_lengths - 1, 0).astype(np.uint32) ring_vertices_per_line = line_lengths * tube_sides cap_vertex_count = 2 if end_caps else 0 vertices_per_line = ring_vertices_per_line + cap_vertex_count ring_triangles_per_line = segments_per_line * tube_sides * 2 cap_triangles_per_line = (tube_sides * 2) if end_caps else 0 triangles_per_line = ring_triangles_per_line + cap_triangles_per_line vertex_offsets = np.zeros(n_lines, dtype=np.uint32) triangle_offsets = np.zeros(n_lines, dtype=np.uint32) if n_lines > 1: vertex_offsets[1:] = np.cumsum(vertices_per_line[:-1], dtype=np.uint64).astype( np.uint32 ) triangle_offsets[1:] = np.cumsum( triangles_per_line[:-1], dtype=np.uint64 ).astype(np.uint32) total_vertices = int(vertices_per_line.astype(np.uint64).sum()) total_triangles = int(triangles_per_line.astype(np.uint64).sum()) positions_data = np.zeros((total_vertices, 3), dtype=np.float32) normals_data = np.zeros((total_vertices, 3), dtype=np.float32) colors_data = np.zeros((total_vertices, color_components), dtype=np.float32) indices_data = np.zeros((total_triangles, 3), dtype=np.uint32) min_point = max_point = np.zeros(3, dtype=np.float32) for line in lines_arr: if line.size == 0: continue line_min = line.min(axis=0) line_max = line.max(axis=0) min_point = np.minimum(min_point, line_min) max_point = np.maximum(max_point, line_max) positions_data[0] = min_point positions_data[1] = max_point if use_per_point_colors: vertex_idx = 0 for line_idx in range(n_lines): n_pts = int(line_lengths[line_idx]) for pt_idx in range(n_pts): pt_color = point_color_data[line_idx, pt_idx] start = vertex_idx + pt_idx * tube_sides end = start + tube_sides colors_data[start:end] = pt_color if end_caps: colors_data[vertex_idx + n_pts * tube_sides] = point_color_data[ line_idx, 0 ] colors_data[vertex_idx + n_pts * tube_sides + 1] = point_color_data[ line_idx, n_pts - 1 ] vertex_idx += int(vertices_per_line[line_idx]) elif not use_rgb_mode: vertex_idx = 0 for line_idx in range(n_lines): n_verts = int(vertices_per_line[line_idx]) colors_data[vertex_idx : vertex_idx + n_verts] = line_colors[line_idx] vertex_idx += n_verts geometry = buffer_to_geometry( positions=positions_data, normals=normals_data, colors=colors_data, indices=indices_data, ) material_obj = _StreamtubeBakedMaterial( opacity=opacity, pick_write=enable_picking, flat_shading=flat_shading, color_mode="vertex", ) material_obj.radius = radius material_obj.segments = segments material_obj.end_caps = end_caps mesh_obj = create_mesh(geometry=geometry, material=material_obj) mesh_obj.n_lines = n_lines mesh_obj.max_line_length = max_line_length mesh_obj.tube_sides = tube_sides mesh_obj.radius = float(radius) mesh_obj.line_lengths = line_lengths mesh_obj.vertex_offsets = vertex_offsets mesh_obj.triangle_offsets = triangle_offsets mesh_obj.end_caps = end_caps mesh_obj.lines = lines_arr mesh_obj.line_colors = line_colors mesh_obj.color_components = color_components mesh_obj.use_rgb_mode = use_rgb_mode mesh_obj.use_per_point_colors = use_per_point_colors mesh_obj._needs_gpu_update = True mesh_obj.line_buffer = Buffer(line_data.reshape(-1)) mesh_obj.length_buffer = Buffer(line_lengths) _color_buf = ( line_colors if line_colors is not None else np.zeros((n_lines, color_components), dtype=np.float32) ) mesh_obj.color_buffer = Buffer(_color_buf) if use_per_point_colors: mesh_obj.point_color_buffer = Buffer( point_color_data.reshape(-1).astype(np.float32) ) else: mesh_obj.point_color_buffer = None mesh_obj.vertex_offset_buffer = Buffer(vertex_offsets) mesh_obj.triangle_offset_buffer = Buffer(triangle_offsets) material_obj._setup_compute_shader( line_count=n_lines, max_line_length=max_line_length, tube_segments=tube_sides, ) return mesh_obj def _streamtube_geometry_task(points, segments, radius, end_caps): """ Generate tube geometry for a single line on the CPU backend. Parameters ---------- points : ndarray, shape (M, 3) Centerline points for one streamline. segments : int Number of radial segments used for the tube cross-section. radius : float Tube radius in world units. end_caps : bool If ``True``, flat caps are added to both ends. Returns ------- tuple ``(vertices, indices)`` arrays suitable for mesh construction. """ points_arr = np.asarray(points, dtype=np.float32) return generate_tube_geometry(points_arr, segments, radius, end_caps) def _slice_colors_for_lines(colors, start_idx, end_idx): """ Slice per-line color arrays to match a subset of lines. Parameters ---------- colors : array_like or None Original colors passed to :func:`streamtube`. start_idx : int Inclusive start index of the line subset. end_idx : int Exclusive end index of the line subset. Returns ------- array_like or None Colors aligned with the requested slice, or ``None`` when colors are not provided. """ if colors is None: return None colors_arr = np.asarray(colors) if colors_arr.ndim == 2 and colors_arr.shape[0] > 1: return colors_arr[start_idx:end_idx] return colors def _resolve_color_components_for_streamtube(colors, backend): """ Infer the color channel count used for streamtube buffers. Parameters ---------- colors : array_like or None Color specification provided to :func:`streamtube`. backend : {'cpu', 'gpu'} Active rendering backend. GPU baking always stores RGB colors. Returns ------- int Number of color channels (3 or 4) to allocate. """ if isinstance(colors, str) and colors.lower() == "rgb" and backend != "gpu": raise ValueError("colors='rgb' requires backend='gpu'") if ( isinstance(colors, (list, tuple)) and len(colors) > 0 and isinstance(colors[0], np.ndarray) and colors[0].ndim == 2 and backend != "gpu" ): raise ValueError("Per-point colors (list of arrays) requires backend='gpu'") if backend == "gpu": return 3 if colors is None: return 3 colors_arr = np.asarray(colors) if colors_arr.ndim == 1: return int(colors_arr.shape[0]) if colors_arr.ndim == 2: return int(colors_arr.shape[1]) return 3 def _create_streamtube_cpu( lines, *, colors, opacity, radius, segments, end_caps, flat_shading, material, enable_picking, ): """ Create a streamtube actor using CPU-based geometry generation. Parameters ---------- lines : list of ndarray Streamtube centerlines to render. colors : array_like Per-line or global colors matching ``streamtube`` API rules. opacity : float Opacity applied to the created material. radius : float Tube radius in world units. segments : int Number of radial segments per ring. end_caps : bool Whether flat end caps are generated. flat_shading : bool When ``True`` use flat shading; otherwise smooth shading. material : str Material name forwarded to :func:`_create_mesh_material`. enable_picking : bool Whether the actor writes to the picking buffer. Returns ------- Mesh CPU-generated mesh actor containing the streamtube geometry. """ with ThreadPoolExecutor() as executor: results = list( executor.map( _streamtube_geometry_task, lines, itertools.repeat(segments), itertools.repeat(radius), itertools.repeat(end_caps), ) ) all_vertices = [] all_triangles = [] vertex_offset = 0 if not any(r[0].size > 0 for r in results): geo = buffer_to_geometry( indices=np.zeros((0, 3), dtype=np.int32), positions=np.zeros((0, 3), dtype=np.float32), colors=np.zeros((0, 4), dtype=np.float32), ) mat = _create_mesh_material() return create_mesh(geometry=geo, material=mat) for verts, tris in results: if verts.size > 0 and tris.size > 0: all_vertices.append(verts) all_triangles.append(tris + vertex_offset) vertex_offset += verts.shape[0] final_vertices = np.vstack(all_vertices) final_triangles = np.vstack(all_triangles) n_vertices = final_vertices.shape[0] input_colors = np.asarray(colors) if input_colors.ndim == 1: vertex_colors = np.tile(input_colors, (n_vertices, 1)) elif input_colors.ndim == 2 and input_colors.shape[0] == len(lines): color_dim = input_colors.shape[1] vertex_colors = np.zeros((n_vertices, color_dim), dtype=np.float32) current_v_idx = 0 for i, (verts, _) in enumerate(results): num_verts = verts.shape[0] if num_verts > 0: vertex_colors[current_v_idx : current_v_idx + num_verts] = input_colors[ i ] current_v_idx += num_verts else: raise ValueError( "Colors must be a single tuple (e.g., (1,0,0)) or an array of shape " f"(n_lines, 3|4), but got shape {input_colors.shape} " f"for {len(lines)} lines." ) geo = buffer_to_geometry( indices=final_triangles.astype("int32"), positions=final_vertices.astype("float32"), colors=vertex_colors.astype("float32"), ) mat = _create_mesh_material( material=material, opacity=opacity, enable_picking=enable_picking, flat_shading=flat_shading, ) obj = create_mesh(geometry=geo, material=mat) return obj def _create_streamtube_actor( lines, *, backend, colors, opacity, radius, segments, end_caps, flat_shading, material, enable_picking, ): """ Create a single streamtube actor for the requested backend. Parameters ---------- lines : list of ndarray Streamtube centerlines for this batch. backend : {'cpu', 'gpu'} Backend used to build the geometry. colors : array_like or None Per-line or global colors aligned with ``lines``. opacity : float Opacity applied to the created material. radius : float Tube radius in world units. segments : int Number of radial segments per ring. end_caps : bool Whether flat end caps are generated. flat_shading : bool When ``True`` use flat shading; otherwise smooth shading. material : str Material name forwarded to mesh/material constructors. enable_picking : bool Whether the actor writes to the picking buffer. Returns ------- Mesh Streamtube actor constructed for the requested backend. """ if backend == "gpu": return _create_streamtube_baked( lines, colors=colors, opacity=opacity, radius=radius, segments=segments, end_caps=end_caps, enable_picking=enable_picking, flat_shading=flat_shading, material=material, ) return _create_streamtube_cpu( lines, colors=colors, opacity=opacity, radius=radius, segments=segments, end_caps=end_caps, flat_shading=flat_shading, material=material, enable_picking=enable_picking, )
[docs] def streamtube( lines, *, opacity=1.0, colors=(1, 1, 1), radius=0.2, segments=8, end_caps=True, flat_shading=False, material="phong", enable_picking=True, backend="gpu", ): """ Create a streamtube from a list of lines using parallel processing. Parameters ---------- lines : list of ndarray, shape (N, 3) List of lines, where each line is a set of 3D points. opacity : float, optional Overall opacity of the actor, from 0.0 to 1.0. colors : tuple or ndarray, optional - A single color tuple (e.g., (1,0,0)) for all lines. - An array of colors, one for each line (e.g., [[1,0,0], [0,1,0],...]). radius : float, optional The radius of the tubes. segments : int, optional Number of segments for the tube's cross-section. end_caps : bool, optional If True, adds flat caps to the ends of each tube. flat_shading : bool, optional If True, use flat shading; otherwise, smooth shading is used. material : str, optional Material model (e.g., 'phong', 'basic'). enable_picking : bool, optional If True, the actor can be picked in a 3D scene. backend : {"gpu", "cpu"}, optional Backend selection for streamtube generation. Options: - "gpu": Use GPU compute shaders for baked geometry generation. - "cpu": Force CPU-based geometry generation. Returns ------- Actor or Group A mesh actor containing the generated streamtubes. When the input data exceeds the available buffer size, multiple actors are created, added to a ``Group``, and the group is returned. Notes ----- This function performs streamtube geometry creation internally. By default, it uses GPU compute shaders to bake the geometry once using a compute pass and then renders with a standard material. The backend parameter defaults to ``"gpu"`` for optimal performance. Set ``backend="cpu"`` to force CPU-based geometry generation as a fallback option. When buffers would exceed device limits, the data is split into evenly sized batches based on the ratio of estimated total size to available buffer size. """ if lines is None or not hasattr(lines, "__iter__"): raise ValueError("lines must be an iterable of arrays") lines_list = list(lines) if len(lines_list) == 0: raise ValueError("lines cannot be empty") for line_arr in lines_list: if line_arr.ndim != 2 or line_arr.shape[1] != 3: raise ValueError("Each line must be a 2D array of shape (N, 3)") if radius <= 0: raise ValueError(f"radius must be positive, got {radius}") if segments < 3: raise ValueError(f"segments must be at least 3, got {segments}") if backend not in ("cpu", "gpu"): raise ValueError(f"backend must be 'cpu' or 'gpu', got {backend!r}") color_components = _resolve_color_components_for_streamtube(colors, backend) wgpu_device = gfx_wgpu.get_shared().device max_buffer_size = wgpu_device.limits.get( "max-storage-buffer-binding-size", 256 * 1024 * 1024 ) batches = _split_streamtube_lines( lines_list, segments=segments, end_caps=end_caps, color_components=color_components, max_buffer_size=max_buffer_size, ) if len(batches) == 1: return _create_streamtube_actor( batches[0], backend=backend, colors=colors, opacity=opacity, radius=radius, segments=segments, end_caps=end_caps, flat_shading=flat_shading, material=material, enable_picking=enable_picking, ) group = Group() color_start = 0 for batch in batches: color_end = color_start + len(batch) batch_colors = _slice_colors_for_lines(colors, color_start, color_end) actor = _create_streamtube_actor( batch, backend=backend, colors=batch_colors, opacity=opacity, radius=radius, segments=segments, end_caps=end_caps, flat_shading=flat_shading, material=material, enable_picking=enable_picking, ) group.add(actor) color_start = color_end return group
@register_wgpu_render_function(Mesh, _StreamtubeBakedMaterial) def _register_streamtube_baking_shaders(wobject): """ Internal: Create compute and render shaders for GPU streamtubes. This function is called automatically by the render system and should not be invoked directly by users. Parameters ---------- wobject : Mesh Mesh produced by the internal streamtube function. Returns ------- tuple of BaseShader A ``(compute_shader, render_shader)`` pair ready for registration. """ compute_shader = _StreamtubeBakingShader(wobject) render_shader = _StreamtubeRenderShader(wobject) return compute_shader, render_shader