Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 1714 additions and 68 deletions
from dataclasses import replace
import numpy as np
import pytest
from pystencils import Backend, Target, CreateKernelConfig
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.scenarios import create_channel
from lbmpy.stencils import LBStencil
def run_equivalence_test(domain_size, lbm_config, lbm_opt, config, time_steps=13):
config = replace(config, target=Target.CPU)
cpu_scenario = create_channel(domain_size=domain_size, pressure_difference=0.001,
lbm_config=lbm_config, lbm_optimisation=lbm_opt, config=config)
config = replace(config, target=Target.GPU, backend=Backend.CUDA)
gpu_scenario = create_channel(domain_size=domain_size, pressure_difference=0.001,
lbm_config=lbm_config, lbm_optimisation=lbm_opt, config=config)
cpu_scenario.run(time_steps)
gpu_scenario.run(time_steps)
max_vel_error = np.max(np.abs(cpu_scenario.velocity_slice() - gpu_scenario.velocity_slice()))
max_rho_error = np.max(np.abs(cpu_scenario.density_slice() - gpu_scenario.density_slice()))
np.testing.assert_allclose(max_vel_error, 0, atol=1e-14)
np.testing.assert_allclose(max_rho_error, 0, atol=1e-14)
@pytest.mark.parametrize('scenario', [((17, 12), Method.SRT, False, (12, 4), 'reverse_numpy'),
((18, 20), Method.MRT, True, (4, 2), 'zyxf'),
((7, 11, 18), Method.TRT, False, False, 'numpy')])
def test_force_driven_channel_short(scenario):
pytest.importorskip("cupy")
ds = scenario[0]
stencil = LBStencil(Stencil.D2Q9) if len(ds) == 2 else LBStencil(Stencil.D3Q27)
method = scenario[1]
compressible = scenario[2]
block_size = scenario[3]
field_layout = scenario[4]
lbm_config = LBMConfig(stencil=stencil, method=method,
compressible=compressible, relaxation_rates=[1.95, 1.9, 1.92, 1.92])
lbm_opt = LBMOptimisation(field_layout=field_layout)
# Different methods
if block_size is not False:
config = CreateKernelConfig(gpu_indexing_params={'block_size': block_size})
else:
config = CreateKernelConfig(gpu_indexing='line')
run_equivalence_test(domain_size=ds, lbm_config=lbm_config, lbm_opt=lbm_opt, config=config)
import pytest
import sympy as sp
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_with_monomial_cumulants
from lbmpy.maxwellian_equilibrium import get_weights
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
def test_zero_centering_equilibrium_equivalence(stencil):
stencil = LBStencil(stencil)
omega = sp.Symbol('omega')
r_rates = (omega,) * stencil.Q
weights = sp.Matrix(get_weights(stencil))
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = {delta_rho: rho - rho_background}
eqs = []
for zero_centered in [False, True]:
method = create_with_monomial_cumulants(stencil, r_rates, zero_centered=zero_centered)
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == weights
import pytest
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.methods import create_srt
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_with_default_polynomial_cumulants
@pytest.mark.parametrize('stencil_name', [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
def test_weights(stencil_name):
stencil = LBStencil(stencil_name)
cumulant_method = create_with_default_polynomial_cumulants(stencil, [1])
moment_method = create_srt(stencil, 1, compressible=True, continuous_equilibrium=True)
assert cumulant_method.weights == moment_method.weights
import pytest
from itertools import chain
import sympy as sp
from pystencils import AssignmentCollection
from lbmpy.moment_transforms import (
CentralMomentsToCumulantsByGeneratingFunc, PRE_COLLISION_MONOMIAL_CENTRAL_MOMENT,
PRE_COLLISION_CUMULANT
)
from lbmpy.methods import cascaded_moment_sets_literature
from lbmpy.stencils import Stencil, LBStencil
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19])
def test_identity(stencil):
stencil = LBStencil(stencil)
polys = list(chain.from_iterable(cascaded_moment_sets_literature(stencil)))
rho = sp.Symbol('rho')
u = sp.symbols('u_:2')
transform = CentralMomentsToCumulantsByGeneratingFunc(stencil, polys, rho, u,
post_collision_symbol_base=PRE_COLLISION_CUMULANT)
forward_eqs = transform.forward_transform()
backward_eqs = transform.backward_transform(central_moment_base=PRE_COLLISION_MONOMIAL_CENTRAL_MOMENT)
subexpressions = forward_eqs.all_assignments + backward_eqs.subexpressions
main_assignments = backward_eqs.main_assignments
ac = AssignmentCollection(main_assignments, subexpressions=subexpressions)
ac = ac.new_without_subexpressions()
for lhs, rhs in ac.main_assignments_dict.items():
assert (lhs - rhs).expand() == 0
from lbmpy.creationfunctions import create_lb_method
from lbmpy.cumulants import *
from lbmpy.moments import (
discrete_moment, exponent_to_polynomial_representation, exponents_to_polynomial_representations)
from lbmpy.stencils import get_stencil
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
def test_cumulants_from_pdfs():
"""
......@@ -11,11 +10,10 @@ def test_cumulants_from_pdfs():
- directly pdfs to cumulant
- indirect pdfs -> raw moments -> cumulants
"""
stencil = get_stencil("D2Q9")
dim = len(stencil[0])
indices = moments_up_to_component_order(2, dim=dim)
stencil = LBStencil(Stencil.D2Q9)
indices = moments_up_to_component_order(2, dim=stencil.D)
pdf_symbols = sp.symbols("f_:%d" % (len(stencil),))
pdf_symbols = sp.symbols(f"f_:{stencil.Q}")
direct_version = cumulants_from_pdfs(stencil, pdf_symbols=pdf_symbols, cumulant_indices=indices)
polynomial_moment_indices = exponents_to_polynomial_representations(indices)
direct_version2 = cumulants_from_pdfs(stencil, pdf_symbols=pdf_symbols, cumulant_indices=polynomial_moment_indices)
......@@ -32,11 +30,10 @@ def test_cumulants_from_pdfs():
def test_raw_moment_to_cumulant_transformation():
"""Transforms from raw moments to cumulants and back, then checks for identity"""
for stencil in [get_stencil("D2Q9"), get_stencil("D3Q27")]:
dim = len(stencil[0])
indices = moments_up_to_component_order(2, dim=dim)
for stencil in [LBStencil(Stencil.D2Q9), LBStencil(Stencil.D3Q27)]:
indices = moments_up_to_component_order(2, dim=stencil.D)
symbol_format = "m_%d_%d_%d" if dim == 3 else "m_%d_%d"
symbol_format = "m_%d_%d_%d" if stencil.D == 3 else "m_%d_%d"
moment_symbols = {idx: sp.Symbol(symbol_format % idx) for idx in indices}
forward = {idx: cumulant_as_function_of_raw_moments(idx, moments_dict=moment_symbols)
......@@ -48,11 +45,10 @@ def test_raw_moment_to_cumulant_transformation():
def test_central_moment_to_cumulant_transformation():
"""Transforms from central moments to cumulants and back, then checks for identity"""
for stencil in [get_stencil("D2Q9"), get_stencil("D3Q27")]:
dim = len(stencil[0])
indices = moments_up_to_component_order(2, dim=dim)
for stencil in [LBStencil(Stencil.D2Q9), LBStencil(Stencil.D3Q27)]:
indices = moments_up_to_component_order(2, dim=stencil.D)
symbol_format = "m_%d_%d_%d" if dim == 3 else "m_%d_%d"
symbol_format = "m_%d_%d_%d" if stencil.D == 3 else "m_%d_%d"
moment_symbols = {idx: sp.Symbol(symbol_format % idx) for idx in indices}
forward = {idx: cumulant_as_function_of_central_moments(idx, moments_dict=moment_symbols)
......@@ -65,29 +61,6 @@ def test_central_moment_to_cumulant_transformation():
assert backward[idx] == moment_symbols[idx]
def test_collision_rule():
cumulant_method = create_lb_method(stencil='D2Q9', compressible=True, cumulant=True, relaxation_rate=1.5)
cumulant_method.set_first_moment_relaxation_rate(1.5)
assert cumulant_method.force_model is None
assert cumulant_method.zeroth_order_equilibrium_moment_symbol == sp.symbols("rho")
assert cumulant_method.first_order_equilibrium_moment_symbols == sp.symbols("u_:2")
assert all(e.relaxation_rate == 1.5 for e in cumulant_method.relaxation_info_dict.values())
cumulant_method.get_equilibrium_terms()
cr1 = cumulant_method.get_collision_rule(moment_subexpressions=True, post_collision_subexpressions=True,
pre_collision_subexpressions=True)
cr2 = cumulant_method.get_collision_rule(moment_subexpressions=False, post_collision_subexpressions=False,
pre_collision_subexpressions=False)
cr1_inserted = cr1.new_without_subexpressions()
cr2_inserted = cr2.new_without_subexpressions()
t = cr1_inserted.main_assignments[8].rhs - cr2_inserted.main_assignments[8].rhs
assert t == 0
html = cumulant_method._repr_html_()
assert 'Cumulant' in html
def test_cumulants_from_pdf():
res = cumulants_from_pdfs(get_stencil("D2Q9"))
assert res[(0, 0)] == sp.log(sum(sp.symbols("f_:9")))
res = cumulants_from_pdfs(LBStencil(Stencil.D2Q9))
assert res[(0, 0)] == sp.log(sum(sp.symbols(f"f_:{9}")))
import pytest
import numpy as np
from numpy.testing import assert_allclose
import pystencils as ps
import sympy as sp
from lbmpy.enums import Stencil, Method
from lbmpy.stencils import LBStencil
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation, create_lb_function
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter, macroscopic_values_getter
@pytest.mark.parametrize('zero_centered', [False, True])
def test_density_velocity_input(zero_centered):
stencil = LBStencil(Stencil.D2Q9)
dh = ps.create_data_handling((5,5), default_target=ps.Target.CPU)
rho_in = dh.add_array("rho_in", 1)
rho_out = dh.add_array_like("rho_out", "rho_in")
u_in = dh.add_array("u_in", 2)
u_out = dh.add_array_like("u_out", "u_in")
pdfs = dh.add_array("pdfs", stencil.Q)
lb_config = LBMConfig(stencil=Stencil.D2Q9, method=Method.SRT, zero_centered=zero_centered,
relaxation_rate=sp.Integer(1),
density_input=rho_in.center, velocity_input=u_in.center_vector,
kernel_type="collide_only")
lb_opt = LBMOptimisation(symbolic_field=pdfs)
lb_func = create_lb_function(lbm_config=lb_config, lbm_optimisation=lb_opt)
setter = macroscopic_values_setter(lb_func.method, 1, (0, 0), pdfs.center_vector)
setter_kernel = ps.create_kernel(setter).compile()
getter = macroscopic_values_getter(lb_func.method, rho_out.center, u_out.center_vector, pdfs.center_vector)
getter_kernel = ps.create_kernel(getter).compile()
dh.run_kernel(setter_kernel)
dh.cpu_arrays[rho_in.name][1:-1, 1:-1] = 1.0 + 0.1 * np.random.random_sample((5, 5))
dh.cpu_arrays[u_in.name][1:-1, 1:-1] = 0.05 + 0.01 * np.random.random_sample((5, 5, 2))
dh.run_kernel(lb_func)
dh.run_kernel(getter_kernel)
assert_allclose(dh.cpu_arrays[rho_out.name][1:-1, 1:-1], dh.cpu_arrays[rho_in.name][1:-1, 1:-1])
assert_allclose(dh.cpu_arrays[u_out.name][1:-1, 1:-1], dh.cpu_arrays[u_in.name][1:-1, 1:-1])
import math
import pytest
from dataclasses import replace
import pystencils as ps
from pystencils.slicing import slice_from_direction
from lbmpy import pdf_initialization_assignments
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries.boundaryconditions import DiffusionDirichlet, NeumannByCopy
from lbmpy.geometry import add_box_boundary
from lbmpy.stencils import LBStencil
from lbmpy.maxwellian_equilibrium import get_weights
import sympy as sp
import numpy as np
def test_diffusion_boundary():
domain_size = (10, 10)
stencil = LBStencil(Stencil.D2Q9)
weights = get_weights(stencil, c_s_sq=sp.Rational(1, 3))
concentration = 1.0
# Data Handling
dh = ps.create_data_handling(domain_size=domain_size)
dh.add_array('pdfs', values_per_cell=stencil.Q)
dh.fill("pdfs", 0.0, ghost_layers=True)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8,
compressible=True, zero_centered=False)
method = create_lb_method(lbm_config=lbm_config)
# Boundary Handling
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'pdfs', name="bh")
add_box_boundary(bh, boundary=DiffusionDirichlet(concentration))
bh()
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[0, 1:-2, 4] - 2 * weights[4]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[0, 1:-2, 6] - 2 * weights[6]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[0, 2:-1, 8] - 2 * weights[8]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[-1, 1:-2, 3] - 2 * weights[3]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[-1, 1:-2, 5] - 2 * weights[5]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[-1, 2:-1, 7] - 2 * weights[7]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[1:-2, 0, 1] - 2 * weights[1]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[2:, 0, 5] - 2 * weights[5]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[:-2, 0, 6] - 2 * weights[6]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[1:-2, -1, 2] - 2 * weights[2]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[2:, -1, 7] - 2 * weights[7]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[:-2, -1, 8] - 2 * weights[8]) < 1e-14)
@pytest.mark.longrun
def test_diffusion():
"""
Runs the "Diffusion from Plate in Uniform Flow" benchmark as it is described
in [ch. 8.6.3, The Lattice Boltzmann Method, Krüger et al.].
dC/dy = 0
┌───────────────┐
│ → → → │
C = 0 │ → u → │ dC/dx = 0
│ → → → │
└───────────────┘
C = 1
The analytical solution is given by:
C(x,y) = 1 * erfc(y / sqrt(4Dx/u))
The hydrodynamic field is not simulated, instead a constant velocity is assumed.
"""
pytest.importorskip("cupy")
# Parameters
domain_size = (1600, 160)
omega = 1.38
diffusion = (1 / omega - 0.5) / 3
velocity = 0.05
time_steps = 50000
stencil = LBStencil(Stencil.D2Q9)
target = ps.Target.GPU
# Data Handling
dh = ps.create_data_handling(domain_size=domain_size, default_target=target)
vel_field = dh.add_array('vel_field', values_per_cell=stencil.D)
dh.fill('vel_field', velocity, 0, ghost_layers=True)
dh.fill('vel_field', 0.0, 1, ghost_layers=True)
con_field = dh.add_array('con_field', values_per_cell=1)
dh.fill('con_field', 0.0, ghost_layers=True)
pdfs = dh.add_array('pdfs', values_per_cell=stencil.Q)
dh.fill('pdfs', 0.0, ghost_layers=True)
pdfs_tmp = dh.add_array('pdfs_tmp', values_per_cell=stencil.Q)
dh.fill('pdfs_tmp', 0.0, ghost_layers=True)
# Lattice Boltzmann method
lbm_config = LBMConfig(stencil=stencil, method=Method.MRT, relaxation_rates=[1, 1.5, 1, 1.5, 1],
zero_centered=False,
velocity_input=vel_field, output={'density': con_field}, compressible=True,
weighted=True, kernel_type='stream_pull_collide')
lbm_opt = LBMOptimisation(symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp)
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=True)
method = create_lb_method(lbm_config=lbm_config)
method.set_conserved_moments_relaxation_rate(omega)
lbm_config = replace(lbm_config, lb_method=method)
update_rule = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
kernel = ps.create_kernel(update_rule, config=config).compile()
# PDF initalization
init = pdf_initialization_assignments(method, con_field.center,
vel_field.center_vector, pdfs.center_vector)
dh.run_kernel(ps.create_kernel(init).compile())
dh.all_to_gpu()
# Boundary Handling
bh = LatticeBoltzmannBoundaryHandling(update_rule.method, dh, 'pdfs', name="bh", target=dh.default_target)
add_box_boundary(bh, boundary=NeumannByCopy())
bh.set_boundary(DiffusionDirichlet(0), slice_from_direction('W', dh.dim))
bh.set_boundary(DiffusionDirichlet(1), slice_from_direction('S', dh.dim))
# Timeloop
for i in range(time_steps):
bh()
dh.run_kernel(kernel)
dh.swap("pdfs", "pdfs_tmp")
dh.all_to_cpu()
# Verification
x = np.arange(1, domain_size[0], 1)
y = np.arange(0, domain_size[1], 1)
X, Y = np.meshgrid(x, y)
analytical = np.zeros(domain_size)
analytical[1:, :] = np.vectorize(math.erfc)(Y / np.vectorize(math.sqrt)(4 * diffusion * X / velocity)).transpose()
simulated = dh.gather_array('con_field', ghost_layers=False)
residual = 0
for i in x:
for j in y:
residual += (simulated[i, j] - analytical[i, j]) ** 2
residual = math.sqrt(residual / (domain_size[0] * domain_size[1]))
assert residual < 1e-2
import platform
import numpy as np
import sympy as sp
import pytest
from lbmpy.enums import ForceModel, Method
from lbmpy.scenarios import create_lid_driven_cavity
@pytest.mark.parametrize('method', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
def test_entropic_methods(method):
if platform.system().lower() == 'windows' and method == Method.TRT_KBC_N4:
pytest.skip("For some reason this test does not run on windows", allow_module_level=True)
sc_kbc = create_lid_driven_cavity((20, 20), method=method,
relaxation_rates=[1.9999, sp.Symbol("omega_free")],
entropic_newton_iterations=3, entropic=True, compressible=True,
zero_centered=False, force=(-1e-10, 0), force_model=ForceModel.LUO)
sc_kbc.run(1000)
assert np.isfinite(np.max(sc_kbc.velocity[:, :]))
from pystencils.stencil import inverse_direction
from lbmpy.stencils import LBStencil
from lbmpy.advanced_streaming.utility import AccessPdfValues, get_timesteps
import pytest
import numpy as np
from pystencils import Target
from pystencils.datahandling import create_data_handling
from lbmpy.boundaries import LatticeBoltzmannBoundaryHandling, SimpleExtrapolationOutflow, ExtrapolationOutflow
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.advanced_streaming.utility import streaming_patterns
from pystencils.slicing import get_ghost_region_slice
from itertools import product
@pytest.mark.parametrize('stencil_enum', [Stencil.D2Q9, Stencil.D3Q27])
@pytest.mark.parametrize('streaming_pattern', streaming_patterns)
def test_pdf_simple_extrapolation(stencil_enum, streaming_pattern):
stencil = LBStencil(stencil_enum)
# Field contains exactly one fluid cell
domain_size = (3,) * stencil.D
for timestep in get_timesteps(streaming_pattern):
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(lbm_config=LBMConfig(stencil=stencil))
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name, streaming_pattern, target=Target.CPU)
# Set up outflows in all directions
for normal_dir in stencil[1:]:
boundary_obj = SimpleExtrapolationOutflow(normal_dir, stencil)
boundary_slice = get_ghost_region_slice(normal_dir)
bh.set_boundary(boundary_obj, boundary_slice)
pdf_arr = dh.cpu_arrays[pdf_field.name]
# Set up the domain with artificial PDF values
# center = (1,) * dim
out_access = AccessPdfValues(stencil, streaming_pattern, timestep, 'out')
for cell in product(*(range(1, 4) for _ in range(stencil.D))):
for q in range(stencil.Q):
out_access.write_pdf(pdf_arr, cell, q, q)
# Do boundary handling
bh(prev_timestep=timestep)
# Check PDF values
in_access = AccessPdfValues(stencil, streaming_pattern, timestep.next(), 'in')
# Inbound in center cell
for cell in product(*(range(1, 4) for _ in range(stencil.D))):
for q in range(stencil.Q):
f = in_access.read_pdf(pdf_arr, cell, q)
assert f == q
def test_extrapolation_outflow_initialization_by_copy():
stencil = LBStencil(Stencil.D2Q9)
domain_size = (1, 5)
streaming_pattern = 'esotwist'
zeroth_timestep = 'even'
pdf_acc = AccessPdfValues(stencil, streaming_pattern=streaming_pattern,
timestep=zeroth_timestep, streaming_dir='out')
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
pdf_arr = dh.cpu_arrays[pdf_field.name]
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name,
streaming_pattern=streaming_pattern, target=Target.CPU)
for y in range(1, 6):
for j in range(stencil.Q):
pdf_acc.write_pdf(pdf_arr, (1, y), j, j)
normal_dir = (1, 0)
outflow = ExtrapolationOutflow(normal_dir, lb_method, streaming_pattern=streaming_pattern,
zeroth_timestep=zeroth_timestep)
boundary_slice = get_ghost_region_slice(normal_dir)
bh.set_boundary(outflow, boundary_slice)
bh.prepare()
blocks = list(dh.iterate())
index_list = blocks[0][bh._index_array_name].boundary_object_to_index_list[outflow]
assert len(index_list) == 13
for entry in index_list:
direction = stencil[entry["dir"]]
inv_dir = stencil.index(inverse_direction(direction))
assert entry[f'pdf'] == inv_dir
assert entry[f'pdf_nd'] == inv_dir
def test_extrapolation_outflow_initialization_by_callback():
stencil = LBStencil(Stencil.D2Q9)
domain_size = (1, 5)
streaming_pattern = 'esotwist'
zeroth_timestep = 'even'
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name,
streaming_pattern=streaming_pattern, target=Target.CPU)
normal_dir = (1, 0)
outflow = ExtrapolationOutflow(normal_direction=normal_dir, lb_method=lb_method,
streaming_pattern=streaming_pattern,
zeroth_timestep=zeroth_timestep,
initial_density=lambda x, y: 1,
initial_velocity=lambda x, y: (0, 0))
boundary_slice = get_ghost_region_slice(normal_dir)
bh.set_boundary(outflow, boundary_slice)
bh.prepare()
weights = [w.evalf() for w in lb_method.weights]
blocks = list(dh.iterate())
index_list = blocks[0][bh._index_array_name].boundary_object_to_index_list[outflow]
assert len(index_list) == 13
for entry in index_list:
direction = stencil[entry["dir"]]
inv_dir = stencil.index(inverse_direction(direction))
assert entry[f'pdf_nd'] == weights[inv_dir]
......@@ -87,7 +87,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
......@@ -101,7 +101,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.9.7"
}
},
"nbformat": 4,
......
import pytest
import pystencils as ps
from lbmpy.creationfunctions import create_lb_function, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.stencils import LBStencil
@pytest.mark.parametrize('double_precision', [False, True])
@pytest.mark.parametrize('method_enum', [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
def test_creation(double_precision, method_enum):
"""Simple test that makes sure that only float variables are created"""
lbm_config = LBMConfig(method=method_enum, relaxation_rate=1.5, compressible=True)
config = ps.CreateKernelConfig(data_type="float64" if double_precision else "float32",
default_number_float="float64" if double_precision else "float32")
func = create_lb_function(lbm_config=lbm_config, config=config)
code = ps.get_code_str(func)
if double_precision:
assert 'float' not in code
assert 'double' in code
else:
assert 'double' not in code
assert 'float' in code
@pytest.mark.parametrize('double_precision', [False, True])
@pytest.mark.parametrize('method_enum', [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
def test_scenario(method_enum, double_precision):
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q27), method=method_enum,
relaxation_rate=1.45, compressible=True)
config = ps.CreateKernelConfig(data_type="float64" if double_precision else "float32",
default_number_float="float64" if double_precision else "float32")
sc = create_lid_driven_cavity((16, 16, 8), lbm_config=lbm_config, config=config)
sc.run(1)
code = ps.get_code_str(sc.ast)
if double_precision:
assert 'float' not in code
assert 'double' in code
else:
assert 'double' not in code
assert 'float' in code
"""Tests velocity and stress fluctuations for thermalized LB"""
import warnings
import pytest
import pystencils as ps
from pystencils import get_code_str
from pystencils.backends.simd_instruction_sets import (
get_supported_instruction_sets,
get_vector_instruction_set,
)
from pystencils.cpu.cpujit import get_compiler_config
from pystencils.enums import Target
from pystencils.rng import PhiloxTwoDoubles
from lbmpy.creationfunctions import *
from lbmpy.forcemodels import Guo
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
import numpy as np
from lbmpy.enums import Stencil
from lbmpy.moments import is_bulk_moment, is_shear_moment, get_order
from lbmpy.stencils import LBStencil
def _skip_instruction_sets_windows(instruction_sets):
if get_compiler_config()["os"] == "windows":
# skip instruction sets supported by the CPU but not by the compiler
if "avx" in instruction_sets and (
"/arch:avx2" not in get_compiler_config()["flags"].lower()
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx")
if (
"avx512" in instruction_sets
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx512")
return instruction_sets
def single_component_maxwell(x1, x2, kT, mass):
"""Integrate the probability density from x1 to x2 using the trapezoidal rule"""
x = np.linspace(x1, x2, 1000)
try:
trapezoid = np.trapezoid # since numpy 2.0
except AttributeError:
trapezoid = np.trapz
return trapezoid(np.exp(-mass * x**2 / (2.0 * kT)), x) / np.sqrt(
2.0 * np.pi * kT / mass
)
def rr_getter(moment_group):
"""Maps a group of moments to a relaxation rate (shear, bulk, even, odd)
in the 4 relaxation time thermalized LB model
"""
is_shear = [is_shear_moment(m, 3) for m in moment_group]
is_bulk = [is_bulk_moment(m, 3) for m in moment_group]
order = [get_order(m) for m in moment_group]
assert min(order) == max(order)
order = order[0]
if order < 2:
return [0] * len(moment_group)
elif any(is_bulk):
assert all(is_bulk)
return [sp.Symbol("omega_bulk")] * len(moment_group)
elif any(is_shear):
assert all(is_shear)
return [sp.Symbol("omega_shear")] * len(moment_group)
elif order % 2 == 0:
assert order > 2
return [sp.Symbol("omega_even")] * len(moment_group)
else:
return [sp.Symbol("omega_odd")] * len(moment_group)
def second_order_moment_tensor_assignments(function_values, stencil, output_field):
"""Assignments for calculating the pressure tensor"""
assert len(function_values) == len(stencil)
dim = len(stencil[0])
return [
ps.Assignment(
output_field(i, j),
sum(c[i] * c[j] * f for f, c in zip(function_values, stencil)),
)
for i in range(dim)
for j in range(dim)
]
def add_pressure_output_to_collision_rule(collision_rule, pressure_field):
pressure_ouput = second_order_moment_tensor_assignments(
collision_rule.method.pre_collision_pdf_symbols,
collision_rule.method.stencil,
pressure_field,
)
collision_rule.main_assignments = collision_rule.main_assignments + pressure_ouput
def get_fluctuating_lb(
size=None,
kT=None,
omega_shear=None,
omega_bulk=None,
omega_odd=None,
omega_even=None,
rho_0=None,
target=None,
zero_centered: bool = False,
):
# Parameters
stencil = LBStencil(Stencil.D3Q19)
# Setup data handling
dh = ps.create_data_handling(
(size,) * stencil.D, periodicity=True, default_target=target
)
src = dh.add_array("src", values_per_cell=stencil.Q, layout="f")
dst = dh.add_array_like("dst", "src")
rho = dh.add_array("rho", layout="f", latex_name="\\rho", values_per_cell=1)
u = dh.add_array("u", values_per_cell=dh.dim, layout="f")
pressure_field = dh.add_array(
"pressure", values_per_cell=(3, 3), layout="f", gpu=target == Target.GPU
)
force_field = dh.add_array(
"force", values_per_cell=stencil.D, layout="f", gpu=target == Target.GPU
)
# Method setup
lbm_config = LBMConfig(
stencil=stencil,
method=Method.MRT,
compressible=True,
weighted=True,
zero_centered=zero_centered,
relaxation_rates=rr_getter,
force_model=Guo(force=force_field.center_vector),
fluctuating={"temperature": kT},
kernel_type="collide_only",
)
lb_method = create_lb_method(lbm_config=lbm_config)
lbm_config.lb_method = lb_method
lbm_opt = LBMOptimisation(symbolic_field=src, cse_global=True)
collision_rule = create_lb_collision_rule(
lbm_config=lbm_config, lbm_optimisation=lbm_opt
)
# add_pressure_output_to_collision_rule(collision_rule, pressure_field)
collision = create_lb_update_rule(
collision_rule=collision_rule, lbm_config=lbm_config, lbm_optimisation=lbm_opt
)
stream = create_stream_pull_with_output_kernel(
collision.method,
src,
dst,
{"density": rho, "velocity": u, "moment2": pressure_field},
)
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
# Compile kernels
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
sync_pdfs = dh.synchronization_function([src.name])
# Initialization
init = macroscopic_values_setter(
collision.method,
velocity=(0,) * dh.dim,
pdfs=src.center_vector,
density=rho_0
)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
dh.fill(rho.name, rho_0)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0)
dh.fill(force_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(force_field.name, 0)
dh.run_kernel(init_kernel)
# time loop
def time_loop(start, steps):
dh.all_to_gpu()
for i in range(start, start + steps):
dh.run_kernel(
collision_kernel,
omega_shear=omega_shear,
omega_bulk=omega_bulk,
omega_odd=omega_odd,
omega_even=omega_even,
seed=42,
time_step=i,
)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
return start + steps
return dh, time_loop
@pytest.mark.parametrize(
"zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
)
@pytest.mark.parametrize(
"domain_size", [8, 60]
)
def test_resting_fluid(zero_centered: bool, domain_size: int, target=Target.CPU):
rho_0 = 0.86
kT = 4e-4
L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(
size=L[0],
target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.04,
omega_odd=0.3,
zero_centered=zero_centered,
)
# Test
t = 0
# warm up
t = time_loop(t, 10)
# Measurement
for i in range(10):
t = time_loop(t, 5)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservationo
# density per cell fluctuates, but toal mass is conserved
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(momentum, [0, 0, 0], atol=1e-10)
# temperature (fluctuates around pre-set kT)
kinetic_energy = 1 / 2 * np.dot(res_rho, res_u * res_u) / np.prod(L)
kT_tol = 0.075 *(16/domain_size)**(3/2)
np.testing.assert_allclose(kinetic_energy, [kT / 2] * 3, rtol=kT_tol)
# velocity distribution
v_hist, v_bins = np.histogram(
res_u, bins=11, range=(-0.075, 0.075), density=True
)
# Calculate expected values from single
v_expected = []
for j in range(len(v_hist)):
# Maxwell distribution
res = (
1.0
/ (v_bins[j + 1] - v_bins[j])
* single_component_maxwell(v_bins[j], v_bins[j + 1], kT, rho_0)
)
v_expected.append(res)
v_expected = np.array(v_expected)
hist_tol_all = 0.75 *(16/domain_size)**(3/2)
np.testing.assert_allclose(v_hist, v_expected, rtol=hist_tol_all)
hist_tol_center = hist_tol_all/10
remove = 3
np.testing.assert_allclose(
v_hist[remove:-remove], v_expected[remove:-remove], rtol=hist_tol_center
)
# pressure tensor against expressions from
# Duenweg, Schiller, Ladd, https://arxiv.org/abs/0707.1581
res_pressure = dh.gather_array("pressure").reshape((-1, 3, 3))
c_s = np.sqrt(1 / 3) # speed of sound
# average of pressure tensor
# Diagonal elements are rho c_s^22 +<u,u>. When the fluid is
# thermalized, the expectation value of <u,u> = kT due to the
# equi-partition theorem.
p_av_expected = np.diag([rho_0 * c_s**2 + kT] * 3)
pressure_atol = c_s**2 / 200 *(16/domain_size)**(3/2)
np.testing.assert_allclose(
np.mean(res_pressure, axis=0), p_av_expected, atol=pressure_atol)
@pytest.mark.parametrize(
"zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
)
@pytest.mark.parametrize(
"domain_size", [8, 60]
)
def test_point_force(zero_centered: bool, domain_size: int, target=Target.CPU):
"""Test momentum balance for thermalized fluid with applied poitn forces"""
rho_0 = 0.86
kT = 4e-4
L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(
size=L[0],
target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.8,
omega_odd=0.8,
zero_centered=zero_centered
)
# Test
t = 0
# warm up
t = time_loop(t, 100)
introduced_momentum = np.zeros(3)
for i in range(100):
point_force = 1e-2/domain_size**(3/2) * (np.random.random(3) - 0.5)
introduced_momentum += point_force
# Note that ghost layers are included in the indexing
force_pos = np.random.randint(1, L[0] - 2, size=3)
dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = point_force
t = time_loop(t, 1)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservation
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(
momentum, introduced_momentum + 0.5 * point_force, atol=1e-10
)
dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = np.zeros(3)
@pytest.mark.skipif(
not get_supported_instruction_sets(), reason="No vector instruction sets supported"
)
@pytest.mark.parametrize("data_type", ("float32", "float64"))
@pytest.mark.parametrize("assume_aligned", (True, False))
@pytest.mark.parametrize("assume_inner_stride_one", (True, False))
@pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
def test_vectorization(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
method = create_mrt_orthogonal(
stencil=stencil, compressible=True, weighted=True, relaxation_rates=rr_getter
)
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
lbm_config = LBMConfig(
lb_method=method,
fluctuating={
"temperature": sp.Symbol("kT"),
"rng_node": rng_node,
"block_offsets": tuple([0] * stencil.D),
},
compressible=True,
zero_centered=False,
stencil=method.stencil,
kernel_type="collide_only",
)
lbm_opt = LBMOptimisation(
cse_global=True, symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp
)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
instruction_sets = _skip_instruction_sets_windows(get_supported_instruction_sets())
instruction_set = instruction_sets[-1]
config = ps.CreateKernelConfig(
target=Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info={
"instruction_set": instruction_set,
"assume_aligned": assume_aligned,
"assume_inner_stride_one": assume_inner_stride_one,
"assume_sufficient_line_padding": assume_sufficient_line_padding,
},
)
if not assume_inner_stride_one and "storeS" not in get_vector_instruction_set(
data_type, instruction_set
):
with pytest.warns(UserWarning) as pytest_warnings:
ast = ps.create_kernel(collision, config=config)
assert "Could not vectorize loop" in pytest_warnings[0].message.args[0]
else:
ast = ps.create_kernel(collision, config=config)
ast.compile()
code = get_code_str(ast)
print(code)
@pytest.mark.parametrize("data_type", ("float32", "float64"))
@pytest.mark.parametrize("assume_aligned", (True, False))
@pytest.mark.parametrize("assume_inner_stride_one", (True, False))
@pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
def test_fluctuating_lb_issue_188_wlb(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
temperature = sp.symbols("temperature")
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
fluctuating = {
"temperature": temperature,
"block_offsets": "walberla",
"rng_node": rng_node,
}
lbm_config = LBMConfig(
stencil=stencil,
method=Method.MRT,
compressible=True,
weighted=True,
zero_centered=False,
relaxation_rate=1.4,
fluctuating=fluctuating,
)
lbm_opt = LBMOptimisation(
symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp, cse_global=True
)
up = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
cpu_vectorize_info = {
"instruction_set": "avx",
"assume_inner_stride_one": True,
"assume_aligned": True,
}
config = ps.CreateKernelConfig(
target=ps.Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info=cpu_vectorize_info,
)
ast = create_kernel(up, config=config)
code = ps.get_code_str(ast)
# print(code)
if data_type == "float32":
assert "0.5f" in code
assert "_mm256_mul_ps" in code
assert "_mm256_sqrt_ps" in code
else:
assert "0.5f" not in code
assert "_mm256_mul_pd" in code
assert "_mm256_sqrt_pd" in code
import pytest
import numpy as np
from numpy.testing import assert_allclose
import sympy as sp
import pystencils as ps
from pystencils import Target
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Stencil, Method, ForceModel
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter, macroscopic_values_getter
from lbmpy.moments import (is_bulk_moment, moments_up_to_component_order,
exponents_to_polynomial_representations, exponent_tuple_sort_key)
from lbmpy.stencils import LBStencil
# all force models available are defined in the ForceModel enum, but Cumulant is not a "real" force model
force_models = [f for f in ForceModel if f is not ForceModel.CENTRALMOMENT]
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("omega", [0.5, 1.5])
def test_total_momentum(method_enum, zero_centered, force_model, omega):
if method_enum == Method.CUMULANT and \
force_model not in (ForceModel.SIMPLE, ForceModel.LUO, ForceModel.GUO, ForceModel.HE):
return
L = (16, 16)
stencil = LBStencil(Stencil.D2Q9)
F = (2e-4, -3e-4)
dh = ps.create_data_handling(L, periodicity=True, default_target=Target.CPU)
src = dh.add_array('src', values_per_cell=stencil.Q)
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho', values_per_cell=1)
u = dh.add_array('u', values_per_cell=stencil.D)
lbm_config = LBMConfig(method=method_enum, stencil=stencil, relaxation_rate=omega,
compressible=True, zero_centered=zero_centered,
force_model=force_model, force=F, streaming_pattern='pull')
lbm_opt = LBMOptimisation(symbolic_field=src)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
collision_kernel = ps.create_kernel(collision, config=config).compile()
fluid_density = 1.1
def init():
dh.fill(ρ.name, fluid_density)
dh.fill(u.name, 0)
setter = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
pdfs=src, density=ρ.center,
set_pre_collision_pdfs=True)
kernel = ps.create_kernel(setter).compile()
dh.run_kernel(kernel)
sync_pdfs = dh.synchronization_function([src.name])
getter = macroscopic_values_getter(collision.method, ρ.center, u.center_vector, src, use_pre_collision_pdfs=True)
getter_kernel = ps.create_kernel(getter).compile()
def time_loop(steps):
dh.all_to_gpu()
for _ in range(steps):
dh.run_kernel(collision_kernel)
dh.swap(src.name, dst.name)
sync_pdfs()
dh.all_to_cpu()
t = 20
init()
time_loop(t)
dh.run_kernel(getter_kernel)
# Check that density did not change
assert_allclose(dh.gather_array(ρ.name), fluid_density)
# Check for correct velocity
total = np.sum(dh.gather_array(u.name), axis=(0, 1))
assert_allclose(total / np.prod(L) / F * fluid_density / t, 1)
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_modes(force_model, compressible):
"""check force terms in mode space"""
_check_modes(LBStencil(Stencil.D2Q9), force_model, compressible)
@pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_modes_longrun(stencil, force_model, compressible):
"""check force terms in mode space"""
_check_modes(LBStencil(stencil), force_model, compressible)
@pytest.mark.parametrize("force_model", force_models)
def test_momentum_density_shift(force_model):
target = Target.CPU
stencil = LBStencil(Stencil.D2Q9)
domain_size = (4, 4)
dh = ps.create_data_handling(domain_size=domain_size, default_target=target)
rho = dh.add_array('rho', values_per_cell=1)
dh.fill('rho', 0.0, ghost_layers=True)
momentum_density = dh.add_array('momentum_density', values_per_cell=dh.dim)
dh.fill('momentum_density', 0.0, ghost_layers=True)
src = dh.add_array('src', values_per_cell=len(stencil))
dh.fill('src', 0.0, ghost_layers=True)
lbm_config = LBMConfig(method=Method.SRT, compressible=True, force_model=force_model,
force=(1, 2))
method = create_lb_method(lbm_config=lbm_config)
cqc = method.conserved_quantity_computation
momentum_density_getter = cqc.output_equations_from_pdfs(src.center_vector,
{'density': rho.center,
'momentum_density': momentum_density.center_vector})
config = ps.CreateKernelConfig(target=dh.default_target)
momentum_density_ast = ps.create_kernel(momentum_density_getter, config=config)
momentum_density_kernel = momentum_density_ast.compile()
dh.run_kernel(momentum_density_kernel)
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 0]) == np.prod(domain_size) / 2
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 1]) == np.prod(domain_size)
@pytest.mark.parametrize('force_model', force_models)
def test_forcing_space_equivalences(force_model):
if force_model == ForceModel.HE:
# We don't expect equivalence for the He model since its
# moments are derived from the continuous maxwellian
return
stencil = LBStencil(Stencil.D3Q27)
force = sp.symbols(f"F_:{stencil.D}")
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, force=force, force_model=force_model)
fmodel = lbm_config.force_model
lb_method = create_lb_method(lbm_config=lbm_config)
inv_moment_matrix = lb_method.moment_matrix.inv()
force_pdfs = sp.Matrix(fmodel(lb_method))
force_moments = fmodel.moment_space_forcing(lb_method)
diff = (force_pdfs - (inv_moment_matrix * force_moments)).expand()
for i, d in enumerate(diff):
assert d == 0, f"Mismatch between population and moment space forcing " \
f"in force model {force_model}, population f_{i}"
@pytest.mark.parametrize("force_model", [ForceModel.GUO, ForceModel.BUICK, ForceModel.SHANCHEN])
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("method", [Method.SRT, Method.TRT, Method.MRT])
def test_literature(force_model, stencil, method):
# Be aware that the choice of the conserved moments does not affect the forcing although omega is introduced
# in the forcing vector then. The reason is that:
# m_{100}^{\ast} = m_{100} + \omega ( m_{100}^{eq} - m_{100} ) + \left( 1 - \frac{\omega}{2} \right) F_x
# always simplifies to:
# m_{100}^{\ast} = m_{100} + F_x
# Thus the relaxation rate gets cancled again.
stencil = LBStencil(stencil)
omega_s = sp.Symbol("omega_s")
omega_b = sp.Symbol("omega_b")
omega_o = sp.Symbol("omega_o")
omega_e = sp.Symbol("omega_e")
if method == Method.SRT:
rrs = [omega_s]
omega_o = omega_b = omega_e = omega_s
elif method == Method.TRT:
rrs = [omega_e, omega_o]
omega_s = omega_b = omega_e
else:
rrs = [omega_s, omega_b, omega_o, omega_e, omega_o, omega_e]
F = sp.symbols(f"F_:{stencil.D}")
lbm_config = LBMConfig(method=method, weighted=True, stencil=stencil, relaxation_rates=rrs,
compressible=force_model != ForceModel.BUICK,
force_model=force_model, force=F)
lb_method = create_lb_method(lbm_config=lbm_config)
omega_momentum = list(set(lb_method.relaxation_rates[1:stencil.D + 1]))
assert len(omega_momentum) == 1
omega_momentum = omega_momentum[0]
subs_dict = lb_method.subs_dict_relaxation_rate
force_term = sp.simplify(lb_method.force_model(lb_method).subs(subs_dict))
u = sp.Matrix(lb_method.first_order_equilibrium_moment_symbols)
rho = lb_method.conserved_quantity_computation.density_symbol
# see silva2020 for nomenclature
F = sp.Matrix(F)
uf = sp.Matrix(u).dot(F)
F2 = sp.Matrix(F).dot(sp.Matrix(F))
Fq = sp.zeros(stencil.Q, 1)
uq = sp.zeros(stencil.Q, 1)
for i, cq in enumerate(stencil):
Fq[i] = sp.Matrix(cq).dot(sp.Matrix(F))
uq[i] = sp.Matrix(cq).dot(u)
common_plus = 3 * (1 - omega_e / 2)
common_minus = 3 * (1 - omega_momentum / 2)
result = []
if method == Method.MRT and force_model == ForceModel.GUO:
# check against eq. 4.68 from schiller2008thermal
uf = u.dot(F) * sp.eye(len(F))
G = (u * F.transpose() + F * u.transpose() - uf * sp.Rational(2, lb_method.dim)) * sp.Rational(1, 2) * \
(2 - omega_s) + uf * sp.Rational(1, lb_method.dim) * (2 - omega_b)
for direction, w_i in zip(lb_method.stencil, lb_method.weights):
direction = sp.Matrix(direction)
tr = sp.trace(G * (direction * direction.transpose() - sp.Rational(1, 3) * sp.eye(len(F))))
result.append(3 * w_i * (F.dot(direction) + sp.Rational(3, 2) * tr))
elif force_model == ForceModel.GUO:
# check against table 2 in silva2020 (correct for SRT and TRT), matches eq. 20 from guo2002discrete (for SRT)
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf)
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.BUICK:
# check against table 2 in silva2020 (correct for all collision models due to the simplicity of Buick),
# matches eq. 18 from silva2010 (for SRT)
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = 0
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.EDM:
# check against table 2 in silva2020
if method == Method.MRT:
# for mrt no literature terms are known at the time of writing this test case.
# However it is most likly correct since SRT and TRT are derived from the moment space representation
pytest.skip()
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf) + ((w_i / (8 * rho)) * (3 * Fq[i] ** 2 - F2))
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.SHANCHEN:
# check against table 2 in silva2020
if method == Method.MRT:
# for mrt no literature terms are known at the time of writing this test case.
# However it is most likly correct since SRT and TRT are derived from the moment space representation
pytest.skip()
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf) + common_plus ** 2 * (
(w_i / (2 * rho)) * (3 * Fq[i] ** 2 - F2))
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
assert list(sp.simplify(force_term - sp.Matrix(result))) == [0] * len(stencil)
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_modes_central_moment(force_model, compressible):
"""check force terms in mode space"""
stencil = LBStencil(Stencil.D2Q9)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_symmetric_forcing_equivalence(force_model, compressible):
stencil = LBStencil(Stencil.D2Q9)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
moments = moments_up_to_component_order(2, dim=2)
moments = sorted(moments, key=exponent_tuple_sort_key)
moment_polys = exponents_to_polynomial_representations(moments)
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
nested_moments=moment_polys, compressible=True, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
if not method.force_model.has_symmetric_central_moment_forcing:
return
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
force_before, force_after = method.force_model.symmetric_central_moment_forcing(method, moments)
d = method.relaxation_matrix
eye = sp.eye(stencil.Q)
force_combined = (eye - d) @ force_before + force_after
assert (force_moments - force_combined).expand() == sp.Matrix([0] * stencil.Q)
@pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_modes_central_moment_longrun(stencil, force_model, compressible):
"""check force terms in mode space"""
stencil = LBStencil(stencil)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
def _check_modes(stencil, force_model, compressible):
omega_s = sp.Symbol("omega_s")
omega_b = sp.Symbol("omega_b")
omega_o = sp.Symbol("omega_o")
omega_e = sp.Symbol("omega_e")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.MRT, stencil=stencil,
relaxation_rates=[omega_s, omega_b, omega_o, omega_e, omega_o, omega_e],
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
if force_model == ForceModel.GUO:
num_stresses = (stencil.D * stencil.D - stencil.D) // 2 + stencil.D
lambda_s, lambda_b = -omega_s, -omega_b
# The stress moments should match eq. 47 from https://doi.org/10.1023/A:1010414013942
u = method.first_order_equilibrium_moment_symbols
def traceless(m):
tr = sp.simplify(sum([m[i, i] for i in range(stencil.D)]))
return m - tr / m.shape[0] * sp.eye(m.shape[0])
C = sp.Rational(1, 2) * (2 + lambda_s) * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) +
traceless(sp.Matrix(F) * sp.Matrix(u).transpose())) + \
sp.Rational(1, method.dim) * (2 + lambda_b) * sp.Matrix(u).dot(F) * sp.eye(method.dim)
subs = {sp.Symbol(chr(ord("x") + i)) * sp.Symbol(chr(ord("x") + j)): C[i, j]
for i in range(stencil.D) for j in range(stencil.D)}
for force_moment, moment in zip(force_moments[stencil.D + 1:stencil.D + 1 + num_stresses],
method.moments[stencil.D + 1:stencil.D + 1 + num_stresses]):
ref = moment.subs(subs)
diff = sp.simplify(ref - force_moment)
if is_bulk_moment(moment, stencil.D):
assert diff == 0 or isinstance(diff, sp.Rational) # difference should be zero or a constant
else:
assert diff == 0 # difference should be zero
ff = method.moment_matrix.inv() * sp.Matrix(method.force_model.moment_space_forcing(method).subs(subs_dict))
# Check eq. 4.53a from schiller2008thermal
assert sp.simplify(sum(ff)) == 0
# Check eq. 4.53b from schiller2008thermal
assert [sp.simplify(sum(ff[i] * stencil[i][j] for i in range(len(stencil)))) for j in range(stencil.D)] == F
# Check eq. 4.61a from schiller2008thermal
ref = (2 + lambda_s) / 2 * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) +
traceless(sp.Matrix(F) * sp.Matrix(u).transpose()))
s = sp.zeros(stencil.D)
for i in range(0, len(stencil)):
s += ff[i] * traceless(sp.Matrix(stencil[i]) * sp.Matrix(stencil[i]).transpose())
assert sp.simplify(s - ref) == sp.zeros(stencil.D)
# Check eq. 4.61b from schiller2008thermal
assert sp.simplify(sum(ff[i] * stencil[i][a] ** 2 for i in range(len(stencil)) for a in range(stencil.D))
- (2 + lambda_b) * sp.Matrix(u).dot(F)) == 0
# All other moments should be zero
assert list(force_moments[stencil.D + 1 + num_stresses:]) == [0] * \
(len(stencil) - (stencil.D + 1 + num_stresses))
elif force_model == ForceModel.SIMPLE:
# All other moments should be zero
assert list(force_moments[stencil.D + 1:]) == [0] * (len(stencil) - (stencil.D + 1))
import numpy as np
from lbmpy.boundaries import UBB, NoSlip
from lbmpy.enums import ForceModel
from lbmpy.scenarios import create_channel
from pystencils import make_slice
try:
import waLBerla as wLB
except ImportError:
wLB = None
# try:
# import waLBerla as wLB
# except ImportError:
wLB = None
def calculate_force(step, obstacle):
......@@ -26,10 +27,11 @@ def test_force_on_boundary():
for parallel in (False, True) if wLB else (False,):
for boundary_obj in boundaries:
print("Testing parallel %d, boundary %s" % (parallel, boundary_obj.name))
step = create_channel(domain_size, force=1e-5, relaxation_rate=1.5, parallel=parallel, force_model='buick')
print(f"Testing parallel {parallel}, boundary {boundary_obj.name}")
step = create_channel(domain_size, force=1e-5, relaxation_rate=1.5, parallel=parallel,
force_model=ForceModel.BUICK)
force = calculate_force(step, boundary_obj)
print(" -> force = ", force)
print(f" -> force = {force}")
results.append(force)
for res in results[1:]:
......
from dataclasses import replace
import pystencils as ps
import lbmpy as lp
from pystencils.slicing import slice_from_direction
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries.boundaryconditions import FreeSlip, NoSlip, ExtrapolationOutflow, UBB
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule
from lbmpy.macroscopic_value_kernels import pdf_initialization_assignments
import numpy as np
def velocity_info_callback(boundary_data, **_):
boundary_data['vel_1'] = 0
boundary_data['vel_2'] = 0
u_max = 0.1
x, y = boundary_data.link_positions(0), boundary_data.link_positions(1)
dist = (15 - y) / 15
boundary_data['vel_0'] = u_max * (1 - dist)
def test_free_slip():
stencil = lp.LBStencil(lp.Stencil.D3Q27)
domain_size = (30, 15, 30)
dim = len(domain_size)
dh = ps.create_data_handling(domain_size=domain_size)
src = dh.add_array('src', values_per_cell=stencil.Q)
dh.fill('src', 0.0, ghost_layers=True)
dst = dh.add_array('dst', values_per_cell=stencil.Q)
dh.fill('dst', 0.0, ghost_layers=True)
velField = dh.add_array('velField', values_per_cell=stencil.D)
dh.fill('velField', 0.0, ghost_layers=True)
lbm_config = lp.LBMConfig(stencil=stencil, method=lp.Method.SRT, relaxation_rate=1.8,
output={'velocity': velField}, kernel_type='stream_pull_collide')
method = create_lb_method(lbm_config=lbm_config)
init = pdf_initialization_assignments(method, 1.0, (0, 0, 0), src.center_vector)
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=False)
ast_init = ps.create_kernel(init, config=config)
kernel_init = ast_init.compile()
dh.run_kernel(kernel_init)
lbm_opt = lp.LBMOptimisation(symbolic_field=src, symbolic_temporary_field=dst)
lbm_config = replace(lbm_config, lb_method=method)
update = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
ast_kernel = ps.create_kernel(update, config=config)
kernel = ast_kernel.compile()
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'src', name="bh")
inflow = UBB(velocity_info_callback, dim=dim)
outflow = ExtrapolationOutflow(stencil[4], method)
wall = NoSlip("wall")
freeslip = FreeSlip(stencil, (0, -1, 0))
bh.set_boundary(inflow, slice_from_direction('W', dim))
bh.set_boundary(outflow, slice_from_direction('E', dim))
bh.set_boundary(wall, slice_from_direction('S', dim))
bh.set_boundary(wall, slice_from_direction('T', dim))
bh.set_boundary(wall, slice_from_direction('B', dim))
bh.set_boundary(freeslip, slice_from_direction('N', dim))
for i in range(2000):
bh()
dh.run_kernel(kernel)
dh.swap("src", "dst")
vel_profile = dh.gather_array('velField')[-2, :, domain_size[2] // 2, 0]
np.testing.assert_almost_equal(np.gradient(vel_profile)[-1], 0, decimal=3)
import os
import numpy as np
import pytest
from lbmpy.boundaries import NoSlip
from lbmpy.enums import Method
from lbmpy.geometry import add_black_and_white_image, add_pipe_walls
from lbmpy.lbstep import LatticeBoltzmannStep
import lbmpy.plot as plt
from pystencils.slicing import make_slice
......@@ -22,23 +25,20 @@ def test_pipe():
plot = False
for domain_size in [(30, 10, 10), (30, 10)]:
for diameter in [5, 10, diameter_callback]:
sc = LatticeBoltzmannStep(domain_size=domain_size, method='srt', relaxation_rate=1.9,
optimization={})
sc = LatticeBoltzmannStep(domain_size=domain_size, method=Method.SRT, relaxation_rate=1.9)
add_pipe_walls(sc.boundary_handling, diameter)
if plot:
import lbmpy.plot as plt
from pystencils.slicing import make_slice
if len(domain_size) == 2:
plt.boundary_handling(sc.boundary_handling)
plt.title("2D, diameter=%s" % (str(diameter,)))
plt.title(f"2D, diameter={str(diameter,)}")
plt.show()
elif len(domain_size) == 3:
plt.subplot(1, 2, 1)
plt.boundary_handling(sc.boundary_handling, make_slice[0.5, :, :])
plt.title("3D, diameter=%s" % (str(diameter,)))
plt.title(f"3D, diameter={str(diameter,)}")
plt.subplot(1, 2, 2)
plt.boundary_handling(sc.boundary_handling, make_slice[:, 0.5, :])
plt.title("3D, diameter=%s" % (str(diameter, )))
plt.title(f"3D, diameter={str(diameter, )}")
plt.show()
......@@ -49,20 +49,19 @@ def get_test_image_path():
def test_image():
sc = LatticeBoltzmannStep(domain_size=(50, 40), method='srt', relaxation_rate=1.9,
optimization={})
pytest.importorskip('scipy.ndimage')
sc = LatticeBoltzmannStep(domain_size=(50, 40), method=Method.SRT, relaxation_rate=1.9)
add_black_and_white_image(sc.boundary_handling, get_test_image_path(), keep_aspect_ratio=True)
def test_slice_mask_combination():
sc = LatticeBoltzmannStep(domain_size=(30, 30), method='srt', relaxation_rate=1.9,
optimization={})
sc = LatticeBoltzmannStep(domain_size=(30, 30), method=Method.SRT, relaxation_rate=1.9)
def callback(*coordinates):
x = coordinates[0]
print("x", coordinates[0][:, 0])
print("y", coordinates[1][0, :])
print(x.shape)
return np.ones_like(x, dtype=np.bool)
return np.ones_like(x, dtype=bool)
sc.boundary_handling.set_boundary(NoSlip(), make_slice[6:7, -1], callback)
from lbmpy.creationfunctions import create_lb_ast
from lbmpy.creationfunctions import create_lb_ast, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.stencils import LBStencil
import pytest
from pystencils import Target, CreateKernelConfig
def test_gpu_block_size_limiting():
pytest.importorskip("cupy")
too_large = 2048*2048
opt = {'target': 'gpu', 'gpu_indexing_params': {'block_size': (too_large, too_large, too_large)}}
ast = create_lb_ast(stencil='D3Q19', cumulant=True, relaxation_rate=1.8, optimization=opt,
compressible=True, force_model='guo')
lbm_config = LBMConfig(method=Method.CUMULANT, stencil=LBStencil(Stencil.D3Q19),
relaxation_rate=1.8, compressible=True)
config = CreateKernelConfig(target=Target.GPU,
gpu_indexing_params={'block_size': (too_large, too_large, too_large)})
ast = create_lb_ast(lbm_config=lbm_config, config=config)
limited_block_size = ast.indexing.call_parameters((1024, 1024, 1024))
kernel = ast.compile()
assert all(b < too_large for b in limited_block_size['block'])
bs = [too_large, too_large, too_large]
ast.indexing.limit_block_size_by_register_restriction(bs, kernel.num_regs)
bs = ast.indexing.limit_block_size_by_register_restriction(bs, kernel.num_regs)
assert all(b < too_large for b in bs)
from lbmpy.creationfunctions import create_lb_method
import pytest
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil
from lbmpy.methods.creationfunctions import compare_moment_based_lb_methods
from lbmpy.moments import (
moment_equality_table, moment_equality_table_by_stencil, moments_up_to_component_order)
from lbmpy.stencils import get_stencil
from lbmpy.stencils import LBStencil
def test_moment_comparison_table():
new = create_lb_method(stencil='D3Q19', maxwellian_moments=True)
old = create_lb_method(stencil='D3Q19', maxwellian_moments=False)
pytest.importorskip('ipy_table')
lbm_config_new = LBMConfig(stencil=LBStencil(Stencil.D3Q19), continuous_equilibrium=True)
lbm_config_old = LBMConfig(stencil=LBStencil(Stencil.D3Q19), continuous_equilibrium=False)
new = create_lb_method(lbm_config=lbm_config_new)
old = create_lb_method(lbm_config=lbm_config_old)
assert old.zeroth_order_equilibrium_moment_symbol == new.zeroth_order_equilibrium_moment_symbol
......@@ -19,17 +27,18 @@ def test_moment_comparison_table():
res_all = compare_moment_based_lb_methods(old, new, show_deviations_only=False)
assert len(res_all.array) == 20
d3q27 = create_lb_method(stencil='D3Q27')
d3q27 = create_lb_method(LBMConfig(stencil=LBStencil(Stencil.D3Q27)))
compare_moment_based_lb_methods(d3q27, new, show_deviations_only=False)
compare_moment_based_lb_methods(new, d3q27, show_deviations_only=False)
def test_moment_equality_table():
d3q19 = get_stencil('D3Q19')
pytest.importorskip('ipy_table')
d3q19 = LBStencil(Stencil.D3Q19)
table1 = moment_equality_table(d3q19, max_order=3)
assert len(table1.array) == 5
table2 = moment_equality_table_by_stencil({'D3Q19': d3q19, 'D3Q27': get_stencil("D3Q27")},
table2 = moment_equality_table_by_stencil({'D3Q19': d3q19, 'D3Q27': LBStencil(Stencil.D3Q27)},
moments_up_to_component_order(2, dim=3))
assert len(table2.array) == 11
assert len(table2.array[0]) == 2 + 2
import pytest
import sympy as sp
from itertools import chain
from pystencils.sympyextensions import remove_higher_order_terms
from lbmpy.stencils import Stencil, LBStencil
from lbmpy.equilibrium import ContinuousHydrodynamicMaxwellian, DiscreteHydrodynamicMaxwellian
from lbmpy.moments import moments_up_to_component_order, moments_up_to_order
from lbmpy.maxwellian_equilibrium import get_equilibrium_values_of_maxwell_boltzmann_function
from lbmpy.methods.default_moment_sets import cascaded_moment_sets_literature, mrt_orthogonal_modes_literature
def test_compressible_raw_moment_values():
stencil = LBStencil("D3Q27")
equilibrium = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=True, deviation_only=False)
raw_moments = list(chain.from_iterable(mrt_orthogonal_modes_literature(stencil, False)))
values_a = equilibrium.moments(raw_moments)
values_b = get_equilibrium_values_of_maxwell_boltzmann_function(raw_moments, stencil.D, space="moment")
for m, a, b in zip(raw_moments, values_a, values_b):
assert (a - b).expand() == sp.Integer(0), f"Mismatch at moment {m}."
def test_compressible_central_moment_values():
stencil = LBStencil("D3Q27")
equilibrium = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=True, deviation_only=False)
central_moments = list(chain.from_iterable(cascaded_moment_sets_literature(stencil)))
values_a = equilibrium.central_moments(central_moments)
values_b = get_equilibrium_values_of_maxwell_boltzmann_function(central_moments, stencil.D, space="central moment")
for m, a, b in zip(central_moments, values_a, values_b):
assert (a - b).expand() == sp.Integer(0), f"Mismatch at moment {m}."
def test_compressible_cumulant_values():
stencil = LBStencil("D3Q27")
equilibrium = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=True, deviation_only=False)
cumulants = list(chain.from_iterable(cascaded_moment_sets_literature(stencil)))
values_a = equilibrium.cumulants(cumulants, rescale=False)
values_b = get_equilibrium_values_of_maxwell_boltzmann_function(cumulants, stencil.D, space="cumulant")
for m, a, b in zip(cumulants, values_a, values_b):
assert (a - b).expand() == sp.Integer(0), f"Mismatch at cumulant {m}."
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('compressible', [False, True])
@pytest.mark.parametrize('deviation_only', [False, True])
def test_continuous_discrete_moment_equivalence(stencil, compressible, deviation_only):
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cd = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
cm = sp.Matrix(cd.moments(moments))
dd = DiscreteHydrodynamicMaxwellian(stencil, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
dm = sp.Matrix(dd.moments(moments))
rho = cd.density
delta_rho = cd.density_deviation
rho_0 = cd.background_density
subs = { delta_rho : rho - rho_0 }
assert (cm - dm).subs(subs).expand() == sp.Matrix((0,) * len(moments))
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('compressible', [False, True])
@pytest.mark.parametrize('deviation_only', [False, True])
def test_continuous_discrete_central_moment_equivalence(stencil, compressible, deviation_only):
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cd = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
cm = sp.Matrix(cd.central_moments(moments))
dd = DiscreteHydrodynamicMaxwellian(stencil, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
dm = sp.Matrix(dd.central_moments(moments))
dm = sp.Matrix([remove_higher_order_terms(t, dd.velocity, order=2) for t in dm])
rho = cd.density
delta_rho = cd.density_deviation
rho_0 = cd.background_density
subs = { delta_rho : rho - rho_0 }
assert (cm - dm).subs(subs).expand() == sp.Matrix((0,) * len(moments))
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
def test_continuous_discrete_cumulant_equivalence(stencil):
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
compressible = True
deviation_only = False
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cd = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
cm = sp.Matrix(cd.cumulants(moments))
dd = DiscreteHydrodynamicMaxwellian(stencil, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
dm = sp.Matrix(dd.cumulants(moments))
dm = sp.Matrix([remove_higher_order_terms(t, dd.velocity, order=2) for t in dm])
rho = cd.density
delta_rho = cd.density_deviation
rho_0 = cd.background_density
subs = { delta_rho : rho - rho_0 }
assert (cm - dm).subs(subs).expand() == sp.Matrix((0,) * len(moments))