Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • ravi.k.ayyala/lbmpy
  • brendan-waters/lbmpy
  • anirudh.jonnalagadda/lbmpy
  • jbadwaik/lbmpy
  • alexander.reinauer/lbmpy
  • itischler/lbmpy
  • he66coqe/lbmpy
  • ev81oxyl/lbmpy
  • Bindgen/lbmpy
  • da15siwa/lbmpy
  • holzer/lbmpy
  • RudolfWeeber/lbmpy
  • pycodegen/lbmpy
13 results
Select Git revision
Show changes
Showing
with 1286 additions and 15 deletions
"""
Test Shan-Chen two-component implementation against reference implementation
"""
import lbmpy
import pystencils as ps
import sympy as sp
import numpy as np
def test_shan_chen_two_component():
from lbmpy.enums import Stencil
from lbmpy import LBMConfig, ForceModel, create_lb_update_rule
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from lbmpy.creationfunctions import create_stream_pull_with_output_kernel
from lbmpy.maxwellian_equilibrium import get_weights
N = 64
omega_a = 1.
omega_b = 1.
# interaction strength
g_aa = 0.
g_ab = g_ba = 6.
g_bb = 0.
rho0 = 1.
stencil = lbmpy.LBStencil(Stencil.D2Q9)
weights = get_weights(stencil, c_s_sq=sp.Rational(1, 3))
dim = stencil.D
dh = ps.create_data_handling((N,) * dim, periodicity=True, default_target=ps.Target.CPU)
src_a = dh.add_array('src_a', values_per_cell=stencil.Q)
dst_a = dh.add_array_like('dst_a', 'src_a')
src_b = dh.add_array('src_b', values_per_cell=stencil.Q)
dst_b = dh.add_array_like('dst_b', 'src_b')
ρ_a = dh.add_array('rho_a')
ρ_b = dh.add_array('rho_b')
u_a = dh.add_array('u_a', values_per_cell=stencil.D)
u_b = dh.add_array('u_b', values_per_cell=stencil.D)
u_bary = dh.add_array_like('u_bary', u_a.name)
f_a = dh.add_array('f_a', values_per_cell=stencil.D)
f_b = dh.add_array_like('f_b', f_a.name)
def psi(dens):
return rho0 * (1. - sp.exp(-dens / rho0))
zero_vec = sp.Matrix([0] * stencil.D)
force_a = zero_vec
for factor, ρ in zip([g_aa, g_ab], [ρ_a, ρ_b]):
force_a += sum((psi(ρ[d]) * w_d * sp.Matrix(d)
for d, w_d in zip(stencil, weights)),
zero_vec) * psi(ρ_a.center) * -1 * factor
force_b = zero_vec
for factor, ρ in zip([g_ba, g_bb], [ρ_a, ρ_b]):
force_b += sum((psi(ρ[d]) * w_d * sp.Matrix(d)
for d, w_d in zip(stencil, weights)),
zero_vec) * psi(ρ_b.center) * -1 * factor
f_expressions = ps.AssignmentCollection([
ps.Assignment(f_a.center_vector, force_a),
ps.Assignment(f_b.center_vector, force_b)
])
# calculate the velocity without force correction
u_temp = ps.AssignmentCollection(ps.Assignment(u_bary.center_vector,
(ρ_a.center * u_a.center_vector
- f_a.center_vector / 2 + ρ_b.center * u_b.center_vector
- f_b.center_vector / 2) / (ρ_a.center + ρ_b.center)))
# add the force correction to the velocity
u_corr = ps.AssignmentCollection(ps.Assignment(u_bary.center_vector,
u_bary.center_vector
+ (f_a.center_vector / 2 + f_b.center_vector / 2) / (
ρ_a.center + ρ_b.center)))
lbm_config_a = LBMConfig(stencil=stencil, relaxation_rate=omega_a, compressible=True,
velocity_input=u_bary, density_input=ρ_a, force_model=ForceModel.GUO,
force=f_a, kernel_type='collide_only')
lbm_config_b = LBMConfig(stencil=stencil, relaxation_rate=omega_b, compressible=True,
velocity_input=u_bary, density_input=ρ_b, force_model=ForceModel.GUO,
force=f_b, kernel_type='collide_only')
collision_a = create_lb_update_rule(lbm_config=lbm_config_a,
optimization={'symbolic_field': src_a})
collision_b = create_lb_update_rule(lbm_config=lbm_config_b,
optimization={'symbolic_field': src_b})
stream_a = create_stream_pull_with_output_kernel(collision_a.method, src_a, dst_a,
{'density': ρ_a, 'velocity': u_a})
stream_b = create_stream_pull_with_output_kernel(collision_b.method, src_b, dst_b,
{'density': ρ_b, 'velocity': u_b})
config = ps.CreateKernelConfig(target=dh.default_target)
stream_a_kernel = ps.create_kernel(stream_a, config=config).compile()
stream_b_kernel = ps.create_kernel(stream_b, config=config).compile()
collision_a_kernel = ps.create_kernel(collision_a, config=config).compile()
collision_b_kernel = ps.create_kernel(collision_b, config=config).compile()
force_kernel = ps.create_kernel(f_expressions, config=config).compile()
u_temp_kernel = ps.create_kernel(u_temp, config=config).compile()
u_corr_kernel = ps.create_kernel(u_corr, config=config).compile()
init_a = macroscopic_values_setter(collision_a.method, velocity=(0, 0),
pdfs=src_a.center_vector, density=ρ_a.center)
init_b = macroscopic_values_setter(collision_b.method, velocity=(0, 0),
pdfs=src_b.center_vector, density=ρ_b.center)
init_a_kernel = ps.create_kernel(init_a, ghost_layers=0).compile()
init_b_kernel = ps.create_kernel(init_b, ghost_layers=0).compile()
sync_pdfs = dh.synchronization_function([src_a.name, src_b.name])
sync_ρs = dh.synchronization_function([ρ_a.name, ρ_b.name])
dh.fill(ρ_a.name, 0.1, slice_obj=ps.make_slice[:, :0.5])
dh.fill(ρ_a.name, 0.9, slice_obj=ps.make_slice[:, 0.5:])
dh.fill(ρ_b.name, 0.9, slice_obj=ps.make_slice[:, :0.5])
dh.fill(ρ_b.name, 0.1, slice_obj=ps.make_slice[:, 0.5:])
dh.fill(u_a.name, 0.0)
dh.fill(u_b.name, 0.0)
dh.fill(f_a.name, 0.0)
dh.fill(f_b.name, 0.0)
dh.run_kernel(u_temp_kernel)
dh.run_kernel(init_a_kernel)
dh.run_kernel(init_b_kernel)
for i in range(1000):
sync_ρs()
dh.run_kernel(force_kernel)
dh.run_kernel(u_corr_kernel)
dh.run_kernel(collision_a_kernel)
dh.run_kernel(collision_b_kernel)
sync_pdfs()
dh.run_kernel(stream_a_kernel)
dh.run_kernel(stream_b_kernel)
dh.run_kernel(u_temp_kernel)
dh.swap(src_a.name, dst_a.name)
dh.swap(src_b.name, dst_b.name)
# reference generated from https://github.com/lbm-principles-practice/code/blob/master/chapter9/shanchen.cpp with
# const int nsteps = 1000;
# const int noutput = 1000;
# const int nfluids = 2;
# const double gA = 0;
ref_a = np.array([0.213948, 0.0816724, 0.0516763, 0.0470179, 0.0480882, 0.0504771, 0.0531983, 0.0560094, 0.0588071,
0.0615311, 0.064102, 0.0664467, 0.0684708, 0.070091, 0.0712222, 0.0718055, 0.0718055, 0.0712222,
0.070091, 0.0684708, 0.0664467, 0.064102, 0.0615311, 0.0588071, 0.0560094, 0.0531983, 0.0504771,
0.0480882, 0.0470179, 0.0516763, 0.0816724, 0.213948, 0.517153, 0.833334, 0.982884, 1.0151,
1.01361, 1.0043, 0.993178, 0.981793, 0.970546, 0.959798, 0.949751, 0.940746, 0.933035, 0.926947,
0.922713, 0.920548, 0.920548, 0.922713, 0.926947, 0.933035, 0.940746, 0.949751, 0.959798,
0.970546, 0.981793, 0.993178, 1.0043, 1.01361, 1.0151, 0.982884, 0.833334, 0.517153])
ref_b = np.array([0.517153, 0.833334, 0.982884, 1.0151, 1.01361, 1.0043, 0.993178, 0.981793, 0.970546, 0.959798,
0.949751, 0.940746, 0.933035, 0.926947, 0.922713, 0.920548, 0.920548, 0.922713, 0.926947,
0.933035, 0.940746, 0.949751, 0.959798, 0.970546, 0.981793, 0.993178, 1.0043, 1.01361, 1.0151,
0.982884, 0.833334, 0.517153, 0.213948, 0.0816724, 0.0516763, 0.0470179, 0.0480882, 0.0504771,
0.0531983, 0.0560094, 0.0588071, 0.0615311, 0.064102, 0.0664467, 0.0684708, 0.070091, 0.0712222,
0.0718055, 0.0718055, 0.0712222, 0.070091, 0.0684708, 0.0664467, 0.064102, 0.0615311, 0.0588071,
0.0560094, 0.0531983, 0.0504771, 0.0480882, 0.0470179, 0.0516763, 0.0816724, 0.213948])
assert np.allclose(dh.gather_array(ρ_a.name)[0], ref_a)
assert np.allclose(dh.gather_array(ρ_b.name)[0], ref_b)
"""
Test Shan-Chen two-phase implementation against reference implementation
"""
import lbmpy
import pystencils as ps
import sympy as sp
import numpy as np
def test_shan_chen_two_phase():
from lbmpy.enums import Stencil
from lbmpy import LBMConfig, ForceModel, create_lb_update_rule
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from lbmpy.creationfunctions import create_stream_pull_with_output_kernel, create_lb_method
from lbmpy.maxwellian_equilibrium import get_weights
N = 64
omega = 1.
g_aa = -4.7
rho0 = 1.
stencil = lbmpy.LBStencil(Stencil.D2Q9)
weights = get_weights(stencil, c_s_sq=sp.Rational(1, 3))
dh = ps.create_data_handling((N, ) * stencil.D, periodicity=True, default_target=ps.Target.CPU)
src = dh.add_array('src', values_per_cell=stencil.Q)
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho')
def psi(dens):
return rho0 * (1. - sp.exp(-dens / rho0))
zero_vec = sp.Matrix([0] * stencil.D)
force = sum((psi(ρ[d]) * w_d * sp.Matrix(d)
for d, w_d in zip(stencil, weights)), zero_vec) * psi(ρ.center) * -1 * g_aa
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=omega, compressible=True,
force_model=ForceModel.GUO, force=force, kernel_type='collide_only')
collision = create_lb_update_rule(lbm_config=lbm_config,
optimization={'symbolic_field': src})
stream = create_stream_pull_with_output_kernel(collision.method, src, dst, {'density': ρ})
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=False)
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
method_without_force = create_lb_method(LBMConfig(stencil=stencil, relaxation_rate=omega, compressible=True))
init_assignments = macroscopic_values_setter(method_without_force, velocity=(0, 0),
pdfs=src.center_vector, density=ρ.center)
init_kernel = ps.create_kernel(init_assignments, ghost_layers=0, config=config).compile()
for x in range(N):
for y in range(N):
if (x - N / 2)**2 + (y - N / 2)**2 <= 15**2:
dh.fill(ρ.name, 2.1, slice_obj=[x, y])
else:
dh.fill(ρ.name, 0.15, slice_obj=[x, y])
dh.run_kernel(init_kernel)
sync_pdfs = dh.synchronization_function([src.name])
sync_ρs = dh.synchronization_function([ρ.name])
for i in range(1000):
sync_ρs()
dh.run_kernel(collision_kernel)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
# reference generated from https://github.com/lbm-principles-practice/code/blob/master/chapter9/shanchen.cpp with
# const int nsteps = 1000;
# const int noutput = 1000;
ref = np.array([0.185757, 0.185753, 0.185743, 0.185727, 0.185703, 0.185672, 0.185636, 0.185599, 0.185586, 0.185694,
0.186302, 0.188901, 0.19923, 0.238074, 0.365271, 0.660658, 1.06766, 1.39673, 1.56644, 1.63217,
1.65412, 1.66064, 1.66207, 1.66189, 1.66123, 1.66048, 1.65977, 1.65914, 1.65861, 1.6582, 1.6579,
1.65772, 1.65766, 1.65772, 1.6579, 1.6582, 1.65861, 1.65914, 1.65977, 1.66048, 1.66123, 1.66189,
1.66207, 1.66064, 1.65412, 1.63217, 1.56644, 1.39673, 1.06766, 0.660658, 0.365271, 0.238074,
0.19923, 0.188901, 0.186302, 0.185694, 0.185586, 0.185599, 0.185636, 0.185672, 0.185703, 0.185727,
0.185743, 0.185753])
assert np.allclose(dh.gather_array(ρ.name)[N // 2], ref)
File moved
import numpy as np
import pytest
from lbmpy.boundaries import (NoSlip, NoSlipLinearBouzidi, QuadraticBounceBack,
UBB, SimpleExtrapolationOutflow, ExtrapolationOutflow, FixedDensity, DiffusionDirichlet,
NeumannByCopy, StreamInConstant, FreeSlip)
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling, create_lattice_boltzmann_boundary_kernel
from lbmpy.creationfunctions import create_lb_function, create_lb_method, LBMConfig
from lbmpy.enums import Stencil, Method
from lbmpy.geometry import add_box_boundary
from lbmpy.lbstep import LatticeBoltzmannStep
from lbmpy.stencils import LBStencil
import pystencils as ps
from pystencils import create_data_handling, make_slice, Target, CreateKernelConfig
from pystencils.slicing import slice_from_direction
from pystencils.stencil import inverse_direction
def mirror_stencil(direction, mirror_axis):
for i, n in enumerate(mirror_axis):
if n != 0:
direction[i] = -direction[i]
return tuple(direction)
@pytest.mark.parametrize("target", [Target.GPU, Target.CPU])
def test_simple(target):
if target == Target.GPU:
import pytest
pytest.importorskip('cupy')
dh = create_data_handling((4, 4), parallel=False, default_target=target)
dh.add_array('pdfs', values_per_cell=9, cpu=True, gpu=target != Target.CPU)
for i in range(9):
dh.fill("pdfs", i, value_idx=i, ghost_layers=True)
if target == Target.GPU:
dh.all_to_gpu()
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D2Q9), compressible=False, zero_centered=False,
delta_equilibrium=False, relaxation_rate=1.8)
config = CreateKernelConfig(target=target)
lb_func = create_lb_function(lbm_config=lbm_config, config=config)
bh = LatticeBoltzmannBoundaryHandling(lb_func.method, dh, 'pdfs', target=target)
wall = NoSlip()
moving_wall = UBB((1, 0), density=1.0)
bh.set_boundary(wall, make_slice[0, :])
bh.set_boundary(wall, make_slice[-1, :])
bh.set_boundary(wall, make_slice[:, 0])
bh.set_boundary(moving_wall, make_slice[:, -1])
bh.prepare()
bh()
if target == Target.GPU:
dh.all_to_cpu()
# left lower corner
assert (dh.cpu_arrays['pdfs'][0, 0, 6] == 7)
assert (dh.cpu_arrays['pdfs'][0, 1, 4] == 3)
assert (dh.cpu_arrays['pdfs'][0, 1, 6] == 7)
assert (dh.cpu_arrays['pdfs'][1, 0, 1] == 2)
assert (dh.cpu_arrays['pdfs'][1, 0, 6] == 7)
# left side
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 4] == 3))
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 6] == 7))
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 5] == 5))
# left upper corner
assert (dh.cpu_arrays['pdfs'][0, 4, 4] == 3)
assert (dh.cpu_arrays['pdfs'][0, 4, 8] == 5)
assert (dh.cpu_arrays['pdfs'][0, 5, 8] == 5 + 6 / 36)
assert (dh.cpu_arrays['pdfs'][1, 5, 8] == 5 + 6 / 36)
assert (dh.cpu_arrays['pdfs'][1, 5, 2] == 1)
# top side
assert (all(dh.cpu_arrays['pdfs'][2:4, 5, 2] == 1))
assert (all(dh.cpu_arrays['pdfs'][2:4, 5, 7] == 6 - 6 / 36))
assert (all(dh.cpu_arrays['pdfs'][2:4, 5, 8] == 5 + 6 / 36))
# right upper corner
assert (dh.cpu_arrays['pdfs'][4, 5, 2] == 1)
assert (dh.cpu_arrays['pdfs'][4, 5, 7] == 6 - 6 / 36)
assert (dh.cpu_arrays['pdfs'][5, 5, 7] == 6 - 6 / 36)
assert (dh.cpu_arrays['pdfs'][5, 4, 3] == 4)
assert (dh.cpu_arrays['pdfs'][5, 4, 7] == 6)
# right side
assert (all(dh.cpu_arrays['pdfs'][5, 2:4, 3] == 4))
assert (all(dh.cpu_arrays['pdfs'][5, 2:4, 5] == 8))
assert (all(dh.cpu_arrays['pdfs'][5, 2:4, 7] == 6))
# right lower corner
assert (dh.cpu_arrays['pdfs'][5, 1, 3] == 4)
assert (dh.cpu_arrays['pdfs'][5, 1, 5] == 8)
assert (dh.cpu_arrays['pdfs'][5, 0, 5] == 8)
assert (dh.cpu_arrays['pdfs'][4, 0, 1] == 2)
assert (dh.cpu_arrays['pdfs'][4, 0, 5] == 8)
# lower side
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 4] == 3))
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 6] == 7))
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 8] == 5))
@pytest.mark.parametrize("given_normal", [True, False])
def test_free_slip(given_normal):
# check if Free slip BC is applied correctly
stencil = LBStencil(Stencil.D2Q9)
dh = create_data_handling(domain_size=(4, 4),)
src1 = dh.add_array('src1', values_per_cell=stencil.Q)
dh.fill('src1', 0.0, ghost_layers=True)
shape = dh.gather_array('src1', ghost_layers=True).shape
num = 0
for x in range(shape[0]):
for y in range(shape[1]):
for direction in range(shape[2]):
dh.cpu_arrays[src1.name][x, y, direction] = num
num += 1
method = create_lb_method(lbm_config=LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8))
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'src1', name="bh1")
if given_normal:
free_slipN = FreeSlip(stencil=stencil, normal_direction=(0, -1))
free_slipS = FreeSlip(stencil=stencil, normal_direction=(0, 1))
free_slipE = FreeSlip(stencil=stencil, normal_direction=(-1, 0))
free_slipW = FreeSlip(stencil=stencil, normal_direction=(1, 0))
bh.set_boundary(free_slipN, slice_from_direction('N', dh.dim))
bh.set_boundary(free_slipS, slice_from_direction('S', dh.dim))
bh.set_boundary(free_slipE, slice_from_direction('E', dh.dim))
bh.set_boundary(free_slipW, slice_from_direction('W', dh.dim))
else:
free_slip = FreeSlip(stencil=stencil)
bh.set_boundary(free_slip, slice_from_direction('N', dh.dim))
bh.set_boundary(free_slip, slice_from_direction('S', dh.dim))
bh.set_boundary(free_slip, slice_from_direction('E', dh.dim))
bh.set_boundary(free_slip, slice_from_direction('W', dh.dim))
bh()
mirrored_dirN = {6: 8, 1: 2, 5: 7}
mirrored_dirS = {7: 5, 2: 1, 8: 6}
mirrored_dirE = {6: 5, 4: 3, 8: 7}
mirrored_dirW = {5: 6, 3: 4, 7: 8}
# check North
assert dh.cpu_arrays[src1.name][1, -1, mirrored_dirN[6]] == dh.cpu_arrays[src1.name][1, -2, 6]
assert dh.cpu_arrays[src1.name][1, -1, mirrored_dirN[1]] == dh.cpu_arrays[src1.name][1, -2, 1]
for i in range(2, 4):
assert dh.cpu_arrays[src1.name][i, -1, mirrored_dirN[6]] == dh.cpu_arrays[src1.name][i, -2, 6]
assert dh.cpu_arrays[src1.name][i, -1, mirrored_dirN[1]] == dh.cpu_arrays[src1.name][i, -2, 1]
assert dh.cpu_arrays[src1.name][i, -1, mirrored_dirN[5]] == dh.cpu_arrays[src1.name][i, -2, 5]
assert dh.cpu_arrays[src1.name][4, -1, mirrored_dirN[1]] == dh.cpu_arrays[src1.name][4, -2, 1]
assert dh.cpu_arrays[src1.name][4, -1, mirrored_dirN[5]] == dh.cpu_arrays[src1.name][4, -2, 5]
# check East
assert dh.cpu_arrays[src1.name][-1, 1, mirrored_dirE[6]] == dh.cpu_arrays[src1.name][-2, 1, 6]
assert dh.cpu_arrays[src1.name][-1, 1, mirrored_dirE[4]] == dh.cpu_arrays[src1.name][-2, 1, 4]
for i in range(2, 4):
assert dh.cpu_arrays[src1.name][-1, i, mirrored_dirE[6]] == dh.cpu_arrays[src1.name][-2, i, 6]
assert dh.cpu_arrays[src1.name][-1, i, mirrored_dirE[4]] == dh.cpu_arrays[src1.name][-2, i, 4]
assert dh.cpu_arrays[src1.name][-1, i, mirrored_dirE[8]] == dh.cpu_arrays[src1.name][-2, i, 8]
assert dh.cpu_arrays[src1.name][-1, 4, mirrored_dirE[4]] == dh.cpu_arrays[src1.name][-2, 4, 4]
assert dh.cpu_arrays[src1.name][-1, 4, mirrored_dirE[8]] == dh.cpu_arrays[src1.name][-2, 4, 8]
# check South
assert dh.cpu_arrays[src1.name][1, 0, mirrored_dirS[8]] == dh.cpu_arrays[src1.name][1, 1, 8]
assert dh.cpu_arrays[src1.name][1, 0, mirrored_dirS[2]] == dh.cpu_arrays[src1.name][1, 1, 2]
for i in range(2, 4):
assert dh.cpu_arrays[src1.name][i, 0, mirrored_dirS[7]] == dh.cpu_arrays[src1.name][i, 1, 7]
assert dh.cpu_arrays[src1.name][i, 0, mirrored_dirS[2]] == dh.cpu_arrays[src1.name][i, 1, 2]
assert dh.cpu_arrays[src1.name][i, 0, mirrored_dirS[8]] == dh.cpu_arrays[src1.name][i, 1, 8]
assert dh.cpu_arrays[src1.name][4, 0, mirrored_dirS[2]] == dh.cpu_arrays[src1.name][4, 1, 2]
assert dh.cpu_arrays[src1.name][4, 0, mirrored_dirS[7]] == dh.cpu_arrays[src1.name][4, 1, 7]
# check West
assert dh.cpu_arrays[src1.name][0, 1, mirrored_dirW[5]] == dh.cpu_arrays[src1.name][1, 1, 5]
assert dh.cpu_arrays[src1.name][0, 1, mirrored_dirW[3]] == dh.cpu_arrays[src1.name][1, 1, 3]
for i in range(2, 4):
assert dh.cpu_arrays[src1.name][0, i, mirrored_dirW[5]] == dh.cpu_arrays[src1.name][1, i, 5]
assert dh.cpu_arrays[src1.name][0, i, mirrored_dirW[3]] == dh.cpu_arrays[src1.name][1, i, 3]
assert dh.cpu_arrays[src1.name][0, i, mirrored_dirW[7]] == dh.cpu_arrays[src1.name][1, i, 7]
assert dh.cpu_arrays[src1.name][0, 4, mirrored_dirW[3]] == dh.cpu_arrays[src1.name][1, 4, 3]
assert dh.cpu_arrays[src1.name][0, 4, mirrored_dirW[7]] == dh.cpu_arrays[src1.name][1, 4, 7]
if given_normal:
# check corners --> determined by the last boundary applied there.
# SouthWest --> West
assert dh.cpu_arrays[src1.name][0, 0, mirrored_dirW[5]] == dh.cpu_arrays[src1.name][1, 0, 5]
# NorthWest --> West
assert dh.cpu_arrays[src1.name][0, -1, mirrored_dirW[7]] == dh.cpu_arrays[src1.name][1, -1, 7]
# NorthEast --> East
assert dh.cpu_arrays[src1.name][-1, -1, mirrored_dirE[8]] == dh.cpu_arrays[src1.name][-2, -1, 8]
# SouthEast --> East
assert dh.cpu_arrays[src1.name][-1, 0, mirrored_dirE[6]] == dh.cpu_arrays[src1.name][-2, 0, 6]
else:
# check corners --> this time the normals are calculated correctly in the corners
# SouthWest --> Normal = (1, 1); dir 7 --> 6
assert dh.cpu_arrays[src1.name][0, 0, 6] == dh.cpu_arrays[src1.name][1, 1, 7]
# NorthWest --> Normal = (1, -1); dir 8 --> 5
assert dh.cpu_arrays[src1.name][0, -1, 8] == dh.cpu_arrays[src1.name][1, -2, 5]
# NorthEast --> Normal = (-1, -1); dir 7 --> 6
assert dh.cpu_arrays[src1.name][-1, -1, 7] == dh.cpu_arrays[src1.name][-2, -2, 6]
# SouthEast --> Normal = (-1, 1); dir 5 --> 8
assert dh.cpu_arrays[src1.name][-1, 0, 5] == dh.cpu_arrays[src1.name][-2, 1, 8]
def test_free_slip_index_list():
stencil = LBStencil(Stencil.D2Q9)
dh = create_data_handling(domain_size=(4, 4), periodicity=(False, False))
src = dh.add_array('src', values_per_cell=len(stencil), alignment=True)
dh.fill('src', 0.0, ghost_layers=True)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8)
method = create_lb_method(lbm_config=lbm_config)
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'src', name="bh")
free_slip = FreeSlip(stencil=stencil)
add_box_boundary(bh, free_slip)
bh.prepare()
for b in dh.iterate():
for b_obj, idx_arr in b[bh._index_array_name].boundary_object_to_index_list.items():
index_array = idx_arr
# normal directions
normal_west = (1, 0)
normal_east = (-1, 0)
normal_south = (0, 1)
normal_north = (0, -1)
normal_south_west = (1, 1)
normal_north_west = (1, -1)
normal_south_east = (-1, 1)
normal_north_east = (-1, -1)
for cell in index_array:
direction = stencil[cell[2]]
inv_dir = inverse_direction(direction)
boundary_cell = (cell[0] + direction[0], cell[1] + direction[1])
normal = (cell[3], cell[4])
# the data is written on the inverse direction of the fluid cell near the boundary
# the data is read from the mirrored direction of the inverse direction where the mirror axis is the normal
assert cell[5] == stencil.index(mirror_stencil(list(inv_dir), normal))
if boundary_cell[0] == 0 and 0 < boundary_cell[1] < 5:
assert normal == normal_west
if boundary_cell[0] == 5 and 0 < boundary_cell[1] < 5:
assert normal == normal_east
if 0 < boundary_cell[0] < 5 and boundary_cell[1] == 0:
assert normal == normal_south
if 0 < boundary_cell[0] < 5 and boundary_cell[1] == 5:
assert normal == normal_north
if boundary_cell == (0, 0):
assert cell[2] == cell[5]
assert normal == normal_south_west
if boundary_cell == (5, 0):
assert cell[2] == cell[5]
assert normal == normal_south_east
if boundary_cell == (0, 5):
assert cell[2] == cell[5]
assert normal == normal_north_west
if boundary_cell == (5, 5):
assert cell[2] == cell[5]
assert normal == normal_north_east
def test_free_slip_index_list_convex_corner():
stencil = LBStencil(Stencil.D2Q9)
dh = create_data_handling(domain_size=(4, 4))
src = dh.add_array('src', values_per_cell=len(stencil))
dh.fill('src', 0.0, ghost_layers=True)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8)
method = create_lb_method(lbm_config=lbm_config)
def bh_callback(x, y):
radius = 2
x_mid = 2
y_mid = 2
return (x - x_mid) ** 2 + (y - y_mid) ** 2 > radius ** 2
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'src', name="bh")
free_slip = FreeSlip(stencil=stencil)
bh.set_boundary(free_slip, mask_callback=bh_callback)
bh.prepare()
for b in dh.iterate():
for b_obj, idx_arr in b[bh._index_array_name].boundary_object_to_index_list.items():
index_array = idx_arr
# correct index array for this case with convex corners
test = [(2, 1, 2, 0, 1, 2), (2, 1, 3, 1, 0, 3), (2, 1, 7, 1, 1, 7),
(2, 1, 8, 0, 1, 7), (3, 1, 2, 0, 1, 2), (3, 1, 4, -1, 0, 4),
(3, 1, 7, 0, 1, 8), (3, 1, 8, -1, 1, 8), (1, 2, 2, 0, 1, 2),
(1, 2, 3, 1, 0, 3), (1, 2, 5, 1, 0, 7), (1, 2, 7, 1, 1, 7),
(2, 2, 7, 1, 1, 7), (3, 2, 8, -1, 1, 8), (4, 2, 2, 0, 1, 2),
(4, 2, 4, -1, 0, 4), (4, 2, 6, -1, 0, 8), (4, 2, 8, -1, 1, 8),
(1, 3, 1, 0, -1, 1), (1, 3, 3, 1, 0, 3), (1, 3, 5, 1, -1, 5),
(1, 3, 7, 1, 0, 5), (2, 3, 5, 1, -1, 5), (3, 3, 6, -1, -1, 6),
(4, 3, 1, 0, -1, 1), (4, 3, 4, -1, 0, 4), (4, 3, 6, -1, -1, 6),
(4, 3, 8, -1, 0, 6), (2, 4, 1, 0, -1, 1), (2, 4, 3, 1, 0, 3),
(2, 4, 5, 1, -1, 5), (2, 4, 6, 0, -1, 5), (3, 4, 1, 0, -1, 1),
(3, 4, 4, -1, 0, 4), (3, 4, 5, 0, -1, 6), (3, 4, 6, -1, -1, 6)]
for i, cell in enumerate(index_array):
for j in range(len(cell)):
assert cell[j] == test[i][j]
def test_free_slip_equivalence():
# check if Free slip BC does the same if the normal direction is specified or not
stencil = LBStencil(Stencil.D2Q9)
dh = create_data_handling(domain_size=(4, 4), periodicity=(False, False))
src1 = dh.add_array('src1', values_per_cell=stencil.Q, alignment=True)
src2 = dh.add_array('src2', values_per_cell=stencil.Q, alignment=True)
dh.fill('src1', 0.0, ghost_layers=True)
dh.fill('src2', 0.0, ghost_layers=True)
shape = dh.gather_array('src1', ghost_layers=True).shape
num = 0
for x in range(shape[0]):
for y in range(shape[1]):
for direction in range(shape[2]):
dh.cpu_arrays['src1'][x, y, direction] = num
dh.cpu_arrays['src2'][x, y, direction] = num
num += 1
method = create_lb_method(lbm_config=LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8))
bh1 = LatticeBoltzmannBoundaryHandling(method, dh, 'src1', name="bh1")
free_slip1 = FreeSlip(stencil=stencil)
bh1.set_boundary(free_slip1, slice_from_direction('N', dh.dim))
bh2 = LatticeBoltzmannBoundaryHandling(method, dh, 'src2', name="bh2")
free_slip2 = FreeSlip(stencil=stencil, normal_direction=(0, -1))
bh2.set_boundary(free_slip2, slice_from_direction('N', dh.dim))
bh1()
bh2()
assert np.array_equal(dh.gather_array('src1'), dh.gather_array('src2'))
def test_exotic_boundaries():
step = LatticeBoltzmannStep((50, 50), relaxation_rate=1.8, compressible=False, zero_centered=True, periodicity=False)
add_box_boundary(step.boundary_handling, NeumannByCopy())
step.boundary_handling.set_boundary(StreamInConstant(0), make_slice[0, :])
step.run(100)
assert np.max(step.velocity[:, :, :]) < 1e-13
def test_boundary_utility_functions():
stencil = LBStencil(Stencil.D2Q9)
method = create_lb_method(lbm_config=LBMConfig(stencil=stencil))
noslip = NoSlip("noslip")
assert noslip == NoSlip("noslip")
assert not noslip == NoSlip("test")
assert not noslip == UBB((0, 0), name="ubb")
assert noslip.name == "noslip"
noslip.name = "test name setter"
assert noslip.name == "test name setter"
ubb = UBB((0, 0), name="ubb")
assert ubb == UBB((0, 0), name="ubb")
assert not noslip == UBB((0, 0), name="test")
assert not ubb == NoSlip("noslip")
simple_extrapolation = SimpleExtrapolationOutflow(normal_direction=stencil[4], stencil=stencil, name="simple")
assert simple_extrapolation == SimpleExtrapolationOutflow(normal_direction=stencil[4],
stencil=stencil, name="simple")
assert not simple_extrapolation == SimpleExtrapolationOutflow(normal_direction=stencil[4],
stencil=stencil, name="test")
assert not simple_extrapolation == NoSlip("noslip")
outflow = ExtrapolationOutflow(normal_direction=stencil[4], lb_method=method, name="outflow")
assert outflow == ExtrapolationOutflow(normal_direction=stencil[4], lb_method=method, name="outflow")
assert not outflow == ExtrapolationOutflow(normal_direction=stencil[4], lb_method=method, name="test")
assert not outflow == simple_extrapolation
density = FixedDensity(density=1.0, name="fixedDensity")
assert density == FixedDensity(density=1.0, name="fixedDensity")
assert not density == FixedDensity(density=1.0, name="test")
assert not density == UBB((0, 0), name="ubb")
diffusion = DiffusionDirichlet(concentration=1.0, name="diffusion")
assert diffusion == DiffusionDirichlet(concentration=1.0, name="diffusion")
assert not diffusion == DiffusionDirichlet(concentration=1.0, name="test")
assert not diffusion == density
neumann = NeumannByCopy(name="Neumann")
assert neumann == NeumannByCopy(name="Neumann")
assert not neumann == NeumannByCopy(name="test")
assert not neumann == diffusion
stream = StreamInConstant(constant=1.0, name="stream")
assert stream == StreamInConstant(constant=1.0, name="stream")
assert not stream == StreamInConstant(constant=1.0, name="test")
assert not stream == noslip
@pytest.mark.parametrize("given_force_vector", [True, False])
@pytest.mark.parametrize("dtype", ["float32", "float64"])
def test_force_on_boundary(given_force_vector, dtype):
stencil = LBStencil(Stencil.D2Q9)
pdfs = ps.fields(f"pdfs_src({stencil.Q}): {dtype}[{stencil.D}D]", layout='fzyx')
method = create_lb_method(lbm_config=LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8))
noslip = NoSlip(name="noslip", calculate_force_on_boundary=True)
bouzidi = NoSlipLinearBouzidi(name="bouzidi", calculate_force_on_boundary=True)
qq_bounce_Back = QuadraticBounceBack(name="qqBB", relaxation_rate=1.8, calculate_force_on_boundary=True)
boundary_objects = [noslip, bouzidi, qq_bounce_Back]
for boundary in boundary_objects:
if given_force_vector:
force_vector_type = np.dtype([(f"F_{i}", dtype) for i in range(stencil.D)], align=True)
force_vector = ps.Field('forceVector', ps.FieldType.INDEXED, force_vector_type, layout=[0],
shape=(ps.TypedSymbol("forceVectorSize", "int32"), 1), strides=(1, 1))
else:
force_vector = None
index_struct_dtype = _numpy_data_type_for_boundary_object(boundary, stencil.D)
index_field = ps.Field('indexVector', ps.FieldType.INDEXED, index_struct_dtype, layout=[0],
shape=(ps.TypedSymbol("indexVectorSize", "int32"), 1), strides=(1, 1))
create_lattice_boltzmann_boundary_kernel(pdfs, index_field, method, boundary, force_vector=force_vector)
def _numpy_data_type_for_boundary_object(boundary_object, dim):
boundary_index_array_coordinate_names = ["x", "y", "z"]
direction_member_name = "dir"
default_index_array_dtype = np.int32
coordinate_names = boundary_index_array_coordinate_names[:dim]
return np.dtype(
[(name, default_index_array_dtype) for name in coordinate_names]
+ [(direction_member_name, default_index_array_dtype)]
+ [(i[0], i[1].numpy_dtype) for i in boundary_object.additional_data],
align=True,)
import numpy as np
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.forcemodels import Luo
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.moments import moment_matrix, set_up_shift_matrix
from lbmpy.methods.creationfunctions import cascaded_moment_sets_literature
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.stencils import LBStencil
from lbmpy.moment_transforms import BinomialChimeraTransform
def test_central_moment_ldc():
sc_central_moment = create_lid_driven_cavity((20, 20), method=Method.CENTRAL_MOMENT,
relaxation_rate=1.8, equilibrium_order=4,
compressible=True, force=(-1e-10, 0))
sc_central_moment_3d = create_lid_driven_cavity((20, 20, 3), method=Method.CENTRAL_MOMENT,
relaxation_rate=1.8, equilibrium_order=4,
compressible=True, force=(-1e-10, 0, 0))
sc_central_moment.run(1000)
sc_central_moment_3d.run(1000)
assert np.isfinite(np.max(sc_central_moment.velocity[:, :]))
assert np.isfinite(np.max(sc_central_moment_3d.velocity[:, :, :]))
def test_central_moment_class():
stencil = LBStencil(Stencil.D2Q9)
lbm_config = LBMConfig(stencil=stencil, method=Method.CENTRAL_MOMENT, relaxation_rates=[1.2, 1.3, 1.4, 1.5],
equilibrium_order=4, compressible=True, zero_centered=True)
method = create_lb_method(lbm_config=lbm_config)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.2,
equilibrium_order=4, compressible=True, zero_centered=True)
srt = create_lb_method(lbm_config=lbm_config)
rho = method.zeroth_order_equilibrium_moment_symbol
u = method.first_order_equilibrium_moment_symbols
cs_sq = sp.Rational(1, 3)
force_model = Luo(force=sp.symbols(f"F_:{2}"))
eq = (rho, 0, 0, 0, 0, 2 * rho * cs_sq, 0, 0, rho * cs_sq ** 2)
default_moments = cascaded_moment_sets_literature(stencil)
default_moments = [item for sublist in default_moments for item in sublist]
assert method.central_moment_transform_class == BinomialChimeraTransform
assert method.conserved_quantity_computation.density_symbol == rho
assert method.conserved_quantity_computation.velocity_symbols == u
assert method.moment_equilibrium_values == eq
assert method.force_model is None
method.set_force_model(force_model)
assert method.force_model == force_model
assert method.relaxation_matrix[0, 0] == 0
assert method.relaxation_matrix[1, 1] == 0
assert method.relaxation_matrix[2, 2] == 0
method.set_conserved_moments_relaxation_rate(1.9)
assert method.relaxation_matrix[0, 0] == 1.9
assert method.relaxation_matrix[1, 1] == 1.9
assert method.relaxation_matrix[2, 2] == 1.9
moments = list()
for i in method.relaxation_info_dict:
moments.append(i)
assert moments == default_moments
for i in range(len(stencil)):
assert method.relaxation_rates[i] == method.relaxation_matrix[i, i]
M = method.moment_matrix
N = method.shift_matrix
assert M == moment_matrix(moments, stencil=stencil)
assert N == set_up_shift_matrix(moments, stencil=stencil)
assert get_weights(stencil) == method.weights
cqc = method.conserved_quantity_computation
subs = {cqc.density_deviation_symbol : cqc.density_symbol - cqc.background_density}
eq_terms_central = method.get_equilibrium_terms()
eq_terms_srt = srt.get_equilibrium_terms()
assert (eq_terms_central - eq_terms_srt).subs(subs).expand() == sp.Matrix((0,) * stencil.Q)
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9), method=Method.CENTRAL_MOMENT,
relaxation_rates=[1.7, 1.8, 1.2, 1.3, 1.4], equilibrium_order=4,
compressible=True))
assert method.relaxation_matrix[0, 0] == 0
assert method.relaxation_matrix[1, 1] == 0
assert method.relaxation_matrix[2, 2] == 0
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9), method=Method.CENTRAL_MOMENT,
relaxation_rates=[1.3] * 9, equilibrium_order=4,
compressible=True))
np.testing.assert_almost_equal(sum(method.relaxation_rates), 1.3 * 9)
import pytest
import sympy as sp
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_central_moment
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.moment_transforms import PdfsToMomentsByChimeraTransform
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
def test_full_and_delta_equilibrium_equivalence(stencil, compressible):
stencil = LBStencil(stencil)
zero_centered = True
omega = sp.Symbol('omega')
r_rates = (omega,) * stencil.Q
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = {delta_rho: rho - rho_background}
eqs = []
for delta_eq in [False, True]:
method = create_central_moment(stencil, r_rates, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq, equilibrium_order=None)
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == sp.Matrix((0,) * stencil.Q)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('delta_eq', [True, False])
def test_zero_centering_equilibrium_equivalence(stencil, compressible, delta_eq):
stencil = LBStencil(stencil)
omega = sp.Symbol('omega')
r_rates = (omega,) * stencil.Q
weights = sp.Matrix(get_weights(stencil))
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = {delta_rho: rho - rho_background}
eqs = []
for zero_centered in [False, True]:
method = create_central_moment(stencil, r_rates, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq and zero_centered,
equilibrium_order=None)
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == weights
import sympy as sp
from pystencils.stencil import have_same_entries
from lbmpy.moments import get_default_moment_set_for_stencil
from lbmpy.methods.creationfunctions import cascaded_moment_sets_literature
import pytest
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.moment_transforms import (
PdfsToCentralMomentsByMatrix, FastCentralMomentTransform,
BinomialChimeraTransform, PdfsToCentralMomentsByShiftMatrix)
@pytest.mark.parametrize('central_moments', ['monomial', 'polynomial'])
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('transform_class', [BinomialChimeraTransform, FastCentralMomentTransform, PdfsToCentralMomentsByShiftMatrix])
def test_forward_transform(central_moments, stencil, transform_class):
stencil = LBStencil(stencil)
if central_moments == 'monomial':
moment_polynomials = get_default_moment_set_for_stencil(stencil)
elif central_moments == 'polynomial':
moment_polynomials = [item for sublist in cascaded_moment_sets_literature(stencil)
for item in sublist]
pdfs = sp.symbols(f"f_:{stencil.Q}")
rho = sp.Symbol('rho')
u = sp.symbols(f"u_:{stencil.D}")
matrix_transform = PdfsToCentralMomentsByMatrix(stencil, moment_polynomials, rho, u)
test_transform = transform_class(stencil, moment_polynomials, rho, u)
if central_moments == 'monomial' and not have_same_entries(stencil, LBStencil(Stencil.D3Q15)):
assert test_transform.mono_to_poly_matrix == sp.eye(stencil.Q)
else:
assert not test_transform.mono_to_poly_matrix == sp.eye(stencil.Q)
f_to_k_matrix = matrix_transform.forward_transform(pdfs)
f_to_k_matrix = f_to_k_matrix.new_without_subexpressions().main_assignments_dict
f_to_k_test = test_transform.forward_transform(pdfs)
f_to_k_test = f_to_k_test.new_without_subexpressions().main_assignments_dict
cm_symbols = matrix_transform.pre_collision_symbols
for moment_symbol in cm_symbols:
rhs_matrix = f_to_k_matrix[moment_symbol].expand()
rhs_test = f_to_k_test[moment_symbol].expand()
assert (rhs_matrix - rhs_test) == 0, \
f"Mismatch between matrix transform and {transform_class.__name__} at {moment_symbol}."
@pytest.mark.parametrize('central_moments', ['monomial', 'polynomial'])
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('transform_class', [BinomialChimeraTransform, FastCentralMomentTransform, PdfsToCentralMomentsByShiftMatrix])
def test_backward_transform(central_moments, stencil, transform_class):
stencil = LBStencil(stencil)
if central_moments == 'monomial':
moment_polynomials = get_default_moment_set_for_stencil(stencil)
elif central_moments == 'polynomial':
moment_polynomials = [item for sublist in cascaded_moment_sets_literature(stencil)
for item in sublist]
pdfs = sp.symbols(f"f_:{stencil.Q}")
rho = sp.Symbol('rho')
u = sp.symbols(f"u_:{stencil.D}")
matrix_transform = PdfsToCentralMomentsByMatrix(stencil, moment_polynomials, rho, u)
test_transform = transform_class(stencil, moment_polynomials, rho, u)
k_to_f_matrix = matrix_transform.backward_transform(pdfs)
k_to_f_matrix = k_to_f_matrix.new_without_subexpressions().main_assignments_dict
k_to_f_test = test_transform.backward_transform(pdfs)
k_to_f_test = k_to_f_test.new_without_subexpressions().main_assignments_dict
for f in pdfs:
rhs_matrix = k_to_f_matrix[f].expand()
rhs_test = k_to_f_test[f].expand()
assert (rhs_matrix - rhs_test) == 0, \
f"Mismatch between matrix transform and {transform_class.__name__} at {f}."
......@@ -10,31 +10,36 @@ from lbmpy.chapman_enskog.chapman_enskog_higher_order import (
determine_higher_order_moments, get_solvability_conditions)
from lbmpy.chapman_enskog.chapman_enskog_steady_state import (
SteadyStateChapmanEnskogAnalysis, SteadyStateChapmanEnskogAnalysisSRT)
from lbmpy.creationfunctions import create_lb_method
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.forcemodels import Guo
from lbmpy.relaxationrates import lattice_viscosity_from_relaxation_rate
from lbmpy.stencils import LBStencil
from pystencils.fd import Diff, normalize_diff_order
from pystencils.sympyextensions import multidimensional_sum
def test_srt():
for stencil in ['D2Q9', 'D3Q19', 'D3Q27']:
for continuous_eq in (False, True):
omega = sp.Symbol("omega")
print("Analysing %s, ContMaxwellianConstruction %d" % (stencil, continuous_eq))
method = create_lb_method(method='srt', stencil=stencil, compressible=True,
relaxation_rate=omega, maxwellian_moments=continuous_eq)
analysis = ChapmanEnskogAnalysis(method)
omega_value = analysis.relaxation_rate_from_kinematic_viscosity(1)[omega]
assert omega_value, sp.Rational(2 == 7)
@pytest.mark.parametrize('continuous_eq', [False, True])
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
def test_srt(continuous_eq, stencil):
omega = sp.Symbol("omega")
print(f"Analysing {stencil}, ContMaxwellianConstruction {continuous_eq}")
lbm_config = LBMConfig(stencil=LBStencil(stencil), method=Method.SRT, compressible=True,
zero_centered=False, relaxation_rate=omega, continuous_equilibrium=continuous_eq)
method = create_lb_method(lbm_config=lbm_config)
analysis = ChapmanEnskogAnalysis(method)
omega_value = analysis.relaxation_rate_from_kinematic_viscosity(1)[omega]
assert omega_value, sp.Rational(2 == 7)
@pytest.mark.longrun
def test_steady_state_silva_paper_comparison():
eps, tau, lambda_plus, f = sp.symbols("epsilon tau Lambda f")
method = create_lb_method(stencil="D3Q19", compressible=False, relaxation_rate=1 / tau,
maxwellian_moments=False)
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q19), compressible=False, relaxation_rate=1 / tau,
continuous_equilibrium=False, zero_centered=False)
method = create_lb_method(lbm_config=lbm_config)
analysis = SteadyStateChapmanEnskogAnalysis(method)
dim = 3
......@@ -142,7 +147,8 @@ def test_steady_state_silva_paper_comparison():
def test_higher_order_moment_computation():
"""In chapman_enskog_higher_order.py there are some functions to generalize the std Chapman Enskog expansion
These are not used by the Chapman Enskog class yet."""
method = create_lb_method(stencil='D2Q9', method='trt', maxwellian_moments=False)
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9), zero_centered=False,
method=Method.TRT, compressible=False))
mom_comp = LbMethodEqMoments(method)
dim = method.dim
order = 2
......@@ -168,7 +174,8 @@ def test_higher_order_moment_computation():
def test_steady_state():
rr = sp.symbols("omega")
method = create_lb_method(stencil='D2Q9', method='srt', relaxation_rate=rr)
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9),
method=Method.SRT, relaxation_rate=rr))
a1 = SteadyStateChapmanEnskogAnalysis(method, order=2)
a2 = SteadyStateChapmanEnskogAnalysisSRT(method, order=2)
......
%% Cell type:code id: tags:
``` python
import pystencils as ps
from lbmpy.session import *
from lbmpy.boundaries.boundaries_in_kernel import update_rule_with_push_boundaries
from lbmpy.macroscopic_value_kernels import macroscopic_values_getter, macroscopic_values_setter
from collections import OrderedDict
from time import perf_counter
```
%% Cell type:markdown id: tags:
# Version 1: compile-in boundaries
%% Cell type:code id: tags:
``` python
domain_size = (32, 32, 32)
relaxation_rate = 1.8
time_steps = 100
lid_velocity = 0.05
stencil = LBStencil(Stencil.D3Q19)
```
%% Cell type:code id: tags:
``` python
dh = create_data_handling(domain_size, default_target=ps.Target.CPU)
pdfs = dh.add_array('pdfs', values_per_cell=stencil.Q)
u = dh.add_array('u', values_per_cell=stencil.D)
streaming_pattern = 'aa'
```
%% Cell type:code id: tags:
``` python
boundaries = OrderedDict((
((0, 1, 0), UBB([lid_velocity, 0, 0])),
((1, 0, 0), NoSlip()),
((-1, 0, 0), NoSlip()),
((0, -1, 0), NoSlip()),
((0, 0, 1), NoSlip()),
((0, 0, -1), NoSlip()),
))
lbm_opt = LBMOptimisation(symbolic_field=pdfs, cse_global=False, cse_pdfs=False)
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=relaxation_rate, compressible=False)
cr_even = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
cr_odd = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
update_rule_aa_even = update_rule_with_push_boundaries(cr_even, pdfs, boundaries, streaming_pattern, Timestep.EVEN)
update_rule_aa_odd = update_rule_with_push_boundaries(cr_odd, pdfs, boundaries, streaming_pattern, Timestep.ODD)
getter_assignments = macroscopic_values_getter(update_rule_aa_even.method, velocity=u.center_vector,
pdfs=pdfs, density=None,
streaming_pattern=streaming_pattern,
previous_timestep=Timestep.EVEN)
config = ps.CreateKernelConfig(target=dh.default_target)
getter_kernel = ps.create_kernel(getter_assignments, config=config).compile()
even_kernel = ps.create_kernel(update_rule_aa_even, config=config).compile()
odd_kernel = ps.create_kernel(update_rule_aa_odd, config=config).compile()
```
%% Cell type:code id: tags:
``` python
def init():
dh.fill(pdfs.name, 0, ghost_layers=True)
def aa_time_loop(steps=100):
assert steps % 2 == 0, "Works only for an even number of time steps"
dh.all_to_gpu()
for i in range(steps // 2):
dh.run_kernel(odd_kernel)
dh.run_kernel(even_kernel)
dh.run_kernel(getter_kernel)
dh.all_to_cpu()
```
%% Cell type:code id: tags:
``` python
init()
aa_time_loop(time_steps)
vel_version1 = dh.gather_array(u.name, ghost_layers=False).copy()
plt.vector_field_magnitude(vel_version1[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
<matplotlib.colorbar.Colorbar at 0x11b578710>
%% Cell type:markdown id: tags:
# Version 2: Normal boundary handling
%% Cell type:code id: tags:
``` python
ldc = create_lid_driven_cavity(domain_size, relaxation_rate=relaxation_rate, lid_velocity=lid_velocity)
ldc.run(time_steps)
vel_version2 = ldc.velocity[:, :, :, :]
plt.vector_field_magnitude(vel_version2[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
<matplotlib.colorbar.Colorbar at 0x12e912a90>
%% Cell type:code id: tags:
``` python
np.testing.assert_almost_equal(vel_version1[1:-1, 1:-1, :], vel_version2[1:-1, 1:-1, :], decimal=2)
```
"""
The update equations should not change if a relaxation rate of a conserved quantity (density/velocity)
changes. This test checks that for moment-based methods
"""
from copy import copy
import pytest
import sympy as sp
import math
from lbmpy.enums import Stencil, Method
from lbmpy.methods import create_srt, create_trt, create_trt_kbc, \
create_with_default_polynomial_cumulants
from lbmpy.methods.momentbased.momentbasedmethod import MomentBasedLbMethod
from lbmpy.methods.cumulantbased import CumulantBasedLbMethod
from lbmpy.moments import MOMENT_SYMBOLS
from lbmpy.simplificationfactory import create_simplification_strategy
from lbmpy.stencils import LBStencil
def __change_relaxation_rate_of_conserved_moments(method, new_relaxation_rate=sp.Symbol("test_omega")):
conserved_moments = (sp.Rational(1, 1),) + MOMENT_SYMBOLS[:method.dim]
rr_dict = copy(method.relaxation_rate_dict)
for conserved_moment in conserved_moments:
rr_dict[conserved_moment] = new_relaxation_rate
if isinstance(method, MomentBasedLbMethod):
changed_method = MomentBasedLbMethod(method.stencil, method.equilibrium_distribution, rr_dict,
method.conserved_quantity_computation,
force_model=method.force_model)
elif isinstance(method, CumulantBasedLbMethod):
changed_method = CumulantBasedLbMethod(method.stencil, method.equilibrium_distribution, rr_dict,
method.conserved_quantity_computation,
force_model=method.force_model,
zero_centered=True)
else:
raise ValueError("Not a moment or cumulant-based method")
return changed_method
def check_for_collision_rule_equivalence(collision_rule1, collision_rule2, use_numeric_subs=False):
collision_rule1 = collision_rule1.new_without_subexpressions()
collision_rule2 = collision_rule2.new_without_subexpressions()
if use_numeric_subs:
free_symbols = collision_rule1.free_symbols
free_symbols.update(collision_rule2.free_symbols)
subs_dict = dict()
value = 10.0
for symbol in free_symbols:
subs_dict.update({symbol: value})
value += 1.1
collision_rule1 = collision_rule1.subs(subs_dict)
collision_rule2 = collision_rule2.subs(subs_dict)
for eq1, eq2 in zip(collision_rule1.main_assignments, collision_rule2.main_assignments):
diff = sp.cancel(sp.expand(eq1.rhs - eq2.rhs))
if use_numeric_subs:
assert math.isclose(diff, 0, rel_tol=0.0, abs_tol=1e-10)
else:
assert diff == 0
def check_method_equivalence(m1, m2, do_simplifications, use_numeric_subs=False):
cr1 = m1.get_collision_rule()
cr2 = m2.get_collision_rule()
if do_simplifications:
cr1 = create_simplification_strategy(m1)(cr1)
cr2 = create_simplification_strategy(m2)(cr2)
check_for_collision_rule_equivalence(cr1, cr2, use_numeric_subs)
@pytest.mark.longrun
def test_cumulant():
stencil = LBStencil(Stencil.D2Q9)
original_method = create_with_default_polynomial_cumulants(stencil, [sp.Symbol("omega")], zero_centered=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True, use_numeric_subs=True)
check_method_equivalence(original_method, changed_method, False, use_numeric_subs=True)
@pytest.mark.longrun
def test_srt():
stencil = LBStencil(Stencil.D3Q27)
original_method = create_srt(stencil, sp.Symbol("omega"), compressible=True,
continuous_equilibrium=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True, use_numeric_subs=True)
check_method_equivalence(original_method, changed_method, False, use_numeric_subs=True)
def test_srt_short():
stencil = LBStencil(Stencil.D2Q9)
original_method = create_srt(stencil, sp.Symbol("omega"), compressible=True,
continuous_equilibrium=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True, use_numeric_subs=False)
check_method_equivalence(original_method, changed_method, False, use_numeric_subs=False)
@pytest.mark.parametrize('stencil_name', [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('continuous_moments', [False, True])
@pytest.mark.longrun
def test_trt(stencil_name, continuous_moments):
stencil = LBStencil(stencil_name)
original_method = create_trt(stencil, sp.Symbol("omega1"), sp.Symbol("omega2"),
continuous_equilibrium=continuous_moments)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
@pytest.mark.parametrize('method_name', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
def test_trt_kbc(method_name):
dim = 2
method_nr = method_name.name[-1]
original_method = create_trt_kbc(dim, sp.Symbol("omega1"), sp.Symbol("omega2"),
method_name='KBC-N' + method_nr,
continuous_equilibrium=False)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
@pytest.mark.parametrize('method_name', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
@pytest.mark.longrun
def test_trt_kbc_long(method_name):
dim = 3
method_nr = method_name.name[-1]
original_method = create_trt_kbc(dim, sp.Symbol("omega1"), sp.Symbol("omega2"),
method_name='KBC-N' + method_nr,
continuous_equilibrium=False)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)