Skip to content
Snippets Groups Projects
Select Git revision
  • 4857c7903d9e9742110756d5ab1b68239d5fc15c
  • master default protected
  • rangersbach/c-interfacing
  • fhennig/devel
  • v0.1a4
  • v0.1a3
  • v0.1a2
  • v0.1a1
8 results

std_mdspan.py

Blame
  • test_datahandling.py 13.15 KiB
    import os
    from tempfile import TemporaryDirectory
    from pathlib import Path
    
    import numpy as np
    
    import pystencils as ps
    from pystencils import create_data_handling, create_kernel
    from pystencils.gpu.gpu_array_handler import GPUArrayHandler
    from pystencils.enums import Target
    
    try:
        import pytest
    except ImportError:
        import unittest.mock
        pytest = unittest.mock.MagicMock()
    
    try:
        import cupy.cuda.runtime
        device_numbers = range(cupy.cuda.runtime.getDeviceCount())
    except ImportError:
        device_numbers = []
    
    SCRIPT_FOLDER = Path(__file__).parent.absolute()
    INPUT_FOLDER = SCRIPT_FOLDER / "test_data"
    
    
    def basic_iteration(dh):
        dh.add_array('basic_iter_test_gl_default')
        dh.add_array('basic_iter_test_gl_3', ghost_layers=3)
    
        for b in dh.iterate():
            assert b.shape == b['basic_iter_test_gl_3'].shape
            assert b.shape == b['basic_iter_test_gl_default'].shape
    
    
    def access_and_gather(dh, domain_size):
        dh.add_array('f1', dtype=np.dtype(np.int8))
        dh.add_array_like('f2', 'f1')
        dh.add_array('v1', values_per_cell=3, dtype=np.int64, ghost_layers=2)
        dh.add_array_like('v2', 'v1')
    
        dh.swap('f1', 'f2')
        dh.swap('v1', 'v2')
    
        # Check symbolic field properties
        assert dh.fields.f1.index_dimensions == 0
        assert dh.fields.f1.spatial_dimensions == len(domain_size)
        assert dh.fields.f1.dtype.numpy_dtype == np.int8
    
        assert dh.fields.v1.index_dimensions == 1
        assert dh.fields.v1.spatial_dimensions == len(domain_size)
        assert dh.fields.v1.dtype.numpy_dtype == np.int64
    
        for b in dh.iterate(ghost_layers=0):
            val = sum(b.cell_index_arrays)
            np.copyto(b['f1'], val)
            for i, coord_arr in enumerate(b.cell_index_arrays):
                np.copyto(b['v1'][..., i], coord_arr)
    
        full_arr = dh.gather_array('v1')
        if full_arr is not None:
            expected_shape = domain_size + (3,)
            assert full_arr.shape == expected_shape
            for x in range(full_arr.shape[0]):
                for y in range(full_arr.shape[1]):
                    if len(domain_size) == 3:
                        for z in range(full_arr.shape[2]):
                            assert full_arr[x, y, z, 0] == x
                            assert full_arr[x, y, z, 1] == y
                            assert full_arr[x, y, z, 2] == z
                    else:
                        assert len(domain_size) == 2
                        assert full_arr[x, y, 0] == x
                        assert full_arr[x, y, 1] == y
    
        full_arr = dh.gather_array('f1')
        if full_arr is not None:
            expected_shape = domain_size
            assert full_arr.shape == expected_shape
            for x in range(full_arr.shape[0]):
                for y in range(full_arr.shape[1]):
                    if len(domain_size) == 3:
                        for z in range(full_arr.shape[2]):
                            assert full_arr[x, y, z] == x + y + z
                    else:
                        assert len(domain_size) == 2
                        assert full_arr[x, y] == x + y
    
    
    def synchronization(dh, test_gpu=False):
        field_name = 'comm_field_test'
        if test_gpu:
            pytest.importorskip("cupy")
            field_name += 'Gpu'
    
        dh.add_array(field_name, ghost_layers=1, dtype=np.int8, cpu=True, gpu=test_gpu)
    
        # initialize everything with 1
        for b in dh.iterate(ghost_layers=1):
            b[field_name].fill(1)
        for b in dh.iterate(ghost_layers=0):
            b[field_name].fill(42)
    
        if test_gpu:
            dh.to_gpu(field_name)
            dh.synchronization_function_gpu(field_name)()
        else:
            dh.synchronization_function_cpu(field_name)()
    
    
        if test_gpu:
            dh.to_cpu(field_name)
    
        for b in dh.iterate(ghost_layers=1):
            np.testing.assert_equal(42, b[field_name])
    
    
    def kernel_execution_jacobi(dh, target):
    
        test_gpu = target == Target.GPU
        dh.add_array('f', gpu=test_gpu)
        dh.add_array('tmp', gpu=test_gpu)
    
        if test_gpu:
            assert dh.is_on_gpu('f')
            assert dh.is_on_gpu('tmp')
    
        stencil_2d = [(1, 0), (-1, 0), (0, 1), (0, -1)]
        stencil_3d = [(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1)]
        stencil = stencil_2d if dh.dim == 2 else stencil_3d
    
        @ps.kernel
        def jacobi():
            dh.fields.tmp.center @= sum(dh.fields.f.neighbors(stencil)) / len(stencil)
    
        kernel = create_kernel(jacobi, config=ps.CreateKernelConfig(target=target)).compile()
        for b in dh.iterate(ghost_layers=1):
            b['f'].fill(42)
        dh.run_kernel(kernel)
        for b in dh.iterate(ghost_layers=0):
            np.testing.assert_equal(b['f'], 42)
    
    
    def vtk_output(dh):
        pytest.importorskip('pyevtk')
        dh.add_array('scalar_field')
        dh.add_array('vector_field', values_per_cell=dh.dim)
        dh.add_array('multiple_scalar_field', values_per_cell=9)
        dh.add_array('flag_field', dtype=np.uint16)
    
        fields_names = ['scalar_field', 'vector_field', 'multiple_scalar_field', 'flag_field']
        with TemporaryDirectory() as tmp_dir:
            writer1 = dh.create_vtk_writer(os.path.join(tmp_dir, "out1"), fields_names, ghost_layers=True)
            writer2 = dh.create_vtk_writer(os.path.join(tmp_dir, "out2"), fields_names, ghost_layers=False)
            masks_to_name = {1: 'flag1', 5: 'some_mask'}
            writer3 = dh.create_vtk_writer_for_flag_array(os.path.join(tmp_dir, "out3"), 'flag_field', masks_to_name)
            writer1(1)
            writer2(1)
            writer3(1)
    
    
    def reduction(dh):
        float_seq = [1.0, 2.0, 3.0]
        int_seq = [1, 2, 3]
        for op in ('min', 'max', 'sum'):
            assert (dh.reduce_float_sequence(float_seq, op) == float_seq).all()
            assert (dh.reduce_int_sequence(int_seq, op) == int_seq).all()
    
    
    def test_symbolic_fields():
        dh = create_data_handling(domain_size=(5, 7))
        dh.add_array('f1', values_per_cell=dh.dim)
        assert dh.fields['f1'].spatial_dimensions == dh.dim
        assert dh.fields['f1'].index_dimensions == 1
    
        dh.add_array_like("f_tmp", "f1", latex_name=r"f_{tmp}")
        assert dh.fields['f_tmp'].spatial_dimensions == dh.dim
        assert dh.fields['f_tmp'].index_dimensions == 1
    
        dh.swap('f1', 'f_tmp')
    
    
    def test_access():
        for domain_shape in [(2, 3, 4), (2, 4)]:
            for f_size in (1, 4):
                dh = create_data_handling(domain_size=domain_shape)
                dh.add_array('f1', values_per_cell=f_size)
                assert dh.dim == len(domain_shape)
    
                for b in dh.iterate(ghost_layers=1):
                    if f_size > 1:
                        assert b['f1'].shape == tuple(ds+2 for ds in domain_shape) + (f_size,)
                    else:
                        assert b['f1'].shape == tuple(ds + 2 for ds in domain_shape)
    
                for b in dh.iterate(ghost_layers=0):
                    if f_size > 1:
                        assert b['f1'].shape == domain_shape + (f_size,)
                    else:
                        assert b['f1'].shape == domain_shape
    
    
    def test_access_and_gather():
        for domain_shape in [(2, 2, 3), (2, 3)]:
            dh = create_data_handling(domain_size=domain_shape, periodicity=True)
            access_and_gather(dh, domain_shape)
            synchronization(dh, test_gpu=False)
            synchronization(dh, test_gpu=True)
    
    
    def test_kernel():
        for domain_shape in [(4, 5), (3, 4, 5)]:
            dh = create_data_handling(domain_size=domain_shape, periodicity=True)
            assert all(dh.periodicity)
            kernel_execution_jacobi(dh, Target.CPU)
            reduction(dh)
    
            try:
                import cupy
                dh = create_data_handling(domain_size=domain_shape, periodicity=True)
                kernel_execution_jacobi(dh, Target.GPU)
            except ImportError:
                pass
    
    
    @pytest.mark.parametrize('target', (Target.CPU, Target.GPU))
    def test_kernel_param(target):
        for domain_shape in [(4, 5), (3, 4, 5)]:
            if target == Target.GPU:
                pytest.importorskip('cupy')
    
            dh = create_data_handling(domain_size=domain_shape, periodicity=True, default_target=target)
            kernel_execution_jacobi(dh, target)
            reduction(dh)
    
    
    def test_vtk_output():
        pytest.importorskip('pyevtk')
        for domain_shape in [(4, 5), (3, 4, 5)]:
            dh = create_data_handling(domain_size=domain_shape, periodicity=True)
            vtk_output(dh)
    
    
    def test_add_arrays():
        domain_shape = (3, 4, 5)
        field_description = 'x, y(9)'
    
        dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=0, default_layout='numpy')
        x_, y_ = dh.add_arrays(field_description)
    
        x, y = ps.fields(field_description + ': [3,4,5]')
    
        assert x_ == x
        assert y_ == y
        assert x == dh.fields['x']
        assert y == dh.fields['y']
    
    
    @pytest.mark.parametrize('shape', [(17, 12), (7, 11, 18)])
    @pytest.mark.parametrize('layout', ['zyxf', 'fzyx'])
    def test_add_arrays_with_layout(shape, layout):
        pytest.importorskip('cupy')
    
        dh = create_data_handling(domain_size=shape, default_layout=layout, default_target=ps.Target.GPU)
        f1 = dh.add_array("f1", values_per_cell=19)
        dh.fill(f1.name, 1.0)
    
        assert dh.cpu_arrays[f1.name].shape == dh.gpu_arrays[f1.name].shape
        assert dh.cpu_arrays[f1.name].strides == dh.gpu_arrays[f1.name].strides
        assert dh.cpu_arrays[f1.name].dtype == dh.gpu_arrays[f1.name].dtype
    
    
    def test_get_kwarg():
        domain_shape = (10, 10)
        field_description = 'src, dst'
    
        dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
        src, dst = dh.add_arrays(field_description)
        dh.fill("src", 1.0, ghost_layers=True)
        dh.fill("dst", 0.0, ghost_layers=True)
    
        with pytest.raises(ValueError):
            dh.add_array('src', values_per_cell=1)
    
        ur = ps.Assignment(src.center, dst.center)
        kernel = ps.create_kernel(ur).compile()
    
        kw = dh.get_kernel_kwargs(kernel)
        assert np.all(kw[0]['src'] == dh.cpu_arrays['src'])
        assert np.all(kw[0]['dst'] == dh.cpu_arrays['dst'])
    
    
    def test_add_custom_data():
        pytest.importorskip('cupy')
        import cupy as cp
    
        def cpu_data_create_func():
            return np.ones((2, 2), dtype=np.float64)
    
        def gpu_data_create_func():
            return cp.zeros((2, 2), dtype=np.float64)
    
        def cpu_to_gpu_transfer_func(gpuarr, cpuarray):
            gpuarr.set(cpuarray)
    
        def gpu_to_cpu_transfer_func(gpuarr, cpuarray):
            cpuarray[:] = gpuarr.get()
    
        dh = create_data_handling(domain_size=(10, 10))
        dh.add_custom_data('custom_data',
                           cpu_data_create_func,
                           gpu_data_create_func,
                           cpu_to_gpu_transfer_func,
                           gpu_to_cpu_transfer_func)
    
        assert np.all(dh.custom_data_cpu['custom_data'] == 1)
        assert np.all(dh.custom_data_gpu['custom_data'].get() == 0)
    
        dh.to_cpu(name='custom_data')
        dh.to_gpu(name='custom_data')
    
        assert 'custom_data' in dh.custom_data_names
    
    
    def test_log():
        dh = create_data_handling(domain_size=(10, 10))
        dh.log_on_root()
        assert dh.is_root
        assert dh.world_rank == 0
    
    
    def test_save_data():
        domain_shape = (2, 2)
    
        dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
        dh.add_array("src", values_per_cell=9)
        dh.fill("src", 1.0, ghost_layers=True)
        dh.add_array("dst", values_per_cell=9)
        dh.fill("dst", 1.0, ghost_layers=True)
    
        dh.save_all(str(INPUT_FOLDER) + '/datahandling_save_test')
    
    
    def test_load_data():
        domain_shape = (2, 2)
    
        dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
        dh.add_array("src", values_per_cell=9)
        dh.fill("src", 0.0, ghost_layers=True)
        dh.add_array("dst", values_per_cell=9)
        dh.fill("dst", 0.0, ghost_layers=True)
    
        dh.load_all(str(INPUT_FOLDER) + '/datahandling_load_test')
        assert np.all(dh.cpu_arrays['src']) == 1
        assert np.all(dh.cpu_arrays['dst']) == 1
    
        domain_shape = (3, 3)
    
        dh = create_data_handling(domain_size=domain_shape, default_ghost_layers=1)
        dh.add_array("src", values_per_cell=9)
        dh.fill("src", 0.0, ghost_layers=True)
        dh.add_array("dst", values_per_cell=9)
        dh.fill("dst", 0.0, ghost_layers=True)
        dh.add_array("dst2", values_per_cell=9)
        dh.fill("dst2", 0.0, ghost_layers=True)
    
        dh.load_all(str(INPUT_FOLDER) + '/datahandling_load_test')
        assert np.all(dh.cpu_arrays['src']) == 0
        assert np.all(dh.cpu_arrays['dst']) == 0
        assert np.all(dh.cpu_arrays['dst2']) == 0
    
    
    @pytest.mark.parametrize("device_number", device_numbers)
    def test_array_handler(device_number):
        size = (2, 2)
        pytest.importorskip('cupy')
        array_handler = GPUArrayHandler(device_number)
    
        zero_array = array_handler.zeros(size)
        cpu_array = np.empty(size)
        array_handler.download(zero_array, cpu_array)
        assert np.all(cpu_array) == 0
    
        ones_array = array_handler.ones(size)
        cpu_array = np.empty(size)
        array_handler.download(ones_array, cpu_array)
        assert np.all(cpu_array) == 1
    
        empty = array_handler.empty(size)
        assert empty.strides == (16, 8)
        empty = array_handler.empty(shape=size, order="F")
        assert empty.strides == (8, 16)
    
        random_array = array_handler.randn(size)
    
        cpu_array = np.empty((20, 40), dtype=np.float64)
        gpu_array = array_handler.to_gpu(cpu_array)
    
        assert cpu_array.base is None
        assert gpu_array.base is None
        assert gpu_array.strides == cpu_array.strides
    
        cpu_array2 = np.empty((20, 40), dtype=np.float64)
        cpu_array2 = cpu_array2.swapaxes(0, 1)
        gpu_array2 = array_handler.to_gpu(cpu_array2)
    
        assert cpu_array2.base is not None
        assert gpu_array2.base is not None
        assert gpu_array2.strides == cpu_array2.strides