Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Sparse
  • WallLaw
  • improved_comm
  • master
  • mr_refactor_wfb
  • suffa/cumulantfourth_order_correction_with_psm
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
  • release/0.4.1
  • release/0.4.2
  • release/0.4.3
  • release/0.4.4
  • release/1.0
  • release/1.0.1
  • release/1.1
  • release/1.1.1
  • release/1.2
  • release/1.3
  • release/1.3.1
  • release/1.3.2
  • release/1.3.3
  • release/1.3.4
  • release/1.3.5
  • release/1.3.6
  • release/1.3.7
44 results

Target

Select target project
  • ravi.k.ayyala/lbmpy
  • brendan-waters/lbmpy
  • anirudh.jonnalagadda/lbmpy
  • jbadwaik/lbmpy
  • alexander.reinauer/lbmpy
  • itischler/lbmpy
  • he66coqe/lbmpy
  • ev81oxyl/lbmpy
  • Bindgen/lbmpy
  • da15siwa/lbmpy
  • holzer/lbmpy
  • RudolfWeeber/lbmpy
  • pycodegen/lbmpy
13 results
Select Git revision
  • GetterSetterAPI
  • HRR
  • HydroPressure
  • InplaceConfig
  • Outflow
  • PhaseField
  • Sparse
  • UBBVelocity
  • UpdateAPISparse
  • WallLaw
  • WetNodeBoundaries
  • csebug
  • feature/sparse
  • feature/try
  • improved_comm
  • install_requires
  • master
  • phaseField
  • relaxationrates
  • test_martin
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
  • release/0.4.1
  • release/0.4.2
  • release/0.4.3
  • release/0.4.4
  • release/1.0
  • release/1.0.1
  • release/1.1
  • release/1.1.1
  • release/1.2
  • release/1.3
  • release/1.3.1
  • release/1.3.2
  • release/1.3.3
  • release/1.3.4
  • release/1.3.5
  • release/1.3.6
