Skip to content
Snippets Groups Projects
geometry.py 10.43 KiB
import numpy as np
from typing import Tuple

from lbmpy.boundaries import NoSlip, UBB
from pystencils.slicing import normalize_slice, shift_slice, slice_intersection, slice_from_direction


def add_box_boundary(boundary_handling, boundary=NoSlip(), replace=True):
    """ Adds wall boundary conditions at all domain boundaries.

    Args:
        boundary_handling: boundary handling object
        boundary: the boundary to set at domain boundaries
        replace: see BoundaryHandling.set_boundary , True overwrites flag field, False only adds the boundary flag

    Returns:
        flag used for the wall boundary
    """
    borders = ['N', 'S', 'E', 'W']
    if boundary_handling.dim == 3:
        borders += ['T', 'B']
    flag = None
    for d in borders:
        flag = boundary_handling.set_boundary(boundary, slice_from_direction(d, boundary_handling.dim), replace=replace)
    assert flag is not None
    return flag


def add_sphere(boundary_handling, midpoint, radius, boundary=NoSlip(), replace=True):
    """Sets boundary in spherical region."""
    def set_sphere(x, y):
        return (x - midpoint[0]) ** 2 + (y - midpoint[1]) ** 2 < radius ** 2
    return boundary_handling.set_boundary(boundary, mask_callback=set_sphere, replace=replace)


def add_pipe_inflow_boundary(boundary_handling, u_max, slice_obj, flow_direction=0, diameter=None):
    """Adds velocity inflow UBB boundary for pipe flow.

    Args:
        boundary_handling: boundary handling object
        u_max: maximum velocity at center of the pipe
        slice_obj: slice describing where the boundary should be set
        flow_direction: dimension index, e.g. 0 for a channel that flows along x,
        diameter: pipe/diameter, if None is passed the maximum diameter is assumed

    Returns:
        flag used for the inflow boundary

    Examples:
        >>> from lbmpy.lbstep import LatticeBoltzmannStep
        >>> from pystencils import make_slice
        >>> pipe = LatticeBoltzmannStep(domain_size=(20, 10, 10), relaxation_rate=1.8, periodicity=(True, False, False))
        >>> flag = add_pipe_inflow_boundary(pipe.boundary_handling, u_max=0.05,
        ...                                 slice_obj=make_slice[0, :, :], flow_direction=0)
    """
    dim = boundary_handling.dim

    def velocity_info_callback(boundary_data):
        for i, name in enumerate(['vel_0', 'vel_1', 'vel_2'][:dim]):
            if i != flow_direction:
                boundary_data[name] = 0.0
        if diameter is None:
            radius = int(round(min(sh for i, sh in enumerate(boundary_handling.shape) if i != flow_direction) / 2))
        else:
            radius = int(round(diameter / 2))

        if dim == 3:
            normal_coord1 = (flow_direction + 1) % 3
            normal_coord2 = (flow_direction + 2) % 3
            y, z = boundary_data.link_positions(normal_coord1), boundary_data.link_positions(normal_coord2)
            centered_normal1 = y - int(round(boundary_handling.shape[normal_coord1] / 2))
            centered_normal2 = z - int(round(boundary_handling.shape[normal_coord2] / 2))
            dist_to_center = np.sqrt(centered_normal1 ** 2 + centered_normal2 ** 2)
        elif dim == 2:
            normal_coord = (flow_direction + 1) % 2
            centered_normal = boundary_data.link_positions(normal_coord) - radius
            dist_to_center = np.sqrt(centered_normal ** 2)
        else:
            raise ValueError("Invalid dimension")

        vel_profile = u_max * (1 - (dist_to_center / radius)**2)
        boundary_data['vel_%d' % (flow_direction,)] = vel_profile

    inflow = UBB(velocity_info_callback, dim=boundary_handling.dim)
    return boundary_handling.set_boundary(inflow, slice_obj=slice_obj, ghost_layers=True)


