Skip to content
Snippets Groups Projects
Forked from pycodegen / lbmpy
96 commits behind, 1 commit ahead of the upstream repository.
test_shear_flow.py 7.18 KiB
"""
Test shear flow velocity and pressureagainst analytical solutions
"""


import numpy as np
import pytest
import sympy as sp

from lbmpy.boundaries import UBB
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.creationfunctions import create_lb_update_rule, create_stream_pull_with_output_kernel,\
    LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from lbmpy.relaxationrates import relaxation_rate_from_lattice_viscosity
from lbmpy.stencils import LBStencil

import pystencils as ps


def shear_flow(x, t, nu, v, h, k_max):
    """
    Analytical solution for driven shear flow between two plates.

    Parameters
    ----------
    x : :obj:`float`
        Position from the left plane.
    t : :obj:`float`
        Time since start of the shearing.
    nu : :obj:`float`
        Kinematic viscosity.
    v : :obj:`float`
        Shear rate.
    h : :obj:`float`
        Distance between the plates.
    k_max : :obj:`int`
        Maximum considered wave number.

    Returns
    -------
    :obj:`double` : Analytical velocity

    """

    u = x / h - 0.5
    for k in np.arange(1, k_max + 1):
        u += 1.0 / (np.pi * k) * np.exp(
            -4 * np.pi ** 2 * nu * k ** 2 / h ** 2 * t) * np.sin(2 * np.pi / h * k * x)
    return -v * u


rho_0 = 2.2  # density
eta = 1.6  # kinematic viscosity
width = 40  # of box
wall_thickness = 2
actual_width = width - wall_thickness  # subtract boundary layer from box width
shear_velocity = 0.2  # scale by width to keep stable
t_max = 2000


@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('stencil_name', (Stencil.D2Q9, Stencil.D3Q19))
@pytest.mark.parametrize('zero_centered', [True, False])
def test_shear_flow(target, stencil_name, zero_centered):

    # Cuda
    if target == ps.Target.GPU:
        pytest.importorskip("pycuda")

    # LB parameters
    stencil = LBStencil(stencil_name)

    if stencil.D == 2:
        L = (4, width)
    elif stencil.D == 3:
        L = (4, width, 4)
    else:
        raise Exception()
    periodicity = [True, False] + [True] * (stencil.D - 2)

    omega = relaxation_rate_from_lattice_viscosity(eta)

    # ## Data structures
    dh = ps.create_data_handling(L, periodicity=periodicity, default_target=target)

    src = dh.add_array('src', values_per_cell=stencil.Q)
    dst = dh.add_array_like('dst', 'src')
    ρ = dh.add_array('rho', latex_name='\\rho', values_per_cell=1)
    u = dh.add_array('u', values_per_cell=stencil.D)
    p = dh.add_array('p', values_per_cell=stencil.D**2)

    # LB Setup
    lbm_config = LBMConfig(stencil=stencil, relaxation_rate=omega, method=Method.TRT,
                           compressible=True, zero_centered=zero_centered,
                           kernel_type='collide_only')
    lbm_opt = LBMOptimisation(symbolic_field=src)
    collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)

    stream = create_stream_pull_with_output_kernel(collision.method, src, dst, {'velocity': u})
    config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)

    stream_kernel = ps.create_kernel(stream, config=config).compile()
    collision_kernel = ps.create_kernel(collision, config=config).compile()

    # Boundaries
    lbbh = LatticeBoltzmannBoundaryHandling(collision.method, dh, src.name, target=dh.default_target)

    # Second moment test setup
    cqc = collision.method.conserved_quantity_computation
    getter_eqs = cqc.output_equations_from_pdfs(src.center_vector,
                                                {'moment2': p})

    kernel_compute_p = ps.create_kernel(getter_eqs, config=config).compile()

    # ## Set up the simulation

    init = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
                                     pdfs=src.center_vector, density=ρ.center)
    init_kernel = ps.create_kernel(init, ghost_layers=0).compile()

    vel_vec = sp.Matrix([0.5 * shear_velocity] + [0] * (stencil.D - 1))
    if stencil.D == 2:
        lbbh.set_boundary(UBB(velocity=vel_vec), ps.make_slice[:, :wall_thickness])
        lbbh.set_boundary(UBB(velocity=-vel_vec), ps.make_slice[:, -wall_thickness:])
    elif stencil.D == 3:
        lbbh.set_boundary(UBB(velocity=vel_vec), ps.make_slice[:, :wall_thickness, :])
        lbbh.set_boundary(UBB(velocity=-vel_vec), ps.make_slice[:, -wall_thickness:, :])
    else:
        raise Exception()

    for bh in lbbh, :
        assert len(bh._boundary_object_to_boundary_info) == 2, "Restart kernel to clear boundaries"

    def init():
        dh.fill(ρ.name, rho_0)
        dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
        dh.fill(u.name, 0)

        dh.run_kernel(init_kernel)

    sync_pdfs = dh.synchronization_function([src.name])

    # Time loop
    def time_loop(steps):
        dh.all_to_gpu()
        for i in range(steps):
            dh.run_kernel(collision_kernel)
            sync_pdfs()
            lbbh()
            dh.run_kernel(stream_kernel)
            dh.run_kernel(kernel_compute_p)

            dh.swap(src.name, dst.name)

        if u.name in dh.gpu_arrays:
            dh.to_cpu(u.name)
        uu = dh.gather_array(u.name)
        # average periodic directions
        if stencil.D == 3:  # dont' swap order
            uu = np.average(uu, axis=2)
        uu = np.average(uu, axis=0)

        if p.name in dh.gpu_arrays:
            dh.to_cpu(p.name)
        pp = dh.gather_array(p.name)
        # average periodic directions
        if stencil.D == 3:  # dont' swap order
            pp = np.average(pp, axis=2)
        pp = np.average(pp, axis=0)

        # cut off wall regions
        uu = uu[wall_thickness:-wall_thickness]
        pp = pp[wall_thickness:-wall_thickness]

        if stencil.D == 2:
            pp = pp.reshape((len(pp), 2, 2))
        if stencil.D == 3:
            pp = pp.reshape((len(pp), 3, 3))
        return uu, pp

    init()
    # Simulation
    profile, pressure_profile = time_loop(t_max)

    expected = shear_flow(x=(np.arange(0, actual_width) + .5),
                          t=t_max,
                          nu=eta / rho_0,
                          v=shear_velocity,
                          h=actual_width,
                          k_max=100)

    if stencil.D == 2:
        shear_direction = np.array((1, 0), dtype=float)
        shear_plane_normal = np.array((0, 1), dtype=float)
    if stencil.D == 3:
        shear_direction = np.array((1, 0, 0), dtype=float)
        shear_plane_normal = np.array((0, 1, 0), dtype=float)

    shear_rate = shear_velocity / actual_width
    dynamic_viscosity = eta * rho_0
    correction_factor = eta / (eta - 1. / 6.)

    p_expected = rho_0 * np.identity(dh.dim) / 3.0 + dynamic_viscosity * shear_rate / correction_factor * (
        np.outer(shear_plane_normal, shear_direction) + np.transpose(np.outer(shear_plane_normal, shear_direction)))

    # Sustract the tensorproduct of the velosity to get the pressure
    pressure_profile[:, 0, 0] -= rho_0 * profile[:, 0]**2
    
    np.testing.assert_allclose(profile[:, 0], expected[1:-1], atol=1E-9)
    for i in range(actual_width - 2):
        np.testing.assert_allclose(pressure_profile[i], p_expected, atol=1E-9, rtol=1E-3)