57 results
Show changes
Showing
with 1759 additions and 155 deletions
import math
import pytest
from dataclasses import replace
import pystencils as ps
from pystencils.slicing import slice_from_direction
from lbmpy import pdf_initialization_assignments
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries.boundaryconditions import DiffusionDirichlet, NeumannByCopy
from lbmpy.geometry import add_box_boundary
from lbmpy.stencils import get_stencil
from lbmpy.stencils import LBStencil
from lbmpy.maxwellian_equilibrium import get_weights
import sympy as sp
......@@ -19,17 +21,19 @@ import numpy as np
def test_diffusion_boundary():
domain_size = (10, 10)
stencil = get_stencil('D2Q9')
stencil = LBStencil(Stencil.D2Q9)
weights = get_weights(stencil, c_s_sq=sp.Rational(1, 3))
concentration = 1.0
# Data Handling
dh = ps.create_data_handling(domain_size=domain_size)
dh.add_array('pdfs', values_per_cell=len(stencil))
dh.add_array('pdfs', values_per_cell=stencil.Q)
dh.fill("pdfs", 0.0, ghost_layers=True)
method = create_lb_method(stencil=stencil, method="srt", relaxation_rate=1.8, compressible=True)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8,
compressible=True, zero_centered=False)
method = create_lb_method(lbm_config=lbm_config)
# Boundary Handling
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'pdfs', name="bh")
......@@ -72,55 +76,57 @@ def test_diffusion():
C(x,y) = 1 * erfc(y / sqrt(4Dx/u))
The hydrodynamic field is not simulated, instead a constant velocity is assumed.
"""
"""
pytest.importorskip("cupy")
# Parameters
domain_size = (1600, 160)
omega = 1.38
diffusion = (1 / omega - 0.5) / 3
velocity = 0.05
time_steps = 50000
stencil = get_stencil('D2Q9')
stencil = LBStencil(Stencil.D2Q9)
target = ps.Target.GPU
# Data Handling
dh = ps.create_data_handling(domain_size=domain_size)
dh = ps.create_data_handling(domain_size=domain_size, default_target=target)
vel_field = dh.add_array('vel_field', values_per_cell=dh.dim)
vel_field = dh.add_array('vel_field', values_per_cell=stencil.D)
dh.fill('vel_field', velocity, 0, ghost_layers=True)
dh.fill('vel_field', 0.0, 1, ghost_layers=True)
con_field = dh.add_array('con_field', values_per_cell=1)
dh.fill('con_field', 0.0, ghost_layers=True)
pdfs = dh.add_array('pdfs', values_per_cell=len(stencil))
pdfs_tmp = dh.add_array('pdfs_tmp', values_per_cell=len(stencil))
pdfs = dh.add_array('pdfs', values_per_cell=stencil.Q)
dh.fill('pdfs', 0.0, ghost_layers=True)
pdfs_tmp = dh.add_array('pdfs_tmp', values_per_cell=stencil.Q)
dh.fill('pdfs_tmp', 0.0, ghost_layers=True)
# Lattice Boltzmann method
params = {'stencil': stencil,
'method': 'mrt',
'relaxation_rates': [1, 1.5, 1, 1.5, 1],
'velocity_input': vel_field,
'output': {'density': con_field},
'compressible': True,
'weighted': True,
'optimization': {'symbolic_field': pdfs,
'symbolic_temporary_field': pdfs_tmp},
'kernel_type': 'stream_pull_collide'
}
method = create_lb_method(**params)
lbm_config = LBMConfig(stencil=stencil, method=Method.MRT, relaxation_rates=[1, 1.5, 1, 1.5, 1],
zero_centered=False,
velocity_input=vel_field, output={'density': con_field}, compressible=True,
weighted=True, kernel_type='stream_pull_collide')
lbm_opt = LBMOptimisation(symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp)
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=True)
method = create_lb_method(lbm_config=lbm_config)
method.set_conserved_moments_relaxation_rate(omega)
update_rule = create_lb_update_rule(lb_method=method, **params)
kernel = ps.create_kernel(update_rule).compile()
lbm_config = replace(lbm_config, lb_method=method)
update_rule = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
kernel = ps.create_kernel(update_rule, config=config).compile()
# PDF initalization
init = pdf_initialization_assignments(method, con_field.center,
vel_field.center_vector, pdfs.center_vector)
dh.run_kernel(ps.create_kernel(init).compile())
dh.all_to_gpu()
# Boundary Handling
bh = LatticeBoltzmannBoundaryHandling(update_rule.method, dh, 'pdfs', name="bh")
bh = LatticeBoltzmannBoundaryHandling(update_rule.method, dh, 'pdfs', name="bh", target=dh.default_target)
add_box_boundary(bh, boundary=NeumannByCopy())
bh.set_boundary(DiffusionDirichlet(0), slice_from_direction('W', dh.dim))
bh.set_boundary(DiffusionDirichlet(1), slice_from_direction('S', dh.dim))
......@@ -131,6 +137,7 @@ def test_diffusion():
dh.run_kernel(kernel)
dh.swap("pdfs", "pdfs_tmp")
dh.all_to_cpu()
# Verification
x = np.arange(1, domain_size[0], 1)
y = np.arange(0, domain_size[1], 1)
......
import platform
import numpy as np
import sympy as sp
import pytest
from lbmpy.enums import ForceModel, Method
from lbmpy.scenarios import create_lid_driven_cavity
@pytest.mark.parametrize('method', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
def test_entropic_methods(method):
if platform.system().lower() == 'windows' and method == Method.TRT_KBC_N4:
pytest.skip("For some reason this test does not run on windows", allow_module_level=True)
sc_kbc = create_lid_driven_cavity((20, 20), method=method,
relaxation_rates=[1.9999, sp.Symbol("omega_free")],
entropic_newton_iterations=3, entropic=True, compressible=True,
zero_centered=False, force=(-1e-10, 0), force_model=ForceModel.LUO)
sc_kbc.run(1000)
assert np.isfinite(np.max(sc_kbc.velocity[:, :]))
from pystencils.stencil import inverse_direction
from lbmpy.stencils import get_stencil
from lbmpy.stencils import LBStencil
from lbmpy.advanced_streaming.utility import AccessPdfValues, get_timesteps
import pytest
import numpy as np
from pystencils import Target
from pystencils.datahandling import create_data_handling
from lbmpy.boundaries import LatticeBoltzmannBoundaryHandling, SimpleExtrapolationOutflow, ExtrapolationOutflow
from lbmpy.creationfunctions import create_lb_method
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.advanced_streaming.utility import streaming_patterns
from pystencils.slicing import get_ghost_region_slice
from itertools import product
@pytest.mark.parametrize('stencil', ['D2Q9', 'D3Q27'])
@pytest.mark.parametrize('stencil_enum', [Stencil.D2Q9, Stencil.D3Q27])
@pytest.mark.parametrize('streaming_pattern', streaming_patterns)
def test_pdf_simple_extrapolation(stencil, streaming_pattern):
stencil = get_stencil(stencil)
dim = len(stencil[0])
values_per_cell = len(stencil)
def test_pdf_simple_extrapolation(stencil_enum, streaming_pattern):
stencil = LBStencil(stencil_enum)
# Field contains exactly one fluid cell
domain_size = (3,) * dim
domain_size = (3,) * stencil.D
for timestep in get_timesteps(streaming_pattern):
dh = create_data_handling(domain_size, default_target='cpu')
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=values_per_cell)
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(lbm_config=LBMConfig(stencil=stencil))
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name, streaming_pattern, target='cpu')
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name, streaming_pattern, target=Target.CPU)
# Set up outflows in all directions
for normal_dir in stencil[1:]:
......@@ -41,8 +41,8 @@ def test_pdf_simple_extrapolation(stencil, streaming_pattern):
# Set up the domain with artificial PDF values
# center = (1,) * dim
out_access = AccessPdfValues(stencil, streaming_pattern, timestep, 'out')
for cell in product(*(range(1, 4) for _ in range(dim))):
for q in range(values_per_cell):
for cell in product(*(range(1, 4) for _ in range(stencil.D))):
for q in range(stencil.Q):
out_access.write_pdf(pdf_arr, cell, q, q)
# Do boundary handling
......@@ -52,15 +52,14 @@ def test_pdf_simple_extrapolation(stencil, streaming_pattern):
in_access = AccessPdfValues(stencil, streaming_pattern, timestep.next(), 'in')
# Inbound in center cell
for cell in product(*(range(1, 4) for _ in range(dim))):
for q in range(values_per_cell):
for cell in product(*(range(1, 4) for _ in range(stencil.D))):
for q in range(stencil.Q):
f = in_access.read_pdf(pdf_arr, cell, q)
assert f == q
def test_extrapolation_outflow_initialization_by_copy():
stencil = get_stencil('D2Q9')
values_per_cell = len(stencil)
stencil = LBStencil(Stencil.D2Q9)
domain_size = (1, 5)
streaming_pattern = 'esotwist'
......@@ -69,16 +68,16 @@ def test_extrapolation_outflow_initialization_by_copy():
pdf_acc = AccessPdfValues(stencil, streaming_pattern=streaming_pattern,
timestep=zeroth_timestep, streaming_dir='out')
dh = create_data_handling(domain_size, default_target='cpu')
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=values_per_cell)
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
pdf_arr = dh.cpu_arrays[pdf_field.name]
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name,
streaming_pattern=streaming_pattern, target='cpu')
streaming_pattern=streaming_pattern, target=Target.CPU)
for y in range(1, 6):
for j in range(values_per_cell):
for j in range(stencil.Q):
pdf_acc.write_pdf(pdf_arr, (1, y), j, j)
normal_dir = (1, 0)
......@@ -99,28 +98,25 @@ def test_extrapolation_outflow_initialization_by_copy():
def test_extrapolation_outflow_initialization_by_callback():
stencil = get_stencil('D2Q9')
values_per_cell = len(stencil)
stencil = LBStencil(Stencil.D2Q9)
domain_size = (1, 5)
streaming_pattern = 'esotwist'
zeroth_timestep = 'even'
dh = create_data_handling(domain_size, default_target='cpu')
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=values_per_cell)
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name,
streaming_pattern=streaming_pattern, target='cpu')
streaming_pattern=streaming_pattern, target=Target.CPU)
normal_dir = (1, 0)
density_callback = lambda x, y: 1
velocity_callback = lambda x, y: (0, 0)
outflow = ExtrapolationOutflow(normal_dir, lb_method,
outflow = ExtrapolationOutflow(normal_direction=normal_dir, lb_method=lb_method,
streaming_pattern=streaming_pattern,
zeroth_timestep=zeroth_timestep,
initial_density=density_callback,
initial_velocity=velocity_callback)
initial_density=lambda x, y: 1,
initial_velocity=lambda x, y: (0, 0))
boundary_slice = get_ghost_region_slice(normal_dir)
bh.set_boundary(outflow, boundary_slice)
bh.prepare()
......
......@@ -87,7 +87,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
......@@ -101,7 +101,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.9.7"
}
},
"nbformat": 4,
......
import pytest
import pystencils as ps
from lbmpy.creationfunctions import create_lb_function, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
@pytest.mark.parametrize("double_precision", [False, True])
@pytest.mark.parametrize(
"method_enum", [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT]
)
def test_creation(double_precision, method_enum):
"""Simple test that makes sure that only float variables are created"""
lbm_config = LBMConfig(method=method_enum, relaxation_rate=1.5, compressible=True)
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig(
default_dtype="float64" if double_precision else "float32"
)
else:
config = ps.CreateKernelConfig(
data_type="float64" if double_precision else "float32",
default_number_float="float64" if double_precision else "float32",
)
func = create_lb_function(lbm_config=lbm_config, config=config)
code = ps.get_code_str(func)
if double_precision:
assert "float" not in code
assert "double" in code
else:
assert "double" not in code
assert "float" in code
@pytest.mark.parametrize("numeric_type", ["float32", "float64"])
@pytest.mark.parametrize(
"method_enum", [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT]
)
def test_scenario(method_enum, numeric_type):
lbm_config = LBMConfig(
stencil=LBStencil(Stencil.D3Q27),
method=method_enum,
relaxation_rate=1.45,
compressible=True,
)
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig(
default_dtype=numeric_type
)
else:
config = ps.CreateKernelConfig(
data_type=numeric_type,
default_number_float=numeric_type
)
sc = create_lid_driven_cavity((16, 16, 8), lbm_config=lbm_config, config=config)
sc.run(1)
code = ps.get_code_str(sc.ast)
if numeric_type == "float64":
assert "float" not in code
assert "double" in code
else:
assert "double" not in code
assert "float" in code
"""Tests velocity and stress fluctuations for thermalized LB"""
import pytest
import pystencils as ps
from lbmpy._compat import IS_PYSTENCILS_2
from pystencils import get_code_str
if IS_PYSTENCILS_2:
pass
else:
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set
from pystencils.cpu.cpujit import get_compiler_config
from pystencils.enums import Target
from lbmpy.creationfunctions import *
from lbmpy.forcemodels import Guo
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
import numpy as np
from lbmpy.enums import Stencil
from lbmpy.moments import is_bulk_moment, is_shear_moment, get_order
from lbmpy.stencils import LBStencil
if not IS_PYSTENCILS_2:
def _skip_instruction_sets_windows(instruction_sets):
if get_compiler_config()["os"] == "windows":
# skip instruction sets supported by the CPU but not by the compiler
if "avx" in instruction_sets and (
"/arch:avx2" not in get_compiler_config()["flags"].lower()
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx")
if (
"avx512" in instruction_sets
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx512")
return instruction_sets
INSTRUCTION_SETS = get_supported_instruction_sets()
else:
INSTRUCTION_SETS = Target.available_vector_cpu_targets()
def single_component_maxwell(x1, x2, kT, mass):
"""Integrate the probability density from x1 to x2 using the trapezoidal rule"""
x = np.linspace(x1, x2, 1000)
try:
trapezoid = np.trapezoid # since numpy 2.0
except AttributeError:
trapezoid = np.trapz
return trapezoid(np.exp(-mass * x**2 / (2.0 * kT)), x) / np.sqrt(
2.0 * np.pi * kT / mass
)
def rr_getter(moment_group):
"""Maps a group of moments to a relaxation rate (shear, bulk, even, odd)
in the 4 relaxation time thermalized LB model
"""
is_shear = [is_shear_moment(m, 3) for m in moment_group]
is_bulk = [is_bulk_moment(m, 3) for m in moment_group]
order = [get_order(m) for m in moment_group]
assert min(order) == max(order)
order = order[0]
if order < 2:
return [0] * len(moment_group)
elif any(is_bulk):
assert all(is_bulk)
return [sp.Symbol("omega_bulk")] * len(moment_group)
elif any(is_shear):
assert all(is_shear)
return [sp.Symbol("omega_shear")] * len(moment_group)
elif order % 2 == 0:
assert order > 2
return [sp.Symbol("omega_even")] * len(moment_group)
else:
return [sp.Symbol("omega_odd")] * len(moment_group)
def second_order_moment_tensor_assignments(function_values, stencil, output_field):
"""Assignments for calculating the pressure tensor"""
assert len(function_values) == len(stencil)
dim = len(stencil[0])
return [
ps.Assignment(
output_field(i, j),
sum(c[i] * c[j] * f for f, c in zip(function_values, stencil)),
)
for i in range(dim)
for j in range(dim)
]
def add_pressure_output_to_collision_rule(collision_rule, pressure_field):
pressure_ouput = second_order_moment_tensor_assignments(
collision_rule.method.pre_collision_pdf_symbols,
collision_rule.method.stencil,
pressure_field,
)
collision_rule.main_assignments = collision_rule.main_assignments + pressure_ouput
def get_fluctuating_lb(
size=None,
kT=None,
omega_shear=None,
omega_bulk=None,
omega_odd=None,
omega_even=None,
rho_0=None,
target=None,
zero_centered: bool = False,
):
# Parameters
stencil = LBStencil(Stencil.D3Q19)
# Setup data handling
dh = ps.create_data_handling(
(size,) * stencil.D, periodicity=True, default_target=target
)
src = dh.add_array("src", values_per_cell=stencil.Q, layout="f")
dst = dh.add_array_like("dst", "src")
rho = dh.add_array("rho", layout="f", latex_name="\\rho", values_per_cell=1)
u = dh.add_array("u", values_per_cell=dh.dim, layout="f")
pressure_field = dh.add_array(
"pressure", values_per_cell=(3, 3), layout="f", gpu=target == Target.GPU
)
force_field = dh.add_array(
"force", values_per_cell=stencil.D, layout="f", gpu=target == Target.GPU
)
# Method setup
lbm_config = LBMConfig(
stencil=stencil,
method=Method.MRT,
compressible=True,
weighted=True,
zero_centered=zero_centered,
relaxation_rates=rr_getter,
force_model=Guo(force=force_field.center_vector),
fluctuating={"temperature": kT},
kernel_type="collide_only",
)
lb_method = create_lb_method(lbm_config=lbm_config)
lbm_config.lb_method = lb_method
lbm_opt = LBMOptimisation(symbolic_field=src, cse_global=True)
collision_rule = create_lb_collision_rule(
lbm_config=lbm_config, lbm_optimisation=lbm_opt
)
# add_pressure_output_to_collision_rule(collision_rule, pressure_field)
collision = create_lb_update_rule(
collision_rule=collision_rule, lbm_config=lbm_config, lbm_optimisation=lbm_opt
)
stream = create_stream_pull_with_output_kernel(
collision.method,
src,
dst,
{"density": rho, "velocity": u, "moment2": pressure_field},
)
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
# Compile kernels
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
sync_pdfs = dh.synchronization_function([src.name])
# Initialization
init = macroscopic_values_setter(
collision.method,
velocity=(0,) * dh.dim,
pdfs=src.center_vector,
density=rho_0
)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
dh.fill(rho.name, rho_0)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0)
dh.fill(force_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(force_field.name, 0)
dh.run_kernel(init_kernel)
# time loop
def time_loop(start, steps):
dh.all_to_gpu()
for i in range(start, start + steps):
dh.run_kernel(
collision_kernel,
omega_shear=omega_shear,
omega_bulk=omega_bulk,
omega_odd=omega_odd,
omega_even=omega_even,
seed=42,
time_step=i,
)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
return start + steps
return dh, time_loop
@pytest.mark.parametrize(
"zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
)
@pytest.mark.parametrize(
"domain_size", [8, 60]
)
def test_resting_fluid(zero_centered: bool, domain_size: int, target=Target.CPU):
rho_0 = 0.86
kT = 4e-4
L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(
size=L[0],
target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.04,
omega_odd=0.3,
zero_centered=zero_centered,
)
# Test
t = 0
# warm up
t = time_loop(t, 10)
# Measurement
for i in range(10):
t = time_loop(t, 5)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservationo
# density per cell fluctuates, but toal mass is conserved
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(momentum, [0, 0, 0], atol=1e-10)
# temperature (fluctuates around pre-set kT)
kinetic_energy = 1 / 2 * np.dot(res_rho, res_u * res_u) / np.prod(L)
kT_tol = 0.075 *(16/domain_size)**(3/2)
np.testing.assert_allclose(kinetic_energy, [kT / 2] * 3, rtol=kT_tol)
# velocity distribution
v_hist, v_bins = np.histogram(
res_u, bins=11, range=(-0.075, 0.075), density=True
)
# Calculate expected values from single
v_expected = []
for j in range(len(v_hist)):
# Maxwell distribution
res = (
1.0
/ (v_bins[j + 1] - v_bins[j])
* single_component_maxwell(v_bins[j], v_bins[j + 1], kT, rho_0)
)
v_expected.append(res)
v_expected = np.array(v_expected)
hist_tol_all = 0.75 *(16/domain_size)**(3/2)
np.testing.assert_allclose(v_hist, v_expected, rtol=hist_tol_all)
hist_tol_center = hist_tol_all/10
remove = 3
np.testing.assert_allclose(
v_hist[remove:-remove], v_expected[remove:-remove], rtol=hist_tol_center
)
# pressure tensor against expressions from
# Duenweg, Schiller, Ladd, https://arxiv.org/abs/0707.1581
res_pressure = dh.gather_array("pressure").reshape((-1, 3, 3))
c_s = np.sqrt(1 / 3) # speed of sound
# average of pressure tensor
# Diagonal elements are rho c_s^22 +<u,u>. When the fluid is
# thermalized, the expectation value of <u,u> = kT due to the
# equi-partition theorem.
p_av_expected = np.diag([rho_0 * c_s**2 + kT] * 3)
pressure_atol = c_s**2 / 200 *(16/domain_size)**(3/2)
np.testing.assert_allclose(
np.mean(res_pressure, axis=0), p_av_expected, atol=pressure_atol)
@pytest.mark.parametrize(
"zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
)
@pytest.mark.parametrize(
"domain_size", [8, 60]
)
def test_point_force(zero_centered: bool, domain_size: int, target=Target.CPU):
"""Test momentum balance for thermalized fluid with applied poitn forces"""
rho_0 = 0.86
kT = 4e-4
L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(
size=L[0],
target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.8,
omega_odd=0.8,
zero_centered=zero_centered
)
# Test
t = 0
# warm up
t = time_loop(t, 100)
introduced_momentum = np.zeros(3)
for i in range(100):
point_force = 1e-2/domain_size**(3/2) * (np.random.random(3) - 0.5)
introduced_momentum += point_force
# Note that ghost layers are included in the indexing
force_pos = np.random.randint(1, L[0] - 2, size=3)
dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = point_force
t = time_loop(t, 1)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservation
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(
momentum, introduced_momentum + 0.5 * point_force, atol=1e-10
)
dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = np.zeros(3)
@pytest.mark.skipif(
not INSTRUCTION_SETS, reason="No vector instruction sets supported"
)
@pytest.mark.parametrize("data_type", ("float32", "float64"))
@pytest.mark.parametrize("assume_aligned", (True, False))
@pytest.mark.parametrize("assume_inner_stride_one", (True, False))
@pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
@pytest.mark.xfail(IS_PYSTENCILS_2, reason="Vectorization for RNGs not implemented yet")
def test_vectorization(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
method = create_mrt_orthogonal(
stencil=stencil, compressible=True, weighted=True, relaxation_rates=rr_getter
)
if IS_PYSTENCILS_2:
seed = ps.TypedSymbol("seed", np.uint32)
rng = ps.random.Philox("phil", data_type, seed)
fluct_options = {
"temperature": sp.Symbol("kT"),
"rng": rng,
}
else:
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
fluct_options = {
"temperature": sp.Symbol("kT"),
"rng_node": rng_node,
"block_offsets": tuple([0] * stencil.D),
}
lbm_config = LBMConfig(
lb_method=method,
fluctuating=fluct_options,
compressible=True,
zero_centered=False,
stencil=method.stencil,
kernel_type="collide_only",
)
lbm_opt = LBMOptimisation(
cse_global=True, symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp
)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
if not IS_PYSTENCILS_2:
instruction_sets = _skip_instruction_sets_windows(INSTRUCTION_SETS)
else:
instruction_sets = INSTRUCTION_SETS
instruction_set = instruction_sets[-1]
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig()
config.target = instruction_set
config.default_dtype = data_type
config.cpu.vectorize.enable = True
config.cpu.vectorize.assume_aligned = assume_aligned
config.cpu.vectorize.assume_inner_stride_one = assume_inner_stride_one
else:
config = ps.CreateKernelConfig(
target=Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info={
"instruction_set": instruction_set,
"assume_aligned": assume_aligned,
"assume_inner_stride_one": assume_inner_stride_one,
"assume_sufficient_line_padding": assume_sufficient_line_padding,
},
)
if not assume_inner_stride_one and "storeS" not in get_vector_instruction_set(
data_type, instruction_set
):
with pytest.warns(UserWarning) as pytest_warnings:
ast = ps.create_kernel(collision, config=config)
assert "Could not vectorize loop" in pytest_warnings[0].message.args[0]
else:
ast = ps.create_kernel(collision, config=config)
ast.compile()
code = get_code_str(ast)
print(code)
@pytest.mark.parametrize("data_type", ("float32", "float64"))
@pytest.mark.parametrize("assume_aligned", (True, False))
@pytest.mark.parametrize("assume_inner_stride_one", (True, False))
@pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
@pytest.mark.skipif(IS_PYSTENCILS_2, reason="waLBerla block offsets feature unavailable")
def test_fluctuating_lb_issue_188_wlb(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
temperature = sp.symbols("temperature")
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
fluctuating = {
"temperature": temperature,
"block_offsets": "walberla",
"rng_node": rng_node,
}
lbm_config = LBMConfig(
stencil=stencil,
method=Method.MRT,
compressible=True,
weighted=True,
zero_centered=False,
relaxation_rate=1.4,
fluctuating=fluctuating,
)
lbm_opt = LBMOptimisation(
symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp, cse_global=True
)
up = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
cpu_vectorize_info = {
"instruction_set": "avx",
"assume_inner_stride_one": True,
"assume_aligned": True,
}
config = ps.CreateKernelConfig(
target=ps.Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info=cpu_vectorize_info,
)
ast = create_kernel(up, config=config)
code = ps.get_code_str(ast)
# print(code)
if data_type == "float32":
assert "0.5f" in code
assert "_mm256_mul_ps" in code
assert "_mm256_sqrt_ps" in code
else:
assert "0.5f" not in code
assert "_mm256_mul_pd" in code
assert "_mm256_sqrt_pd" in code
import pytest
import numpy as np
from numpy.testing import assert_allclose
import sympy as sp
import pystencils as ps
from pystencils import Target
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Stencil, Method, ForceModel
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter, macroscopic_values_getter
from lbmpy.moments import (is_bulk_moment, moments_up_to_component_order,
exponents_to_polynomial_representations, exponent_tuple_sort_key)
from lbmpy.stencils import LBStencil
# all force models available are defined in the ForceModel enum, but Cumulant is not a "real" force model
force_models = [f for f in ForceModel if f is not ForceModel.CENTRALMOMENT]
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("omega", [0.5, 1.5])
def test_total_momentum(method_enum, zero_centered, force_model, omega):
if method_enum == Method.CUMULANT and \
force_model not in (ForceModel.SIMPLE, ForceModel.LUO, ForceModel.GUO, ForceModel.HE):
return
L = (16, 16)
stencil = LBStencil(Stencil.D2Q9)
F = (2e-4, -3e-4)
dh = ps.create_data_handling(L, periodicity=True, default_target=Target.CPU)
src = dh.add_array('src', values_per_cell=stencil.Q)
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho', values_per_cell=1)
u = dh.add_array('u', values_per_cell=stencil.D)
lbm_config = LBMConfig(method=method_enum, stencil=stencil, relaxation_rate=omega,
compressible=True, zero_centered=zero_centered,
force_model=force_model, force=F, streaming_pattern='pull')
lbm_opt = LBMOptimisation(symbolic_field=src)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
collision_kernel = ps.create_kernel(collision, config=config).compile()
fluid_density = 1.1
def init():
dh.fill(ρ.name, fluid_density)
dh.fill(u.name, 0)
setter = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
pdfs=src, density=ρ.center,
set_pre_collision_pdfs=True)
kernel = ps.create_kernel(setter).compile()
dh.run_kernel(kernel)
sync_pdfs = dh.synchronization_function([src.name])
getter = macroscopic_values_getter(collision.method, ρ.center, u.center_vector, src, use_pre_collision_pdfs=True)
getter_kernel = ps.create_kernel(getter).compile()
def time_loop(steps):
dh.all_to_gpu()
for _ in range(steps):
dh.run_kernel(collision_kernel)
dh.swap(src.name, dst.name)
sync_pdfs()
dh.all_to_cpu()
t = 20
init()
time_loop(t)
dh.run_kernel(getter_kernel)
# Check that density did not change
assert_allclose(dh.gather_array(ρ.name), fluid_density)
# Check for correct velocity
total = np.sum(dh.gather_array(u.name), axis=(0, 1))
assert_allclose(total / np.prod(L) / F * fluid_density / t, 1)
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_modes(force_model, compressible):
"""check force terms in mode space"""
_check_modes(LBStencil(Stencil.D2Q9), force_model, compressible)
@pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_modes_longrun(stencil, force_model, compressible):
"""check force terms in mode space"""
_check_modes(LBStencil(stencil), force_model, compressible)
@pytest.mark.parametrize("force_model", force_models)
def test_momentum_density_shift(force_model):
target = Target.CPU
stencil = LBStencil(Stencil.D2Q9)
domain_size = (4, 4)
dh = ps.create_data_handling(domain_size=domain_size, default_target=target)
rho = dh.add_array('rho', values_per_cell=1)
dh.fill('rho', 0.0, ghost_layers=True)
momentum_density = dh.add_array('momentum_density', values_per_cell=dh.dim)
dh.fill('momentum_density', 0.0, ghost_layers=True)
src = dh.add_array('src', values_per_cell=len(stencil))
dh.fill('src', 0.0, ghost_layers=True)
lbm_config = LBMConfig(method=Method.SRT, compressible=True, force_model=force_model,
force=(1, 2))
method = create_lb_method(lbm_config=lbm_config)
cqc = method.conserved_quantity_computation
momentum_density_getter = cqc.output_equations_from_pdfs(src.center_vector,
{'density': rho.center,
'momentum_density': momentum_density.center_vector})
config = ps.CreateKernelConfig(target=dh.default_target)
momentum_density_ast = ps.create_kernel(momentum_density_getter, config=config)
momentum_density_kernel = momentum_density_ast.compile()
dh.run_kernel(momentum_density_kernel)
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 0]) == np.prod(domain_size) / 2
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 1]) == np.prod(domain_size)
@pytest.mark.parametrize('force_model', force_models)
def test_forcing_space_equivalences(force_model):
if force_model == ForceModel.HE:
# We don't expect equivalence for the He model since its
# moments are derived from the continuous maxwellian
return
stencil = LBStencil(Stencil.D3Q27)
force = sp.symbols(f"F_:{stencil.D}")
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, force=force, force_model=force_model)
fmodel = lbm_config.force_model
lb_method = create_lb_method(lbm_config=lbm_config)
inv_moment_matrix = lb_method.moment_matrix.inv()
force_pdfs = sp.Matrix(fmodel(lb_method))
force_moments = fmodel.moment_space_forcing(lb_method)
diff = (force_pdfs - (inv_moment_matrix * force_moments)).expand()
for i, d in enumerate(diff):
assert d == 0, f"Mismatch between population and moment space forcing " \
f"in force model {force_model}, population f_{i}"
@pytest.mark.parametrize("force_model", [ForceModel.GUO, ForceModel.BUICK, ForceModel.SHANCHEN])
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("method", [Method.SRT, Method.TRT, Method.MRT])
def test_literature(force_model, stencil, method):
# Be aware that the choice of the conserved moments does not affect the forcing although omega is introduced
# in the forcing vector then. The reason is that:
# m_{100}^{\ast} = m_{100} + \omega ( m_{100}^{eq} - m_{100} ) + \left( 1 - \frac{\omega}{2} \right) F_x
# always simplifies to:
# m_{100}^{\ast} = m_{100} + F_x
# Thus the relaxation rate gets cancled again.
stencil = LBStencil(stencil)
omega_s = sp.Symbol("omega_s")
omega_b = sp.Symbol("omega_b")
omega_o = sp.Symbol("omega_o")
omega_e = sp.Symbol("omega_e")
if method == Method.SRT:
rrs = [omega_s]
omega_o = omega_b = omega_e = omega_s
elif method == Method.TRT:
rrs = [omega_e, omega_o]
omega_s = omega_b = omega_e
else:
rrs = [omega_s, omega_b, omega_o, omega_e, omega_o, omega_e]
F = sp.symbols(f"F_:{stencil.D}")
lbm_config = LBMConfig(method=method, weighted=True, stencil=stencil, relaxation_rates=rrs,
compressible=force_model != ForceModel.BUICK,
force_model=force_model, force=F)
lb_method = create_lb_method(lbm_config=lbm_config)
omega_momentum = list(set(lb_method.relaxation_rates[1:stencil.D + 1]))
assert len(omega_momentum) == 1
omega_momentum = omega_momentum[0]
subs_dict = lb_method.subs_dict_relaxation_rate
force_term = sp.simplify(lb_method.force_model(lb_method).subs(subs_dict))
u = sp.Matrix(lb_method.first_order_equilibrium_moment_symbols)
rho = lb_method.conserved_quantity_computation.density_symbol
# see silva2020 for nomenclature
F = sp.Matrix(F)
uf = sp.Matrix(u).dot(F)
F2 = sp.Matrix(F).dot(sp.Matrix(F))
Fq = sp.zeros(stencil.Q, 1)
uq = sp.zeros(stencil.Q, 1)
for i, cq in enumerate(stencil):
Fq[i] = sp.Matrix(cq).dot(sp.Matrix(F))
uq[i] = sp.Matrix(cq).dot(u)
common_plus = 3 * (1 - omega_e / 2)
common_minus = 3 * (1 - omega_momentum / 2)
result = []
if method == Method.MRT and force_model == ForceModel.GUO:
# check against eq. 4.68 from schiller2008thermal
uf = u.dot(F) * sp.eye(len(F))
G = (u * F.transpose() + F * u.transpose() - uf * sp.Rational(2, lb_method.dim)) * sp.Rational(1, 2) * \
(2 - omega_s) + uf * sp.Rational(1, lb_method.dim) * (2 - omega_b)
for direction, w_i in zip(lb_method.stencil, lb_method.weights):
direction = sp.Matrix(direction)
tr = sp.trace(G * (direction * direction.transpose() - sp.Rational(1, 3) * sp.eye(len(F))))
result.append(3 * w_i * (F.dot(direction) + sp.Rational(3, 2) * tr))
elif force_model == ForceModel.GUO:
# check against table 2 in silva2020 (correct for SRT and TRT), matches eq. 20 from guo2002discrete (for SRT)
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf)
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.BUICK:
# check against table 2 in silva2020 (correct for all collision models due to the simplicity of Buick),
# matches eq. 18 from silva2010 (for SRT)
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = 0
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.EDM:
# check against table 2 in silva2020
if method == Method.MRT:
# for mrt no literature terms are known at the time of writing this test case.
# However it is most likly correct since SRT and TRT are derived from the moment space representation
pytest.skip()
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf) + ((w_i / (8 * rho)) * (3 * Fq[i] ** 2 - F2))
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.SHANCHEN:
# check against table 2 in silva2020
if method == Method.MRT:
# for mrt no literature terms are known at the time of writing this test case.
# However it is most likly correct since SRT and TRT are derived from the moment space representation
pytest.skip()
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf) + common_plus ** 2 * (
(w_i / (2 * rho)) * (3 * Fq[i] ** 2 - F2))
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
assert list(sp.simplify(force_term - sp.Matrix(result))) == [0] * len(stencil)
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_modes_central_moment(force_model, compressible):
"""check force terms in mode space"""
stencil = LBStencil(Stencil.D2Q9)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_symmetric_forcing_equivalence(force_model, compressible):
stencil = LBStencil(Stencil.D2Q9)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
moments = moments_up_to_component_order(2, dim=2)
moments = sorted(moments, key=exponent_tuple_sort_key)
moment_polys = exponents_to_polynomial_representations(moments)
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
nested_moments=moment_polys, compressible=True, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
if not method.force_model.has_symmetric_central_moment_forcing:
return
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
force_before, force_after = method.force_model.symmetric_central_moment_forcing(method, moments)
d = method.relaxation_matrix
eye = sp.eye(stencil.Q)
force_combined = (eye - d) @ force_before + force_after
assert (force_moments - force_combined).expand() == sp.Matrix([0] * stencil.Q)
@pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_modes_central_moment_longrun(stencil, force_model, compressible):
"""check force terms in mode space"""
stencil = LBStencil(stencil)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
def _check_modes(stencil, force_model, compressible):
omega_s = sp.Symbol("omega_s")
omega_b = sp.Symbol("omega_b")
omega_o = sp.Symbol("omega_o")
omega_e = sp.Symbol("omega_e")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.MRT, stencil=stencil,
relaxation_rates=[omega_s, omega_b, omega_o, omega_e, omega_o, omega_e],
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
if force_model == ForceModel.GUO:
num_stresses = (stencil.D * stencil.D - stencil.D) // 2 + stencil.D
lambda_s, lambda_b = -omega_s, -omega_b
# The stress moments should match eq. 47 from https://doi.org/10.1023/A:1010414013942
u = method.first_order_equilibrium_moment_symbols
def traceless(m):
tr = sp.simplify(sum([m[i, i] for i in range(stencil.D)]))
return m - tr / m.shape[0] * sp.eye(m.shape[0])
C = sp.Rational(1, 2) * (2 + lambda_s) * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) +
traceless(sp.Matrix(F) * sp.Matrix(u).transpose())) + \
sp.Rational(1, method.dim) * (2 + lambda_b) * sp.Matrix(u).dot(F) * sp.eye(method.dim)
subs = {sp.Symbol(chr(ord("x") + i)) * sp.Symbol(chr(ord("x") + j)): C[i, j]
for i in range(stencil.D) for j in range(stencil.D)}
for force_moment, moment in zip(force_moments[stencil.D + 1:stencil.D + 1 + num_stresses],
method.moments[stencil.D + 1:stencil.D + 1 + num_stresses]):
ref = moment.subs(subs)
diff = sp.simplify(ref - force_moment)
if is_bulk_moment(moment, stencil.D):
assert diff == 0 or isinstance(diff, sp.Rational) # difference should be zero or a constant
else:
assert diff == 0 # difference should be zero
ff = method.moment_matrix.inv() * sp.Matrix(method.force_model.moment_space_forcing(method).subs(subs_dict))
# Check eq. 4.53a from schiller2008thermal
assert sp.simplify(sum(ff)) == 0
# Check eq. 4.53b from schiller2008thermal
assert [sp.simplify(sum(ff[i] * stencil[i][j] for i in range(len(stencil)))) for j in range(stencil.D)] == F
# Check eq. 4.61a from schiller2008thermal
ref = (2 + lambda_s) / 2 * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) +
traceless(sp.Matrix(F) * sp.Matrix(u).transpose()))
s = sp.zeros(stencil.D)
for i in range(0, len(stencil)):
s += ff[i] * traceless(sp.Matrix(stencil[i]) * sp.Matrix(stencil[i]).transpose())
assert sp.simplify(s - ref) == sp.zeros(stencil.D)
# Check eq. 4.61b from schiller2008thermal
assert sp.simplify(sum(ff[i] * stencil[i][a] ** 2 for i in range(len(stencil)) for a in range(stencil.D))
- (2 + lambda_b) * sp.Matrix(u).dot(F)) == 0
# All other moments should be zero
assert list(force_moments[stencil.D + 1 + num_stresses:]) == [0] * \
(len(stencil) - (stencil.D + 1 + num_stresses))
elif force_model == ForceModel.SIMPLE:
# All other moments should be zero
assert list(force_moments[stencil.D + 1:]) == [0] * (len(stencil) - (stencil.D + 1))
import numpy as np
from lbmpy.boundaries import UBB, NoSlip
from lbmpy.enums import ForceModel
from lbmpy.scenarios import create_channel
from pystencils import make_slice
......@@ -26,10 +27,11 @@ def test_force_on_boundary():
for parallel in (False, True) if wLB else (False,):
for boundary_obj in boundaries:
print("Testing parallel %d, boundary %s" % (parallel, boundary_obj.name))
step = create_channel(domain_size, force=1e-5, relaxation_rate=1.5, parallel=parallel, force_model='buick')
print(f"Testing parallel {parallel}, boundary {boundary_obj.name}")
step = create_channel(domain_size, force=1e-5, relaxation_rate=1.5, parallel=parallel,
force_model=ForceModel.BUICK)
force = calculate_force(step, boundary_obj)
print(" -> force = ", force)
print(f" -> force = {force}")
results.append(force)
for res in results[1:]:
......
from dataclasses import replace
import pystencils as ps
import lbmpy as lp
from pystencils.slicing import slice_from_direction
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries.boundaryconditions import FreeSlip, NoSlip, ExtrapolationOutflow, UBB
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule
from lbmpy.macroscopic_value_kernels import pdf_initialization_assignments
import numpy as np
def velocity_info_callback(boundary_data, **_):
boundary_data['vel_1'] = 0
boundary_data['vel_2'] = 0
u_max = 0.1
x, y = boundary_data.link_positions(0), boundary_data.link_positions(1)
dist = (15 - y) / 15
boundary_data['vel_0'] = u_max * (1 - dist)
def test_free_slip():
stencil = lp.LBStencil(lp.Stencil.D3Q27)
domain_size = (30, 15, 30)
dim = len(domain_size)
dh = ps.create_data_handling(domain_size=domain_size)
src = dh.add_array('src', values_per_cell=stencil.Q)
dh.fill('src', 0.0, ghost_layers=True)
dst = dh.add_array('dst', values_per_cell=stencil.Q)
dh.fill('dst', 0.0, ghost_layers=True)
velField = dh.add_array('velField', values_per_cell=stencil.D)
dh.fill('velField', 0.0, ghost_layers=True)
lbm_config = lp.LBMConfig(stencil=stencil, method=lp.Method.SRT, relaxation_rate=1.8,
output={'velocity': velField}, kernel_type='stream_pull_collide')
method = create_lb_method(lbm_config=lbm_config)
init = pdf_initialization_assignments(method, 1.0, (0, 0, 0), src.center_vector)
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=False)
ast_init = ps.create_kernel(init, config=config)
kernel_init = ast_init.compile()
dh.run_kernel(kernel_init)
lbm_opt = lp.LBMOptimisation(symbolic_field=src, symbolic_temporary_field=dst)
lbm_config = replace(lbm_config, lb_method=method)
update = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
ast_kernel = ps.create_kernel(update, config=config)
kernel = ast_kernel.compile()
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'src', name="bh")
inflow = UBB(velocity_info_callback, dim=dim)
outflow = ExtrapolationOutflow(stencil[4], method)
wall = NoSlip("wall")
freeslip = FreeSlip(stencil, (0, -1, 0))
bh.set_boundary(inflow, slice_from_direction('W', dim))
bh.set_boundary(outflow, slice_from_direction('E', dim))
bh.set_boundary(wall, slice_from_direction('S', dim))
bh.set_boundary(wall, slice_from_direction('T', dim))
bh.set_boundary(wall, slice_from_direction('B', dim))
bh.set_boundary(freeslip, slice_from_direction('N', dim))
for i in range(2000):
bh()
dh.run_kernel(kernel)
dh.swap("src", "dst")
vel_profile = dh.gather_array('velField')[-2, :, domain_size[2] // 2, 0]
np.testing.assert_almost_equal(np.gradient(vel_profile)[-1], 0, decimal=3)
......@@ -4,8 +4,10 @@ import numpy as np
import pytest
from lbmpy.boundaries import NoSlip
from lbmpy.enums import Method
from lbmpy.geometry import add_black_and_white_image, add_pipe_walls
from lbmpy.lbstep import LatticeBoltzmannStep
import lbmpy.plot as plt
from pystencils.slicing import make_slice
......@@ -23,23 +25,20 @@ def test_pipe():
plot = False
for domain_size in [(30, 10, 10), (30, 10)]:
for diameter in [5, 10, diameter_callback]:
sc = LatticeBoltzmannStep(domain_size=domain_size, method='srt', relaxation_rate=1.9,
optimization={})
sc = LatticeBoltzmannStep(domain_size=domain_size, method=Method.SRT, relaxation_rate=1.9)
add_pipe_walls(sc.boundary_handling, diameter)
if plot:
import lbmpy.plot as plt
from pystencils.slicing import make_slice
if len(domain_size) == 2:
plt.boundary_handling(sc.boundary_handling)
plt.title("2D, diameter=%s" % (str(diameter,)))
plt.title(f"2D, diameter={str(diameter,)}")
plt.show()
elif len(domain_size) == 3:
plt.subplot(1, 2, 1)
plt.boundary_handling(sc.boundary_handling, make_slice[0.5, :, :])
plt.title("3D, diameter=%s" % (str(diameter,)))
plt.title(f"3D, diameter={str(diameter,)}")
plt.subplot(1, 2, 2)
plt.boundary_handling(sc.boundary_handling, make_slice[:, 0.5, :])
plt.title("3D, diameter=%s" % (str(diameter, )))
plt.title(f"3D, diameter={str(diameter, )}")
plt.show()
......@@ -51,14 +50,12 @@ def get_test_image_path():
def test_image():
pytest.importorskip('scipy.ndimage')
sc = LatticeBoltzmannStep(domain_size=(50, 40), method='srt', relaxation_rate=1.9,
optimization={})
sc = LatticeBoltzmannStep(domain_size=(50, 40), method=Method.SRT, relaxation_rate=1.9)
add_black_and_white_image(sc.boundary_handling, get_test_image_path(), keep_aspect_ratio=True)
def test_slice_mask_combination():
sc = LatticeBoltzmannStep(domain_size=(30, 30), method='srt', relaxation_rate=1.9,
optimization={})
sc = LatticeBoltzmannStep(domain_size=(30, 30), method=Method.SRT, relaxation_rate=1.9)
def callback(*coordinates):
x = coordinates[0]
......
from lbmpy.creationfunctions import create_lb_ast, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
import pytest
from pystencils import Target, CreateKernelConfig
@pytest.mark.xfail(IS_PYSTENCILS_2, reason="GPU block size limiting by registers is not available yet")
def test_gpu_block_size_limiting():
pytest.importorskip("cupy")
too_large = 2048*2048
lbm_config = LBMConfig(method=Method.CUMULANT, stencil=LBStencil(Stencil.D3Q19),
relaxation_rate=1.8, compressible=True)
config = CreateKernelConfig(target=Target.GPU,
gpu_indexing_params={'block_size': (too_large, too_large, too_large)})
ast = create_lb_ast(lbm_config=lbm_config, config=config)
limited_block_size = ast.indexing.call_parameters((1024, 1024, 1024))
kernel = ast.compile()
assert all(b < too_large for b in limited_block_size['block'])
bs = [too_large, too_large, too_large]
bs = ast.indexing.limit_block_size_by_register_restriction(bs, kernel.num_regs)
assert all(b < too_large for b in bs)
import pytest
from lbmpy.creationfunctions import create_lb_method
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil
from lbmpy.methods.creationfunctions import compare_moment_based_lb_methods
from lbmpy.moments import (
moment_equality_table, moment_equality_table_by_stencil, moments_up_to_component_order)
from lbmpy.stencils import get_stencil
from lbmpy.stencils import LBStencil
def test_moment_comparison_table():
pytest.importorskip('ipy_table')
new = create_lb_method(stencil='D3Q19', maxwellian_moments=True)
old = create_lb_method(stencil='D3Q19', maxwellian_moments=False)
lbm_config_new = LBMConfig(stencil=LBStencil(Stencil.D3Q19), continuous_equilibrium=True)
lbm_config_old = LBMConfig(stencil=LBStencil(Stencil.D3Q19), continuous_equilibrium=False)
new = create_lb_method(lbm_config=lbm_config_new)
old = create_lb_method(lbm_config=lbm_config_old)
assert old.zeroth_order_equilibrium_moment_symbol == new.zeroth_order_equilibrium_moment_symbol
......@@ -22,18 +27,18 @@ def test_moment_comparison_table():
res_all = compare_moment_based_lb_methods(old, new, show_deviations_only=False)
assert len(res_all.array) == 20
d3q27 = create_lb_method(stencil='D3Q27')
d3q27 = create_lb_method(LBMConfig(stencil=LBStencil(Stencil.D3Q27)))
compare_moment_based_lb_methods(d3q27, new, show_deviations_only=False)
compare_moment_based_lb_methods(new, d3q27, show_deviations_only=False)
def test_moment_equality_table():
pytest.importorskip('ipy_table')
d3q19 = get_stencil('D3Q19')
d3q19 = LBStencil(Stencil.D3Q19)
table1 = moment_equality_table(d3q19, max_order=3)
assert len(table1.array) == 5
table2 = moment_equality_table_by_stencil({'D3Q19': d3q19, 'D3Q27': get_stencil("D3Q27")},
table2 = moment_equality_table_by_stencil({'D3Q19': d3q19, 'D3Q27': LBStencil(Stencil.D3Q27)},
moments_up_to_component_order(2, dim=3))
assert len(table2.array) == 11
assert len(table2.array[0]) == 2 + 2
import pytest
import sympy as sp
from itertools import chain
from pystencils.sympyextensions import remove_higher_order_terms
from lbmpy.stencils import Stencil, LBStencil
from lbmpy.equilibrium import ContinuousHydrodynamicMaxwellian, DiscreteHydrodynamicMaxwellian
from lbmpy.moments import moments_up_to_component_order, moments_up_to_order
from lbmpy.maxwellian_equilibrium import get_equilibrium_values_of_maxwell_boltzmann_function
from lbmpy.methods.default_moment_sets import cascaded_moment_sets_literature, mrt_orthogonal_modes_literature
def test_compressible_raw_moment_values():
stencil = LBStencil("D3Q27")
equilibrium = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=True, deviation_only=False)
raw_moments = list(chain.from_iterable(mrt_orthogonal_modes_literature(stencil, False)))
values_a = equilibrium.moments(raw_moments)
values_b = get_equilibrium_values_of_maxwell_boltzmann_function(raw_moments, stencil.D, space="moment")
for m, a, b in zip(raw_moments, values_a, values_b):
assert (a - b).expand() == sp.Integer(0), f"Mismatch at moment {m}."
def test_compressible_central_moment_values():
stencil = LBStencil("D3Q27")
equilibrium = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=True, deviation_only=False)
central_moments = list(chain.from_iterable(cascaded_moment_sets_literature(stencil)))
values_a = equilibrium.central_moments(central_moments)
values_b = get_equilibrium_values_of_maxwell_boltzmann_function(central_moments, stencil.D, space="central moment")
for m, a, b in zip(central_moments, values_a, values_b):
assert (a - b).expand() == sp.Integer(0), f"Mismatch at moment {m}."
def test_compressible_cumulant_values():
stencil = LBStencil("D3Q27")
equilibrium = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=True, deviation_only=False)
cumulants = list(chain.from_iterable(cascaded_moment_sets_literature(stencil)))
values_a = equilibrium.cumulants(cumulants, rescale=False)
values_b = get_equilibrium_values_of_maxwell_boltzmann_function(cumulants, stencil.D, space="cumulant")
for m, a, b in zip(cumulants, values_a, values_b):
assert (a - b).expand() == sp.Integer(0), f"Mismatch at cumulant {m}."
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('compressible', [False, True])
@pytest.mark.parametrize('deviation_only', [False, True])
def test_continuous_discrete_moment_equivalence(stencil, compressible, deviation_only):
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cd = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
cm = sp.Matrix(cd.moments(moments))
dd = DiscreteHydrodynamicMaxwellian(stencil, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
dm = sp.Matrix(dd.moments(moments))
rho = cd.density
delta_rho = cd.density_deviation
rho_0 = cd.background_density
subs = { delta_rho : rho - rho_0 }
assert (cm - dm).subs(subs).expand() == sp.Matrix((0,) * len(moments))
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('compressible', [False, True])
@pytest.mark.parametrize('deviation_only', [False, True])
def test_continuous_discrete_central_moment_equivalence(stencil, compressible, deviation_only):
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cd = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
cm = sp.Matrix(cd.central_moments(moments))
dd = DiscreteHydrodynamicMaxwellian(stencil, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
dm = sp.Matrix(dd.central_moments(moments))
dm = sp.Matrix([remove_higher_order_terms(t, dd.velocity, order=2) for t in dm])
rho = cd.density
delta_rho = cd.density_deviation
rho_0 = cd.background_density
subs = { delta_rho : rho - rho_0 }
assert (cm - dm).subs(subs).expand() == sp.Matrix((0,) * len(moments))
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
def test_continuous_discrete_cumulant_equivalence(stencil):
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
compressible = True
deviation_only = False
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cd = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
cm = sp.Matrix(cd.cumulants(moments))
dd = DiscreteHydrodynamicMaxwellian(stencil, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
dm = sp.Matrix(dd.cumulants(moments))
dm = sp.Matrix([remove_higher_order_terms(t, dd.velocity, order=2) for t in dm])
rho = cd.density
delta_rho = cd.density_deviation
rho_0 = cd.background_density
subs = { delta_rho : rho - rho_0 }
assert (cm - dm).subs(subs).expand() == sp.Matrix((0,) * len(moments))
import pytest
import numpy as np
from pystencils.slicing import slice_from_direction
from lbmpy import LBMConfig, LBStencil, Stencil, Method
from lbmpy.boundaries.boundaryconditions import NoSlip, NoSlipLinearBouzidi, QuadraticBounceBack, UBB
from lbmpy.lbstep import LatticeBoltzmannStep
def check_velocity(noslip_velocity, interpolated_velocity, wall_distance):
# First we check if the flow is fully developed
np.testing.assert_almost_equal(np.gradient(np.gradient(noslip_velocity)), 0, decimal=8)
np.testing.assert_almost_equal(np.gradient(np.gradient(interpolated_velocity)), 0, decimal=8)
# If the wall is closer to the first fluid cell we expect a lower velocity at the first fluid cell
if wall_distance < 0.5:
assert noslip_velocity[0] > interpolated_velocity[0]
# If the wall is further away from the first fluid cell we expect a higher velocity at the first fluid cell
if wall_distance > 0.5:
assert noslip_velocity[0] < interpolated_velocity[0]
# If the wall cuts the cell halfway the interpolation BC should approximately fall back to a noslip boundary
if wall_distance == 0.5:
np.testing.assert_almost_equal(noslip_velocity[0], interpolated_velocity[0], decimal=2)
def couette_flow(stencil, method_enum, zero_centered, wall_distance, compressible):
dim = stencil.D
if dim == 2:
domain_size = (50, 25)
wall_velocity = (0.01, 0)
periodicity = (True, False)
else:
domain_size = (50, 25, 25)
wall_velocity = (0.01, 0, 0)
periodicity = (True, False, True)
timesteps = 10000
omega = 1.1
lbm_config = LBMConfig(stencil=stencil,
method=method_enum,
relaxation_rate=omega,
zero_centered=zero_centered,
compressible=compressible,
weighted=True)
lb_step_noslip = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
lb_step_bouzidi = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
lb_step_quadratic_bb = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
def init_wall_distance(boundary_data, **_):
for cell in boundary_data.index_array:
cell['q'] = wall_distance
moving_wall = UBB(wall_velocity)
noslip = NoSlip("wall")
bouzidi = NoSlipLinearBouzidi("wall", init_wall_distance=init_wall_distance)
quadratic_bb = QuadraticBounceBack(omega, "wall", init_wall_distance=init_wall_distance)
lb_step_noslip.boundary_handling.set_boundary(noslip, slice_from_direction('S', dim))
lb_step_noslip.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_bouzidi.boundary_handling.set_boundary(bouzidi, slice_from_direction('S', dim))
lb_step_bouzidi.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_quadratic_bb.boundary_handling.set_boundary(quadratic_bb, slice_from_direction('S', dim))
lb_step_quadratic_bb.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_noslip.run(timesteps)
lb_step_bouzidi.run(timesteps)
lb_step_quadratic_bb.run(timesteps)
if dim == 2:
noslip_velocity = lb_step_noslip.velocity[domain_size[0] // 2, :, 0]
bouzidi_velocity = lb_step_bouzidi.velocity[domain_size[0] // 2, :, 0]
quadratic_bb_velocity = lb_step_quadratic_bb.velocity[domain_size[0] // 2, :, 0]
else:
noslip_velocity = lb_step_noslip.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
bouzidi_velocity = lb_step_bouzidi.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
quadratic_bb_velocity = lb_step_quadratic_bb.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
check_velocity(noslip_velocity, bouzidi_velocity, wall_distance)
check_velocity(noslip_velocity, quadratic_bb_velocity, wall_distance)
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("compressible", [True, False])
def test_couette_flow_short(zero_centered, wall_distance, compressible):
stencil = LBStencil(Stencil.D2Q9)
couette_flow(stencil, Method.SRT, zero_centered, wall_distance, compressible)
@pytest.mark.parametrize("method_enum", [Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_couette_flow_long(method_enum, zero_centered, wall_distance, compressible):
if method_enum is Method.CUMULANT and compressible is False:
pytest.skip("incompressible cumulant is not supported")
stencil = LBStencil(Stencil.D2Q9)
couette_flow(stencil, method_enum, zero_centered, wall_distance, compressible)
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("stencil", [Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.longrun
def test_couette_flow_d3d(method_enum, wall_distance, stencil):
stencil = LBStencil(stencil)
couette_flow(stencil, method_enum, True, wall_distance, True)
"""
Test the lbmpy-specific JSON encoder and serializer as used in the Database class.
"""
import tempfile
import pystencils as ps
from lbmpy import Stencil, Method, ForceModel, SubgridScaleModel
from lbmpy.advanced_streaming import Timestep
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation, LBStencil
from lbmpy.fieldaccess import StreamPullTwoFieldsAccessor
from pystencils.runhelper import Database
from lbmpy.db import LbmpyJsonSerializer
def test_json_serializer():
stencil = LBStencil(Stencil.D3Q27)
q = stencil.Q
pdfs, pdfs_tmp = ps.fields(f"pdfs({q}), pdfs_tmp({q}): double[3D]", layout='fzyx')
density = ps.fields(f"rho: double[3D]", layout='fzyx')
from lbmpy.non_newtonian_models import CassonsParameters
cassons_params = CassonsParameters(0.2)
# create dummy lbmpy config
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q27), method=Method.CUMULANT, force_model=ForceModel.GUO,
compressible=True, relaxation_rate=1.999, subgrid_scale_model=SubgridScaleModel.SMAGORINSKY,
galilean_correction=True, cassons=cassons_params, density_input=density,
kernel_type=StreamPullTwoFieldsAccessor, timestep=Timestep.BOTH)
lbm_optimization = LBMOptimisation(cse_pdfs=False, cse_global=False, builtin_periodicity=(True, False, False),
symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp)
# create dummy database
temp_dir = tempfile.TemporaryDirectory()
db = Database(file=temp_dir.name, serializer_info=('lbmpy_serializer', LbmpyJsonSerializer))
db.save(params={'lbm_config': lbm_config, 'lbm_optimization': lbm_optimization}, result={'test': 'dummy'})
import numpy as np
import pytest
from types import MappingProxyType
from pystencils import Target, CreateKernelConfig
from lbmpy.scenarios import create_fully_periodic_flow, create_lid_driven_cavity
from lbmpy import SubgridScaleModel
try:
import pycuda.driver
import cupy
gpu_available = True
except ImportError:
gpu_available = False
......@@ -30,58 +33,24 @@ def test_data_handling_3d():
for gpu in [False, True] if gpu_available else [False]:
if parallel and gpu and not hasattr(wLB, 'cuda'):
continue
print("Testing parallel: %s\tgpu: %s" % (parallel, gpu))
opt_params = {'target': 'gpu' if gpu else 'cpu',
'gpu_indexing_params': {'block_size': (8, 4, 2)}}
print(f"Testing parallel: {parallel}\tgpu: {gpu}")
config = CreateKernelConfig(target=Target.GPU if gpu else Target.CPU,
gpu_indexing_params=MappingProxyType({'block_size': (8, 4, 2)}))
if parallel:
from pystencils.datahandling import ParallelDataHandling
blocks = wLB.createUniformBlockGrid(blocks=(2, 3, 4), cellsPerBlock=(5, 5, 5),
oneBlockPerProcess=False)
dh = ParallelDataHandling(blocks, dim=3)
rho = ldc_setup(data_handling=dh, optimization=opt_params)
rho = ldc_setup(data_handling=dh, config=config)
results.append(rho)
else:
rho = ldc_setup(domain_size=(10, 15, 20), parallel=False, optimization=opt_params)
rho = ldc_setup(domain_size=(10, 15, 20), parallel=False, config=config)
results.append(rho)
for i, arr in enumerate(results[1:]):
print("Testing equivalence version 0 with version %d" % (i + 1,))
np.testing.assert_almost_equal(results[0], arr)
def test_data_handling_2d_opencl():
pytest.importorskip('pyopencl')
import pystencils.opencl.opencljit
pystencils.opencl.opencljit.init_globally()
print("--- LDC 2D test ---")
results = []
# Since waLBerla has no OpenCL Backend yet, it is not possible to use the
# parallel Datahandling with OpenCL at the moment
# TODO: Activate parallel Datahandling if Backend is available
parallel = False
for gpu in [True, False] if gpu_available else [False]:
if parallel and gpu and not hasattr(wLB, 'cuda'):
continue
print("Testing parallel: %s\tgpu: %s" % (parallel, gpu))
opt_params = {'target': 'opencl' if gpu else 'cpu',
'gpu_indexing_params': {'block_size': (8, 4, 2)}}
if parallel:
from pystencils.datahandling import ParallelDataHandling
blocks = wLB.createUniformBlockGrid(blocks=(2, 3, 1), cellsPerBlock=(5, 5, 1),
oneBlockPerProcess=False)
dh = ParallelDataHandling(blocks, dim=2)
rho = ldc_setup(data_handling=dh, optimization=opt_params)
results.append(rho)
else:
rho = ldc_setup(domain_size=(10, 15), parallel=False, optimization=opt_params)
results.append(rho)
for i, arr in enumerate(results[1:]):
print("Testing equivalence version 0 with version %d" % (i + 1,))
np.testing.assert_almost_equal(results[0], arr)
def test_data_handling_2d():
print("--- LDC 2D test ---")
results = []
......@@ -90,26 +59,32 @@ def test_data_handling_2d():
if parallel and gpu and not hasattr(wLB, 'cuda'):
continue
print("Testing parallel: %s\tgpu: %s" % (parallel, gpu))
opt_params = {'target': 'gpu' if gpu else 'cpu',
'gpu_indexing_params': {'block_size': (8, 4, 2)}}
print(f"Testing parallel: {parallel}\tgpu: {gpu}")
config = CreateKernelConfig(target=Target.GPU if gpu else Target.CPU,
gpu_indexing_params=MappingProxyType({'block_size': (8, 4, 2)}))
if parallel:
from pystencils.datahandling import ParallelDataHandling
blocks = wLB.createUniformBlockGrid(blocks=(2, 3, 1), cellsPerBlock=(5, 5, 1),
oneBlockPerProcess=False)
dh = ParallelDataHandling(blocks, dim=2)
rho = ldc_setup(data_handling=dh, optimization=opt_params)
rho = ldc_setup(data_handling=dh, config=config)
results.append(rho)
else:
rho = ldc_setup(domain_size=(10, 15), parallel=False, optimization=opt_params)
rho = ldc_setup(domain_size=(10, 15), parallel=False, config=config)
results.append(rho)
for i, arr in enumerate(results[1:]):
print("Testing equivalence version 0 with version %d" % (i + 1,))
print(f"Testing equivalence version 0 with version {i + 1}")
np.testing.assert_almost_equal(results[0], arr)
def test_smagorinsky_setup():
step = create_lid_driven_cavity((30, 30), smagorinsky=0.16, relaxation_rate=1.99)
step = create_lid_driven_cavity((30, 30), subgrid_scale_model=(SubgridScaleModel.SMAGORINSKY, 0.16),
relaxation_rate=1.99)
step.run(10)
def test_qr_setup():
step = create_lid_driven_cavity((30, 30), subgrid_scale_model=(SubgridScaleModel.QR, 0.33),
relaxation_rate=1.99)
step.run(10)
......@@ -123,7 +98,7 @@ def test_advanced_initialization():
init_vel[:, height // 3: height // 3 * 2, 0] = -velocity_magnitude
# small random y velocity component
init_vel[:, :, 1] = 0.1 * velocity_magnitude * np.random.rand(width, height)
shear_flow_scenario = create_fully_periodic_flow(initial_velocity=init_vel, relaxation_rate=1.95)
shear_flow_scenario = create_fully_periodic_flow(initial_velocity=init_vel, relaxation_rate=1.99)
with pytest.raises(ValueError) as e:
shear_flow_scenario.run_iterative_initialization(max_steps=20000, check_residuum_after=500)
assert 'did not converge' in str(e.value)
......
from functools import partial
import pystencils as ps
from pystencils.astnodes import LoopOverCoordinate
from lbmpy._compat import get_loop_counter_symbol
from pystencils.slicing import get_periodic_boundary_functor
from lbmpy.creationfunctions import create_lb_update_rule
from lbmpy.stencils import get_stencil
from lbmpy.creationfunctions import create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Stencil, ForceModel
from lbmpy.stencils import LBStencil
from lbmpy.updatekernels import create_stream_pull_with_output_kernel
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from lbmpy.relaxationrates import lattice_viscosity_from_relaxation_rate
......@@ -57,42 +58,37 @@ def test_lees_edwards():
shear_dir = 0 # direction of shear flow
shear_dir_normal = 1 # direction normal to shear plane, for interpolation
stencil = get_stencil("D2Q9")
stencil = LBStencil(Stencil.D2Q9)
dim = len(stencil[0])
dh = ps.create_data_handling(domain_size, periodicity=True, default_target='cpu')
dh = ps.create_data_handling(domain_size, periodicity=True, default_target=ps.Target.CPU)
src = dh.add_array('src', values_per_cell=len(stencil))
src = dh.add_array('src', values_per_cell=stencil.Q)
dh.fill('src', 1.0, ghost_layers=True)
dst = dh.add_array_like('dst', 'src')
dh.fill('dst', 0.0, ghost_layers=True)
force = dh.add_array('force', values_per_cell=dh.dim)
force = dh.add_array('force', values_per_cell=stencil.D)
dh.fill('force', 0.0, ghost_layers=True)
rho = dh.add_array('rho', values_per_cell=1)
dh.fill('rho', 1.0, ghost_layers=True)
u = dh.add_array('u', values_per_cell=dh.dim)
u = dh.add_array('u', values_per_cell=stencil.D)
dh.fill('u', 0.0, ghost_layers=True)
counters = [LoopOverCoordinate.get_loop_counter_symbol(i) for i in range(dim)]
counters = [get_loop_counter_symbol(i) for i in range(stencil.D)]
points_up = sp.Symbol('points_up')
points_down = sp.Symbol('points_down')
u_p = sp.Piecewise((1, sp.And(ps.data_types.type_all_numbers(counters[1] <= 1, 'int'), points_down)),
(-1, sp.And(ps.data_types.type_all_numbers(counters[1] >= src.shape[1] - 2, 'int'),
points_up)), (0, True)) * shear_velocity
u_p = sp.Piecewise((1, sp.And(counters[1] <= 1, points_down)),
(-1, sp.And(counters[1] >= src.shape[1] - 2, points_up)), (0, True)) * shear_velocity
collision = create_lb_update_rule(stencil=stencil,
relaxation_rate=omega,
compressible=True,
velocity_input=u.center_vector + sp.Matrix([u_p, 0]),
density_input=rho,
force_model='luo',
force=force.center_vector,
kernel_type='collide_only',
optimization={'symbolic_field': src})
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=omega, compressible=True,
velocity_input=u.center_vector + sp.Matrix([u_p, 0]), density_input=rho,
force_model=ForceModel.LUO, force=force.center_vector,
kernel_type='collide_only')
lbm_opt = LBMOptimisation(symbolic_field=src)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
to_insert = [s.lhs for s in collision.subexpressions
if collision.method.first_order_equilibrium_moment_symbols[shear_dir]
......@@ -115,8 +111,9 @@ def test_lees_edwards():
stream = create_stream_pull_with_output_kernel(collision.method, src, dst,
{'density': rho, 'velocity': u})
stream_kernel = ps.create_kernel(stream, target=dh.default_target).compile()
collision_kernel = ps.create_kernel(collision, target=dh.default_target).compile()
config = ps.CreateKernelConfig(target=dh.default_target)
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
init = macroscopic_values_setter(collision.method, velocity=(0, 0),
pdfs=src.center_vector, density=rho.center)
......
import pytest
import numpy as np
import sympy as sp
from lbmpy.creationfunctions import create_lb_method
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil, ForceModel, Method
from lbmpy.macroscopic_value_kernels import (
compile_macroscopic_values_getter, compile_macroscopic_values_setter)
compile_macroscopic_values_getter, compile_macroscopic_values_setter, strain_rate_tensor_getter)
from lbmpy.advanced_streaming.utility import streaming_patterns, Timestep
from lbmpy.stencils import LBStencil
import pytest
@pytest.mark.parametrize('stencil', ['D2Q9', 'D3Q19'])
@pytest.mark.parametrize('force_model', ['guo', 'luo', None])
@pytest.mark.parametrize('compressible', ['compressible', False])
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('force_model', [ForceModel.GUO, ForceModel.LUO, None])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('zero_centered', [True, False])
@pytest.mark.parametrize('streaming_pattern', streaming_patterns)
@pytest.mark.parametrize('prev_timestep', [Timestep.EVEN, Timestep.ODD])
def test_set_get_density_velocity_with_fields(stencil, force_model, compressible, streaming_pattern, prev_timestep):
def test_set_get_density_velocity_with_fields(stencil, force_model, compressible, zero_centered, streaming_pattern, prev_timestep):
force = (0.1, 0.12, -0.17)
method = create_lb_method(stencil=stencil, force_model=force_model, method='trt',
compressible=compressible, force=force)
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, force_model=force_model, method=Method.TRT,
compressible=compressible, zero_centered=zero_centered, force=force)
method = create_lb_method(lbm_config=lbm_config)
ghost_layers = 1
inner_size = (3, 7, 4)[:method.dim]
total_size = tuple(s + 2 * ghost_layers for s in inner_size)
......@@ -34,37 +42,60 @@ def test_set_get_density_velocity_with_fields(stencil, force_model, compressible
ghost_layers=ghost_layers, streaming_pattern=streaming_pattern, previous_timestep=prev_timestep)
setter(pdf_arr)
getter = compile_macroscopic_values_getter(method, ['density', 'velocity'], pdf_arr=pdf_arr,
getter = compile_macroscopic_values_getter(method, ['density', 'density_deviation', 'velocity'], pdf_arr=pdf_arr,
ghost_layers=ghost_layers, streaming_pattern=streaming_pattern, previous_timestep=prev_timestep)
inner_part = (slice(ghost_layers, -ghost_layers), ) * stencil.D
density_output_field = np.zeros_like(density_input_field)
density_deviation_output_field = np.zeros_like(density_input_field)
velocity_output_field = np.zeros_like(velocity_input_field)
getter(pdfs=pdf_arr, density=density_output_field, velocity=velocity_output_field)
getter(pdfs=pdf_arr, density=density_output_field, density_deviation=density_deviation_output_field, velocity=velocity_output_field)
np.testing.assert_almost_equal(density_input_field, density_output_field)
np.testing.assert_almost_equal(density_input_field[inner_part] - 1.0, density_deviation_output_field[inner_part])
np.testing.assert_almost_equal(velocity_input_field, velocity_output_field)
def test_set_get_constant_velocity():
for stencil in ['D2Q9', 'D3Q19']:
for force_model in ['guo', 'luo', 'none']:
for compressible in [True, False]:
ref_velocity = [0.01, -0.2, 0.1]
force = (0.1, 0.12, -0.17)
method = create_lb_method(stencil=stencil, force_model=force_model, method='trt',
compressible=compressible, force=force)
size = (1, 1, 1)[:method.dim]
pdf_arr = np.zeros(size + (len(method.stencil),))
setter = compile_macroscopic_values_setter(method, pdf_arr=pdf_arr, ghost_layers=0,
quantities_to_set={'velocity': ref_velocity[:method.dim]}, )
setter(pdf_arr)
getter = compile_macroscopic_values_getter(method, ['velocity'], pdf_arr=pdf_arr, ghost_layers=0)
velocity_output_field = np.zeros(size + (method.dim, ))
getter(pdfs=pdf_arr, velocity=velocity_output_field)
if method.dim == 2:
computed_velocity = velocity_output_field[0, 0, :]
else:
computed_velocity = velocity_output_field[0, 0, 0, :]
np.testing.assert_almost_equal(np.array(ref_velocity[:method.dim]), computed_velocity)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('force_model', [ForceModel.GUO, ForceModel.LUO, None])
@pytest.mark.parametrize('compressible', [True, False])
def test_set_get_constant_velocity(stencil, force_model, compressible):
ref_velocity = [0.01, -0.2, 0.1]
force = (0.1, 0.12, -0.17)
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, force_model=force_model, method=Method.TRT,
compressible=compressible, force=force)
method = create_lb_method(lbm_config=lbm_config)
size = (1, 1, 1)[:method.dim]
pdf_arr = np.zeros(size + (len(method.stencil),))
setter = compile_macroscopic_values_setter(method, pdf_arr=pdf_arr, ghost_layers=0,
quantities_to_set={'velocity': ref_velocity[:method.dim]}, )
setter(pdf_arr)
getter = compile_macroscopic_values_getter(method, ['velocity'], pdf_arr=pdf_arr, ghost_layers=0)
velocity_output_field = np.zeros(size + (method.dim, ))
getter(pdfs=pdf_arr, velocity=velocity_output_field)
if method.dim == 2:
computed_velocity = velocity_output_field[0, 0, :]
else:
computed_velocity = velocity_output_field[0, 0, 0, :]
np.testing.assert_almost_equal(np.array(ref_velocity[:method.dim]), computed_velocity)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
def test_get_strain_rate_tensor(stencil, method_enum):
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, method=method_enum, compressible=True)
method = create_lb_method(lbm_config=lbm_config)
pdfs = sp.symbols(f"f_:{stencil.Q}")
strain_rate_tensor = sp.symbols(f"strain_rate_tensor_:{method.dim * method.dim}")
getter_assignments = strain_rate_tensor_getter(method, strain_rate_tensor, pdfs)
assert len(getter_assignments) == method.dim * method.dim
import pytest
from lbmpy.cumulants import raw_moment_as_function_of_cumulants
from lbmpy.enums import Stencil
from lbmpy.maxwellian_equilibrium import *
from lbmpy.moments import (
MOMENT_SYMBOLS, exponents_to_polynomial_representations, moment_matrix,
moments_up_to_component_order, moments_up_to_order)
from lbmpy.stencils import LBStencil
from pystencils.sympyextensions import remove_higher_order_terms
def test_maxwellian_moments():
"""Check moments of continuous Maxwellian"""
rho = sp.Symbol("rho")
u = sp.symbols("u_0 u_1 u_2")
c_s = sp.Symbol("c_s")
eq_moments = get_equilibrium_values_of_maxwell_boltzmann_function(((0, 0, 0), (0, 0, 1)),
dim=3, rho=rho, u=u, c_s_sq=c_s ** 2,
space="moment")
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[2]
x, y, z = MOMENT_SYMBOLS
one = sp.Rational(1, 1)
eq_moments = get_equilibrium_values_of_maxwell_boltzmann_function((one, x, x ** 2, x * y),
dim=2, rho=rho, u=u[:2], c_s_sq=c_s ** 2,
space="moment")
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[0]
assert eq_moments[2] == rho * (c_s ** 2 + u[0] ** 2)
assert eq_moments[3] == rho * u[0] * u[1]
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
def test_continuous_discrete_moment_equivalence(stencil):
"""Check that moments up to order 3 agree with moments of the continuous Maxwellian"""
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cm = sp.Matrix(get_equilibrium_values_of_maxwell_boltzmann_function(moments, order=2, dim=stencil.D,
c_s_sq=c_s_sq, space="moment"))
dm = sp.Matrix(get_moments_of_discrete_maxwellian_equilibrium(stencil, moments, order=2,
compressible=True, c_s_sq=c_s_sq))
diff = sp.simplify(cm - dm)
for d in diff:
assert d == 0
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q27])
def test_moment_cumulant_continuous_equivalence(stencil):
"""Test that discrete equilibrium is the same up to order 3 when obtained with following methods
* eq1: take moments of continuous Maxwellian and transform back to pdf space
* eq2: take cumulants of continuous Maxwellian, transform to moments then transform to pdf space
* eq3: take discrete equilibrium from LBM literature
* eq4: same as eq1 but with built-in function
"""
stencil = LBStencil(stencil)
u = sp.symbols(f"u_:{stencil.D}")
indices = tuple(moments_up_to_component_order(2, dim=stencil.D))
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D, u=u, c_s_sq=c_s_sq,
space="moment")
eq_cumulants = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D, u=u, c_s_sq=c_s_sq,
space="cumulant")
eq_cumulants = {idx: c for idx, c in zip(indices, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in indices]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q27])
def test_moment_cumulant_continuous_equivalence_polynomial_formulation(stencil):
"""Same as test above, but instead of index tuples, the polynomial formulation is used."""
stencil = LBStencil(stencil)
u = sp.symbols(f"u_:{stencil.D}")
index_tuples = tuple(moments_up_to_component_order(2, dim=stencil.D))
indices = exponents_to_polynomial_representations(index_tuples)
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D,
u=u, c_s_sq=c_s_sq, space="moment")
eq_cumulants = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D,
u=u, c_s_sq=c_s_sq, space="cumulant")
eq_cumulants = {idx: c for idx, c in zip(index_tuples, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in index_tuples]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4