Skip to content
Snippets Groups Projects
Forked from pycodegen / lbmpy
10 commits behind the upstream repository.
test_communication.py 8.01 KiB
import pystencils as ps

import numpy as np

from lbmpy.stencils import LBStencil
from pystencils.slicing import get_slice_before_ghost_layer, get_ghost_region_slice
from lbmpy.creationfunctions import create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.advanced_streaming.communication import (
    get_communication_slices,
    _fix_length_one_slices,
    LBMPeriodicityHandling,
    periodic_pdf_gpu_copy_kernel,
)
from lbmpy.advanced_streaming.utility import streaming_patterns, Timestep
from lbmpy.enums import Stencil

import pytest


@pytest.mark.parametrize(
    "stencil", [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27]
)
@pytest.mark.parametrize("streaming_pattern", streaming_patterns)
@pytest.mark.parametrize("timestep", [Timestep.EVEN, Timestep.ODD])
def test_slices_not_empty(stencil, streaming_pattern, timestep):
    stencil = LBStencil(stencil)
    arr = np.zeros((4,) * stencil.D + (stencil.Q,))
    slices = get_communication_slices(
        stencil,
        streaming_pattern=streaming_pattern,
        prev_timestep=timestep,
        ghost_layers=1,
    )
    for _, slices_list in slices.items():
        for src, dst in slices_list:
            assert all(s != 0 for s in arr[src].shape)
            assert all(s != 0 for s in arr[dst].shape)


@pytest.mark.parametrize(
    "stencil", [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27]
)
@pytest.mark.parametrize("streaming_pattern", streaming_patterns)
@pytest.mark.parametrize("timestep", [Timestep.EVEN, Timestep.ODD])
def test_src_dst_same_shape(stencil, streaming_pattern, timestep):
    stencil = LBStencil(stencil)
    arr = np.zeros((4,) * stencil.D + (stencil.Q,))
    slices = get_communication_slices(
        stencil,
        streaming_pattern=streaming_pattern,
        prev_timestep=timestep,
        ghost_layers=1,
    )
    for _, slices_list in slices.items():
        for src, dst in slices_list:
            src_shape = arr[src].shape
            dst_shape = arr[dst].shape
            assert src_shape == dst_shape


@pytest.mark.parametrize(
    "stencil", [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27]
)
def test_pull_communication_slices(stencil):
    stencil = LBStencil(stencil)

    slices = get_communication_slices(
        stencil, streaming_pattern="pull", prev_timestep=Timestep.BOTH, ghost_layers=1
    )
    for i, d in enumerate(stencil):
        if i == 0:
            continue

        for s in slices[d]:
            if s[0][-1] == i:
                src = s[0][:-1]
                dst = s[1][:-1]
                break

        inner_slice = _fix_length_one_slices(
            get_slice_before_ghost_layer(d, ghost_layers=1)
        )
        inv_dir = (-e for e in d)
        gl_slice = _fix_length_one_slices(
            get_ghost_region_slice(inv_dir, ghost_layers=1)
        )
        assert src == inner_slice
        assert dst == gl_slice


@pytest.mark.parametrize("direction", LBStencil(Stencil.D3Q27).stencil_entries)
@pytest.mark.parametrize("pull", [False, True])
def test_gpu_comm_kernels(direction: tuple, pull: bool):
    pytest.importorskip("cupy")

    stencil = LBStencil(Stencil.D3Q27)
    inv_dir = stencil[stencil.inverse_index(direction)]
    target = ps.Target.GPU

    domain_size = (4,) * stencil.D

    dh: ps.datahandling.SerialDataHandling = ps.create_data_handling(
        domain_size,
        periodicity=(True,) * stencil.D,
        parallel=False,
        default_target=target,
    )

    field = dh.add_array("field", values_per_cell=2)

    if pull:
        dst_slice = get_ghost_region_slice(inv_dir)
        src_slice = get_slice_before_ghost_layer(direction)
    else:
        dst_slice = get_slice_before_ghost_layer(direction)
        src_slice = get_ghost_region_slice(inv_dir)

    src_slice += (1,)
    dst_slice += (1,)

    kernel = periodic_pdf_gpu_copy_kernel(field, src_slice, dst_slice)

    dh.cpu_arrays[field.name][src_slice] = 42.0
    dh.all_to_gpu()

    dh.run_kernel(kernel)

    dh.all_to_cpu()
    np.testing.assert_equal(dh.cpu_arrays[field.name][dst_slice], 42.0)


