Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • ravi.k.ayyala/lbmpy
  • brendan-waters/lbmpy
  • anirudh.jonnalagadda/lbmpy
  • jbadwaik/lbmpy
  • alexander.reinauer/lbmpy
  • itischler/lbmpy
  • he66coqe/lbmpy
  • ev81oxyl/lbmpy
  • Bindgen/lbmpy
  • da15siwa/lbmpy
  • holzer/lbmpy
  • RudolfWeeber/lbmpy
  • pycodegen/lbmpy
13 results
Select Git revision
Show changes
Showing
with 834 additions and 23 deletions
...@@ -31,7 +31,8 @@ def test_diffusion_boundary(): ...@@ -31,7 +31,8 @@ def test_diffusion_boundary():
dh.add_array('pdfs', values_per_cell=stencil.Q) dh.add_array('pdfs', values_per_cell=stencil.Q)
dh.fill("pdfs", 0.0, ghost_layers=True) dh.fill("pdfs", 0.0, ghost_layers=True)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8, compressible=True) lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8,
compressible=True, zero_centered=False)
method = create_lb_method(lbm_config=lbm_config) method = create_lb_method(lbm_config=lbm_config)
# Boundary Handling # Boundary Handling
...@@ -76,7 +77,7 @@ def test_diffusion(): ...@@ -76,7 +77,7 @@ def test_diffusion():
The hydrodynamic field is not simulated, instead a constant velocity is assumed. The hydrodynamic field is not simulated, instead a constant velocity is assumed.
""" """
pytest.importorskip("pycuda") pytest.importorskip("cupy")
# Parameters # Parameters
domain_size = (1600, 160) domain_size = (1600, 160)
omega = 1.38 omega = 1.38
...@@ -103,6 +104,7 @@ def test_diffusion(): ...@@ -103,6 +104,7 @@ def test_diffusion():
# Lattice Boltzmann method # Lattice Boltzmann method
lbm_config = LBMConfig(stencil=stencil, method=Method.MRT, relaxation_rates=[1, 1.5, 1, 1.5, 1], lbm_config = LBMConfig(stencil=stencil, method=Method.MRT, relaxation_rates=[1, 1.5, 1, 1.5, 1],
zero_centered=False,
velocity_input=vel_field, output={'density': con_field}, compressible=True, velocity_input=vel_field, output={'density': con_field}, compressible=True,
weighted=True, kernel_type='stream_pull_collide') weighted=True, kernel_type='stream_pull_collide')
......
File moved
import pytest
import pystencils as ps
from lbmpy.creationfunctions import create_lb_function, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
@pytest.mark.parametrize("double_precision", [False, True])
@pytest.mark.parametrize(
"method_enum", [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT]
)
def test_creation(double_precision, method_enum):
"""Simple test that makes sure that only float variables are created"""
lbm_config = LBMConfig(method=method_enum, relaxation_rate=1.5, compressible=True)
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig(
default_dtype="float64" if double_precision else "float32"
)
else:
config = ps.CreateKernelConfig(
data_type="float64" if double_precision else "float32",
default_number_float="float64" if double_precision else "float32",
)
func = create_lb_function(lbm_config=lbm_config, config=config)
code = ps.get_code_str(func)
if double_precision:
assert "float" not in code
assert "double" in code
else:
assert "double" not in code
assert "float" in code
@pytest.mark.parametrize("numeric_type", ["float32", "float64"])
@pytest.mark.parametrize(
"method_enum", [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT]
)
def test_scenario(method_enum, numeric_type):
lbm_config = LBMConfig(
stencil=LBStencil(Stencil.D3Q27),
method=method_enum,
relaxation_rate=1.45,
compressible=True,
)
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig(
default_dtype=numeric_type
)
else:
config = ps.CreateKernelConfig(
data_type=numeric_type,
default_number_float=numeric_type
)
sc = create_lid_driven_cavity((16, 16, 8), lbm_config=lbm_config, config=config)
sc.run(1)
code = ps.get_code_str(sc.ast)
if numeric_type == "float64":
assert "float" not in code
assert "double" in code
else:
assert "double" not in code
assert "float" in code
"""Tests velocity and stress fluctuations for thermalized LB""" """Tests velocity and stress fluctuations for thermalized LB"""
import pytest
import pystencils as ps import pystencils as ps
from lbmpy._compat import IS_PYSTENCILS_2
from pystencils import get_code_str
if IS_PYSTENCILS_2:
pass
else:
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set
from pystencils.cpu.cpujit import get_compiler_config
from pystencils.enums import Target
from lbmpy.creationfunctions import * from lbmpy.creationfunctions import *
from lbmpy.forcemodels import Guo from lbmpy.forcemodels import Guo
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
import numpy as np import numpy as np
from lbmpy.enums import Stencil from lbmpy.enums import Stencil
from lbmpy.moments import is_bulk_moment, is_shear_moment, get_order from lbmpy.moments import is_bulk_moment, is_shear_moment, get_order
from lbmpy.stencils import LBStencil from lbmpy.stencils import LBStencil
from pystencils.rng import PhiloxTwoDoubles
import pytest
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set if not IS_PYSTENCILS_2:
from pystencils.cpu.cpujit import get_compiler_config def _skip_instruction_sets_windows(instruction_sets):
from pystencils.enums import Target if get_compiler_config()["os"] == "windows":
# skip instruction sets supported by the CPU but not by the compiler
if "avx" in instruction_sets and (
"/arch:avx2" not in get_compiler_config()["flags"].lower()
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx")
if (
"avx512" in instruction_sets
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx512")
return instruction_sets
INSTRUCTION_SETS = get_supported_instruction_sets()
else:
INSTRUCTION_SETS = Target.available_vector_cpu_targets()
def single_component_maxwell(x1, x2, kT, mass): def single_component_maxwell(x1, x2, kT, mass):
"""Integrate the probability density from x1 to x2 using the trapezoidal rule""" """Integrate the probability density from x1 to x2 using the trapezoidal rule"""
x = np.linspace(x1, x2, 1000) x = np.linspace(x1, x2, 1000)
return np.trapz(np.exp(-mass * x ** 2 / (2. * kT)), x) / np.sqrt(2. * np.pi * kT / mass)
try:
trapezoid = np.trapezoid # since numpy 2.0
except AttributeError:
trapezoid = np.trapz
return trapezoid(np.exp(-mass * x**2 / (2.0 * kT)), x) / np.sqrt(
2.0 * np.pi * kT / mass
)
def rr_getter(moment_group): def rr_getter(moment_group):
...@@ -51,53 +88,86 @@ def second_order_moment_tensor_assignments(function_values, stencil, output_fiel ...@@ -51,53 +88,86 @@ def second_order_moment_tensor_assignments(function_values, stencil, output_fiel
"""Assignments for calculating the pressure tensor""" """Assignments for calculating the pressure tensor"""
assert len(function_values) == len(stencil) assert len(function_values) == len(stencil)
dim = len(stencil[0]) dim = len(stencil[0])
return [ps.Assignment(output_field(i, j), return [
sum(c[i] * c[j] * f for f, c in zip(function_values, stencil))) ps.Assignment(
for i in range(dim) for j in range(dim)] output_field(i, j),
sum(c[i] * c[j] * f for f, c in zip(function_values, stencil)),
)
for i in range(dim)
for j in range(dim)
]
def add_pressure_output_to_collision_rule(collision_rule, pressure_field): def add_pressure_output_to_collision_rule(collision_rule, pressure_field):
pressure_ouput = second_order_moment_tensor_assignments(collision_rule.method.pre_collision_pdf_symbols, pressure_ouput = second_order_moment_tensor_assignments(
collision_rule.method.stencil, pressure_field) collision_rule.method.pre_collision_pdf_symbols,
collision_rule.method.stencil,
pressure_field,
)
collision_rule.main_assignments = collision_rule.main_assignments + pressure_ouput collision_rule.main_assignments = collision_rule.main_assignments + pressure_ouput
def get_fluctuating_lb(size=None, kT=None, def get_fluctuating_lb(
omega_shear=None, omega_bulk=None, omega_odd=None, omega_even=None, size=None,
rho_0=None, target=None): kT=None,
omega_shear=None,
omega_bulk=None,
omega_odd=None,
omega_even=None,
rho_0=None,
target=None,
zero_centered: bool = False,
):
# Parameters # Parameters
stencil = LBStencil(Stencil.D3Q19) stencil = LBStencil(Stencil.D3Q19)
# Setup data handling # Setup data handling
dh = ps.create_data_handling((size,) * stencil.D, periodicity=True, default_target=target) dh = ps.create_data_handling(
src = dh.add_array('src', values_per_cell=stencil.Q, layout='f') (size,) * stencil.D, periodicity=True, default_target=target
dst = dh.add_array_like('dst', 'src') )
rho = dh.add_array('rho', layout='f', latex_name='\\rho', values_per_cell=1) src = dh.add_array("src", values_per_cell=stencil.Q, layout="f")
u = dh.add_array('u', values_per_cell=dh.dim, layout='f') dst = dh.add_array_like("dst", "src")
pressure_field = dh.add_array('pressure', values_per_cell=( rho = dh.add_array("rho", layout="f", latex_name="\\rho", values_per_cell=1)
3, 3), layout='f', gpu=target == Target.GPU) u = dh.add_array("u", values_per_cell=dh.dim, layout="f")
pressure_field = dh.add_array(
"pressure", values_per_cell=(3, 3), layout="f", gpu=target == Target.GPU
)
force_field = dh.add_array( force_field = dh.add_array(
'force', values_per_cell=stencil.D, layout='f', gpu=target == Target.GPU) "force", values_per_cell=stencil.D, layout="f", gpu=target == Target.GPU
)
# Method setup # Method setup
lbm_config = LBMConfig(stencil=stencil, method=Method.MRT, compressible=True, lbm_config = LBMConfig(
weighted=True, zero_centered=False, relaxation_rates=rr_getter, stencil=stencil,
force_model=Guo(force=force_field.center_vector), method=Method.MRT,
fluctuating={'temperature': kT}, compressible=True,
kernel_type='collide_only') weighted=True,
zero_centered=zero_centered,
relaxation_rates=rr_getter,
force_model=Guo(force=force_field.center_vector),
fluctuating={"temperature": kT},
kernel_type="collide_only",
)
lb_method = create_lb_method(lbm_config=lbm_config) lb_method = create_lb_method(lbm_config=lbm_config)
lbm_config.lb_method = lb_method lbm_config.lb_method = lb_method
lbm_opt = LBMOptimisation(symbolic_field=src, cse_global=True)
collision_rule = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
add_pressure_output_to_collision_rule(collision_rule, pressure_field)
collision = create_lb_update_rule(collision_rule=collision_rule, lbm_opt = LBMOptimisation(symbolic_field=src, cse_global=True)
lbm_config=lbm_config, lbm_optimisation=lbm_opt) collision_rule = create_lb_collision_rule(
stream = create_stream_pull_with_output_kernel(collision.method, src, dst, lbm_config=lbm_config, lbm_optimisation=lbm_opt
{'density': rho, 'velocity': u}) )
# add_pressure_output_to_collision_rule(collision_rule, pressure_field)
collision = create_lb_update_rule(
collision_rule=collision_rule, lbm_config=lbm_config, lbm_optimisation=lbm_opt
)
stream = create_stream_pull_with_output_kernel(
collision.method,
src,
dst,
{"density": rho, "velocity": u, "moment2": pressure_field},
)
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target) config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
...@@ -108,15 +178,18 @@ def get_fluctuating_lb(size=None, kT=None, ...@@ -108,15 +178,18 @@ def get_fluctuating_lb(size=None, kT=None,
sync_pdfs = dh.synchronization_function([src.name]) sync_pdfs = dh.synchronization_function([src.name])
# Initialization # Initialization
init = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim, init = macroscopic_values_setter(
pdfs=src.center_vector, density=rho.center) collision.method,
velocity=(0,) * dh.dim,
pdfs=src.center_vector,
density=rho_0
)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile() init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
dh.fill(rho.name, rho_0) dh.fill(rho.name, rho_0)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True) dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0) dh.fill(u.name, 0)
dh.fill(force_field.name, np.nan, dh.fill(force_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
ghost_layers=True, inner_ghost_layers=True)
dh.fill(force_field.name, 0) dh.fill(force_field.name, 0)
dh.run_kernel(init_kernel) dh.run_kernel(init_kernel)
...@@ -124,8 +197,15 @@ def get_fluctuating_lb(size=None, kT=None, ...@@ -124,8 +197,15 @@ def get_fluctuating_lb(size=None, kT=None,
def time_loop(start, steps): def time_loop(start, steps):
dh.all_to_gpu() dh.all_to_gpu()
for i in range(start, start + steps): for i in range(start, start + steps):
dh.run_kernel(collision_kernel, omega_shear=omega_shear, omega_bulk=omega_bulk, dh.run_kernel(
omega_odd=omega_odd, omega_even=omega_even, seed=42, time_step=i) collision_kernel,
omega_shear=omega_shear,
omega_bulk=omega_bulk,
omega_odd=omega_odd,
omega_even=omega_even,
seed=42,
time_step=i,
)
sync_pdfs() sync_pdfs()
dh.run_kernel(stream_kernel) dh.run_kernel(stream_kernel)
...@@ -136,13 +216,27 @@ def get_fluctuating_lb(size=None, kT=None, ...@@ -136,13 +216,27 @@ def get_fluctuating_lb(size=None, kT=None,
return dh, time_loop return dh, time_loop
def test_resting_fluid(target=Target.CPU): @pytest.mark.parametrize(
rho_0 = 0.86 "zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
kT = 4E-4 )
L = [60] * 3 @pytest.mark.parametrize(
dh, time_loop = get_fluctuating_lb(size=L[0], target=target, "domain_size", [8, 60]
rho_0=rho_0, kT=kT, )
omega_shear=0.8, omega_bulk=0.5, omega_even=.04, omega_odd=0.3) def test_resting_fluid(zero_centered: bool, domain_size: int, target=Target.CPU):
rho_0 = 0.86
kT = 4e-4
L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(
size=L[0],
target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.04,
omega_odd=0.3,
zero_centered=zero_centered,
)
# Test # Test
t = 0 t = 0
...@@ -156,38 +250,43 @@ def test_resting_fluid(target=Target.CPU): ...@@ -156,38 +250,43 @@ def test_resting_fluid(target=Target.CPU):
res_u = dh.gather_array("u").reshape((-1, 3)) res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,)) res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservation # mass conservationo
# density per cell fluctuates, but toal mass is conserved
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12) np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation # momentum conservation
momentum = np.dot(res_rho, res_u) momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(momentum, [0, 0, 0], atol=1E-10) np.testing.assert_allclose(momentum, [0, 0, 0], atol=1e-10)
# temperature # temperature (fluctuates around pre-set kT)
kinetic_energy = 1 / 2 * np.dot(res_rho, res_u * res_u) / np.product(L) kinetic_energy = 1 / 2 * np.dot(res_rho, res_u * res_u) / np.prod(L)
np.testing.assert_allclose( kT_tol = 0.075 *(16/domain_size)**(3/2)
kinetic_energy, [kT / 2] * 3, atol=kT * 0.01) np.testing.assert_allclose(kinetic_energy, [kT / 2] * 3, rtol=kT_tol)
# velocity distribution # velocity distribution
v_hist, v_bins = np.histogram( v_hist, v_bins = np.histogram(
res_u, bins=11, range=(-.075, .075), density=True) res_u, bins=11, range=(-0.075, 0.075), density=True
)
# Calculate expected values from single # Calculate expected values from single
v_expected = [] v_expected = []
for j in range(len(v_hist)): for j in range(len(v_hist)):
# Maxwell distribution # Maxwell distribution
res = 1. / (v_bins[j + 1] - v_bins[j]) * \ res = (
single_component_maxwell( 1.0
v_bins[j], v_bins[j + 1], kT, rho_0) / (v_bins[j + 1] - v_bins[j])
* single_component_maxwell(v_bins[j], v_bins[j + 1], kT, rho_0)
)
v_expected.append(res) v_expected.append(res)
v_expected = np.array(v_expected) v_expected = np.array(v_expected)
# 10% accuracy on the entire histogram hist_tol_all = 0.75 *(16/domain_size)**(3/2)
np.testing.assert_allclose(v_hist, v_expected, rtol=0.1) np.testing.assert_allclose(v_hist, v_expected, rtol=hist_tol_all)
# 1% accuracy on the middle part hist_tol_center = hist_tol_all/10
remove = 3 remove = 3
np.testing.assert_allclose( np.testing.assert_allclose(
v_hist[remove:-remove], v_expected[remove:-remove], rtol=0.01) v_hist[remove:-remove], v_expected[remove:-remove], rtol=hist_tol_center
)
# pressure tensor against expressions from # pressure tensor against expressions from
# Duenweg, Schiller, Ladd, https://arxiv.org/abs/0707.1581 # Duenweg, Schiller, Ladd, https://arxiv.org/abs/0707.1581
...@@ -200,19 +299,35 @@ def test_resting_fluid(target=Target.CPU): ...@@ -200,19 +299,35 @@ def test_resting_fluid(target=Target.CPU):
# Diagonal elements are rho c_s^22 +<u,u>. When the fluid is # Diagonal elements are rho c_s^22 +<u,u>. When the fluid is
# thermalized, the expectation value of <u,u> = kT due to the # thermalized, the expectation value of <u,u> = kT due to the
# equi-partition theorem. # equi-partition theorem.
p_av_expected = np.diag([rho_0 * c_s ** 2 + kT] * 3) p_av_expected = np.diag([rho_0 * c_s**2 + kT] * 3)
pressure_atol = c_s**2 / 200 *(16/domain_size)**(3/2)
np.testing.assert_allclose( np.testing.assert_allclose(
np.mean(res_pressure, axis=0), p_av_expected, atol=c_s ** 2 / 2000) np.mean(res_pressure, axis=0), p_av_expected, atol=pressure_atol)
def test_point_force(target=Target.CPU): @pytest.mark.parametrize(
"zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
)
@pytest.mark.parametrize(
"domain_size", [8, 60]
)
def test_point_force(zero_centered: bool, domain_size: int, target=Target.CPU):
"""Test momentum balance for thermalized fluid with applied poitn forces""" """Test momentum balance for thermalized fluid with applied poitn forces"""
rho_0 = 0.86 rho_0 = 0.86
kT = 4E-4 kT = 4e-4
L = [8] * 3 L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(size=L[0], target=target, dh, time_loop = get_fluctuating_lb(
rho_0=rho_0, kT=kT, size=L[0],
omega_shear=0.8, omega_bulk=0.5, omega_even=.04, omega_odd=0.3) target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.8,
omega_odd=0.8,
zero_centered=zero_centered
)
# Test # Test
t = 0 t = 0
...@@ -221,17 +336,17 @@ def test_point_force(target=Target.CPU): ...@@ -221,17 +336,17 @@ def test_point_force(target=Target.CPU):
introduced_momentum = np.zeros(3) introduced_momentum = np.zeros(3)
for i in range(100): for i in range(100):
point_force = 1E-5 * (np.random.random(3) - .5) point_force = 1e-2/domain_size**(3/2) * (np.random.random(3) - 0.5)
introduced_momentum += point_force introduced_momentum += point_force
# Note that ghost layers are included in the indexing # Note that ghost layers are included in the indexing
force_pos = np.random.randint(1, L[0] - 2, size=3) force_pos = np.random.randint(1, L[0] - 2, size=3)
dh.cpu_arrays["force"][force_pos[0], dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = point_force
force_pos[1], force_pos[2]] = point_force
t = time_loop(t, 1) t = time_loop(t, 1)
res_u = dh.gather_array("u").reshape((-1, 3)) res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,)) res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservation # mass conservation
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12) np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
...@@ -239,58 +354,162 @@ def test_point_force(target=Target.CPU): ...@@ -239,58 +354,162 @@ def test_point_force(target=Target.CPU):
# momentum conservation # momentum conservation
momentum = np.dot(res_rho, res_u) momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose( np.testing.assert_allclose(
momentum, introduced_momentum + 0.5 * point_force, atol=1E-10) momentum, introduced_momentum + 0.5 * point_force, atol=1e-10
dh.cpu_arrays["force"][force_pos[0], )
force_pos[1], force_pos[2]] = np.zeros(3) dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = np.zeros(3)
# @pytest.mark.skipif(not get_supported_instruction_sets(), reason="No vector instruction sets supported") @pytest.mark.skipif(
# @pytest.mark.parametrize('assume_aligned', (True, False)) not INSTRUCTION_SETS, reason="No vector instruction sets supported"
# @pytest.mark.parametrize('assume_inner_stride_one', (True, False)) )
# @pytest.mark.parametrize('assume_sufficient_line_padding', (True, False)) @pytest.mark.parametrize("data_type", ("float32", "float64"))
def test_vectorization(): @pytest.mark.parametrize("assume_aligned", (True, False))
assume_aligned = True @pytest.mark.parametrize("assume_inner_stride_one", (True, False))
assume_inner_stride_one = True @pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
assume_sufficient_line_padding = True @pytest.mark.xfail(IS_PYSTENCILS_2, reason="Vectorization for RNGs not implemented yet")
def test_vectorization(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
method = create_mrt_orthogonal( method = create_mrt_orthogonal(
stencil=LBStencil(Stencil.D2Q9), stencil=stencil, compressible=True, weighted=True, relaxation_rates=rr_getter
)
if IS_PYSTENCILS_2:
seed = ps.TypedSymbol("seed", np.uint32)
rng = ps.random.Philox("phil", data_type, seed)
fluct_options = {
"temperature": sp.Symbol("kT"),
"rng": rng,
}
else:
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
fluct_options = {
"temperature": sp.Symbol("kT"),
"rng_node": rng_node,
"block_offsets": tuple([0] * stencil.D),
}
lbm_config = LBMConfig(
lb_method=method,
fluctuating=fluct_options,
compressible=True, compressible=True,
weighted=True, zero_centered=False,
relaxation_rates=rr_getter) stencil=method.stencil,
lbm_config = LBMConfig(lb_method=method, fluctuating={'temperature': sp.Symbol("kT"), kernel_type="collide_only",
'rng_node': PhiloxTwoDoubles, )
'block_offsets': (0, 0)}, lbm_opt = LBMOptimisation(
compressible=True, zero_centered=False, cse_global=True, symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp
stencil=method.stencil, kernel_type='collide_only') )
lbm_opt = LBMOptimisation(cse_global=True)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt) collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
instruction_sets = get_supported_instruction_sets() if not IS_PYSTENCILS_2:
if get_compiler_config()['os'] == 'windows': instruction_sets = _skip_instruction_sets_windows(INSTRUCTION_SETS)
# skip instruction sets supported by the CPU but not by the compiler else:
if 'avx' in instruction_sets and ('/arch:avx2' not in get_compiler_config()['flags'].lower() instruction_sets = INSTRUCTION_SETS
and '/arch:avx512' not in get_compiler_config()['flags'].lower()):
instruction_sets.remove('avx')
if 'avx512' in instruction_sets and '/arch:avx512' not in get_compiler_config()['flags'].lower():
instruction_sets.remove('avx512')
instruction_set = instruction_sets[-1] instruction_set = instruction_sets[-1]
config = ps.CreateKernelConfig(cpu_openmp=False, if IS_PYSTENCILS_2:
cpu_vectorize_info={'instruction_set': instruction_set, config = ps.CreateKernelConfig()
'assume_aligned': assume_aligned, config.target = instruction_set
'assume_inner_stride_one': assume_inner_stride_one, config.default_dtype = data_type
'assume_sufficient_line_padding': assume_sufficient_line_padding, config.cpu.vectorize.enable = True
}, config.cpu.vectorize.assume_aligned = assume_aligned
target=Target.CPU) config.cpu.vectorize.assume_inner_stride_one = assume_inner_stride_one
else:
if not assume_inner_stride_one and 'storeS' not in get_vector_instruction_set('double', instruction_set): config = ps.CreateKernelConfig(
with pytest.warns(UserWarning) as warn: target=Target.CPU,
code = ps.create_kernel(collision, config=config) data_type=data_type,
assert 'Could not vectorize loop' in warn[0].message.args[0] default_number_float=data_type,
cpu_vectorize_info={
"instruction_set": instruction_set,
"assume_aligned": assume_aligned,
"assume_inner_stride_one": assume_inner_stride_one,
"assume_sufficient_line_padding": assume_sufficient_line_padding,
},
)
if not assume_inner_stride_one and "storeS" not in get_vector_instruction_set(
data_type, instruction_set
):
with pytest.warns(UserWarning) as pytest_warnings:
ast = ps.create_kernel(collision, config=config)
assert "Could not vectorize loop" in pytest_warnings[0].message.args[0]
else:
ast = ps.create_kernel(collision, config=config)
ast.compile()
code = get_code_str(ast)
print(code)
@pytest.mark.parametrize("data_type", ("float32", "float64"))
@pytest.mark.parametrize("assume_aligned", (True, False))
@pytest.mark.parametrize("assume_inner_stride_one", (True, False))
@pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
@pytest.mark.skipif(IS_PYSTENCILS_2, reason="waLBerla block offsets feature unavailable")
def test_fluctuating_lb_issue_188_wlb(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
temperature = sp.symbols("temperature")
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
fluctuating = {
"temperature": temperature,
"block_offsets": "walberla",
"rng_node": rng_node,
}
lbm_config = LBMConfig(
stencil=stencil,
method=Method.MRT,
compressible=True,
weighted=True,
zero_centered=False,
relaxation_rate=1.4,
fluctuating=fluctuating,
)
lbm_opt = LBMOptimisation(
symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp, cse_global=True
)
up = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
cpu_vectorize_info = {
"instruction_set": "avx",
"assume_inner_stride_one": True,
"assume_aligned": True,
}
config = ps.CreateKernelConfig(
target=ps.Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info=cpu_vectorize_info,
)
ast = create_kernel(up, config=config)
code = ps.get_code_str(ast)
# print(code)
if data_type == "float32":
assert "0.5f" in code
assert "_mm256_mul_ps" in code
assert "_mm256_sqrt_ps" in code
else: else:
with pytest.warns(None) as warn: assert "0.5f" not in code
code = ps.create_kernel(collision, config=config) assert "_mm256_mul_pd" in code
assert len(warn) == 0 assert "_mm256_sqrt_pd" in code
code.compile()
...@@ -9,19 +9,23 @@ from pystencils import Target ...@@ -9,19 +9,23 @@ from pystencils import Target
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Stencil, Method, ForceModel from lbmpy.enums import Stencil, Method, ForceModel
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter, macroscopic_values_getter from lbmpy.macroscopic_value_kernels import macroscopic_values_setter, macroscopic_values_getter
from lbmpy.moments import is_bulk_moment from lbmpy.moments import (is_bulk_moment, moments_up_to_component_order,
exponents_to_polynomial_representations, exponent_tuple_sort_key)
from lbmpy.stencils import LBStencil from lbmpy.stencils import LBStencil
from lbmpy.updatekernels import create_stream_pull_with_output_kernel
# all force models available are defined in the ForceModel enum, but Cumulant is not a "real" force model # all force models available are defined in the ForceModel enum, but Cumulant is not a "real" force model
force_models = [f for f in ForceModel if f is not ForceModel.CUMULANT] force_models = [f for f in ForceModel if f is not ForceModel.CENTRALMOMENT]
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT]) @pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
@pytest.mark.parametrize("zero_centered", [False, True]) @pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("force_model", force_models) @pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("omega", [0.5, 1.5]) @pytest.mark.parametrize("omega", [0.5, 1.5])
def test_total_momentum(method_enum, zero_centered, force_model, omega): def test_total_momentum(method_enum, zero_centered, force_model, omega):
if method_enum == Method.CUMULANT and \
force_model not in (ForceModel.SIMPLE, ForceModel.LUO, ForceModel.GUO, ForceModel.HE):
return
L = (16, 16) L = (16, 16)
stencil = LBStencil(Stencil.D2Q9) stencil = LBStencil(Stencil.D2Q9)
F = (2e-4, -3e-4) F = (2e-4, -3e-4)
...@@ -43,8 +47,10 @@ def test_total_momentum(method_enum, zero_centered, force_model, omega): ...@@ -43,8 +47,10 @@ def test_total_momentum(method_enum, zero_centered, force_model, omega):
collision_kernel = ps.create_kernel(collision, config=config).compile() collision_kernel = ps.create_kernel(collision, config=config).compile()
fluid_density = 1.1
def init(): def init():
dh.fill(ρ.name, 1) dh.fill(ρ.name, fluid_density)
dh.fill(u.name, 0) dh.fill(u.name, 0)
setter = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim, setter = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
...@@ -70,8 +76,13 @@ def test_total_momentum(method_enum, zero_centered, force_model, omega): ...@@ -70,8 +76,13 @@ def test_total_momentum(method_enum, zero_centered, force_model, omega):
init() init()
time_loop(t) time_loop(t)
dh.run_kernel(getter_kernel) dh.run_kernel(getter_kernel)
# Check that density did not change
assert_allclose(dh.gather_array(ρ.name), fluid_density)
# Check for correct velocity
total = np.sum(dh.gather_array(u.name), axis=(0, 1)) total = np.sum(dh.gather_array(u.name), axis=(0, 1))
assert_allclose(total / np.prod(L) / F / t, 1) assert_allclose(total / np.prod(L) / F * fluid_density / t, 1)
@pytest.mark.parametrize("force_model", force_models) @pytest.mark.parametrize("force_model", force_models)
...@@ -186,7 +197,7 @@ def test_literature(force_model, stencil, method): ...@@ -186,7 +197,7 @@ def test_literature(force_model, stencil, method):
assert len(omega_momentum) == 1 assert len(omega_momentum) == 1
omega_momentum = omega_momentum[0] omega_momentum = omega_momentum[0]
subs_dict = lb_method.subs_dict_relxation_rate subs_dict = lb_method.subs_dict_relaxation_rate
force_term = sp.simplify(lb_method.force_model(lb_method).subs(subs_dict)) force_term = sp.simplify(lb_method.force_model(lb_method).subs(subs_dict))
u = sp.Matrix(lb_method.first_order_equilibrium_moment_symbols) u = sp.Matrix(lb_method.first_order_equilibrium_moment_symbols)
rho = lb_method.conserved_quantity_computation.density_symbol rho = lb_method.conserved_quantity_computation.density_symbol
...@@ -268,11 +279,11 @@ def test_modes_central_moment(force_model, compressible): ...@@ -268,11 +279,11 @@ def test_modes_central_moment(force_model, compressible):
F = list(sp.symbols(f"F_:{stencil.D}")) F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s, lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
compressible=True, force_model=force_model, force=tuple(F)) compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config) method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relxation_rate subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method) force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict) force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero # The mass mode should be zero
...@@ -282,6 +293,34 @@ def test_modes_central_moment(force_model, compressible): ...@@ -282,6 +293,34 @@ def test_modes_central_moment(force_model, compressible):
assert list(force_moments[1:stencil.D + 1]) == F assert list(force_moments[1:stencil.D + 1]) == F
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_symmetric_forcing_equivalence(force_model, compressible):
stencil = LBStencil(Stencil.D2Q9)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
moments = moments_up_to_component_order(2, dim=2)
moments = sorted(moments, key=exponent_tuple_sort_key)
moment_polys = exponents_to_polynomial_representations(moments)
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
nested_moments=moment_polys, compressible=True, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
if not method.force_model.has_symmetric_central_moment_forcing:
return
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
force_before, force_after = method.force_model.symmetric_central_moment_forcing(method, moments)
d = method.relaxation_matrix
eye = sp.eye(stencil.Q)
force_combined = (eye - d) @ force_before + force_after
assert (force_moments - force_combined).expand() == sp.Matrix([0] * stencil.Q)
@pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27]) @pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("force_model", force_models) @pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False]) @pytest.mark.parametrize("compressible", [True, False])
...@@ -296,7 +335,7 @@ def test_modes_central_moment_longrun(stencil, force_model, compressible): ...@@ -296,7 +335,7 @@ def test_modes_central_moment_longrun(stencil, force_model, compressible):
compressible=compressible, force_model=force_model, force=tuple(F)) compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config) method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relxation_rate subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method) force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict) force_moments = force_moments.subs(subs_dict)
...@@ -320,7 +359,7 @@ def _check_modes(stencil, force_model, compressible): ...@@ -320,7 +359,7 @@ def _check_modes(stencil, force_model, compressible):
compressible=compressible, force_model=force_model, force=tuple(F)) compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config) method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relxation_rate subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method) force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict) force_moments = force_moments.subs(subs_dict)
...@@ -373,7 +412,8 @@ def _check_modes(stencil, force_model, compressible): ...@@ -373,7 +412,8 @@ def _check_modes(stencil, force_model, compressible):
- (2 + lambda_b) * sp.Matrix(u).dot(F)) == 0 - (2 + lambda_b) * sp.Matrix(u).dot(F)) == 0
# All other moments should be zero # All other moments should be zero
assert list(force_moments[stencil.D + 1 + num_stresses:]) == [0] * (len(stencil) - (stencil.D + 1 + num_stresses)) assert list(force_moments[stencil.D + 1 + num_stresses:]) == [0] * \
(len(stencil) - (stencil.D + 1 + num_stresses))
elif force_model == ForceModel.SIMPLE: elif force_model == ForceModel.SIMPLE:
# All other moments should be zero # All other moments should be zero
assert list(force_moments[stencil.D + 1:]) == [0] * (len(stencil) - (stencil.D + 1)) assert list(force_moments[stencil.D + 1:]) == [0] * (len(stencil) - (stencil.D + 1))
File moved
from lbmpy.creationfunctions import create_lb_ast, LBMConfig from lbmpy.creationfunctions import create_lb_ast, LBMConfig
from lbmpy.enums import Method, Stencil from lbmpy.enums import Method, Stencil
from lbmpy.stencils import LBStencil from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
import pytest import pytest
from pystencils import Target, CreateKernelConfig from pystencils import Target, CreateKernelConfig
def test_gpu_block_size_limiting(): @pytest.mark.xfail(IS_PYSTENCILS_2, reason="GPU block size limiting by registers is not available yet")
pytest.importorskip("pycuda") def test_gpu_block_size_limiting():
pytest.importorskip("cupy")
too_large = 2048*2048 too_large = 2048*2048
lbm_config = LBMConfig(method=Method.CUMULANT, stencil=LBStencil(Stencil.D3Q19), lbm_config = LBMConfig(method=Method.CUMULANT, stencil=LBStencil(Stencil.D3Q19),
relaxation_rate=1.8, compressible=True) relaxation_rate=1.8, compressible=True)
...@@ -17,5 +19,5 @@ def test_gpu_block_size_limiting(): ...@@ -17,5 +19,5 @@ def test_gpu_block_size_limiting():
kernel = ast.compile() kernel = ast.compile()
assert all(b < too_large for b in limited_block_size['block']) assert all(b < too_large for b in limited_block_size['block'])
bs = [too_large, too_large, too_large] bs = [too_large, too_large, too_large]
ast.indexing.limit_block_size_by_register_restriction(bs, kernel.num_regs) bs = ast.indexing.limit_block_size_by_register_restriction(bs, kernel.num_regs)
assert all(b < too_large for b in bs) assert all(b < too_large for b in bs)
import pytest
import numpy as np
from pystencils.slicing import slice_from_direction
from lbmpy import LBMConfig, LBStencil, Stencil, Method
from lbmpy.boundaries.boundaryconditions import NoSlip, NoSlipLinearBouzidi, QuadraticBounceBack, UBB
from lbmpy.lbstep import LatticeBoltzmannStep
def check_velocity(noslip_velocity, interpolated_velocity, wall_distance):
# First we check if the flow is fully developed
np.testing.assert_almost_equal(np.gradient(np.gradient(noslip_velocity)), 0, decimal=8)
np.testing.assert_almost_equal(np.gradient(np.gradient(interpolated_velocity)), 0, decimal=8)
# If the wall is closer to the first fluid cell we expect a lower velocity at the first fluid cell
if wall_distance < 0.5:
assert noslip_velocity[0] > interpolated_velocity[0]
# If the wall is further away from the first fluid cell we expect a higher velocity at the first fluid cell
if wall_distance > 0.5:
assert noslip_velocity[0] < interpolated_velocity[0]
# If the wall cuts the cell halfway the interpolation BC should approximately fall back to a noslip boundary
if wall_distance == 0.5:
np.testing.assert_almost_equal(noslip_velocity[0], interpolated_velocity[0], decimal=2)
def couette_flow(stencil, method_enum, zero_centered, wall_distance, compressible):
dim = stencil.D
if dim == 2:
domain_size = (50, 25)
wall_velocity = (0.01, 0)
periodicity = (True, False)
else:
domain_size = (50, 25, 25)
wall_velocity = (0.01, 0, 0)
periodicity = (True, False, True)
timesteps = 10000
omega = 1.1
lbm_config = LBMConfig(stencil=stencil,
method=method_enum,
relaxation_rate=omega,
zero_centered=zero_centered,
compressible=compressible,
weighted=True)
lb_step_noslip = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
lb_step_bouzidi = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
lb_step_quadratic_bb = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
def init_wall_distance(boundary_data, **_):
for cell in boundary_data.index_array:
cell['q'] = wall_distance
moving_wall = UBB(wall_velocity)
noslip = NoSlip("wall")
bouzidi = NoSlipLinearBouzidi("wall", init_wall_distance=init_wall_distance)
quadratic_bb = QuadraticBounceBack(omega, "wall", init_wall_distance=init_wall_distance)
lb_step_noslip.boundary_handling.set_boundary(noslip, slice_from_direction('S', dim))
lb_step_noslip.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_bouzidi.boundary_handling.set_boundary(bouzidi, slice_from_direction('S', dim))
lb_step_bouzidi.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_quadratic_bb.boundary_handling.set_boundary(quadratic_bb, slice_from_direction('S', dim))
lb_step_quadratic_bb.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_noslip.run(timesteps)
lb_step_bouzidi.run(timesteps)
lb_step_quadratic_bb.run(timesteps)
if dim == 2:
noslip_velocity = lb_step_noslip.velocity[domain_size[0] // 2, :, 0]
bouzidi_velocity = lb_step_bouzidi.velocity[domain_size[0] // 2, :, 0]
quadratic_bb_velocity = lb_step_quadratic_bb.velocity[domain_size[0] // 2, :, 0]
else:
noslip_velocity = lb_step_noslip.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
bouzidi_velocity = lb_step_bouzidi.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
quadratic_bb_velocity = lb_step_quadratic_bb.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
check_velocity(noslip_velocity, bouzidi_velocity, wall_distance)
check_velocity(noslip_velocity, quadratic_bb_velocity, wall_distance)
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("compressible", [True, False])
def test_couette_flow_short(zero_centered, wall_distance, compressible):
stencil = LBStencil(Stencil.D2Q9)
couette_flow(stencil, Method.SRT, zero_centered, wall_distance, compressible)
@pytest.mark.parametrize("method_enum", [Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_couette_flow_long(method_enum, zero_centered, wall_distance, compressible):
if method_enum is Method.CUMULANT and compressible is False:
pytest.skip("incompressible cumulant is not supported")
stencil = LBStencil(Stencil.D2Q9)
couette_flow(stencil, method_enum, zero_centered, wall_distance, compressible)
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("stencil", [Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.longrun
def test_couette_flow_d3d(method_enum, wall_distance, stencil):
stencil = LBStencil(stencil)
couette_flow(stencil, method_enum, True, wall_distance, True)
"""
Test the lbmpy-specific JSON encoder and serializer as used in the Database class.
"""
import tempfile
import pystencils as ps
from lbmpy import Stencil, Method, ForceModel, SubgridScaleModel
from lbmpy.advanced_streaming import Timestep
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation, LBStencil
from lbmpy.fieldaccess import StreamPullTwoFieldsAccessor
from pystencils.runhelper import Database
from lbmpy.db import LbmpyJsonSerializer
def test_json_serializer():
stencil = LBStencil(Stencil.D3Q27)
q = stencil.Q
pdfs, pdfs_tmp = ps.fields(f"pdfs({q}), pdfs_tmp({q}): double[3D]", layout='fzyx')
density = ps.fields(f"rho: double[3D]", layout='fzyx')
from lbmpy.non_newtonian_models import CassonsParameters
cassons_params = CassonsParameters(0.2)
# create dummy lbmpy config
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q27), method=Method.CUMULANT, force_model=ForceModel.GUO,
compressible=True, relaxation_rate=1.999, subgrid_scale_model=SubgridScaleModel.SMAGORINSKY,
galilean_correction=True, cassons=cassons_params, density_input=density,
kernel_type=StreamPullTwoFieldsAccessor, timestep=Timestep.BOTH)
lbm_optimization = LBMOptimisation(cse_pdfs=False, cse_global=False, builtin_periodicity=(True, False, False),
symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp)
# create dummy database
temp_dir = tempfile.TemporaryDirectory()
db = Database(file=temp_dir.name, serializer_info=('lbmpy_serializer', LbmpyJsonSerializer))
db.save(params={'lbm_config': lbm_config, 'lbm_optimization': lbm_optimization}, result={'test': 'dummy'})
...@@ -4,9 +4,10 @@ from types import MappingProxyType ...@@ -4,9 +4,10 @@ from types import MappingProxyType
from pystencils import Target, CreateKernelConfig from pystencils import Target, CreateKernelConfig
from lbmpy.scenarios import create_fully_periodic_flow, create_lid_driven_cavity from lbmpy.scenarios import create_fully_periodic_flow, create_lid_driven_cavity
from lbmpy import SubgridScaleModel
try: try:
import pycuda.driver import cupy
gpu_available = True gpu_available = True
except ImportError: except ImportError:
gpu_available = False gpu_available = False
...@@ -77,7 +78,13 @@ def test_data_handling_2d(): ...@@ -77,7 +78,13 @@ def test_data_handling_2d():
def test_smagorinsky_setup(): def test_smagorinsky_setup():
step = create_lid_driven_cavity((30, 30), smagorinsky=0.16, relaxation_rate=1.99) step = create_lid_driven_cavity((30, 30), subgrid_scale_model=(SubgridScaleModel.SMAGORINSKY, 0.16),
relaxation_rate=1.99)
step.run(10)
def test_qr_setup():
step = create_lid_driven_cavity((30, 30), subgrid_scale_model=(SubgridScaleModel.QR, 0.33),
relaxation_rate=1.99)
step.run(10) step.run(10)
......
from functools import partial from functools import partial
import pystencils as ps import pystencils as ps
from pystencils.astnodes import LoopOverCoordinate from lbmpy._compat import get_loop_counter_symbol
from pystencils.slicing import get_periodic_boundary_functor from pystencils.slicing import get_periodic_boundary_functor
from lbmpy.creationfunctions import create_lb_update_rule, LBMConfig, LBMOptimisation from lbmpy.creationfunctions import create_lb_update_rule, LBMConfig, LBMOptimisation
...@@ -76,7 +76,7 @@ def test_lees_edwards(): ...@@ -76,7 +76,7 @@ def test_lees_edwards():
u = dh.add_array('u', values_per_cell=stencil.D) u = dh.add_array('u', values_per_cell=stencil.D)
dh.fill('u', 0.0, ghost_layers=True) dh.fill('u', 0.0, ghost_layers=True)
counters = [LoopOverCoordinate.get_loop_counter_symbol(i) for i in range(stencil.D)] counters = [get_loop_counter_symbol(i) for i in range(stencil.D)]
points_up = sp.Symbol('points_up') points_up = sp.Symbol('points_up')
points_down = sp.Symbol('points_down') points_down = sp.Symbol('points_down')
......
import pytest import pytest
import numpy as np import numpy as np
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil, ForceModel, Method from lbmpy.enums import Stencil, ForceModel, Method
from lbmpy.macroscopic_value_kernels import ( from lbmpy.macroscopic_value_kernels import (
compile_macroscopic_values_getter, compile_macroscopic_values_setter) compile_macroscopic_values_getter, compile_macroscopic_values_setter, strain_rate_tensor_getter)
from lbmpy.advanced_streaming.utility import streaming_patterns, Timestep from lbmpy.advanced_streaming.utility import streaming_patterns, Timestep
from lbmpy.stencils import LBStencil from lbmpy.stencils import LBStencil
...@@ -83,3 +84,18 @@ def test_set_get_constant_velocity(stencil, force_model, compressible): ...@@ -83,3 +84,18 @@ def test_set_get_constant_velocity(stencil, force_model, compressible):
computed_velocity = velocity_output_field[0, 0, 0, :] computed_velocity = velocity_output_field[0, 0, 0, :]
np.testing.assert_almost_equal(np.array(ref_velocity[:method.dim]), computed_velocity) np.testing.assert_almost_equal(np.array(ref_velocity[:method.dim]), computed_velocity)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
def test_get_strain_rate_tensor(stencil, method_enum):
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, method=method_enum, compressible=True)
method = create_lb_method(lbm_config=lbm_config)
pdfs = sp.symbols(f"f_:{stencil.Q}")
strain_rate_tensor = sp.symbols(f"strain_rate_tensor_:{method.dim * method.dim}")
getter_assignments = strain_rate_tensor_getter(method, strain_rate_tensor, pdfs)
assert len(getter_assignments) == method.dim * method.dim