def add_pipe_walls(boundary_handling, diameter, boundary=NoSlip()):
    """Sets boundary for the wall of a pipe with flow in x direction.

    Args:
        boundary_handling: boundary handling object, works for serial and parallel versions
        diameter: pipe diameter, can be either a constant value or a callback function.
                  the callback function has the signature (x_coord_array, domain_shape_in_cells) and has to return
                  a array of same shape as the received x_coord_array, with the diameter for each x position
        boundary: boundary object that is set at the wall, defaults to NoSlip (bounce back)

    Returns:
        flag used for the wall boundary
    """
    domain_shape = boundary_handling.shape
    dim = len(domain_shape)
    assert dim in (2, 3)

    def callback(*coordinates):
        flow_coord = coordinates[0]

        if callable(diameter):
            flow_coord_line = flow_coord[:, 0, 0] if dim == 3 else flow_coord[:, 0]
            diameter_value = diameter(flow_coord_line, domain_shape)
            diameter_value = diameter_value[:, np.newaxis, np.newaxis] if dim == 3 else diameter_value[:, np.newaxis]
        else:
            diameter_value = diameter

        radius_sq = (diameter_value / 2) ** 2

        mid = [domain_shape[i] // 2 for i in range(1, dim)]
        distance = sum((c_i - mid_i) ** 2 for c_i, mid_i in zip(coordinates[1:], mid))
        return distance > radius_sq

    return boundary_handling.set_boundary(boundary, mask_callback=callback)


def get_pipe_velocity_field(domain_size, u_max, flow_direction=0, diameter=None):
    """Analytic velocity field for 2D or 3D pipe flow.

    Args:
        domain_size: 2-tuple or 3-tuple: shape of the domain
        u_max: velocity in the middle of the pipe
        flow_direction: dimension index, e.g. 0 for a channel that flows along x,
        diameter: pipe/diameter, if None is passed the maximum diameter is assumed

    Returns:
        numpy array with velocity field, velocity values outside the pipe are invalid

    Examples:
        Set up channel flow with analytic solution
        >>> from lbmpy.scenarios import create_channel
        >>> domain_size = (10, 10, 5)
        >>> u_max = 0.05
        >>> initial_velocity = get_pipe_velocity_field(domain_size, u_max)
        >>> scenario = create_channel(domain_size=domain_size, u_max=u_max, relaxation_rate=1.8,
        ...                           initial_velocity=initial_velocity)
    """
    if diameter is None:
        radius = int(round(min(sh for i, sh in enumerate(domain_size) if i != flow_direction) / 2))
    else:
        radius = int(round(diameter / 2))

    params = [np.arange(s) + 0.5 for s in domain_size]
    grid: Tuple[np.array] = np.meshgrid(*params, indexing='ij')

    dist = 0
    for i in range(len(domain_size)):
        if i == flow_direction:
            continue
        center = int(round(domain_size[i] / 2))
        dist += (grid[i] - center) ** 2
    dist = np.sqrt(dist)

    u = np.zeros(tuple(domain_size) + (len(domain_size),))
    u[..., flow_direction] = u_max * (1 - (dist / radius) ** 2)
    return u


def add_black_and_white_image(boundary_handling, image_file, target_slice=None, plane=(0, 1), boundary=NoSlip(),
                              keep_aspect_ratio=False):
    """Sets boundary from a black and white image, setting boundary where image is black.

    For 3D domains the image is extruded along a coordinate. Requires scipy for image resizing.

    Args:
        boundary_handling: boundary handling object
        image_file: path to image file
        target_slice: rectangular sub-region where the image should be set, or None for everywhere
        plane: 2-tuple with coordinate indices, (0, 1) means that the image is mapped into the x-y plane and
               extruded in z direction
        boundary: boundary object to set, where the image has black pixels
        keep_aspect_ratio: by default the image is reshaped to match the target_slice. If this parameter is True the
                           aspect ratio is kept, effectively making the target_slice smaller

    Returns:
        numpy array with velocity field, velocity values outside the pipe are invalid
    """
    from scipy.ndimage import zoom

    domain_size = boundary_handling.shape
    if target_slice is None:
        target_slice = [slice(None, None, None)] * len(domain_size)

    dim = boundary_handling.dim

    image_slice = normalize_slice(target_slice, domain_size)
    target_size = [image_slice[i].stop - image_slice[i].start for i in plane]

    img_arr = _read_image(image_file, flatten=True).astype(int)
    img_arr = np.rot90(img_arr, 3)

    zoom_factor = [target_size[i] / img_arr.shape[i] for i in range(2)]
    if keep_aspect_ratio:
        zoom_factor = min(zoom_factor)
    zoomed_image = zoom(img_arr, zoom_factor, order=0)

    # binarize
    zoomed_image[zoomed_image <= 254] = 0
    zoomed_image[zoomed_image > 254] = 1
    zoomed_image = np.logical_not(zoomed_image.astype(np.bool))

    # resize necessary if aspect ratio should be constant
    if zoomed_image.shape != target_size:
        resized_image = np.zeros(target_size, dtype=np.bool)
        mid = [(ts - s) // 2 for ts, s in zip(target_size, zoomed_image.shape)]
        resized_image[mid[0]:zoomed_image.shape[0] + mid[0], mid[1]:zoomed_image.shape[1] + mid[1]] = zoomed_image
        zoomed_image = resized_image

    def callback(*coordinates):
        result = np.zeros_like(coordinates[0], dtype=np.bool)
        mask_start = [int(coordinates[i][(0,) * dim] - 0.5) for i in range(dim)]
        mask_end = [int(coordinates[i][(-1,) * dim] + 1 - 0.5) for i in range(dim)]

        mask_slice = [slice(start, stop) for start, stop in zip(mask_start, mask_end)]
        intersection_slice = slice_intersection(mask_slice, image_slice)
        if intersection_slice is None:
            return result
        else:
            mask_target_slice = shift_slice(intersection_slice, [-e for e in mask_start])
            image_target_slice = shift_slice(intersection_slice, [-s.start for s in image_slice])
            mask_target_slice = [mask_target_slice[i] if i in plane else slice(None, None) for i in range(dim)]
            image_target_slice = [image_target_slice[i] if i in plane else np.newaxis for i in range(dim)]
            result[mask_target_slice] = zoomed_image[image_target_slice]
            return result

    return boundary_handling.set_boundary(boundary, slice_obj=image_slice, mask_callback=callback,
                                          ghost_layers=False, inner_ghost_layers=True)


def _read_image(path, flatten=False):
    try:
        from PIL import Image
    except ImportError:
        raise ImportError("Image loading failed. Required package 'pillow' is missing")

    im = Image.open(path)
    if flatten:
        im = im.convert('F')
    return np.array(im)