@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize("streaming_pattern", streaming_patterns)
def test_direct_copy_and_kernels_equivalence(stencil: Stencil, streaming_pattern: str):
    pytest.importorskip("cupy")

    target = ps.Target.GPU
    stencil = LBStencil(stencil)
    domain_size = (4,) * stencil.D

    dh: ps.datahandling.SerialDataHandling = ps.create_data_handling(
        domain_size,
        periodicity=(True,) * stencil.D,
        parallel=False,
        default_target=target,
    )

    pdfs_a = dh.add_array("pdfs_a", values_per_cell=stencil.Q)
    pdfs_b = dh.add_array("pdfs_b", values_per_cell=stencil.Q)

    dh.fill(pdfs_a.name, 0.0, ghost_layers=True)
    dh.fill(pdfs_b.name, 0.0, ghost_layers=True)

    for q in range(stencil.Q):
        sl = ps.make_slice[:4, :4, q] if stencil.D == 2 else ps.make_slice[:4, :4, :4, q]
        dh.cpu_arrays[pdfs_a.name][sl] = q
        dh.cpu_arrays[pdfs_b.name][sl] = q

    dh.all_to_gpu()

    direct_copy = LBMPeriodicityHandling(stencil, dh, pdfs_a.name, streaming_pattern, cupy_direct_copy=True)
    kernels_copy = LBMPeriodicityHandling(stencil, dh, pdfs_b.name, streaming_pattern, cupy_direct_copy=False)

    direct_copy(Timestep.EVEN)
    kernels_copy(Timestep.EVEN)

    dh.all_to_cpu()

    np.testing.assert_equal(
        dh.cpu_arrays[pdfs_a.name],
        dh.cpu_arrays[pdfs_b.name]
    )


@pytest.mark.parametrize(
    "stencil_name", [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27]
)
def test_optimised_and_full_communication_equivalence(stencil_name):
    target = ps.Target.CPU
    stencil = LBStencil(stencil_name)
    domain_size = (4,) * stencil.D

    dh = ps.create_data_handling(
        domain_size,
        periodicity=(True,) * stencil.D,
        parallel=False,
        default_target=target,
    )

    pdf = dh.add_array("pdf", values_per_cell=len(stencil), dtype=np.int64)
    dh.fill("pdf", 0, ghost_layers=True)
    pdf_tmp = dh.add_array("pdf_tmp", values_per_cell=len(stencil), dtype=np.int64)
    dh.fill("pdf_tmp", 0, ghost_layers=True)

    gl = dh.ghost_layers_of_field("pdf")

    num = 0
    for idx, x in np.ndenumerate(dh.cpu_arrays["pdf"]):
        dh.cpu_arrays["pdf"][idx] = num
        dh.cpu_arrays["pdf_tmp"][idx] = num
        num += 1

    lbm_config = LBMConfig(stencil=stencil, kernel_type="stream_pull_only")
    lbm_opt = LBMOptimisation(symbolic_field=pdf, symbolic_temporary_field=pdf_tmp)
    config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=True)

    ac = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
    ast = ps.create_kernel(ac, config=config)
    stream = ast.compile()

    full_communication = dh.synchronization_function(
        pdf.name, target=dh.default_target, optimization={"openmp": True}
    )
    full_communication()

    dh.run_kernel(stream)
    dh.swap("pdf", "pdf_tmp")
    pdf_full_communication = np.copy(dh.cpu_arrays["pdf"])

    num = 0
    for idx, x in np.ndenumerate(dh.cpu_arrays["pdf"]):
        dh.cpu_arrays["pdf"][idx] = num
        dh.cpu_arrays["pdf_tmp"][idx] = num
        num += 1

    optimised_communication = LBMPeriodicityHandling(
        stencil=stencil,
        data_handling=dh,
        pdf_field_name=pdf.name,
        streaming_pattern="pull",
    )
    optimised_communication()
    dh.run_kernel(stream)
    dh.swap("pdf", "pdf_tmp")

    if stencil.D == 3:
        for i in range(gl, domain_size[0]):
            for j in range(gl, domain_size[1]):
                for k in range(gl, domain_size[2]):
                    for f in range(len(stencil)):
                        assert (
                            dh.cpu_arrays["pdf"][i, j, k, f]
                            == pdf_full_communication[i, j, k, f]
                        ), print(f)
    else:
        for i in range(gl, domain_size[0]):
            for j in range(gl, domain_size[1]):
                for f in range(len(stencil)):
                    assert (
                        dh.cpu_arrays["pdf"][i, j, f] == pdf_full_communication[i, j, f]
                    )