Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • ravi.k.ayyala/lbmpy
  • brendan-waters/lbmpy
  • anirudh.jonnalagadda/lbmpy
  • jbadwaik/lbmpy
  • alexander.reinauer/lbmpy
  • itischler/lbmpy
  • he66coqe/lbmpy
  • ev81oxyl/lbmpy
  • Bindgen/lbmpy
  • da15siwa/lbmpy
  • holzer/lbmpy
  • RudolfWeeber/lbmpy
  • pycodegen/lbmpy
13 results
Select Git revision
Show changes
Showing
with 1795 additions and 80 deletions
......@@ -10,31 +10,36 @@ from lbmpy.chapman_enskog.chapman_enskog_higher_order import (
determine_higher_order_moments, get_solvability_conditions)
from lbmpy.chapman_enskog.chapman_enskog_steady_state import (
SteadyStateChapmanEnskogAnalysis, SteadyStateChapmanEnskogAnalysisSRT)
from lbmpy.creationfunctions import create_lb_method
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.forcemodels import Guo
from lbmpy.relaxationrates import lattice_viscosity_from_relaxation_rate
from lbmpy.stencils import LBStencil
from pystencils.fd import Diff, normalize_diff_order
from pystencils.sympyextensions import multidimensional_sum
def test_srt():
for stencil in ['D2Q9', 'D3Q19', 'D3Q27']:
for continuous_eq in (False, True):
omega = sp.Symbol("omega")
print("Analysing %s, ContMaxwellianConstruction %d" % (stencil, continuous_eq))
method = create_lb_method(method='srt', stencil=stencil, compressible=True,
relaxation_rate=omega, maxwellian_moments=continuous_eq)
analysis = ChapmanEnskogAnalysis(method)
omega_value = analysis.relaxation_rate_from_kinematic_viscosity(1)[omega]
assert omega_value, sp.Rational(2 == 7)
@pytest.mark.parametrize('continuous_eq', [False, True])
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
def test_srt(continuous_eq, stencil):
omega = sp.Symbol("omega")
print(f"Analysing {stencil}, ContMaxwellianConstruction {continuous_eq}")
lbm_config = LBMConfig(stencil=LBStencil(stencil), method=Method.SRT, compressible=True,
zero_centered=False, relaxation_rate=omega, continuous_equilibrium=continuous_eq)
method = create_lb_method(lbm_config=lbm_config)
analysis = ChapmanEnskogAnalysis(method)
omega_value = analysis.relaxation_rate_from_kinematic_viscosity(1)[omega]
assert omega_value, sp.Rational(2 == 7)
@pytest.mark.longrun
def test_steady_state_silva_paper_comparison():
eps, tau, lambda_plus, f = sp.symbols("epsilon tau Lambda f")
method = create_lb_method(stencil="D3Q19", compressible=False, relaxation_rate=1 / tau,
maxwellian_moments=False)
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q19), compressible=False, relaxation_rate=1 / tau,
continuous_equilibrium=False, zero_centered=False)
method = create_lb_method(lbm_config=lbm_config)
analysis = SteadyStateChapmanEnskogAnalysis(method)
dim = 3
......@@ -142,7 +147,8 @@ def test_steady_state_silva_paper_comparison():
def test_higher_order_moment_computation():
"""In chapman_enskog_higher_order.py there are some functions to generalize the std Chapman Enskog expansion
These are not used by the Chapman Enskog class yet."""
method = create_lb_method(stencil='D2Q9', method='trt', maxwellian_moments=False)
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9), zero_centered=False,
method=Method.TRT, compressible=False))
mom_comp = LbMethodEqMoments(method)
dim = method.dim
order = 2
......@@ -168,7 +174,8 @@ def test_higher_order_moment_computation():
def test_steady_state():
rr = sp.symbols("omega")
method = create_lb_method(stencil='D2Q9', method='srt', relaxation_rate=rr)
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9),
method=Method.SRT, relaxation_rate=rr))
a1 = SteadyStateChapmanEnskogAnalysis(method, order=2)
a2 = SteadyStateChapmanEnskogAnalysisSRT(method, order=2)
......
%% Cell type:code id: tags:
``` python
import pystencils as ps
from lbmpy.session import *
from lbmpy.boundaries.boundaries_in_kernel import update_rule_with_push_boundaries
from lbmpy.macroscopic_value_kernels import macroscopic_values_getter, macroscopic_values_setter
from collections import OrderedDict
from time import perf_counter
```
%% Cell type:markdown id: tags:
# Version 1: compile-in boundaries
%% Cell type:code id: tags:
``` python
domain_size = (32, 32, 32)
relaxation_rate = 1.8
time_steps = 100
lid_velocity = 0.05
stencil = LBStencil(Stencil.D3Q19)
```
%% Cell type:code id: tags:
``` python
dh = create_data_handling(domain_size, default_target=ps.Target.CPU)
pdfs = dh.add_array('pdfs', values_per_cell=stencil.Q)
u = dh.add_array('u', values_per_cell=stencil.D)
streaming_pattern = 'aa'
```
%% Cell type:code id: tags:
``` python
boundaries = OrderedDict((
((0, 1, 0), UBB([lid_velocity, 0, 0])),
((1, 0, 0), NoSlip()),
((-1, 0, 0), NoSlip()),
((0, -1, 0), NoSlip()),
((0, 0, 1), NoSlip()),
((0, 0, -1), NoSlip()),
))
lbm_opt = LBMOptimisation(symbolic_field=pdfs, cse_global=False, cse_pdfs=False)
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=relaxation_rate, compressible=False)
cr_even = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
cr_odd = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
update_rule_aa_even = update_rule_with_push_boundaries(cr_even, pdfs, boundaries, streaming_pattern, Timestep.EVEN)
update_rule_aa_odd = update_rule_with_push_boundaries(cr_odd, pdfs, boundaries, streaming_pattern, Timestep.ODD)
getter_assignments = macroscopic_values_getter(update_rule_aa_even.method, velocity=u.center_vector,
pdfs=pdfs, density=None,
streaming_pattern=streaming_pattern,
previous_timestep=Timestep.EVEN)
config = ps.CreateKernelConfig(target=dh.default_target)
getter_kernel = ps.create_kernel(getter_assignments, config=config).compile()
even_kernel = ps.create_kernel(update_rule_aa_even, config=config).compile()
odd_kernel = ps.create_kernel(update_rule_aa_odd, config=config).compile()
```
%% Cell type:code id: tags:
``` python
def init():
dh.fill(pdfs.name, 0, ghost_layers=True)
def aa_time_loop(steps=100):
assert steps % 2 == 0, "Works only for an even number of time steps"
dh.all_to_gpu()
for i in range(steps // 2):
dh.run_kernel(odd_kernel)
dh.run_kernel(even_kernel)
dh.run_kernel(getter_kernel)
dh.all_to_cpu()
```
%% Cell type:code id: tags:
``` python
init()
aa_time_loop(time_steps)
vel_version1 = dh.gather_array(u.name, ghost_layers=False).copy()
plt.vector_field_magnitude(vel_version1[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
<matplotlib.colorbar.Colorbar at 0x11b578710>
%% Cell type:markdown id: tags:
# Version 2: Normal boundary handling
%% Cell type:code id: tags:
``` python
ldc = create_lid_driven_cavity(domain_size, relaxation_rate=relaxation_rate, lid_velocity=lid_velocity)
ldc.run(time_steps)
vel_version2 = ldc.velocity[:, :, :, :]
plt.vector_field_magnitude(vel_version2[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
<matplotlib.colorbar.Colorbar at 0x12e912a90>
%% Cell type:code id: tags:
``` python
np.testing.assert_almost_equal(vel_version1[1:-1, 1:-1, :], vel_version2[1:-1, 1:-1, :], decimal=2)
```
......@@ -6,114 +6,138 @@ from copy import copy
import pytest
import sympy as sp
import math
from lbmpy.methods.creationfunctions import RelaxationInfo, create_srt, create_trt, create_trt_kbc, \
from lbmpy.enums import Stencil, Method
from lbmpy.methods import create_srt, create_trt, create_trt_kbc, \
create_with_default_polynomial_cumulants
from lbmpy.methods.momentbased.momentbasedmethod import MomentBasedLbMethod
from lbmpy.methods.centeredcumulant.centeredcumulantmethod import CenteredCumulantBasedLbMethod
from lbmpy.methods.cumulantbased import CumulantBasedLbMethod
from lbmpy.moments import MOMENT_SYMBOLS
from lbmpy.simplificationfactory import create_simplification_strategy
from lbmpy.stencils import get_stencil
from lbmpy.stencils import LBStencil
def __change_relaxation_rate_of_conserved_moments(method, new_relaxation_rate=sp.Symbol("test_omega")):
conserved_moments = (sp.Rational(1, 1),) + MOMENT_SYMBOLS[:method.dim]
rr_dict = copy(method.relaxation_info_dict)
rr_dict = copy(method.relaxation_rate_dict)
for conserved_moment in conserved_moments:
prev = rr_dict[conserved_moment]
rr_dict[conserved_moment] = RelaxationInfo(prev.equilibrium_value, new_relaxation_rate)
rr_dict[conserved_moment] = new_relaxation_rate
if isinstance(method, MomentBasedLbMethod):
changed_method = MomentBasedLbMethod(method.stencil, rr_dict, method.conserved_quantity_computation,
changed_method = MomentBasedLbMethod(method.stencil, method.equilibrium_distribution, rr_dict,
method.conserved_quantity_computation,
force_model=method.force_model)
elif isinstance(method, CenteredCumulantBasedLbMethod):
changed_method = CenteredCumulantBasedLbMethod(method.stencil, rr_dict, method.conserved_quantity_computation,
force_model=method.force_model)
elif isinstance(method, CumulantBasedLbMethod):
changed_method = CumulantBasedLbMethod(method.stencil, method.equilibrium_distribution, rr_dict,
method.conserved_quantity_computation,
force_model=method.force_model,
zero_centered=True)
else:
raise ValueError("Not a moment or cumulant-based method")
return changed_method
def check_for_collision_rule_equivalence(collision_rule1, collision_rule2):
def check_for_collision_rule_equivalence(collision_rule1, collision_rule2, use_numeric_subs=False):
collision_rule1 = collision_rule1.new_without_subexpressions()
collision_rule2 = collision_rule2.new_without_subexpressions()
if use_numeric_subs:
free_symbols = collision_rule1.free_symbols
free_symbols.update(collision_rule2.free_symbols)
subs_dict = dict()
value = 10.0
for symbol in free_symbols:
subs_dict.update({symbol: value})
value += 1.1
collision_rule1 = collision_rule1.subs(subs_dict)
collision_rule2 = collision_rule2.subs(subs_dict)
for eq1, eq2 in zip(collision_rule1.main_assignments, collision_rule2.main_assignments):
diff = sp.cancel(sp.expand(eq1.rhs - eq2.rhs))
assert diff == 0
if use_numeric_subs:
assert math.isclose(diff, 0, rel_tol=0.0, abs_tol=1e-10)
else:
assert diff == 0
def check_method_equivalence(m1, m2, do_simplifications):
def check_method_equivalence(m1, m2, do_simplifications, use_numeric_subs=False):
cr1 = m1.get_collision_rule()
cr2 = m2.get_collision_rule()
if do_simplifications:
cr1 = create_simplification_strategy(m1)(cr1)
cr2 = create_simplification_strategy(m2)(cr2)
check_for_collision_rule_equivalence(cr1, cr2)
check_for_collision_rule_equivalence(cr1, cr2, use_numeric_subs)
@pytest.mark.longrun
def test_cumulant():
for stencil_name in ('D2Q9',):
stencil = get_stencil(stencil_name)
original_method = create_with_default_polynomial_cumulants(stencil, [sp.Symbol("omega")])
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
stencil = LBStencil(Stencil.D2Q9)
original_method = create_with_default_polynomial_cumulants(stencil, [sp.Symbol("omega")], zero_centered=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
check_method_equivalence(original_method, changed_method, True, use_numeric_subs=True)
check_method_equivalence(original_method, changed_method, False, use_numeric_subs=True)
@pytest.mark.longrun
def test_srt():
for stencil_name in ('D2Q9',):
stencil = get_stencil(stencil_name)
original_method = create_srt(stencil, sp.Symbol("omega"), compressible=True,
maxwellian_moments=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
stencil = LBStencil(Stencil.D3Q27)
original_method = create_srt(stencil, sp.Symbol("omega"), compressible=True,
continuous_equilibrium=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
check_method_equivalence(original_method, changed_method, True, use_numeric_subs=True)
check_method_equivalence(original_method, changed_method, False, use_numeric_subs=True)
def test_srt_short():
stencil = get_stencil("D2Q9")
stencil = LBStencil(Stencil.D2Q9)
original_method = create_srt(stencil, sp.Symbol("omega"), compressible=True,
maxwellian_moments=True)
continuous_equilibrium=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
check_method_equivalence(original_method, changed_method, True, use_numeric_subs=False)
check_method_equivalence(original_method, changed_method, False, use_numeric_subs=False)
@pytest.mark.parametrize('stencil_name', [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('continuous_moments', [False, True])
@pytest.mark.longrun
def test_trt():
for stencil_name in ("D2Q9", "D3Q19", "D3Q27"):
for continuous_moments in (False, True):
stencil = get_stencil(stencil_name)
original_method = create_trt(stencil, sp.Symbol("omega1"), sp.Symbol("omega2"),
maxwellian_moments=continuous_moments)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
def test_trt(stencil_name, continuous_moments):
stencil = LBStencil(stencil_name)
original_method = create_trt(stencil, sp.Symbol("omega1"), sp.Symbol("omega2"),
continuous_equilibrium=continuous_moments)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
@pytest.mark.parametrize('method_name', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
def test_trt_kbc(method_name):
dim = 2
method_nr = method_name.name[-1]
original_method = create_trt_kbc(dim, sp.Symbol("omega1"), sp.Symbol("omega2"),
method_name='KBC-N' + method_nr,
continuous_equilibrium=False)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
@pytest.mark.parametrize('method_name', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
@pytest.mark.longrun
def test_trt_kbc_long():
for dim in (2, 3):
for method_name in ("KBC-N1", "KBC-N2", "KBC-N3", "KBC-N4"):
original_method = create_trt_kbc(dim, sp.Symbol("omega1"), sp.Symbol("omega2"), method_name=method_name,
maxwellian_moments=False)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
def test_trt_kbc_short():
for dim, method_name in [(2, "KBC-N2")]:
original_method = create_trt_kbc(dim, sp.Symbol("omega1"), sp.Symbol("omega2"), method_name=method_name,
maxwellian_moments=False)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
def test_trt_kbc_long(method_name):
dim = 3
method_nr = method_name.name[-1]
original_method = create_trt_kbc(dim, sp.Symbol("omega1"), sp.Symbol("omega2"),
method_name='KBC-N' + method_nr,
continuous_equilibrium=False)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
from dataclasses import replace
import numpy as np
import pytest
from pystencils import Target, CreateKernelConfig
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.scenarios import create_channel
from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
def run_equivalence_test(domain_size, lbm_config, lbm_opt, base_config, time_steps=13):
config = replace(base_config, target=Target.CPU)
cpu_scenario = create_channel(domain_size=domain_size, pressure_difference=0.001,
lbm_config=lbm_config, lbm_optimisation=lbm_opt, config=config)
config = replace(base_config, target=Target.GPU)
if not IS_PYSTENCILS_2:
from pystencils.enums import Backend
config = replace(config, backend=Backend.CUDA)
gpu_scenario = create_channel(domain_size=domain_size, pressure_difference=0.001,
lbm_config=lbm_config, lbm_optimisation=lbm_opt, config=config)
cpu_scenario.run(time_steps)
gpu_scenario.run(time_steps)
max_vel_error = np.max(np.abs(cpu_scenario.velocity_slice() - gpu_scenario.velocity_slice()))
max_rho_error = np.max(np.abs(cpu_scenario.density_slice() - gpu_scenario.density_slice()))
np.testing.assert_allclose(max_vel_error, 0, atol=1e-14)
np.testing.assert_allclose(max_rho_error, 0, atol=1e-14)
@pytest.mark.parametrize('scenario', [((17, 12), Method.SRT, False, (12, 4), 'reverse_numpy'),
((18, 20), Method.MRT, True, (4, 2), 'zyxf'),
((7, 11, 18), Method.TRT, False, False, 'numpy')])
def test_force_driven_channel_short(scenario):
pytest.importorskip("cupy")
ds = scenario[0]
stencil = LBStencil(Stencil.D2Q9) if len(ds) == 2 else LBStencil(Stencil.D3Q27)
method = scenario[1]
compressible = scenario[2]
block_size = scenario[3]
field_layout = scenario[4]
lbm_config = LBMConfig(stencil=stencil, method=method,
compressible=compressible, relaxation_rates=[1.95, 1.9, 1.92, 1.92])
lbm_opt = LBMOptimisation(field_layout=field_layout)
# Different methods
if block_size is not False:
config = CreateKernelConfig(gpu_indexing_params={'block_size': block_size})
else:
if IS_PYSTENCILS_2:
config = CreateKernelConfig()
config.gpu.indexing_scheme = "blockwise4d"
else:
config = CreateKernelConfig(gpu_indexing='line')
run_equivalence_test(domain_size=ds, lbm_config=lbm_config, lbm_opt=lbm_opt, base_config=config)
import pytest
import sympy as sp
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_with_monomial_cumulants
from lbmpy.maxwellian_equilibrium import get_weights
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
def test_zero_centering_equilibrium_equivalence(stencil):
stencil = LBStencil(stencil)
omega = sp.Symbol('omega')
r_rates = (omega,) * stencil.Q
weights = sp.Matrix(get_weights(stencil))
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = {delta_rho: rho - rho_background}
eqs = []
for zero_centered in [False, True]:
method = create_with_monomial_cumulants(stencil, r_rates, zero_centered=zero_centered)
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == weights
import pytest
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.methods import create_srt
from lbmpy.stencils import get_stencil
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_with_default_polynomial_cumulants
@pytest.mark.parametrize('stencil_name', ['D2Q9', 'D3Q19', 'D3Q27'])
@pytest.mark.parametrize('stencil_name', [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
def test_weights(stencil_name):
stencil = get_stencil(stencil_name)
stencil = LBStencil(stencil_name)
cumulant_method = create_with_default_polynomial_cumulants(stencil, [1])
moment_method = create_srt(stencil, 1, cumulant=False, compressible=True, maxwellian_moments=True)
moment_method = create_srt(stencil, 1, compressible=True, continuous_equilibrium=True)
assert cumulant_method.weights == moment_method.weights
import pytest
from itertools import chain
import sympy as sp
from pystencils import AssignmentCollection
from lbmpy.moment_transforms import (
CentralMomentsToCumulantsByGeneratingFunc, PRE_COLLISION_MONOMIAL_CENTRAL_MOMENT,
PRE_COLLISION_CUMULANT
)
from lbmpy.methods import cascaded_moment_sets_literature
from lbmpy.stencils import Stencil, LBStencil
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19])
def test_identity(stencil):
stencil = LBStencil(stencil)
polys = list(chain.from_iterable(cascaded_moment_sets_literature(stencil)))
rho = sp.Symbol('rho')
u = sp.symbols('u_:2')
transform = CentralMomentsToCumulantsByGeneratingFunc(stencil, polys, rho, u,
post_collision_symbol_base=PRE_COLLISION_CUMULANT)
forward_eqs = transform.forward_transform()
backward_eqs = transform.backward_transform(central_moment_base=PRE_COLLISION_MONOMIAL_CENTRAL_MOMENT)
subexpressions = forward_eqs.all_assignments + backward_eqs.subexpressions
main_assignments = backward_eqs.main_assignments
ac = AssignmentCollection(main_assignments, subexpressions=subexpressions)
ac = ac.new_without_subexpressions()
for lhs, rhs in ac.main_assignments_dict.items():
assert (lhs - rhs).expand() == 0
from lbmpy.creationfunctions import create_lb_method
from lbmpy.cumulants import *
from lbmpy.moments import (
discrete_moment, exponent_to_polynomial_representation, exponents_to_polynomial_representations)
from lbmpy.stencils import get_stencil
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
def test_cumulants_from_pdfs():
"""
......@@ -11,11 +10,10 @@ def test_cumulants_from_pdfs():
- directly pdfs to cumulant
- indirect pdfs -> raw moments -> cumulants
"""
stencil = get_stencil("D2Q9")
dim = len(stencil[0])
indices = moments_up_to_component_order(2, dim=dim)
stencil = LBStencil(Stencil.D2Q9)
indices = moments_up_to_component_order(2, dim=stencil.D)
pdf_symbols = sp.symbols("f_:%d" % (len(stencil),))
pdf_symbols = sp.symbols(f"f_:{stencil.Q}")
direct_version = cumulants_from_pdfs(stencil, pdf_symbols=pdf_symbols, cumulant_indices=indices)
polynomial_moment_indices = exponents_to_polynomial_representations(indices)
direct_version2 = cumulants_from_pdfs(stencil, pdf_symbols=pdf_symbols, cumulant_indices=polynomial_moment_indices)
......@@ -32,11 +30,10 @@ def test_cumulants_from_pdfs():
def test_raw_moment_to_cumulant_transformation():
"""Transforms from raw moments to cumulants and back, then checks for identity"""
for stencil in [get_stencil("D2Q9"), get_stencil("D3Q27")]:
dim = len(stencil[0])
indices = moments_up_to_component_order(2, dim=dim)
for stencil in [LBStencil(Stencil.D2Q9), LBStencil(Stencil.D3Q27)]:
indices = moments_up_to_component_order(2, dim=stencil.D)
symbol_format = "m_%d_%d_%d" if dim == 3 else "m_%d_%d"
symbol_format = "m_%d_%d_%d" if stencil.D == 3 else "m_%d_%d"
moment_symbols = {idx: sp.Symbol(symbol_format % idx) for idx in indices}
forward = {idx: cumulant_as_function_of_raw_moments(idx, moments_dict=moment_symbols)
......@@ -48,11 +45,10 @@ def test_raw_moment_to_cumulant_transformation():
def test_central_moment_to_cumulant_transformation():
"""Transforms from central moments to cumulants and back, then checks for identity"""
for stencil in [get_stencil("D2Q9"), get_stencil("D3Q27")]:
dim = len(stencil[0])
indices = moments_up_to_component_order(2, dim=dim)
for stencil in [LBStencil(Stencil.D2Q9), LBStencil(Stencil.D3Q27)]:
indices = moments_up_to_component_order(2, dim=stencil.D)
symbol_format = "m_%d_%d_%d" if dim == 3 else "m_%d_%d"
symbol_format = "m_%d_%d_%d" if stencil.D == 3 else "m_%d_%d"
moment_symbols = {idx: sp.Symbol(symbol_format % idx) for idx in indices}
forward = {idx: cumulant_as_function_of_central_moments(idx, moments_dict=moment_symbols)
......@@ -66,5 +62,5 @@ def test_central_moment_to_cumulant_transformation():
def test_cumulants_from_pdf():
res = cumulants_from_pdfs(get_stencil("D2Q9"))
assert res[(0, 0)] == sp.log(sum(sp.symbols("f_:9")))
res = cumulants_from_pdfs(LBStencil(Stencil.D2Q9))
assert res[(0, 0)] == sp.log(sum(sp.symbols(f"f_:{9}")))
import pytest
import numpy as np
from numpy.testing import assert_allclose
import pystencils as ps
import sympy as sp
from lbmpy.enums import Stencil, Method
from lbmpy.stencils import LBStencil
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation, create_lb_function
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter, macroscopic_values_getter
@pytest.mark.parametrize('zero_centered', [False, True])
def test_density_velocity_input(zero_centered):
stencil = LBStencil(Stencil.D2Q9)
dh = ps.create_data_handling((5,5), default_target=ps.Target.CPU)
rho_in = dh.add_array("rho_in", 1)
rho_out = dh.add_array_like("rho_out", "rho_in")
u_in = dh.add_array("u_in", 2)
u_out = dh.add_array_like("u_out", "u_in")
pdfs = dh.add_array("pdfs", stencil.Q)
lb_config = LBMConfig(stencil=Stencil.D2Q9, method=Method.SRT, zero_centered=zero_centered,
relaxation_rate=sp.Integer(1),
density_input=rho_in.center, velocity_input=u_in.center_vector,
kernel_type="collide_only")
lb_opt = LBMOptimisation(symbolic_field=pdfs)
lb_func = create_lb_function(lbm_config=lb_config, lbm_optimisation=lb_opt)
setter = macroscopic_values_setter(lb_func.method, 1, (0, 0), pdfs.center_vector)
setter_kernel = ps.create_kernel(setter).compile()
getter = macroscopic_values_getter(lb_func.method, rho_out.center, u_out.center_vector, pdfs.center_vector)
getter_kernel = ps.create_kernel(getter).compile()
dh.run_kernel(setter_kernel)
dh.cpu_arrays[rho_in.name][1:-1, 1:-1] = 1.0 + 0.1 * np.random.random_sample((5, 5))
dh.cpu_arrays[u_in.name][1:-1, 1:-1] = 0.05 + 0.01 * np.random.random_sample((5, 5, 2))
dh.run_kernel(lb_func)
dh.run_kernel(getter_kernel)
assert_allclose(dh.cpu_arrays[rho_out.name][1:-1, 1:-1], dh.cpu_arrays[rho_in.name][1:-1, 1:-1])
assert_allclose(dh.cpu_arrays[u_out.name][1:-1, 1:-1], dh.cpu_arrays[u_in.name][1:-1, 1:-1])
import math
import pytest
from dataclasses import replace
import pystencils as ps
from pystencils.slicing import slice_from_direction
from lbmpy import pdf_initialization_assignments
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries.boundaryconditions import DiffusionDirichlet, NeumannByCopy
from lbmpy.geometry import add_box_boundary
from lbmpy.stencils import LBStencil
from lbmpy.maxwellian_equilibrium import get_weights
import sympy as sp
import numpy as np
def test_diffusion_boundary():
domain_size = (10, 10)
stencil = LBStencil(Stencil.D2Q9)
weights = get_weights(stencil, c_s_sq=sp.Rational(1, 3))
concentration = 1.0
# Data Handling
dh = ps.create_data_handling(domain_size=domain_size)
dh.add_array('pdfs', values_per_cell=stencil.Q)
dh.fill("pdfs", 0.0, ghost_layers=True)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8,
compressible=True, zero_centered=False)
method = create_lb_method(lbm_config=lbm_config)
# Boundary Handling
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'pdfs', name="bh")
add_box_boundary(bh, boundary=DiffusionDirichlet(concentration))
bh()
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[0, 1:-2, 4] - 2 * weights[4]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[0, 1:-2, 6] - 2 * weights[6]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[0, 2:-1, 8] - 2 * weights[8]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[-1, 1:-2, 3] - 2 * weights[3]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[-1, 1:-2, 5] - 2 * weights[5]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[-1, 2:-1, 7] - 2 * weights[7]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[1:-2, 0, 1] - 2 * weights[1]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[2:, 0, 5] - 2 * weights[5]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[:-2, 0, 6] - 2 * weights[6]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[1:-2, -1, 2] - 2 * weights[2]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[2:, -1, 7] - 2 * weights[7]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[:-2, -1, 8] - 2 * weights[8]) < 1e-14)
@pytest.mark.longrun
def test_diffusion():
"""
Runs the "Diffusion from Plate in Uniform Flow" benchmark as it is described
in [ch. 8.6.3, The Lattice Boltzmann Method, Krüger et al.].
dC/dy = 0
┌───────────────┐
│ → → → │
C = 0 │ → u → │ dC/dx = 0
│ → → → │
└───────────────┘
C = 1
The analytical solution is given by:
C(x,y) = 1 * erfc(y / sqrt(4Dx/u))
The hydrodynamic field is not simulated, instead a constant velocity is assumed.
"""
pytest.importorskip("cupy")
# Parameters
domain_size = (1600, 160)
omega = 1.38
diffusion = (1 / omega - 0.5) / 3
velocity = 0.05
time_steps = 50000
stencil = LBStencil(Stencil.D2Q9)
target = ps.Target.GPU
# Data Handling
dh = ps.create_data_handling(domain_size=domain_size, default_target=target)
vel_field = dh.add_array('vel_field', values_per_cell=stencil.D)
dh.fill('vel_field', velocity, 0, ghost_layers=True)
dh.fill('vel_field', 0.0, 1, ghost_layers=True)
con_field = dh.add_array('con_field', values_per_cell=1)
dh.fill('con_field', 0.0, ghost_layers=True)
pdfs = dh.add_array('pdfs', values_per_cell=stencil.Q)
dh.fill('pdfs', 0.0, ghost_layers=True)
pdfs_tmp = dh.add_array('pdfs_tmp', values_per_cell=stencil.Q)
dh.fill('pdfs_tmp', 0.0, ghost_layers=True)
# Lattice Boltzmann method
lbm_config = LBMConfig(stencil=stencil, method=Method.MRT, relaxation_rates=[1, 1.5, 1, 1.5, 1],
zero_centered=False,
velocity_input=vel_field, output={'density': con_field}, compressible=True,
weighted=True, kernel_type='stream_pull_collide')
lbm_opt = LBMOptimisation(symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp)
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=True)
method = create_lb_method(lbm_config=lbm_config)
method.set_conserved_moments_relaxation_rate(omega)
lbm_config = replace(lbm_config, lb_method=method)
update_rule = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
kernel = ps.create_kernel(update_rule, config=config).compile()
# PDF initalization
init = pdf_initialization_assignments(method, con_field.center,
vel_field.center_vector, pdfs.center_vector)
dh.run_kernel(ps.create_kernel(init).compile())
dh.all_to_gpu()
# Boundary Handling
bh = LatticeBoltzmannBoundaryHandling(update_rule.method, dh, 'pdfs', name="bh", target=dh.default_target)
add_box_boundary(bh, boundary=NeumannByCopy())
bh.set_boundary(DiffusionDirichlet(0), slice_from_direction('W', dh.dim))
bh.set_boundary(DiffusionDirichlet(1), slice_from_direction('S', dh.dim))
# Timeloop
for i in range(time_steps):
bh()
dh.run_kernel(kernel)
dh.swap("pdfs", "pdfs_tmp")
dh.all_to_cpu()
# Verification
x = np.arange(1, domain_size[0], 1)
y = np.arange(0, domain_size[1], 1)
X, Y = np.meshgrid(x, y)
analytical = np.zeros(domain_size)
analytical[1:, :] = np.vectorize(math.erfc)(Y / np.vectorize(math.sqrt)(4 * diffusion * X / velocity)).transpose()
simulated = dh.gather_array('con_field', ghost_layers=False)
residual = 0
for i in x:
for j in y:
residual += (simulated[i, j] - analytical[i, j]) ** 2
residual = math.sqrt(residual / (domain_size[0] * domain_size[1]))
assert residual < 1e-2
import platform
import numpy as np
import sympy as sp
import pytest
from lbmpy.enums import ForceModel, Method
from lbmpy.scenarios import create_lid_driven_cavity
@pytest.mark.parametrize('method', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
def test_entropic_methods(method):
if platform.system().lower() == 'windows' and method == Method.TRT_KBC_N4:
pytest.skip("For some reason this test does not run on windows", allow_module_level=True)
sc_kbc = create_lid_driven_cavity((20, 20), method=method,
relaxation_rates=[1.9999, sp.Symbol("omega_free")],
entropic_newton_iterations=3, entropic=True, compressible=True,
zero_centered=False, force=(-1e-10, 0), force_model=ForceModel.LUO)
sc_kbc.run(1000)
assert np.isfinite(np.max(sc_kbc.velocity[:, :]))
from pystencils.stencil import inverse_direction
from lbmpy.stencils import get_stencil
from lbmpy.stencils import LBStencil
from lbmpy.advanced_streaming.utility import AccessPdfValues, get_timesteps
import pytest
import numpy as np
from pystencils import Target
from pystencils.datahandling import create_data_handling
from lbmpy.boundaries import LatticeBoltzmannBoundaryHandling, SimpleExtrapolationOutflow, ExtrapolationOutflow
from lbmpy.creationfunctions import create_lb_method
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.advanced_streaming.utility import streaming_patterns
from pystencils.slicing import get_ghost_region_slice
from itertools import product
@pytest.mark.parametrize('stencil', ['D2Q9', 'D3Q27'])
@pytest.mark.parametrize('stencil_enum', [Stencil.D2Q9, Stencil.D3Q27])
@pytest.mark.parametrize('streaming_pattern', streaming_patterns)
def test_pdf_simple_extrapolation(stencil, streaming_pattern):
stencil = get_stencil(stencil)
dim = len(stencil[0])
values_per_cell = len(stencil)
def test_pdf_simple_extrapolation(stencil_enum, streaming_pattern):
stencil = LBStencil(stencil_enum)
# Field contains exactly one fluid cell
domain_size = (3,) * dim
domain_size = (3,) * stencil.D
for timestep in get_timesteps(streaming_pattern):
dh = create_data_handling(domain_size, default_target='cpu')
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=values_per_cell)
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(lbm_config=LBMConfig(stencil=stencil))
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name, streaming_pattern, target='cpu')
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name, streaming_pattern, target=Target.CPU)
# Set up outflows in all directions
for normal_dir in stencil[1:]:
......@@ -41,8 +41,8 @@ def test_pdf_simple_extrapolation(stencil, streaming_pattern):
# Set up the domain with artificial PDF values
# center = (1,) * dim
out_access = AccessPdfValues(stencil, streaming_pattern, timestep, 'out')
for cell in product(*(range(1, 4) for _ in range(dim))):
for q in range(values_per_cell):
for cell in product(*(range(1, 4) for _ in range(stencil.D))):
for q in range(stencil.Q):
out_access.write_pdf(pdf_arr, cell, q, q)
# Do boundary handling
......@@ -52,15 +52,14 @@ def test_pdf_simple_extrapolation(stencil, streaming_pattern):
in_access = AccessPdfValues(stencil, streaming_pattern, timestep.next(), 'in')
# Inbound in center cell
for cell in product(*(range(1, 4) for _ in range(dim))):
for q in range(values_per_cell):
for cell in product(*(range(1, 4) for _ in range(stencil.D))):
for q in range(stencil.Q):
f = in_access.read_pdf(pdf_arr, cell, q)
assert f == q
def test_extrapolation_outflow_initialization_by_copy():
stencil = get_stencil('D2Q9')
values_per_cell = len(stencil)
stencil = LBStencil(Stencil.D2Q9)
domain_size = (1, 5)
streaming_pattern = 'esotwist'
......@@ -69,16 +68,16 @@ def test_extrapolation_outflow_initialization_by_copy():
pdf_acc = AccessPdfValues(stencil, streaming_pattern=streaming_pattern,
timestep=zeroth_timestep, streaming_dir='out')
dh = create_data_handling(domain_size, default_target='cpu')
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=values_per_cell)
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
pdf_arr = dh.cpu_arrays[pdf_field.name]
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name,
streaming_pattern=streaming_pattern, target='cpu')
streaming_pattern=streaming_pattern, target=Target.CPU)
for y in range(1, 6):
for j in range(values_per_cell):
for j in range(stencil.Q):
pdf_acc.write_pdf(pdf_arr, (1, y), j, j)
normal_dir = (1, 0)
......@@ -99,28 +98,25 @@ def test_extrapolation_outflow_initialization_by_copy():
def test_extrapolation_outflow_initialization_by_callback():
stencil = get_stencil('D2Q9')
values_per_cell = len(stencil)
stencil = LBStencil(Stencil.D2Q9)
domain_size = (1, 5)
streaming_pattern = 'esotwist'
zeroth_timestep = 'even'
dh = create_data_handling(domain_size, default_target='cpu')
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=values_per_cell)
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name,
streaming_pattern=streaming_pattern, target='cpu')
streaming_pattern=streaming_pattern, target=Target.CPU)
normal_dir = (1, 0)
density_callback = lambda x, y: 1
velocity_callback = lambda x, y: (0, 0)
outflow = ExtrapolationOutflow(normal_dir, lb_method,
outflow = ExtrapolationOutflow(normal_direction=normal_dir, lb_method=lb_method,
streaming_pattern=streaming_pattern,
zeroth_timestep=zeroth_timestep,
initial_density=density_callback,
initial_velocity=velocity_callback)
initial_density=lambda x, y: 1,
initial_velocity=lambda x, y: (0, 0))
boundary_slice = get_ghost_region_slice(normal_dir)
bh.set_boundary(outflow, boundary_slice)
bh.prepare()
......
......@@ -87,7 +87,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
......@@ -101,7 +101,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.9.7"
}
},
"nbformat": 4,
......
import pytest
import pystencils as ps
from lbmpy.creationfunctions import create_lb_function, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
@pytest.mark.parametrize("double_precision", [False, True])
@pytest.mark.parametrize(
"method_enum", [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT]
)
def test_creation(double_precision, method_enum):
"""Simple test that makes sure that only float variables are created"""
lbm_config = LBMConfig(method=method_enum, relaxation_rate=1.5, compressible=True)
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig(
default_dtype="float64" if double_precision else "float32"
)
else:
config = ps.CreateKernelConfig(
data_type="float64" if double_precision else "float32",
default_number_float="float64" if double_precision else "float32",
)
func = create_lb_function(lbm_config=lbm_config, config=config)
code = ps.get_code_str(func)
if double_precision:
assert "float" not in code
assert "double" in code
else:
assert "double" not in code
assert "float" in code
@pytest.mark.parametrize("numeric_type", ["float32", "float64"])
@pytest.mark.parametrize(
"method_enum", [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT]
)
def test_scenario(method_enum, numeric_type):
lbm_config = LBMConfig(
stencil=LBStencil(Stencil.D3Q27),
method=method_enum,
relaxation_rate=1.45,
compressible=True,
)
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig(
default_dtype=numeric_type
)
else:
config = ps.CreateKernelConfig(
data_type=numeric_type,
default_number_float=numeric_type
)
sc = create_lid_driven_cavity((16, 16, 8), lbm_config=lbm_config, config=config)
sc.run(1)
code = ps.get_code_str(sc.ast)
if numeric_type == "float64":
assert "float" not in code
assert "double" in code
else:
assert "double" not in code
assert "float" in code
"""Tests velocity and stress fluctuations for thermalized LB"""
import pytest
import pystencils as ps
from lbmpy._compat import IS_PYSTENCILS_2
from pystencils import get_code_str
if IS_PYSTENCILS_2:
pass
else:
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set
from pystencils.cpu.cpujit import get_compiler_config
from pystencils.enums import Target
from lbmpy.creationfunctions import *
from lbmpy.forcemodels import Guo
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
import numpy as np
from lbmpy.enums import Stencil
from lbmpy.moments import is_bulk_moment, is_shear_moment, get_order
from lbmpy.stencils import LBStencil
if not IS_PYSTENCILS_2:
def _skip_instruction_sets_windows(instruction_sets):
if get_compiler_config()["os"] == "windows":
# skip instruction sets supported by the CPU but not by the compiler
if "avx" in instruction_sets and (
"/arch:avx2" not in get_compiler_config()["flags"].lower()
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx")
if (
"avx512" in instruction_sets
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx512")
return instruction_sets
INSTRUCTION_SETS = get_supported_instruction_sets()
else:
INSTRUCTION_SETS = Target.available_vector_cpu_targets()
def single_component_maxwell(x1, x2, kT, mass):
"""Integrate the probability density from x1 to x2 using the trapezoidal rule"""
x = np.linspace(x1, x2, 1000)
try:
trapezoid = np.trapezoid # since numpy 2.0
except AttributeError:
trapezoid = np.trapz
return trapezoid(np.exp(-mass * x**2 / (2.0 * kT)), x) / np.sqrt(
2.0 * np.pi * kT / mass
)
def rr_getter(moment_group):
"""Maps a group of moments to a relaxation rate (shear, bulk, even, odd)
in the 4 relaxation time thermalized LB model
"""
is_shear = [is_shear_moment(m, 3) for m in moment_group]
is_bulk = [is_bulk_moment(m, 3) for m in moment_group]
order = [get_order(m) for m in moment_group]
assert min(order) == max(order)
order = order[0]
if order < 2:
return [0] * len(moment_group)
elif any(is_bulk):
assert all(is_bulk)
return [sp.Symbol("omega_bulk")] * len(moment_group)
elif any(is_shear):
assert all(is_shear)
return [sp.Symbol("omega_shear")] * len(moment_group)
elif order % 2 == 0:
assert order > 2
return [sp.Symbol("omega_even")] * len(moment_group)
else:
return [sp.Symbol("omega_odd")] * len(moment_group)
def second_order_moment_tensor_assignments(function_values, stencil, output_field):
"""Assignments for calculating the pressure tensor"""
assert len(function_values) == len(stencil)
dim = len(stencil[0])
return [
ps.Assignment(
output_field(i, j),
sum(c[i] * c[j] * f for f, c in zip(function_values, stencil)),
)
for i in range(dim)
for j in range(dim)
]
def add_pressure_output_to_collision_rule(collision_rule, pressure_field):
pressure_ouput = second_order_moment_tensor_assignments(
collision_rule.method.pre_collision_pdf_symbols,
collision_rule.method.stencil,
pressure_field,
)
collision_rule.main_assignments = collision_rule.main_assignments + pressure_ouput
def get_fluctuating_lb(
size=None,
kT=None,
omega_shear=None,
omega_bulk=None,
omega_odd=None,
omega_even=None,
rho_0=None,
target=None,
zero_centered: bool = False,
):
# Parameters
stencil = LBStencil(Stencil.D3Q19)
# Setup data handling
dh = ps.create_data_handling(
(size,) * stencil.D, periodicity=True, default_target=target
)
src = dh.add_array("src", values_per_cell=stencil.Q, layout="f")
dst = dh.add_array_like("dst", "src")
rho = dh.add_array("rho", layout="f", latex_name="\\rho", values_per_cell=1)
u = dh.add_array("u", values_per_cell=dh.dim, layout="f")
pressure_field = dh.add_array(
"pressure", values_per_cell=(3, 3), layout="f", gpu=target == Target.GPU
)
force_field = dh.add_array(
"force", values_per_cell=stencil.D, layout="f", gpu=target == Target.GPU
)
# Method setup
lbm_config = LBMConfig(
stencil=stencil,
method=Method.MRT,
compressible=True,
weighted=True,
zero_centered=zero_centered,
relaxation_rates=rr_getter,
force_model=Guo(force=force_field.center_vector),
fluctuating={"temperature": kT},
kernel_type="collide_only",
)
lb_method = create_lb_method(lbm_config=lbm_config)
lbm_config.lb_method = lb_method
lbm_opt = LBMOptimisation(symbolic_field=src, cse_global=True)
collision_rule = create_lb_collision_rule(
lbm_config=lbm_config, lbm_optimisation=lbm_opt
)
# add_pressure_output_to_collision_rule(collision_rule, pressure_field)
collision = create_lb_update_rule(
collision_rule=collision_rule, lbm_config=lbm_config, lbm_optimisation=lbm_opt
)
stream = create_stream_pull_with_output_kernel(
collision.method,
src,
dst,
{"density": rho, "velocity": u, "moment2": pressure_field},
)
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
# Compile kernels
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
sync_pdfs = dh.synchronization_function([src.name])
# Initialization
init = macroscopic_values_setter(
collision.method,
velocity=(0,) * dh.dim,
pdfs=src.center_vector,
density=rho_0
)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
dh.fill(rho.name, rho_0)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0)
dh.fill(force_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(force_field.name, 0)
dh.run_kernel(init_kernel)
# time loop
def time_loop(start, steps):
dh.all_to_gpu()
for i in range(start, start + steps):
dh.run_kernel(
collision_kernel,
omega_shear=omega_shear,
omega_bulk=omega_bulk,
omega_odd=omega_odd,
omega_even=omega_even,
seed=42,
time_step=i,
)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
return start + steps
return dh, time_loop
@pytest.mark.parametrize(
"zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
)
@pytest.mark.parametrize(
"domain_size", [8, 60]
)
def test_resting_fluid(zero_centered: bool, domain_size: int, target=Target.CPU):
rho_0 = 0.86
kT = 4e-4
L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(
size=L[0],
target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.04,
omega_odd=0.3,
zero_centered=zero_centered,
)
# Test
t = 0
# warm up
t = time_loop(t, 10)
# Measurement
for i in range(10):
t = time_loop(t, 5)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservationo
# density per cell fluctuates, but toal mass is conserved
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(momentum, [0, 0, 0], atol=1e-10)
# temperature (fluctuates around pre-set kT)
kinetic_energy = 1 / 2 * np.dot(res_rho, res_u * res_u) / np.prod(L)
kT_tol = 0.075 *(16/domain_size)**(3/2)
np.testing.assert_allclose(kinetic_energy, [kT / 2] * 3, rtol=kT_tol)
# velocity distribution
v_hist, v_bins = np.histogram(
res_u, bins=11, range=(-0.075, 0.075), density=True
)
# Calculate expected values from single
v_expected = []
for j in range(len(v_hist)):
# Maxwell distribution
res = (
1.0
/ (v_bins[j + 1] - v_bins[j])
* single_component_maxwell(v_bins[j], v_bins[j + 1], kT, rho_0)
)
v_expected.append(res)
v_expected = np.array(v_expected)
hist_tol_all = 0.75 *(16/domain_size)**(3/2)
np.testing.assert_allclose(v_hist, v_expected, rtol=hist_tol_all)
hist_tol_center = hist_tol_all/10
remove = 3
np.testing.assert_allclose(
v_hist[remove:-remove], v_expected[remove:-remove], rtol=hist_tol_center
)
# pressure tensor against expressions from
# Duenweg, Schiller, Ladd, https://arxiv.org/abs/0707.1581
res_pressure = dh.gather_array("pressure").reshape((-1, 3, 3))
c_s = np.sqrt(1 / 3) # speed of sound
# average of pressure tensor
# Diagonal elements are rho c_s^22 +<u,u>. When the fluid is
# thermalized, the expectation value of <u,u> = kT due to the
# equi-partition theorem.
p_av_expected = np.diag([rho_0 * c_s**2 + kT] * 3)
pressure_atol = c_s**2 / 200 *(16/domain_size)**(3/2)
np.testing.assert_allclose(
np.mean(res_pressure, axis=0), p_av_expected, atol=pressure_atol)
@pytest.mark.parametrize(
"zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
)
@pytest.mark.parametrize(
"domain_size", [8, 60]
)
def test_point_force(zero_centered: bool, domain_size: int, target=Target.CPU):
"""Test momentum balance for thermalized fluid with applied poitn forces"""
rho_0 = 0.86
kT = 4e-4
L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(
size=L[0],
target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.8,
omega_odd=0.8,
zero_centered=zero_centered
)
# Test
t = 0
# warm up
t = time_loop(t, 100)
introduced_momentum = np.zeros(3)
for i in range(100):
point_force = 1e-2/domain_size**(3/2) * (np.random.random(3) - 0.5)
introduced_momentum += point_force
# Note that ghost layers are included in the indexing
force_pos = np.random.randint(1, L[0] - 2, size=3)
dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = point_force
t = time_loop(t, 1)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservation
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(
momentum, introduced_momentum + 0.5 * point_force, atol=1e-10
)
dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = np.zeros(3)
@pytest.mark.skipif(
not INSTRUCTION_SETS, reason="No vector instruction sets supported"
)
@pytest.mark.parametrize("data_type", ("float32", "float64"))
@pytest.mark.parametrize("assume_aligned", (True, False))
@pytest.mark.parametrize("assume_inner_stride_one", (True, False))
@pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
@pytest.mark.xfail(IS_PYSTENCILS_2, reason="Vectorization for RNGs not implemented yet")
def test_vectorization(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
method = create_mrt_orthogonal(
stencil=stencil, compressible=True, weighted=True, relaxation_rates=rr_getter
)
if IS_PYSTENCILS_2:
seed = ps.TypedSymbol("seed", np.uint32)
rng = ps.random.Philox("phil", data_type, seed)
fluct_options = {
"temperature": sp.Symbol("kT"),
"rng": rng,
}
else:
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
fluct_options = {
"temperature": sp.Symbol("kT"),
"rng_node": rng_node,
"block_offsets": tuple([0] * stencil.D),
}
lbm_config = LBMConfig(
lb_method=method,
fluctuating=fluct_options,
compressible=True,
zero_centered=False,
stencil=method.stencil,
kernel_type="collide_only",
)
lbm_opt = LBMOptimisation(
cse_global=True, symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp
)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
if not IS_PYSTENCILS_2:
instruction_sets = _skip_instruction_sets_windows(INSTRUCTION_SETS)
else:
instruction_sets = INSTRUCTION_SETS
instruction_set = instruction_sets[-1]
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig()
config.target = instruction_set
config.default_dtype = data_type
config.cpu.vectorize.enable = True
config.cpu.vectorize.assume_aligned = assume_aligned
config.cpu.vectorize.assume_inner_stride_one = assume_inner_stride_one
else:
config = ps.CreateKernelConfig(
target=Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info={
"instruction_set": instruction_set,
"assume_aligned": assume_aligned,
"assume_inner_stride_one": assume_inner_stride_one,
"assume_sufficient_line_padding": assume_sufficient_line_padding,
},
)
if not assume_inner_stride_one and "storeS" not in get_vector_instruction_set(
data_type, instruction_set
):
with pytest.warns(UserWarning) as pytest_warnings:
ast = ps.create_kernel(collision, config=config)
assert "Could not vectorize loop" in pytest_warnings[0].message.args[0]
else:
ast = ps.create_kernel(collision, config=config)
ast.compile()
code = get_code_str(ast)
print(code)
@pytest.mark.parametrize("data_type", ("float32", "float64"))
@pytest.mark.parametrize("assume_aligned", (True, False))
@pytest.mark.parametrize("assume_inner_stride_one", (True, False))
@pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
@pytest.mark.skipif(IS_PYSTENCILS_2, reason="waLBerla block offsets feature unavailable")
def test_fluctuating_lb_issue_188_wlb(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
temperature = sp.symbols("temperature")
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
fluctuating = {
"temperature": temperature,
"block_offsets": "walberla",
"rng_node": rng_node,
}
lbm_config = LBMConfig(
stencil=stencil,
method=Method.MRT,
compressible=True,
weighted=True,
zero_centered=False,
relaxation_rate=1.4,
fluctuating=fluctuating,
)
lbm_opt = LBMOptimisation(
symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp, cse_global=True
)
up = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
cpu_vectorize_info = {
"instruction_set": "avx",
"assume_inner_stride_one": True,
"assume_aligned": True,
}
config = ps.CreateKernelConfig(
target=ps.Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info=cpu_vectorize_info,
)
ast = create_kernel(up, config=config)
code = ps.get_code_str(ast)
# print(code)
if data_type == "float32":
assert "0.5f" in code
assert "_mm256_mul_ps" in code
assert "_mm256_sqrt_ps" in code
else:
assert "0.5f" not in code
assert "_mm256_mul_pd" in code
assert "_mm256_sqrt_pd" in code
import pytest
import numpy as np
from numpy.testing import assert_allclose
import sympy as sp
import pystencils as ps
from pystencils import Target
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Stencil, Method, ForceModel
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter, macroscopic_values_getter
from lbmpy.moments import (is_bulk_moment, moments_up_to_component_order,
exponents_to_polynomial_representations, exponent_tuple_sort_key)
from lbmpy.stencils import LBStencil
# all force models available are defined in the ForceModel enum, but Cumulant is not a "real" force model
force_models = [f for f in ForceModel if f is not ForceModel.CENTRALMOMENT]
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("omega", [0.5, 1.5])
def test_total_momentum(method_enum, zero_centered, force_model, omega):
if method_enum == Method.CUMULANT and \
force_model not in (ForceModel.SIMPLE, ForceModel.LUO, ForceModel.GUO, ForceModel.HE):
return
L = (16, 16)
stencil = LBStencil(Stencil.D2Q9)
F = (2e-4, -3e-4)
dh = ps.create_data_handling(L, periodicity=True, default_target=Target.CPU)
src = dh.add_array('src', values_per_cell=stencil.Q)
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho', values_per_cell=1)
u = dh.add_array('u', values_per_cell=stencil.D)
lbm_config = LBMConfig(method=method_enum, stencil=stencil, relaxation_rate=omega,
compressible=True, zero_centered=zero_centered,
force_model=force_model, force=F, streaming_pattern='pull')
lbm_opt = LBMOptimisation(symbolic_field=src)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
collision_kernel = ps.create_kernel(collision, config=config).compile()
fluid_density = 1.1
def init():
dh.fill(ρ.name, fluid_density)
dh.fill(u.name, 0)
setter = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
pdfs=src, density=ρ.center,
set_pre_collision_pdfs=True)
kernel = ps.create_kernel(setter).compile()
dh.run_kernel(kernel)
sync_pdfs = dh.synchronization_function([src.name])
getter = macroscopic_values_getter(collision.method, ρ.center, u.center_vector, src, use_pre_collision_pdfs=True)
getter_kernel = ps.create_kernel(getter).compile()
def time_loop(steps):
dh.all_to_gpu()
for _ in range(steps):
dh.run_kernel(collision_kernel)
dh.swap(src.name, dst.name)
sync_pdfs()
dh.all_to_cpu()
t = 20
init()
time_loop(t)
dh.run_kernel(getter_kernel)
# Check that density did not change
assert_allclose(dh.gather_array(ρ.name), fluid_density)
# Check for correct velocity
total = np.sum(dh.gather_array(u.name), axis=(0, 1))
assert_allclose(total / np.prod(L) / F * fluid_density / t, 1)
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_modes(force_model, compressible):
"""check force terms in mode space"""
_check_modes(LBStencil(Stencil.D2Q9), force_model, compressible)
@pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_modes_longrun(stencil, force_model, compressible):
"""check force terms in mode space"""
_check_modes(LBStencil(stencil), force_model, compressible)
@pytest.mark.parametrize("force_model", force_models)
def test_momentum_density_shift(force_model):
target = Target.CPU
stencil = LBStencil(Stencil.D2Q9)
domain_size = (4, 4)
dh = ps.create_data_handling(domain_size=domain_size, default_target=target)
rho = dh.add_array('rho', values_per_cell=1)
dh.fill('rho', 0.0, ghost_layers=True)
momentum_density = dh.add_array('momentum_density', values_per_cell=dh.dim)
dh.fill('momentum_density', 0.0, ghost_layers=True)
src = dh.add_array('src', values_per_cell=len(stencil))
dh.fill('src', 0.0, ghost_layers=True)
lbm_config = LBMConfig(method=Method.SRT, compressible=True, force_model=force_model,
force=(1, 2))
method = create_lb_method(lbm_config=lbm_config)
cqc = method.conserved_quantity_computation
momentum_density_getter = cqc.output_equations_from_pdfs(src.center_vector,
{'density': rho.center,
'momentum_density': momentum_density.center_vector})
config = ps.CreateKernelConfig(target=dh.default_target)
momentum_density_ast = ps.create_kernel(momentum_density_getter, config=config)
momentum_density_kernel = momentum_density_ast.compile()
dh.run_kernel(momentum_density_kernel)
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 0]) == np.prod(domain_size) / 2
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 1]) == np.prod(domain_size)
@pytest.mark.parametrize('force_model', force_models)
def test_forcing_space_equivalences(force_model):
if force_model == ForceModel.HE:
# We don't expect equivalence for the He model since its
# moments are derived from the continuous maxwellian
return
stencil = LBStencil(Stencil.D3Q27)
force = sp.symbols(f"F_:{stencil.D}")
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, force=force, force_model=force_model)
fmodel = lbm_config.force_model
lb_method = create_lb_method(lbm_config=lbm_config)
inv_moment_matrix = lb_method.moment_matrix.inv()
force_pdfs = sp.Matrix(fmodel(lb_method))
force_moments = fmodel.moment_space_forcing(lb_method)
diff = (force_pdfs - (inv_moment_matrix * force_moments)).expand()
for i, d in enumerate(diff):
assert d == 0, f"Mismatch between population and moment space forcing " \
f"in force model {force_model}, population f_{i}"
@pytest.mark.parametrize("force_model", [ForceModel.GUO, ForceModel.BUICK, ForceModel.SHANCHEN])
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("method", [Method.SRT, Method.TRT, Method.MRT])
def test_literature(force_model, stencil, method):
# Be aware that the choice of the conserved moments does not affect the forcing although omega is introduced
# in the forcing vector then. The reason is that:
# m_{100}^{\ast} = m_{100} + \omega ( m_{100}^{eq} - m_{100} ) + \left( 1 - \frac{\omega}{2} \right) F_x
# always simplifies to:
# m_{100}^{\ast} = m_{100} + F_x
# Thus the relaxation rate gets cancled again.
stencil = LBStencil(stencil)
omega_s = sp.Symbol("omega_s")
omega_b = sp.Symbol("omega_b")
omega_o = sp.Symbol("omega_o")
omega_e = sp.Symbol("omega_e")
if method == Method.SRT:
rrs = [omega_s]
omega_o = omega_b = omega_e = omega_s
elif method == Method.TRT:
rrs = [omega_e, omega_o]
omega_s = omega_b = omega_e
else:
rrs = [omega_s, omega_b, omega_o, omega_e, omega_o, omega_e]
F = sp.symbols(f"F_:{stencil.D}")
lbm_config = LBMConfig(method=method, weighted=True, stencil=stencil, relaxation_rates=rrs,
compressible=force_model != ForceModel.BUICK,
force_model=force_model, force=F)
lb_method = create_lb_method(lbm_config=lbm_config)
omega_momentum = list(set(lb_method.relaxation_rates[1:stencil.D + 1]))
assert len(omega_momentum) == 1
omega_momentum = omega_momentum[0]
subs_dict = lb_method.subs_dict_relaxation_rate
force_term = sp.simplify(lb_method.force_model(lb_method).subs(subs_dict))
u = sp.Matrix(lb_method.first_order_equilibrium_moment_symbols)
rho = lb_method.conserved_quantity_computation.density_symbol
# see silva2020 for nomenclature
F = sp.Matrix(F)
uf = sp.Matrix(u).dot(F)
F2 = sp.Matrix(F).dot(sp.Matrix(F))
Fq = sp.zeros(stencil.Q, 1)
uq = sp.zeros(stencil.Q, 1)
for i, cq in enumerate(stencil):
Fq[i] = sp.Matrix(cq).dot(sp.Matrix(F))
uq[i] = sp.Matrix(cq).dot(u)
common_plus = 3 * (1 - omega_e / 2)
common_minus = 3 * (1 - omega_momentum / 2)
result = []
if method == Method.MRT and force_model == ForceModel.GUO:
# check against eq. 4.68 from schiller2008thermal
uf = u.dot(F) * sp.eye(len(F))
G = (u * F.transpose() + F * u.transpose() - uf * sp.Rational(2, lb_method.dim)) * sp.Rational(1, 2) * \
(2 - omega_s) + uf * sp.Rational(1, lb_method.dim) * (2 - omega_b)
for direction, w_i in zip(lb_method.stencil, lb_method.weights):
direction = sp.Matrix(direction)
tr = sp.trace(G * (direction * direction.transpose() - sp.Rational(1, 3) * sp.eye(len(F))))
result.append(3 * w_i * (F.dot(direction) + sp.Rational(3, 2) * tr))
elif force_model == ForceModel.GUO:
# check against table 2 in silva2020 (correct for SRT and TRT), matches eq. 20 from guo2002discrete (for SRT)
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf)
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.BUICK:
# check against table 2 in silva2020 (correct for all collision models due to the simplicity of Buick),
# matches eq. 18 from silva2010 (for SRT)
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = 0
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.EDM:
# check against table 2 in silva2020
if method == Method.MRT:
# for mrt no literature terms are known at the time of writing this test case.
# However it is most likly correct since SRT and TRT are derived from the moment space representation
pytest.skip()
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf) + ((w_i / (8 * rho)) * (3 * Fq[i] ** 2 - F2))
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.SHANCHEN:
# check against table 2 in silva2020
if method == Method.MRT:
# for mrt no literature terms are known at the time of writing this test case.
# However it is most likly correct since SRT and TRT are derived from the moment space representation
pytest.skip()
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf) + common_plus ** 2 * (
(w_i / (2 * rho)) * (3 * Fq[i] ** 2 - F2))
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
assert list(sp.simplify(force_term - sp.Matrix(result))) == [0] * len(stencil)
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_modes_central_moment(force_model, compressible):
"""check force terms in mode space"""
stencil = LBStencil(Stencil.D2Q9)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_symmetric_forcing_equivalence(force_model, compressible):
stencil = LBStencil(Stencil.D2Q9)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
moments = moments_up_to_component_order(2, dim=2)
moments = sorted(moments, key=exponent_tuple_sort_key)
moment_polys = exponents_to_polynomial_representations(moments)
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
nested_moments=moment_polys, compressible=True, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
if not method.force_model.has_symmetric_central_moment_forcing:
return
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
force_before, force_after = method.force_model.symmetric_central_moment_forcing(method, moments)
d = method.relaxation_matrix
eye = sp.eye(stencil.Q)
force_combined = (eye - d) @ force_before + force_after
assert (force_moments - force_combined).expand() == sp.Matrix([0] * stencil.Q)
@pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_modes_central_moment_longrun(stencil, force_model, compressible):
"""check force terms in mode space"""
stencil = LBStencil(stencil)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
def _check_modes(stencil, force_model, compressible):
omega_s = sp.Symbol("omega_s")
omega_b = sp.Symbol("omega_b")
omega_o = sp.Symbol("omega_o")
omega_e = sp.Symbol("omega_e")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.MRT, stencil=stencil,
relaxation_rates=[omega_s, omega_b, omega_o, omega_e, omega_o, omega_e],
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
if force_model == ForceModel.GUO:
num_stresses = (stencil.D * stencil.D - stencil.D) // 2 + stencil.D
lambda_s, lambda_b = -omega_s, -omega_b
# The stress moments should match eq. 47 from https://doi.org/10.1023/A:1010414013942
u = method.first_order_equilibrium_moment_symbols
def traceless(m):
tr = sp.simplify(sum([m[i, i] for i in range(stencil.D)]))
return m - tr / m.shape[0] * sp.eye(m.shape[0])
C = sp.Rational(1, 2) * (2 + lambda_s) * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) +
traceless(sp.Matrix(F) * sp.Matrix(u).transpose())) + \
sp.Rational(1, method.dim) * (2 + lambda_b) * sp.Matrix(u).dot(F) * sp.eye(method.dim)
subs = {sp.Symbol(chr(ord("x") + i)) * sp.Symbol(chr(ord("x") + j)): C[i, j]
for i in range(stencil.D) for j in range(stencil.D)}
for force_moment, moment in zip(force_moments[stencil.D + 1:stencil.D + 1 + num_stresses],
method.moments[stencil.D + 1:stencil.D + 1 + num_stresses]):
ref = moment.subs(subs)
diff = sp.simplify(ref - force_moment)
if is_bulk_moment(moment, stencil.D):
assert diff == 0 or isinstance(diff, sp.Rational) # difference should be zero or a constant
else:
assert diff == 0 # difference should be zero
ff = method.moment_matrix.inv() * sp.Matrix(method.force_model.moment_space_forcing(method).subs(subs_dict))
# Check eq. 4.53a from schiller2008thermal
assert sp.simplify(sum(ff)) == 0
# Check eq. 4.53b from schiller2008thermal
assert [sp.simplify(sum(ff[i] * stencil[i][j] for i in range(len(stencil)))) for j in range(stencil.D)] == F
# Check eq. 4.61a from schiller2008thermal
ref = (2 + lambda_s) / 2 * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) +
traceless(sp.Matrix(F) * sp.Matrix(u).transpose()))
s = sp.zeros(stencil.D)
for i in range(0, len(stencil)):
s += ff[i] * traceless(sp.Matrix(stencil[i]) * sp.Matrix(stencil[i]).transpose())
assert sp.simplify(s - ref) == sp.zeros(stencil.D)
# Check eq. 4.61b from schiller2008thermal
assert sp.simplify(sum(ff[i] * stencil[i][a] ** 2 for i in range(len(stencil)) for a in range(stencil.D))
- (2 + lambda_b) * sp.Matrix(u).dot(F)) == 0
# All other moments should be zero
assert list(force_moments[stencil.D + 1 + num_stresses:]) == [0] * \
(len(stencil) - (stencil.D + 1 + num_stresses))
elif force_model == ForceModel.SIMPLE:
# All other moments should be zero
assert list(force_moments[stencil.D + 1:]) == [0] * (len(stencil) - (stencil.D + 1))
import numpy as np
from lbmpy.boundaries import UBB, NoSlip
from lbmpy.enums import ForceModel
from lbmpy.scenarios import create_channel
from pystencils import make_slice
......@@ -26,10 +27,11 @@ def test_force_on_boundary():
for parallel in (False, True) if wLB else (False,):
for boundary_obj in boundaries:
print("Testing parallel %d, boundary %s" % (parallel, boundary_obj.name))
step = create_channel(domain_size, force=1e-5, relaxation_rate=1.5, parallel=parallel, force_model='buick')
print(f"Testing parallel {parallel}, boundary {boundary_obj.name}")
step = create_channel(domain_size, force=1e-5, relaxation_rate=1.5, parallel=parallel,
force_model=ForceModel.BUICK)
force = calculate_force(step, boundary_obj)
print(" -> force = ", force)
print(f" -> force = {force}")
results.append(force)
for res in results[1:]:
......
from dataclasses import replace
import pystencils as ps
import lbmpy as lp
from pystencils.slicing import slice_from_direction
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries.boundaryconditions import FreeSlip, NoSlip, ExtrapolationOutflow, UBB
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule
from lbmpy.macroscopic_value_kernels import pdf_initialization_assignments
import numpy as np
def velocity_info_callback(boundary_data, **_):
boundary_data['vel_1'] = 0
boundary_data['vel_2'] = 0
u_max = 0.1
x, y = boundary_data.link_positions(0), boundary_data.link_positions(1)
dist = (15 - y) / 15
boundary_data['vel_0'] = u_max * (1 - dist)
def test_free_slip():
stencil = lp.LBStencil(lp.Stencil.D3Q27)
domain_size = (30, 15, 30)
dim = len(domain_size)
dh = ps.create_data_handling(domain_size=domain_size)
src = dh.add_array('src', values_per_cell=stencil.Q)
dh.fill('src', 0.0, ghost_layers=True)
dst = dh.add_array('dst', values_per_cell=stencil.Q)
dh.fill('dst', 0.0, ghost_layers=True)
velField = dh.add_array('velField', values_per_cell=stencil.D)
dh.fill('velField', 0.0, ghost_layers=True)
lbm_config = lp.LBMConfig(stencil=stencil, method=lp.Method.SRT, relaxation_rate=1.8,
output={'velocity': velField}, kernel_type='stream_pull_collide')
method = create_lb_method(lbm_config=lbm_config)
init = pdf_initialization_assignments(method, 1.0, (0, 0, 0), src.center_vector)
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=False)
ast_init = ps.create_kernel(init, config=config)
kernel_init = ast_init.compile()
dh.run_kernel(kernel_init)
lbm_opt = lp.LBMOptimisation(symbolic_field=src, symbolic_temporary_field=dst)
lbm_config = replace(lbm_config, lb_method=method)
update = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
ast_kernel = ps.create_kernel(update, config=config)
kernel = ast_kernel.compile()
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'src', name="bh")
inflow = UBB(velocity_info_callback, dim=dim)
outflow = ExtrapolationOutflow(stencil[4], method)
wall = NoSlip("wall")
freeslip = FreeSlip(stencil, (0, -1, 0))
bh.set_boundary(inflow, slice_from_direction('W', dim))
bh.set_boundary(outflow, slice_from_direction('E', dim))
bh.set_boundary(wall, slice_from_direction('S', dim))
bh.set_boundary(wall, slice_from_direction('T', dim))
bh.set_boundary(wall, slice_from_direction('B', dim))
bh.set_boundary(freeslip, slice_from_direction('N', dim))
for i in range(2000):
bh()
dh.run_kernel(kernel)
dh.swap("src", "dst")
vel_profile = dh.gather_array('velField')[-2, :, domain_size[2] // 2, 0]
np.testing.assert_almost_equal(np.gradient(vel_profile)[-1], 0, decimal=3)
import os
import numpy as np
import pytest
from lbmpy.boundaries import NoSlip
from lbmpy.enums import Method
from lbmpy.geometry import add_black_and_white_image, add_pipe_walls
from lbmpy.lbstep import LatticeBoltzmannStep
import lbmpy.plot as plt
from pystencils.slicing import make_slice
......@@ -22,23 +25,20 @@ def test_pipe():
plot = False
for domain_size in [(30, 10, 10), (30, 10)]:
for diameter in [5, 10, diameter_callback]:
sc = LatticeBoltzmannStep(domain_size=domain_size, method='srt', relaxation_rate=1.9,
optimization={})
sc = LatticeBoltzmannStep(domain_size=domain_size, method=Method.SRT, relaxation_rate=1.9)
add_pipe_walls(sc.boundary_handling, diameter)
if plot:
import lbmpy.plot as plt
from pystencils.slicing import make_slice
if len(domain_size) == 2:
plt.boundary_handling(sc.boundary_handling)
plt.title("2D, diameter=%s" % (str(diameter,)))
plt.title(f"2D, diameter={str(diameter,)}")
plt.show()
elif len(domain_size) == 3:
plt.subplot(1, 2, 1)
plt.boundary_handling(sc.boundary_handling, make_slice[0.5, :, :])
plt.title("3D, diameter=%s" % (str(diameter,)))
plt.title(f"3D, diameter={str(diameter,)}")
plt.subplot(1, 2, 2)
plt.boundary_handling(sc.boundary_handling, make_slice[:, 0.5, :])
plt.title("3D, diameter=%s" % (str(diameter, )))
plt.title(f"3D, diameter={str(diameter, )}")
plt.show()
......@@ -49,14 +49,13 @@ def get_test_image_path():
def test_image():
sc = LatticeBoltzmannStep(domain_size=(50, 40), method='srt', relaxation_rate=1.9,
optimization={})
pytest.importorskip('scipy.ndimage')
sc = LatticeBoltzmannStep(domain_size=(50, 40), method=Method.SRT, relaxation_rate=1.9)
add_black_and_white_image(sc.boundary_handling, get_test_image_path(), keep_aspect_ratio=True)
def test_slice_mask_combination():
sc = LatticeBoltzmannStep(domain_size=(30, 30), method='srt', relaxation_rate=1.9,
optimization={})
sc = LatticeBoltzmannStep(domain_size=(30, 30), method=Method.SRT, relaxation_rate=1.9)
def callback(*coordinates):
x = coordinates[0]
......