Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Sparse
  • WallLaw
  • improved_comm
  • master
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
  • release/0.4.1
  • release/0.4.2
  • release/0.4.3
  • release/0.4.4
  • release/1.0
  • release/1.0.1
  • release/1.1
  • release/1.1.1
  • release/1.2
  • release/1.3
  • release/1.3.1
  • release/1.3.2
  • release/1.3.3
  • release/1.3.4
  • release/1.3.5
  • release/1.3.6
  • release/1.3.7
42 results

Target

Select target project
  • ravi.k.ayyala/lbmpy
  • brendan-waters/lbmpy
  • anirudh.jonnalagadda/lbmpy
  • jbadwaik/lbmpy
  • alexander.reinauer/lbmpy
  • itischler/lbmpy
  • he66coqe/lbmpy
  • ev81oxyl/lbmpy
  • Bindgen/lbmpy
  • da15siwa/lbmpy
  • holzer/lbmpy
  • RudolfWeeber/lbmpy
  • pycodegen/lbmpy
13 results
Select Git revision
  • Zerocentering
  • csebug
  • improved_comm
  • master
  • schiller
  • test_martin
  • tutorial_fixes_new
  • win
  • windows
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
30 results
Show changes
Showing
with 2156 additions and 13 deletions
import pytest
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.moments import is_shear_moment, get_order
from lbmpy.enums import Method, Stencil
from lbmpy.relaxationrates import get_shear_relaxation_rate
from lbmpy.stencils import LBStencil
def test_relaxation_rate():
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q19), method=Method.MRT_RAW,
relaxation_rates=[1 + i / 10 for i in range(19)])
method = create_lb_method(lbm_config=lbm_config)
with pytest.raises(ValueError) as e:
get_shear_relaxation_rate(method)
assert 'Shear moments are relaxed with different relaxation' in str(e.value)
omegas = sp.symbols("omega_:4")
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q19), method=Method.MRT,
relaxation_rates=omegas)
method = create_lb_method(lbm_config=lbm_config)
assert get_shear_relaxation_rate(method) == omegas[0]
@pytest.mark.parametrize("method", [Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
def test_default_mrt_behaviour(method):
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q19), method=method, compressible=True)
method = create_lb_method(lbm_config=lbm_config)
for moment, relax_info in method.relaxation_info_dict.items():
if get_order(moment) <= 1:
assert relax_info.relaxation_rate == 0
elif is_shear_moment(moment, method.dim):
assert relax_info.relaxation_rate == sp.Symbol('omega')
else:
assert relax_info.relaxation_rate == 1
"""
Test shear flow velocity and pressure against analytical solutions
"""
import numpy as np
import pytest
import sympy as sp
from lbmpy.boundaries import UBB
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.creationfunctions import create_lb_update_rule, create_stream_pull_with_output_kernel,\
LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from lbmpy.relaxationrates import relaxation_rate_from_lattice_viscosity
from lbmpy.stencils import LBStencil
import pystencils as ps
def shear_flow(x, t, nu, v, h, k_max):
"""
Analytical solution for driven shear flow between two plates.
Parameters
----------
x : :obj:`float`
Position from the left plane.
t : :obj:`float`
Time since start of the shearing.
nu : :obj:`float`
Kinematic viscosity.
v : :obj:`float`
Shear rate.
h : :obj:`float`
Distance between the plates.
k_max : :obj:`int`
Maximum considered wave number.
Returns
-------
:obj:`double` : Analytical velocity
"""
u = x / h - 0.5
for k in np.arange(1, k_max + 1):
u += 1.0 / (np.pi * k) * np.exp(
-4 * np.pi ** 2 * nu * k ** 2 / h ** 2 * t) * np.sin(2 * np.pi / h * k * x)
return -v * u
rho_0 = 2.2 # density
eta = 1.6 # kinematic viscosity
width = 40 # of box
wall_thickness = 2
actual_width = width - wall_thickness # subtract boundary layer from box width
shear_velocity = 0.2 # scale by width to keep stable
t_max = 2000
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('stencil_name', (Stencil.D2Q9, Stencil.D3Q19))
@pytest.mark.parametrize('zero_centered', [True, False])
def test_shear_flow(target, stencil_name, zero_centered):
# Cuda
if target == ps.Target.GPU:
pytest.importorskip("cupy")
# LB parameters
stencil = LBStencil(stencil_name)
if stencil.D == 2:
L = (4, width)
elif stencil.D == 3:
L = (4, width, 4)
else:
raise Exception()
periodicity = [True, False] + [True] * (stencil.D - 2)
omega = relaxation_rate_from_lattice_viscosity(eta)
# ## Data structures
dh = ps.create_data_handling(L, periodicity=periodicity, default_target=target)
src = dh.add_array('src', values_per_cell=stencil.Q)
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho', latex_name='\\rho', values_per_cell=1)
u = dh.add_array('u', values_per_cell=stencil.D)
p = dh.add_array('p', values_per_cell=stencil.D**2)
# LB Setup
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=omega, method=Method.TRT,
compressible=True, zero_centered=zero_centered,
kernel_type='collide_only')
lbm_opt = LBMOptimisation(symbolic_field=src)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
stream = create_stream_pull_with_output_kernel(collision.method, src, dst, {'velocity': u})
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
# Boundaries
lbbh = LatticeBoltzmannBoundaryHandling(collision.method, dh, src.name, target=dh.default_target)
# Second moment test setup
cqc = collision.method.conserved_quantity_computation
getter_eqs = cqc.output_equations_from_pdfs(src.center_vector,
{'moment2': p})
kernel_compute_p = ps.create_kernel(getter_eqs, config=config).compile()
# ## Set up the simulation
init = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
pdfs=src.center_vector, density=ρ.center)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
vel_vec = sp.Matrix([0.5 * shear_velocity] + [0] * (stencil.D - 1))
if stencil.D == 2:
lbbh.set_boundary(UBB(velocity=vel_vec), ps.make_slice[:, :wall_thickness])
lbbh.set_boundary(UBB(velocity=-vel_vec), ps.make_slice[:, -wall_thickness:])
elif stencil.D == 3:
lbbh.set_boundary(UBB(velocity=vel_vec), ps.make_slice[:, :wall_thickness, :])
lbbh.set_boundary(UBB(velocity=-vel_vec), ps.make_slice[:, -wall_thickness:, :])
else:
raise Exception()
for bh in lbbh, :
assert len(bh._boundary_object_to_boundary_info) == 2, "Restart kernel to clear boundaries"
def init():
dh.fill(ρ.name, rho_0)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0)
dh.run_kernel(init_kernel)
sync_pdfs = dh.synchronization_function([src.name])
# Time loop
def time_loop(steps):
dh.all_to_gpu()
for i in range(steps):
dh.run_kernel(collision_kernel)
sync_pdfs()
lbbh()
dh.run_kernel(stream_kernel)
dh.run_kernel(kernel_compute_p)
dh.swap(src.name, dst.name)
if u.name in dh.gpu_arrays:
dh.to_cpu(u.name)
uu = dh.gather_array(u.name)
# average periodic directions
if stencil.D == 3: # dont' swap order
uu = np.average(uu, axis=2)
uu = np.average(uu, axis=0)
if p.name in dh.gpu_arrays:
dh.to_cpu(p.name)
pp = dh.gather_array(p.name)
# average periodic directions
if stencil.D == 3: # dont' swap order
pp = np.average(pp, axis=2)
pp = np.average(pp, axis=0)
# cut off wall regions
uu = uu[wall_thickness:-wall_thickness]
pp = pp[wall_thickness:-wall_thickness]
if stencil.D == 2:
pp = pp.reshape((len(pp), 2, 2))
if stencil.D == 3:
pp = pp.reshape((len(pp), 3, 3))
return uu, pp
init()
# Simulation
profile, pressure_profile = time_loop(t_max)
expected = shear_flow(x=(np.arange(0, actual_width) + .5),
t=t_max,
nu=eta / rho_0,
v=shear_velocity,
h=actual_width,
k_max=100)
if stencil.D == 2:
shear_direction = np.array((1, 0), dtype=float)
shear_plane_normal = np.array((0, 1), dtype=float)
if stencil.D == 3:
shear_direction = np.array((1, 0, 0), dtype=float)
shear_plane_normal = np.array((0, 1, 0), dtype=float)
shear_rate = shear_velocity / actual_width
dynamic_viscosity = eta * rho_0
correction_factor = eta / (eta - 1. / 6.)
p_expected = rho_0 * np.identity(dh.dim) / 3.0 + dynamic_viscosity * shear_rate / correction_factor * (
np.outer(shear_plane_normal, shear_direction) + np.transpose(np.outer(shear_plane_normal, shear_direction)))
# Subtract the tensor product of the velocity to get the pressure
pressure_profile[:, 0, 0] -= rho_0 * profile[:, 0]**2
np.testing.assert_allclose(profile[:, 0], expected[1:-1], atol=1E-9)
for i in range(actual_width - 2):
np.testing.assert_allclose(pressure_profile[i], p_expected, atol=1E-9, rtol=1E-3)
import sympy as sp
import pytest
from lbmpy.stencils import LBStencil, Stencil
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.equilibrium import default_background_distribution
from lbmpy.moments import get_default_moment_set_for_stencil
from lbmpy.moment_transforms import (
PdfsToMomentsByMatrixTransform, PdfsToMomentsByChimeraTransform,
PdfsToCentralMomentsByMatrix, PdfsToCentralMomentsByShiftMatrix,
FastCentralMomentTransform
)
transforms = [
PdfsToMomentsByMatrixTransform, PdfsToMomentsByChimeraTransform,
PdfsToCentralMomentsByMatrix, PdfsToCentralMomentsByShiftMatrix,
FastCentralMomentTransform
]
def check_shifts(stencil, transform_class):
weights = get_weights(stencil)
bd = default_background_distribution(stencil.D)
rho = bd.density
u = bd.velocity
moments = get_default_moment_set_for_stencil(stencil)
fs = sp.symbols(f'f_:{stencil.Q}')
gs = sp.symbols(f'g_:{stencil.Q}')
transform_unshifted = transform_class(stencil=stencil,
equilibrium_density=rho,
equilibrium_velocity=u,
moment_polynomials=moments)
transform_shifted = transform_class(stencil=stencil,
equilibrium_density=rho,
equilibrium_velocity=u,
moment_polynomials=moments,
background_distribution=bd)
# Test forward transforms
fw_unshifted = transform_unshifted.forward_transform(fs).new_without_subexpressions()
fw_shifted = transform_shifted.forward_transform(gs).new_without_subexpressions()
fw_delta = [(a.rhs - b.rhs).expand() for a, b in zip(fw_unshifted, fw_shifted)]
fw_subs = {f: w for f, w in zip(fs, weights)}
fw_subs.update({g: sp.Integer(0) for g in gs})
fw_delta = [eq.subs(fw_subs).expand() for eq in fw_delta]
for i, eq in enumerate(fw_delta):
assert eq == sp.Integer(0), f"Error at index {i}"
# Test backward transforms
bw_unshifted = transform_unshifted.backward_transform(fs).new_without_subexpressions()
bw_shifted = transform_shifted.backward_transform(fs).new_without_subexpressions()
bw_delta = [(a.rhs - b.rhs).expand() for a, b in zip(bw_unshifted, bw_shifted)]
assert bw_delta == weights
@pytest.mark.parametrize('stencil', [LBStencil(Stencil.D2Q9)])
@pytest.mark.parametrize('transform_class', transforms)
def test_shifted_transform_fast(stencil, transform_class):
check_shifts(stencil, transform_class)
@pytest.mark.longrun
@pytest.mark.parametrize('stencil', [LBStencil(Stencil.D3Q19), LBStencil(Stencil.D3Q27)])
@pytest.mark.parametrize('transform_class', transforms)
def test_shifted_transform_long(stencil, transform_class):
check_shifts(stencil, transform_class)
import numpy as np
from pystencils import Target, CreateKernelConfig
from lbmpy.creationfunctions import create_lb_function, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.stencils import LBStencil
import pytest
@pytest.mark.parametrize('target', [Target.CPU, Target.GPU])
@pytest.mark.parametrize('method', [Method.SRT, Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize('compressible', [False, True])
@pytest.mark.parametrize('delta_equilibrium', [False, True])
def test_simple_equilibrium_conservation(target, method, compressible, delta_equilibrium):
if target == Target.GPU:
pytest.importorskip("cupy")
if method == Method.SRT and not delta_equilibrium:
pytest.skip()
if method == Method.CUMULANT and (not compressible or delta_equilibrium):
pytest.skip()
src = np.zeros((3, 3, 9))
dst = np.zeros_like(src)
config = CreateKernelConfig(target=target)
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D2Q9), method=method,
relaxation_rate=1.8, compressible=compressible,
zero_centered=True, delta_equilibrium=delta_equilibrium)
func = create_lb_function(lbm_config=lbm_config, config=config)
if target == Target.GPU:
import cupy
gpu_src, gpu_dst = cupy.asarray(src), cupy.asarray(dst)
func(src=gpu_src, dst=gpu_dst)
src[:] = gpu_src.get()
dst[:] = gpu_dst.get()
else:
func(src=src, dst=dst)
np.testing.assert_allclose(np.sum(np.abs(dst)), 0.0, atol=1e-13)
import sympy as sp
from pystencils.simp import insert_constants
from lbmpy import create_lb_collision_rule, LBMConfig, LBStencil, Stencil, Method, SubgridScaleModel
def test_smagorinsky_with_constant_omega():
stencil = LBStencil(Stencil.D2Q9)
config = LBMConfig(stencil=stencil, method=Method.SRT, subgrid_scale_model=SubgridScaleModel.SMAGORINSKY,
relaxation_rate=sp.Symbol("omega"))
collision_rule = create_lb_collision_rule(lbm_config=config)
config = LBMConfig(stencil=stencil, method=Method.SRT, subgrid_scale_model=SubgridScaleModel.SMAGORINSKY,
relaxation_rate=1.5)
collision_rule2 = create_lb_collision_rule(lbm_config=config)
collision_rule = collision_rule.subs({sp.Symbol("omega"): 1.5})
collision_rule = insert_constants(collision_rule)
assert collision_rule == collision_rule2
%% Cell type:code id: tags:
``` python
from lbmpy.session import *
from lbmpy.sparse import *
from pystencils.field import FieldType
import pystencils as ps
```
%% Cell type:code id: tags:
``` python
try:
import cupy
except ImportError:
cupy = None
target = ps.Target.CPU
print('No cupy installed')
if cupy:
target = ps.Target.GPU
```
%% Output
No cupy installed
%% Cell type:markdown id: tags:
# Sparse (list based) LBM
%% Cell type:code id: tags:
``` python
domain_size = (20, 20)
omega = 1.8
target = ps.Target.CPU
ghost_layers = 1
arr_size = tuple(ds + 2 * ghost_layers for ds in domain_size)
lid_velocity = 0.01
force = 1e-6
channel = True
if channel:
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D2Q9), relaxation_rate=omega,
compressible=False, force=(force, 0))
else:
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D2Q9), relaxation_rate=omega, compressible=False)
method = create_lb_method(lbm_config=lbm_config)
```
%% Cell type:code id: tags:
``` python
ubb = UBB( (lid_velocity, 0) )
noslip = NoSlip()
flags = {
'fluid': 1,
noslip: 2,
ubb: 4,
}
flag_arr = np.zeros(arr_size, dtype=np.uint16)
flag_arr.fill(flags['fluid'])
if channel:
flag_arr[0, :] = 0
flag_arr[-1, :] = 0
flag_arr[:, 0] = flags[noslip]
flag_arr[:, -1] = flags[noslip]
else:
flag_arr[:, -1] = flags[ubb]
flag_arr[:, 0] = flags[noslip]
flag_arr[0, :] = flags[noslip]
flag_arr[-1, :] = flags[noslip]
plt.scalar_field(flag_arr)
plt.colorbar();
```
%% Output
%% Cell type:markdown id: tags:
### Mappings
%% Cell type:code id: tags:
``` python
mapping = SparseLbMapper(method.stencil, flag_arr, flags['fluid'], flags[noslip], flags[ubb])
index_arr = mapping.create_index_array(ghost_layers)
# Arrays
#index_arr = index_arr_linear.reshape([len(method.stencil), mapping.num_fluid_cells])
#index_arr = index_arr.swapaxes(0, 1)
pdf_arr = np.empty((len(mapping), len(method.stencil)), order='f')
pdf_arr_tmp = np.empty_like(pdf_arr)
vel_arr = np.ones([mapping.num_fluid_cells, method.dim], order='f')
```
%% Cell type:code id: tags:
``` python
pdf_field, pdf_field_tmp, vel_field = ps.fields("f(9), d(9), u(2): [1D]",
#f=pdf_arr[:mapping.num_fluid_cells],
#d=pdf_arr_tmp[:mapping.num_fluid_cells],
#u=vel_arr
)
pdf_field.field_type = FieldType.CUSTOM
pdf_field.pdf_field_tmp = FieldType.CUSTOM
```
%% Cell type:markdown id: tags:
### Macroscopic quantities
%% Cell type:code id: tags:
``` python
cqc = method.conserved_quantity_computation
inp_eqs = cqc.equilibrium_input_equations_from_init_values(force_substitution=False)
setter_eqs = method.get_equilibrium(conserved_quantity_equations=inp_eqs)
setter_eqs = setter_eqs.new_with_substitutions({sym: pdf_field(i)
for i, sym in enumerate(method.post_collision_pdf_symbols)})
config = ps.CreateKernelConfig(ghost_layers=((0, 0),))
kernel_initialize = ps.create_kernel(setter_eqs, config=config).compile()
def init():
kernel_initialize(f=pdf_arr[:mapping.num_fluid_cells])
init()
getter_eqs = cqc.output_equations_from_pdfs(pdf_field.center_vector,
{'velocity': vel_field})
kernel_compute_u = ps.create_kernel(getter_eqs, config=config).compile()
def get_velocity(arr=pdf_arr):
fluid_cell_arr = mapping.coordinates
kernel_compute_u(f=pdf_arr[:mapping.num_fluid_cells], u=vel_arr)
full_velocity_arr = np.zeros(flag_arr.shape + (2,), dtype=np.float64)
full_velocity_arr.fill(np.nan)
arr = fluid_cell_arr[:mapping.num_fluid_cells]
full_velocity_arr[arr['x'], arr['y']] = vel_arr
return full_velocity_arr[1:-1, 1:-1]
```
%% Cell type:markdown id: tags:
### Stream Collide Kernel
%% Cell type:code id: tags:
``` python
#index_field = ps.Field.create_from_numpy_array("idx", index_arr, index_dimensions=1)
index_field = ps.Field.create_generic("idx", spatial_dimensions=1, index_dimensions=1, dtype=index_arr.dtype)
collision_rule = method.get_collision_rule()
Q = len(method.stencil)
symbol_subs = {sym: pdf_field.absolute_access((index_field(i-1),),())
for i, sym in enumerate(method.pre_collision_pdf_symbols)}
symbol_subs.update({sym: pdf_field_tmp(i) for i, sym in enumerate(method.post_collision_pdf_symbols)})
symbol_subs[method.pre_collision_pdf_symbols[0]] = pdf_field(0) # special case for center
symbol_subs
```
%% Output
$\displaystyle \left\{ d_{0} : {d}_{0}^{0}, \ d_{1} : {d}_{0}^{1}, \ d_{2} : {d}_{0}^{2}, \ d_{3} : {d}_{0}^{3}, \ d_{4} : {d}_{0}^{4}, \ d_{5} : {d}_{0}^{5}, \ d_{6} : {d}_{0}^{6}, \ d_{7} : {d}_{0}^{7}, \ d_{8} : {d}_{0}^{8}, \ f_{0} : {f}_{0}^{0}, \ f_{1} : {f}_{\mathbf{idx}_{0}^{0}}^{0}, \ f_{2} : {f}_{\mathbf{idx}_{0}^{1}}^{0}, \ f_{3} : {f}_{\mathbf{idx}_{0}^{2}}^{0}, \ f_{4} : {f}_{\mathbf{idx}_{0}^{3}}^{0}, \ f_{5} : {f}_{\mathbf{idx}_{0}^{4}}^{0}, \ f_{6} : {f}_{\mathbf{idx}_{0}^{5}}^{0}, \ f_{7} : {f}_{\mathbf{idx}_{0}^{6}}^{0}, \ f_{8} : {f}_{\mathbf{idx}_{0}^{7}}^{0}\right\}$
{d₀: d_C__0, d₁: d_C__1, d₂: d_C__2, d₃: d_C__3, d₄: d_C__4, d₅: d_C__5, d₆: d
_C__6, d₇: d_C__7, d₈: d_C__8, f₀: f_C__0, f₁: f_90e8a363d81d, f₂: f_84cf209b1
9e8, f₃: f_20435f2dfd8e, f₄: f_1b97e5aad5c8, f₅: f_b6addb4a5f44, f₆: f_bb89351
3e9ec, f₇: f_7d89efe28aa0, f₈: f_e8b951c1cd37}
%% Cell type:code id: tags:
``` python
collision_rule = method.get_collision_rule()
update_rule = collision_rule.new_with_substitutions(symbol_subs)
from lbmpy._compat import IS_PYSTENCILS_2
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig(ghost_layers=((0, 0),), target=target, index_dtype="uint32")
else:
config = ps.CreateKernelConfig(ghost_layers=((0, 0),), target=target)
kernel_stream_collide = ps.create_kernel(update_rule, config=config).compile()
```
%% Cell type:markdown id: tags:
### Boundary Kernels
%% Cell type:code id: tags:
``` python
if not channel:
if target == ps.Target.GPU:
raise NotImplementedError("UBB on GPU not working yet")
ubb_mapper = SparseLbBoundaryMapper(ubb, method, pdf_field)
#TODO the following line is wrong: kernel contains accesses to index_field and pdf_field which have
#different size: a correct kernel comes out when by change the shape is taken from index field,
# when taken from pdf field, a wrong kernel is generated
ubb_kernel = ps.create_kernel(ubb_mapper.assignments(), ghost_layers=0).compile()
ubb_idx_arr = ubb_mapper.create_index_arr(mapping, flags[ubb])
ps.show_code(ubb_kernel.ast)
def handle_ubb():
ubb_kernel(indexField=ubb_idx_arr, f=pdf_arr[:mapping.num_fluid_cells])
else:
def handle_ubb():
pass
```
%% Cell type:markdown id: tags:
### Time Loop
%% Cell type:code id: tags:
``` python
def time_step():
global pdf_arr, pdf_arr_tmp, index_arr
handle_ubb()
if target == ps.Target.GPU:
gpu_pdf_arr = cupy.asarray(pdf_arr)
gpu_pdf_arr_tmp = cupy.asarray(pdf_arr_tmp)
gpu_index_arr = cupy.asarray(index_arr)
kernel_stream_collide(f=gpu_pdf_arr[:mapping.num_fluid_cells],
d=gpu_pdf_arr_tmp[:mapping.num_fluid_cells],
idx=gpu_index_arr)
pdf_arr[:] = gpu_pdf_arr.get()
pdf_arr_tmp[:] = gpu_pdf_arr_tmp.get()
index_arr[:] = gpu_index_arr.get()
else:
kernel_stream_collide(f=pdf_arr[:mapping.num_fluid_cells],
d=pdf_arr_tmp[:mapping.num_fluid_cells],
idx=index_arr)
pdf_arr_tmp, pdf_arr = pdf_arr, pdf_arr_tmp
def run(steps=100):
for t in range(steps):
time_step()
return get_velocity()
```
%% Cell type:code id: tags:
``` python
init()
```
%% Cell type:code id: tags:
``` python
init()
result = run(100)
plt.vector_field(result, step=1);
```
%% Output
%% Cell type:markdown id: tags:
### Check against reference
%% Cell type:code id: tags:
``` python
if channel:
reference = create_channel(domain_size, force=force, lb_method=method)
else:
reference = create_lid_driven_cavity(domain_size, relaxation_rate=omega, lid_velocity=lid_velocity,
compressible=False)
reference.run(100)
```
%% Cell type:code id: tags:
``` python
np.testing.assert_almost_equal(reference.velocity[:, :], result)
```
import numpy as np
import pytest
from lbmpy.creationfunctions import create_lb_ast, LBMConfig, LBMOptimisation
from lbmpy.enums import ForceModel, Method, Stencil
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
from pystencils.sympyextensions import count_operations_in_ast
from sympy.core.cache import clear_cache
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('method', [Method.SRT, Method.TRT])
@pytest.mark.xfail(IS_PYSTENCILS_2, reason="Loop splitting is not available yet")
def test_split_number_of_operations(stencil, compressible, method):
# For the following configurations the number of operations for splitted and un-splitted version are
# exactly equal. This is not true for D3Q15 and D3Q27 because some sub-expressions are computed in multiple
# splitted, inner loops.
lbm_config = LBMConfig(stencil=LBStencil(stencil), method=method, compressible=compressible,
force_model=ForceModel.LUO, force=(1e-6, 1e-5, 1e-7))
lbm_opt_split = LBMOptimisation(split=True)
lbm_opt = LBMOptimisation(split=False)
ast_with_splitting = create_lb_ast(lbm_config=lbm_config, lbm_optimisation=lbm_opt_split)
ast_without_splitting = create_lb_ast(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
op_with_splitting = count_operations_in_ast(ast_with_splitting)
op_without_splitting = count_operations_in_ast(ast_without_splitting)
assert op_without_splitting['muls'] == op_with_splitting['muls']
assert op_without_splitting['adds'] == op_with_splitting['adds']
assert op_without_splitting['divs'] == op_with_splitting['divs']
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('method', [Method.SRT, Method.TRT])
@pytest.mark.parametrize('force', [(0, 0, 0), (1e-6, 1e-7, 2e-6)])
@pytest.mark.longrun
@pytest.mark.xfail(IS_PYSTENCILS_2, reason="Loop splitting is not available yet")
def test_equivalence(stencil, compressible, method, force):
relaxation_rates = [1.8, 1.7, 1.0, 1.0, 1.0, 1.0]
stencil = LBStencil(stencil)
clear_cache()
domain_size = (10, 20) if stencil.D == 2 else (5, 10, 7)
lbm_config = LBMConfig(stencil=stencil, method=method, compressible=compressible,
relaxation_rates=relaxation_rates,
force_model=ForceModel.GUO, force=force)
lbm_opt_split = LBMOptimisation(split=True)
lbm_opt = LBMOptimisation(split=False)
with_split = create_lid_driven_cavity(domain_size=domain_size,
lbm_config=lbm_config, lbm_optimisation=lbm_opt_split)
without_split = create_lid_driven_cavity(domain_size=domain_size,
lbm_config=lbm_config, lbm_optimisation=lbm_opt)
with_split.run(100)
without_split.run(100)
np.testing.assert_almost_equal(with_split.velocity_slice(), without_split.velocity_slice())
@pytest.mark.parametrize('setup', [(Stencil.D2Q9, True, Method.SRT, 1e-7), (Stencil.D3Q19, False, Method.MRT, 0)])
@pytest.mark.xfail(IS_PYSTENCILS_2, reason="Loop splitting is not available yet")
def test_equivalence_short(setup):
relaxation_rates = [1.8, 1.7, 1.0, 1.0, 1.0, 1.0]
stencil = LBStencil(setup[0])
compressible = setup[1]
method = setup[2]
force = (setup[3], 0) if stencil.D == 2 else (setup[3], 0, 0)
domain_size = (20, 30) if stencil.D == 2 else (10, 13, 7)
lbm_config = LBMConfig(stencil=stencil, method=method, compressible=compressible,
relaxation_rates=relaxation_rates,
force_model=ForceModel.GUO, force=force)
lbm_opt_split = LBMOptimisation(split=True)
lbm_opt = LBMOptimisation(split=False)
with_split = create_lid_driven_cavity(domain_size=domain_size,
lbm_config=lbm_config, lbm_optimisation=lbm_opt_split)
without_split = create_lid_driven_cavity(domain_size=domain_size,
lbm_config=lbm_config, lbm_optimisation=lbm_opt)
with_split.run(100)
without_split.run(100)
np.testing.assert_almost_equal(with_split.velocity_slice(), without_split.velocity_slice())
......@@ -4,11 +4,13 @@ known acceptable values.
"""
import sympy as sp
from lbmpy.enums import Stencil, CollisionSpace
from lbmpy.forcemodels import Luo
from lbmpy.methods import create_srt, create_trt, create_trt_with_magic_number
from lbmpy.methods.creationfunctions import CollisionSpaceInfo
from lbmpy.methods.momentbased.momentbasedsimplifications import cse_in_opposing_directions
from lbmpy.simplificationfactory import create_simplification_strategy
from lbmpy.stencils import get_stencil
from lbmpy.stencils import LBStencil
def check_method(method, limits_default, limits_cse):
......@@ -29,39 +31,56 @@ def check_method(method, limits_default, limits_cse):
assert ops_cse['divs'] <= limits_cse[2]
def test_simplifications_srt_d2q9_incompressible():
def test_simplifications_srt_d2q9_incompressible_regular():
omega = sp.symbols('omega')
method = create_srt(get_stencil("D2Q9"), omega, compressible=False, equilibrium_order=2)
check_method(method, [53, 46, 0], [49, 30, 0])
method = create_srt(LBStencil(Stencil.D2Q9), omega, compressible=False,
zero_centered=False, equilibrium_order=2)
check_method(method, [53, 46, 0], [53, 38, 0])
def test_simplifications_srt_d2q9_compressible():
def test_simplifications_srt_d2q9_incompressible_zc():
omega = sp.symbols('omega')
method = create_srt(get_stencil("D2Q9"), omega, compressible=True, equilibrium_order=2)
method = create_srt(LBStencil(Stencil.D2Q9), omega, compressible=False,
zero_centered=True, delta_equilibrium=True, equilibrium_order=2)
check_method(method, [53, 46, 0], [53, 38, 0])
def test_simplifications_srt_d2q9_compressible_regular():
omega = sp.symbols('omega')
method = create_srt(LBStencil(Stencil.D2Q9), omega, compressible=True,
equilibrium_order=2)
check_method(method, [53, 58, 1], [53, 42, 1])
def test_simplifications_srt_d2q9_compressible_zc():
omega = sp.symbols('omega')
method = create_srt(LBStencil(Stencil.D2Q9), omega, compressible=True,
zero_centered=True, delta_equilibrium=True, equilibrium_order=2)
check_method(method, [54, 58, 1], [54, 42, 1])
def test_simplifications_trt_d2q9_incompressible():
o1, o2 = sp.symbols("omega_1 omega_2")
method = create_trt(get_stencil("D2Q9"), o1, o2, compressible=False)
method = create_trt(LBStencil(Stencil.D2Q9), o1, o2, compressible=False)
check_method(method, [77, 86, 0], [65, 38, 0])
def test_simplifications_trt_d2q9_compressible():
o1, o2 = sp.symbols("omega_1 omega_2")
method = create_trt(get_stencil("D2Q9"), o1, o2, compressible=True)
check_method(method, [77, 106, 1], [65, 56, 1])
method = create_trt(LBStencil(Stencil.D2Q9), o1, o2, compressible=True)
check_method(method, [77, 106, 1], [65, 50, 1])
def test_simplifications_trt_d3q19_force_incompressible():
o1, o2 = sp.symbols("omega_1 omega_2")
force_model = Luo([sp.Rational(1, 3), sp.Rational(1, 2), sp.Rational(1, 5)])
method = create_trt(get_stencil("D3Q19"), o1, o2, compressible=False, force_model=force_model)
check_method(method, [268, 281, 0], [241, 175, 1])
method = create_trt(LBStencil(Stencil.D3Q19), o1, o2, compressible=False, force_model=force_model, continuous_equilibrium=False)
check_method(method, [246, 243, 0], [219, 137, 1])
def test_simplifications_trt_d3q19_force_compressible():
o1, o2 = sp.symbols("omega_1 omega_2")
force_model = Luo([sp.Rational(1, 3), sp.Rational(1, 2), sp.Rational(1, 5)])
method = create_trt_with_magic_number(get_stencil("D3Q19"), o1, compressible=False, force_model=force_model)
check_method(method, [270, 284, 1], [243, 178, 1])
method = create_trt_with_magic_number(LBStencil(Stencil.D3Q19), o1, compressible=False,
force_model=force_model, continuous_equilibrium=False)
check_method(method, [248, 246, 1], [221, 140, 1])
import warnings
import pytest
import sympy as sp
import matplotlib.pyplot as plt
import pystencils as ps
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
def get_3d_stencils():
return LBStencil(Stencil.D3Q15), LBStencil(Stencil.D3Q19), LBStencil(Stencil.D3Q27)
def get_all_stencils():
return [
LBStencil(Stencil.D2Q9, 'walberla'),
LBStencil(Stencil.D3Q15, 'walberla'),
LBStencil(Stencil.D3Q19, 'walberla'),
LBStencil(Stencil.D3Q27, 'walberla'),
LBStencil(Stencil.D2Q9, 'counterclockwise'),
LBStencil(Stencil.D2Q9, 'braunschweig'),
LBStencil(Stencil.D2Q9, 'uk'),
LBStencil(Stencil.D3Q15, 'premnath'),
LBStencil(Stencil.D3Q15, 'fakhari'),
LBStencil(Stencil.D3Q19, 'braunschweig'),
LBStencil(Stencil.D3Q19, 'premnath'),
LBStencil(Stencil.D3Q27, "premnath"),
LBStencil(Stencil.D3Q27, "fakhari"),
]
def test_sizes():
assert LBStencil(Stencil.D2Q9).Q == 9
assert LBStencil(Stencil.D3Q15).Q == 15
assert LBStencil(Stencil.D3Q19).Q == 19
assert LBStencil(Stencil.D3Q27).Q == 27
def test_dimensionality():
for d in LBStencil(Stencil.D2Q9).stencil_entries:
assert len(d) == 2
assert LBStencil(Stencil.D2Q9).D == 2
for stencil in get_3d_stencils():
assert stencil.D == 3
def test_uniqueness():
for stencil in get_3d_stencils():
direction_set = set(stencil.stencil_entries)
assert len(direction_set) == len(stencil.stencil_entries)
def test_run_self_check():
for st in get_all_stencils():
assert ps.stencil.is_valid(st.stencil_entries, max_neighborhood=1)
assert ps.stencil.is_symmetric(st.stencil_entries)
def test_inverse_direction():
stencil = LBStencil(Stencil.D2Q9)
for i in range(stencil.Q):
assert ps.stencil.inverse_direction(stencil.stencil_entries[i]) == stencil.inverse_stencil_entries[i]
def test_free_functions():
assert not ps.stencil.is_symmetric([(1, 0), (0, 1)])
assert not ps.stencil.is_valid([(1, 0), (1, 1, 0)])
assert not ps.stencil.is_valid([(2, 0), (0, 1)], max_neighborhood=1)
with pytest.raises(ValueError) as e:
LBStencil("name_that_does_not_exist")
assert "No such stencil" in str(e.value)
def test_visualize():
plt.clf()
plt.cla()
d2q9, d3q19 = LBStencil(Stencil.D2Q9), LBStencil(Stencil.D3Q19)
figure = plt.gcf()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
d2q9.plot(figure=figure, data=[str(i) for i in range(9)])
d3q19.plot(figure=figure, data=sp.symbols("a_:19"))
def test_comparability_of_stencils():
stencil_1 = LBStencil(Stencil.D2Q9)
stencil_2 = LBStencil(Stencil.D2Q9)
stencil_3 = LBStencil(Stencil.D2Q9, ordering="braunschweig")
stencil_4 = LBStencil(stencil_1.stencil_entries)
stencil_5 = LBStencil(stencil_3.stencil_entries)
stencil_6 = LBStencil(stencil_1.stencil_entries)
assert stencil_1 == stencil_2
assert stencil_1 != stencil_3
assert stencil_1 != stencil_4
assert stencil_1 != stencil_5
assert stencil_4 == stencil_6
Source diff could not be displayed: it is too large. Options to address this: view the blob.
import pytest
import pystencils as ps
from lbmpy.advanced_streaming.utility import get_timesteps, streaming_patterns, get_accessor, \
is_inplace, AccessPdfValues
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.updatekernels import create_stream_only_kernel
from pystencils import create_kernel, Target
@pytest.mark.parametrize('streaming_pattern', streaming_patterns)
def test_stream_only_kernel(streaming_pattern):
domain_size = (4, 4)
stencil = LBStencil(Stencil.D2Q9)
dh = ps.create_data_handling(domain_size, default_target=Target.CPU)
pdfs = dh.add_array('pdfs', values_per_cell=len(stencil))
pdfs_tmp = dh.add_array_like('pdfs_tmp', 'pdfs')
for t in get_timesteps(streaming_pattern):
accessor = get_accessor(streaming_pattern, t)
src = pdfs
dst = pdfs if is_inplace(streaming_pattern) else pdfs_tmp
dh.fill(src.name, 0.0)
dh.fill(dst.name, 0.0)
stream_kernel = create_stream_only_kernel(stencil, src, dst, accessor=accessor)
stream_func = create_kernel(stream_kernel).compile()
# Check functionality
acc_in = AccessPdfValues(stencil, streaming_dir='in', accessor=accessor)
for i in range(len(stencil)):
acc_in.write_pdf(dh.cpu_arrays[src.name], (1,1), i, i)
dh.run_kernel(stream_func)
acc_out = AccessPdfValues(stencil, streaming_dir='out', accessor=accessor)
for i in range(len(stencil)):
assert acc_out.read_pdf(dh.cpu_arrays[dst.name], (1,1), i) == i
import numpy as np
import pytest
from dataclasses import replace
import pystencils as ps
from lbmpy._compat import get_supported_instruction_sets, IS_PYSTENCILS_2
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation
vector_isas = get_supported_instruction_sets()
if vector_isas is None:
vector_isas = []
@pytest.mark.skipif(not vector_isas, reason="cannot detect CPU instruction set")
def test_lbm_vectorization_short():
print("Computing reference solutions")
size1 = (64, 32)
relaxation_rate = 1.8
ldc1_ref = create_lid_driven_cavity(size1, relaxation_rate=relaxation_rate)
ldc1_ref.run(10)
lbm_config = LBMConfig(relaxation_rate=relaxation_rate)
config = ps.CreateKernelConfig(
cpu_vectorize_info={
"instruction_set": get_supported_instruction_sets()[-1],
"assume_aligned": True,
"nontemporal": True,
"assume_inner_stride_one": True,
"assume_sufficient_line_padding": False,
}
)
ldc1 = create_lid_driven_cavity(
size1, lbm_config=lbm_config, config=config, fixed_loop_sizes=False
)
ldc1.run(10)
@pytest.mark.skipif(not vector_isas, reason="cannot detect CPU instruction set")
@pytest.mark.xfail(reason="Loop splitting is not available yet")
@pytest.mark.parametrize("instruction_set", vector_isas)
@pytest.mark.parametrize(
"aligned_and_padding", [[False, False], [True, False], [True, True]]
)
@pytest.mark.parametrize("nontemporal", [False, True])
@pytest.mark.parametrize("double_precision", [False, True])
@pytest.mark.parametrize("fixed_loop_sizes", [False, True])
@pytest.mark.longrun
def test_lbm_vectorization(
instruction_set,
aligned_and_padding,
nontemporal,
double_precision,
fixed_loop_sizes,
):
vectorization_options = {
"instruction_set": instruction_set,
"assume_aligned": aligned_and_padding[0],
"nontemporal": nontemporal,
"assume_inner_stride_one": True,
"assume_sufficient_line_padding": aligned_and_padding[1],
}
time_steps = 100
size1 = (64, 32)
size2 = (666, 34)
relaxation_rate = 1.8
print("Computing reference solutions")
ldc1_ref = create_lid_driven_cavity(size1, relaxation_rate=relaxation_rate)
ldc1_ref.run(time_steps)
ldc2_ref = create_lid_driven_cavity(size2, relaxation_rate=relaxation_rate)
ldc2_ref.run(time_steps)
lbm_config = LBMConfig(relaxation_rate=relaxation_rate)
config = ps.CreateKernelConfig(
data_type="float64" if double_precision else "float32",
cpu_vectorize_info=vectorization_options,
)
if not IS_PYSTENCILS_2:
config = replace(
config,
default_number_float="float64" if double_precision else "float32",
)
lbm_opt_split = LBMOptimisation(cse_global=True, split=True)
lbm_opt = LBMOptimisation(cse_global=True, split=False)
print(
f"Vectorization test, double precision {double_precision}, vectorization {vectorization_options}, "
f"fixed loop sizes {fixed_loop_sizes}"
)
ldc1 = create_lid_driven_cavity(
size1,
fixed_loop_sizes=fixed_loop_sizes,
lbm_config=lbm_config,
lbm_optimisation=lbm_opt,
config=config,
)
ldc1.run(time_steps)
np.testing.assert_almost_equal(ldc1_ref.velocity[:, :], ldc1.velocity[:, :])
ldc2 = create_lid_driven_cavity(
size2,
fixed_loop_sizes=fixed_loop_sizes,
lbm_config=lbm_config,
lbm_optimisation=lbm_opt_split,
config=config,
)
ldc2.run(time_steps)
np.testing.assert_almost_equal(ldc2_ref.velocity[:, :], ldc2.velocity[:, :])
import lbmpy
def test_version_string():
version = lbmpy.__version__
print(version)
numeric_version = [int(x, 10) for x in version.split('.')[0:1]]
test_version = sum(x * (100 ** i) for i, x in enumerate(numeric_version))
assert test_version >= 1
import pytest
import pystencils as ps
from lbmpy import Stencil, LBStencil, LBMConfig, Method, lattice_viscosity_from_relaxation_rate, \
LatticeBoltzmannStep, pdf_initialization_assignments, ForceModel
from lbmpy.boundaries.boundaryconditions import UBB, WallFunctionBounce, NoSlip, FreeSlip
from lbmpy.boundaries.wall_function_models import SpaldingsLaw, LogLaw, MoninObukhovSimilarityTheory, MuskerLaw
from pystencils.slicing import slice_from_direction
@pytest.mark.parametrize('stencil', [Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('wfb_type', ['wfb_i', 'wfb_ii', 'wfb_iii', 'wfb_iv'])
def test_wfb(stencil, wfb_type):
stencil = LBStencil(stencil)
periodicity = (True, False, True)
domain_size = (30, 20, 15)
dim = len(domain_size)
omega = 1.1
nu = lattice_viscosity_from_relaxation_rate(omega)
# pressure gradient for laminar channel flow with u_max = 0.1
ext_force_density = 0.1 * 2 * nu / ((domain_size[1] - 2) / 2) ** 2
lbm_config = LBMConfig(stencil=stencil,
method=Method.SRT,
force_model=ForceModel.GUO,
force=(ext_force_density, 0, 0),
relaxation_rate=omega,
compressible=True)
wall_north = NoSlip()
normal = (0, 1, 0)
# NO-SLIP
lb_step_noslip = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
noslip = NoSlip()
lb_step_noslip.boundary_handling.set_boundary(noslip, slice_from_direction('S', dim))
lb_step_noslip.boundary_handling.set_boundary(wall_north, slice_from_direction('N', dim))
# FREE-SLIP
lb_step_freeslip = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
freeslip = FreeSlip(stencil=stencil, normal_direction=normal)
lb_step_freeslip.boundary_handling.set_boundary(freeslip, slice_from_direction('S', dim))
lb_step_freeslip.boundary_handling.set_boundary(wall_north, slice_from_direction('N', dim))
# WFB
lb_step_wfb = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
# pdf initialisation
init = pdf_initialization_assignments(lb_step_wfb.method, 1.0, (0.025, 0, 0),
lb_step_wfb.data_handling.fields[lb_step_wfb._pdf_arr_name].center_vector)
config = ps.CreateKernelConfig(target=lb_step_wfb.data_handling.default_target, cpu_openmp=False)
ast_init = ps.create_kernel(init, config=config)
kernel_init = ast_init.compile()
lb_step_wfb.data_handling.run_kernel(kernel_init)
# potential mean velocity field
mean_vel_field = lb_step_wfb.data_handling.fields[lb_step_wfb.velocity_data_name]
# mean_vel_field = lb_step_wfb.data_handling.add_array('mean_velocity_field', values_per_cell=stencil.D)
# lb_step_wfb.data_handling.fill('mean_velocity_field', 0.005, value_idx=0, ghost_layers=True)
lb_step_wfb.data_handling.fill(lb_step_wfb.velocity_data_name, 0.025, value_idx=0, ghost_layers=True)
# wfb arguments
wfb_args = {
'wfb_i': {'wall_function_model': SpaldingsLaw(viscosity=nu),
'weight_method': WallFunctionBounce.WeightMethod.GEOMETRIC_WEIGHT,
'name': "wall"},
'wfb_ii': {'wall_function_model': MuskerLaw(viscosity=nu),
'weight_method': WallFunctionBounce.WeightMethod.GEOMETRIC_WEIGHT,
'mean_velocity': mean_vel_field,
'name': "wall"},
'wfb_iii': {'wall_function_model': LogLaw(viscosity=nu),
'weight_method': WallFunctionBounce.WeightMethod.LATTICE_WEIGHT,
'mean_velocity': mean_vel_field.center,
'sampling_shift': 2},
'wfb_iv': {'wall_function_model': MoninObukhovSimilarityTheory(z0=1e-2),
'weight_method': WallFunctionBounce.WeightMethod.LATTICE_WEIGHT,
'mean_velocity': mean_vel_field,
'maronga_sampling_shift': 2}
}
wall = WallFunctionBounce(lb_method=lb_step_wfb.method, normal_direction=normal,
pdfs=lb_step_wfb.data_handling.fields[lb_step_wfb._pdf_arr_name],
**wfb_args[wfb_type])
lb_step_wfb.boundary_handling.set_boundary(wall, slice_from_direction('S', dim))
lb_step_wfb.boundary_handling.set_boundary(wall_north, slice_from_direction('N', dim))
# rum cases
timesteps = 4000
lb_step_noslip.run(timesteps)
lb_step_freeslip.run(timesteps)
lb_step_wfb.run(timesteps)
noslip_velocity = lb_step_noslip.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
freeslip_velocity = lb_step_freeslip.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
wfb_velocity = lb_step_wfb.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
assert wfb_velocity[0] > noslip_velocity[0], f"WFB enforced velocity below no-slip velocity"
assert wfb_velocity[0] < freeslip_velocity[0], f"WFB enforced velocity above free-slip velocity"
import pytest
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.stencils import LBStencil
def compare_weights(method, zero_centered, continuous_equilibrium, stencil_name):
stencil = LBStencil(stencil_name)
hardcoded_weights = get_weights(stencil)
method = create_lb_method(LBMConfig(stencil=stencil, method=method, zero_centered=zero_centered,
continuous_equilibrium=continuous_equilibrium))
weights = method.weights
for i in range(len(weights)):
assert hardcoded_weights[i] == weights[i]
@pytest.mark.parametrize('method', [Method.SRT, Method.TRT])
@pytest.mark.parametrize('zero_centered', [False, True])
@pytest.mark.parametrize('continuous_equilibrium', [False, True])
@pytest.mark.parametrize('stencil_name', [Stencil.D2Q9, Stencil.D3Q7])
def test_weight_calculation(method, zero_centered, continuous_equilibrium, stencil_name):
compare_weights(method, zero_centered, continuous_equilibrium, stencil_name)
@pytest.mark.parametrize('method', [Method.MRT, Method.CENTRAL_MOMENT])
@pytest.mark.parametrize('continuous_equilibrium', [False, True])
@pytest.mark.parametrize('zero_centered', [False, True])
@pytest.mark.parametrize('stencil_name', [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.longrun
def test_weight_calculation_longrun(method, zero_centered, continuous_equilibrium, stencil_name):
compare_weights(method, zero_centered, continuous_equilibrium, stencil_name)
\ No newline at end of file
import pytest
import numpy as np
import pystencils as ps
from lbmpy.flow_statistics import welford_assignments
@pytest.mark.parametrize('order', [1, 2, 3])
@pytest.mark.parametrize('dim', [1, 2, 3])
def test_welford(order, dim):
target = ps.Target.CPU
# create data handling and fields
dh = ps.create_data_handling((1, 1, 1), periodicity=False, default_target=target)
vector_field = dh.add_array('vector', values_per_cell=dim)
dh.fill(vector_field.name, 0.0, ghost_layers=False)
mean_field = dh.add_array('mean', values_per_cell=dim)
dh.fill(mean_field.name, 0.0, ghost_layers=False)
if order >= 2:
sos = dh.add_array('sos', values_per_cell=dim**2)
dh.fill(sos.name, 0.0, ghost_layers=False)
if order == 3:
soc = dh.add_array('soc', values_per_cell=dim**3)
dh.fill(soc.name, 0.0, ghost_layers=False)
else:
soc = None
else:
sos = None
soc = None
welford = welford_assignments(field=vector_field, mean_field=mean_field,
sum_of_squares_field=sos, sum_of_cubes_field=soc)
welford_kernel = ps.create_kernel(welford).compile()
# set random seed
np.random.seed(42)
n = int(1e4)
x = np.random.normal(size=n * dim).reshape(n, dim)
analytical_mean = np.zeros(dim)
analytical_covariance = np.zeros(dim**2)
analytical_third_order_moments = np.zeros(dim**3)
# calculate analytical mean
for i in range(dim):
analytical_mean[i] = np.mean(x[:, i])
# calculate analytical covariances
for i in range(dim):
for j in range(dim):
analytical_covariance[i * dim + j] \
= (np.sum((x[:, i] - analytical_mean[i]) * (x[:, j] - analytical_mean[j]))) / n
# calculate analytical third-order central moments
for i in range(dim):
for j in range(dim):
for k in range(dim):
analytical_third_order_moments[(i * dim + j) * dim + k] \
= (np.sum((x[:, i] - analytical_mean[i])
* (x[:, j] - analytical_mean[j])
* (x[:, k] - analytical_mean[k]))) / n
# Time loop
counter = 1
for i in range(n):
for d in range(dim):
new_value = x[i, d]
if dim > 1:
dh.fill(array_name=vector_field.name, val=new_value, value_idx=d, ghost_layers=False)
else:
dh.fill(array_name=vector_field.name, val=new_value, ghost_layers=False)
dh.run_kernel(welford_kernel, counter=counter)
counter += 1
welford_mean = dh.gather_array(mean_field.name).flatten()
np.testing.assert_allclose(welford_mean, analytical_mean)
if order >= 2:
welford_covariance = dh.gather_array(sos.name).flatten() / n
np.testing.assert_allclose(welford_covariance, analytical_covariance)
if order == 3:
welford_third_order_moments = dh.gather_array(soc.name).flatten() / n
np.testing.assert_allclose(welford_third_order_moments, analytical_third_order_moments)
if __name__ == '__main__':
test_welford(1, 1)
import pytest
import numpy as np
from pystencils import CreateKernelConfig, Target
from lbmpy import Method, LBMConfig
from lbmpy.stencils import Stencil, LBStencil
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.scenarios import create_fully_periodic_flow
from numpy.testing import assert_allclose
@pytest.mark.parametrize('method', [Method.SRT, Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize('delta_equilibrium', [False, True])
def test_periodic_shear_layers(method, delta_equilibrium):
if method == Method.SRT and not delta_equilibrium:
pytest.skip()
if method == Method.CUMULANT and delta_equilibrium:
pytest.skip()
stencil = LBStencil(Stencil.D3Q27)
omega_shear = 1.3
# Velocity Field
L = (128, 32, 32)
velocity_magnitude = 0.02
velocity = np.zeros(L + (3,))
velocity[:,:,:,0] = velocity_magnitude
velocity[:, L[1]//3 : L[1]//3*2, L[2]//3 : L[2]//3*2, 0] = - velocity_magnitude
velocity[:, :, :, 1] = 0.1 * velocity_magnitude * np.random.rand(*L)
kernel_config = CreateKernelConfig(target=Target.CPU)
config_full = LBMConfig(stencil=stencil, method=method, relaxation_rate=omega_shear,
compressible=True, zero_centered=False)
scenario_full = create_fully_periodic_flow(velocity, lbm_config=config_full, config=kernel_config)
config_zero_centered = LBMConfig(stencil=stencil, method=method, relaxation_rate=omega_shear,
compressible=True, zero_centered=True, delta_equilibrium=delta_equilibrium)
scenario_zero_centered = create_fully_periodic_flow(velocity, lbm_config=config_zero_centered,
config=kernel_config)
scenario_full.run(20)
scenario_zero_centered.run(20)
pdfs_full = scenario_full.data_handling.cpu_arrays[scenario_full.pdf_array_name]
pdfs_zero_centered = scenario_zero_centered.data_handling.cpu_arrays[scenario_zero_centered.pdf_array_name]
difference = pdfs_full - pdfs_zero_centered
weights = np.array(get_weights(stencil))
reference = np.zeros(L + (stencil.Q, ))
reference[:,:,:] = weights
if delta_equilibrium and method == Method.CENTRAL_MOMENT:
# Much less agreement is expected here, as the delta-equilibrium's velocity dependence
# lets the CM method's numerical quality degrade.
assert_allclose(difference[1:-1, 1:-1, 1:-1], reference, rtol=1e-5)
else:
assert_allclose(difference[1:-1, 1:-1, 1:-1], reference)