Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Sparse
  • WallLaw
  • improved_comm
  • master
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
  • release/0.4.1
  • release/0.4.2
  • release/0.4.3
  • release/0.4.4
  • release/1.0
  • release/1.0.1
  • release/1.1
  • release/1.1.1
  • release/1.2
  • release/1.3
  • release/1.3.1
  • release/1.3.2
  • release/1.3.3
  • release/1.3.4
  • release/1.3.5
  • release/1.3.6
  • release/1.3.7
42 results

Target

Select target project
  • ravi.k.ayyala/lbmpy
  • brendan-waters/lbmpy
  • anirudh.jonnalagadda/lbmpy
  • jbadwaik/lbmpy
  • alexander.reinauer/lbmpy
  • itischler/lbmpy
  • he66coqe/lbmpy
  • ev81oxyl/lbmpy
  • Bindgen/lbmpy
  • da15siwa/lbmpy
  • holzer/lbmpy
  • RudolfWeeber/lbmpy
  • pycodegen/lbmpy
13 results
Select Git revision
  • GetterSetterAPI
  • HRR
  • HydroPressure
  • InplaceConfig
  • Outflow
  • PhaseField
  • Sparse
  • UBBVelocity
  • UpdateAPISparse
  • WallLaw
  • WetNodeBoundaries
  • csebug
  • feature/sparse
  • feature/try
  • improved_comm
  • install_requires
  • master
  • phaseField
  • relaxationrates
  • test_martin
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
  • release/0.4.1
  • release/0.4.2
  • release/0.4.3
  • release/0.4.4
  • release/1.0
  • release/1.0.1
  • release/1.1
  • release/1.1.1
  • release/1.2
  • release/1.3
  • release/1.3.1
  • release/1.3.2
  • release/1.3.3
  • release/1.3.4
  • release/1.3.5
  • release/1.3.6
57 results
Show changes
Showing
with 2052 additions and 57 deletions
import numpy as np
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.forcemodels import Luo
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.moments import moment_matrix, set_up_shift_matrix
from lbmpy.methods.creationfunctions import cascaded_moment_sets_literature
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.stencils import LBStencil
from lbmpy.moment_transforms import BinomialChimeraTransform
def test_central_moment_ldc():
sc_central_moment = create_lid_driven_cavity((20, 20), method=Method.CENTRAL_MOMENT,
relaxation_rate=1.8, equilibrium_order=4,
compressible=True, force=(-1e-10, 0))
sc_central_moment_3d = create_lid_driven_cavity((20, 20, 3), method=Method.CENTRAL_MOMENT,
relaxation_rate=1.8, equilibrium_order=4,
compressible=True, force=(-1e-10, 0, 0))
sc_central_moment.run(1000)
sc_central_moment_3d.run(1000)
assert np.isfinite(np.max(sc_central_moment.velocity[:, :]))
assert np.isfinite(np.max(sc_central_moment_3d.velocity[:, :, :]))
def test_central_moment_class():
stencil = LBStencil(Stencil.D2Q9)
lbm_config = LBMConfig(stencil=stencil, method=Method.CENTRAL_MOMENT, relaxation_rates=[1.2, 1.3, 1.4, 1.5],
equilibrium_order=4, compressible=True, zero_centered=True)
method = create_lb_method(lbm_config=lbm_config)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.2,
equilibrium_order=4, compressible=True, zero_centered=True)
srt = create_lb_method(lbm_config=lbm_config)
rho = method.zeroth_order_equilibrium_moment_symbol
u = method.first_order_equilibrium_moment_symbols
cs_sq = sp.Rational(1, 3)
force_model = Luo(force=sp.symbols(f"F_:{2}"))
eq = (rho, 0, 0, 0, 0, 2 * rho * cs_sq, 0, 0, rho * cs_sq ** 2)
default_moments = cascaded_moment_sets_literature(stencil)
default_moments = [item for sublist in default_moments for item in sublist]
assert method.central_moment_transform_class == BinomialChimeraTransform
assert method.conserved_quantity_computation.density_symbol == rho
assert method.conserved_quantity_computation.velocity_symbols == u
assert method.moment_equilibrium_values == eq
assert method.force_model is None
method.set_force_model(force_model)
assert method.force_model == force_model
assert method.relaxation_matrix[0, 0] == 0
assert method.relaxation_matrix[1, 1] == 0
assert method.relaxation_matrix[2, 2] == 0
method.set_conserved_moments_relaxation_rate(1.9)
assert method.relaxation_matrix[0, 0] == 1.9
assert method.relaxation_matrix[1, 1] == 1.9
assert method.relaxation_matrix[2, 2] == 1.9
moments = list()
for i in method.relaxation_info_dict:
moments.append(i)
assert moments == default_moments
for i in range(len(stencil)):
assert method.relaxation_rates[i] == method.relaxation_matrix[i, i]
M = method.moment_matrix
N = method.shift_matrix
assert M == moment_matrix(moments, stencil=stencil)
assert N == set_up_shift_matrix(moments, stencil=stencil)
assert get_weights(stencil) == method.weights
cqc = method.conserved_quantity_computation
subs = {cqc.density_deviation_symbol : cqc.density_symbol - cqc.background_density}
eq_terms_central = method.get_equilibrium_terms()
eq_terms_srt = srt.get_equilibrium_terms()
assert (eq_terms_central - eq_terms_srt).subs(subs).expand() == sp.Matrix((0,) * stencil.Q)
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9), method=Method.CENTRAL_MOMENT,
relaxation_rates=[1.7, 1.8, 1.2, 1.3, 1.4], equilibrium_order=4,
compressible=True))
assert method.relaxation_matrix[0, 0] == 0
assert method.relaxation_matrix[1, 1] == 0
assert method.relaxation_matrix[2, 2] == 0
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9), method=Method.CENTRAL_MOMENT,
relaxation_rates=[1.3] * 9, equilibrium_order=4,
compressible=True))
np.testing.assert_almost_equal(sum(method.relaxation_rates), 1.3 * 9)
import pytest
import sympy as sp
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_central_moment
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.moment_transforms import PdfsToMomentsByChimeraTransform
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
def test_full_and_delta_equilibrium_equivalence(stencil, compressible):
stencil = LBStencil(stencil)
zero_centered = True
omega = sp.Symbol('omega')
r_rates = (omega,) * stencil.Q
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = {delta_rho: rho - rho_background}
eqs = []
for delta_eq in [False, True]:
method = create_central_moment(stencil, r_rates, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq, equilibrium_order=None)
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == sp.Matrix((0,) * stencil.Q)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('delta_eq', [True, False])
def test_zero_centering_equilibrium_equivalence(stencil, compressible, delta_eq):
stencil = LBStencil(stencil)
omega = sp.Symbol('omega')
r_rates = (omega,) * stencil.Q
weights = sp.Matrix(get_weights(stencil))
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = {delta_rho: rho - rho_background}
eqs = []
for zero_centered in [False, True]:
method = create_central_moment(stencil, r_rates, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq and zero_centered,
equilibrium_order=None)
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == weights
import sympy as sp
from pystencils.stencil import have_same_entries
from lbmpy.moments import get_default_moment_set_for_stencil
from lbmpy.methods.creationfunctions import cascaded_moment_sets_literature
import pytest
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.moment_transforms import (
PdfsToCentralMomentsByMatrix, FastCentralMomentTransform,
BinomialChimeraTransform, PdfsToCentralMomentsByShiftMatrix)
@pytest.mark.parametrize('central_moments', ['monomial', 'polynomial'])
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('transform_class', [BinomialChimeraTransform, FastCentralMomentTransform, PdfsToCentralMomentsByShiftMatrix])
def test_forward_transform(central_moments, stencil, transform_class):
stencil = LBStencil(stencil)
if central_moments == 'monomial':
moment_polynomials = get_default_moment_set_for_stencil(stencil)
elif central_moments == 'polynomial':
moment_polynomials = [item for sublist in cascaded_moment_sets_literature(stencil)
for item in sublist]
pdfs = sp.symbols(f"f_:{stencil.Q}")
rho = sp.Symbol('rho')
u = sp.symbols(f"u_:{stencil.D}")
matrix_transform = PdfsToCentralMomentsByMatrix(stencil, moment_polynomials, rho, u)
test_transform = transform_class(stencil, moment_polynomials, rho, u)
if central_moments == 'monomial' and not have_same_entries(stencil, LBStencil(Stencil.D3Q15)):
assert test_transform.mono_to_poly_matrix == sp.eye(stencil.Q)
else:
assert not test_transform.mono_to_poly_matrix == sp.eye(stencil.Q)
f_to_k_matrix = matrix_transform.forward_transform(pdfs)
f_to_k_matrix = f_to_k_matrix.new_without_subexpressions().main_assignments_dict
f_to_k_test = test_transform.forward_transform(pdfs)
f_to_k_test = f_to_k_test.new_without_subexpressions().main_assignments_dict
cm_symbols = matrix_transform.pre_collision_symbols
for moment_symbol in cm_symbols:
rhs_matrix = f_to_k_matrix[moment_symbol].expand()
rhs_test = f_to_k_test[moment_symbol].expand()
assert (rhs_matrix - rhs_test) == 0, \
f"Mismatch between matrix transform and {transform_class.__name__} at {moment_symbol}."
@pytest.mark.parametrize('central_moments', ['monomial', 'polynomial'])
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('transform_class', [BinomialChimeraTransform, FastCentralMomentTransform, PdfsToCentralMomentsByShiftMatrix])
def test_backward_transform(central_moments, stencil, transform_class):
stencil = LBStencil(stencil)
if central_moments == 'monomial':
moment_polynomials = get_default_moment_set_for_stencil(stencil)
elif central_moments == 'polynomial':
moment_polynomials = [item for sublist in cascaded_moment_sets_literature(stencil)
for item in sublist]
pdfs = sp.symbols(f"f_:{stencil.Q}")
rho = sp.Symbol('rho')
u = sp.symbols(f"u_:{stencil.D}")
matrix_transform = PdfsToCentralMomentsByMatrix(stencil, moment_polynomials, rho, u)
test_transform = transform_class(stencil, moment_polynomials, rho, u)
k_to_f_matrix = matrix_transform.backward_transform(pdfs)
k_to_f_matrix = k_to_f_matrix.new_without_subexpressions().main_assignments_dict
k_to_f_test = test_transform.backward_transform(pdfs)
k_to_f_test = k_to_f_test.new_without_subexpressions().main_assignments_dict
for f in pdfs:
rhs_matrix = k_to_f_matrix[f].expand()
rhs_test = k_to_f_test[f].expand()
assert (rhs_matrix - rhs_test) == 0, \
f"Mismatch between matrix transform and {transform_class.__name__} at {f}."
......@@ -10,31 +10,36 @@ from lbmpy.chapman_enskog.chapman_enskog_higher_order import (
determine_higher_order_moments, get_solvability_conditions)
from lbmpy.chapman_enskog.chapman_enskog_steady_state import (
SteadyStateChapmanEnskogAnalysis, SteadyStateChapmanEnskogAnalysisSRT)
from lbmpy.creationfunctions import create_lb_method
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.forcemodels import Guo
from lbmpy.relaxationrates import lattice_viscosity_from_relaxation_rate
from lbmpy.stencils import LBStencil
from pystencils.fd import Diff, normalize_diff_order
from pystencils.sympyextensions import multidimensional_sum
def test_srt():
for stencil in ['D2Q9', 'D3Q19', 'D3Q27']:
for continuous_eq in (False, True):
omega = sp.Symbol("omega")
print("Analysing %s, ContMaxwellianConstruction %d" % (stencil, continuous_eq))
method = create_lb_method(method='srt', stencil=stencil, compressible=True,
relaxation_rate=omega, maxwellian_moments=continuous_eq)
analysis = ChapmanEnskogAnalysis(method)
omega_value = analysis.relaxation_rate_from_kinematic_viscosity(1)[omega]
assert omega_value, sp.Rational(2 == 7)
@pytest.mark.parametrize('continuous_eq', [False, True])
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
def test_srt(continuous_eq, stencil):
omega = sp.Symbol("omega")
print(f"Analysing {stencil}, ContMaxwellianConstruction {continuous_eq}")
lbm_config = LBMConfig(stencil=LBStencil(stencil), method=Method.SRT, compressible=True,
zero_centered=False, relaxation_rate=omega, continuous_equilibrium=continuous_eq)
method = create_lb_method(lbm_config=lbm_config)
analysis = ChapmanEnskogAnalysis(method)
omega_value = analysis.relaxation_rate_from_kinematic_viscosity(1)[omega]
assert omega_value, sp.Rational(2 == 7)
@pytest.mark.longrun
def test_steady_state_silva_paper_comparison():
eps, tau, lambda_plus, f = sp.symbols("epsilon tau Lambda f")
method = create_lb_method(stencil="D3Q19", compressible=False, relaxation_rate=1 / tau,
maxwellian_moments=False)
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q19), compressible=False, relaxation_rate=1 / tau,
continuous_equilibrium=False, zero_centered=False)
method = create_lb_method(lbm_config=lbm_config)
analysis = SteadyStateChapmanEnskogAnalysis(method)
dim = 3
......@@ -142,7 +147,8 @@ def test_steady_state_silva_paper_comparison():
def test_higher_order_moment_computation():
"""In chapman_enskog_higher_order.py there are some functions to generalize the std Chapman Enskog expansion
These are not used by the Chapman Enskog class yet."""
method = create_lb_method(stencil='D2Q9', method='trt', maxwellian_moments=False)
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9), zero_centered=False,
method=Method.TRT, compressible=False))
mom_comp = LbMethodEqMoments(method)
dim = method.dim
order = 2
......@@ -168,7 +174,8 @@ def test_higher_order_moment_computation():
def test_steady_state():
rr = sp.symbols("omega")
method = create_lb_method(stencil='D2Q9', method='srt', relaxation_rate=rr)
method = create_lb_method(lbm_config=LBMConfig(stencil=LBStencil(Stencil.D2Q9),
method=Method.SRT, relaxation_rate=rr))
a1 = SteadyStateChapmanEnskogAnalysis(method, order=2)
a2 = SteadyStateChapmanEnskogAnalysisSRT(method, order=2)
......
%% Cell type:code id: tags:
``` python
import pystencils as ps
from lbmpy.session import *
from lbmpy.boundaries.boundaries_in_kernel import update_rule_with_push_boundaries
from lbmpy.macroscopic_value_kernels import macroscopic_values_getter, macroscopic_values_setter
from collections import OrderedDict
from time import perf_counter
```
%% Cell type:markdown id: tags:
# Version 1: compile-in boundaries
%% Cell type:code id: tags:
``` python
domain_size = (32, 32, 32)
relaxation_rate = 1.8
time_steps = 100
lid_velocity = 0.05
stencil = LBStencil(Stencil.D3Q19)
```
%% Cell type:code id: tags:
``` python
dh = create_data_handling(domain_size, default_target=ps.Target.CPU)
pdfs = dh.add_array('pdfs', values_per_cell=stencil.Q)
u = dh.add_array('u', values_per_cell=stencil.D)
streaming_pattern = 'aa'
```
%% Cell type:code id: tags:
``` python
boundaries = OrderedDict((
((0, 1, 0), UBB([lid_velocity, 0, 0])),
((1, 0, 0), NoSlip()),
((-1, 0, 0), NoSlip()),
((0, -1, 0), NoSlip()),
((0, 0, 1), NoSlip()),
((0, 0, -1), NoSlip()),
))
lbm_opt = LBMOptimisation(symbolic_field=pdfs, cse_global=False, cse_pdfs=False)
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=relaxation_rate, compressible=False)
cr_even = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
cr_odd = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
update_rule_aa_even = update_rule_with_push_boundaries(cr_even, pdfs, boundaries, streaming_pattern, Timestep.EVEN)
update_rule_aa_odd = update_rule_with_push_boundaries(cr_odd, pdfs, boundaries, streaming_pattern, Timestep.ODD)
getter_assignments = macroscopic_values_getter(update_rule_aa_even.method, velocity=u.center_vector,
pdfs=pdfs, density=None,
streaming_pattern=streaming_pattern,
previous_timestep=Timestep.EVEN)
config = ps.CreateKernelConfig(target=dh.default_target)
getter_kernel = ps.create_kernel(getter_assignments, config=config).compile()
even_kernel = ps.create_kernel(update_rule_aa_even, config=config).compile()
odd_kernel = ps.create_kernel(update_rule_aa_odd, config=config).compile()
```
%% Cell type:code id: tags:
``` python
def init():
dh.fill(pdfs.name, 0, ghost_layers=True)
def aa_time_loop(steps=100):
assert steps % 2 == 0, "Works only for an even number of time steps"
dh.all_to_gpu()
for i in range(steps // 2):
dh.run_kernel(odd_kernel)
dh.run_kernel(even_kernel)
dh.run_kernel(getter_kernel)
dh.all_to_cpu()
```
%% Cell type:code id: tags:
``` python
init()
aa_time_loop(time_steps)
vel_version1 = dh.gather_array(u.name, ghost_layers=False).copy()
plt.vector_field_magnitude(vel_version1[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
<matplotlib.colorbar.Colorbar at 0x11b578710>
%% Cell type:markdown id: tags:
# Version 2: Normal boundary handling
%% Cell type:code id: tags:
``` python
ldc = create_lid_driven_cavity(domain_size, relaxation_rate=relaxation_rate, lid_velocity=lid_velocity)
ldc.run(time_steps)
vel_version2 = ldc.velocity[:, :, :, :]
plt.vector_field_magnitude(vel_version2[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
<matplotlib.colorbar.Colorbar at 0x12e912a90>
%% Cell type:code id: tags:
``` python
np.testing.assert_almost_equal(vel_version1[1:-1, 1:-1, :], vel_version2[1:-1, 1:-1, :], decimal=2)
```
"""
The update equations should not change if a relaxation rate of a conserved quantity (density/velocity)
changes. This test checks that for moment-based methods
"""
from copy import copy
import pytest
import sympy as sp
import math
from lbmpy.enums import Stencil, Method
from lbmpy.methods import create_srt, create_trt, create_trt_kbc, \
create_with_default_polynomial_cumulants
from lbmpy.methods.momentbased.momentbasedmethod import MomentBasedLbMethod
from lbmpy.methods.cumulantbased import CumulantBasedLbMethod
from lbmpy.moments import MOMENT_SYMBOLS
from lbmpy.simplificationfactory import create_simplification_strategy
from lbmpy.stencils import LBStencil
def __change_relaxation_rate_of_conserved_moments(method, new_relaxation_rate=sp.Symbol("test_omega")):
conserved_moments = (sp.Rational(1, 1),) + MOMENT_SYMBOLS[:method.dim]
rr_dict = copy(method.relaxation_rate_dict)
for conserved_moment in conserved_moments:
rr_dict[conserved_moment] = new_relaxation_rate
if isinstance(method, MomentBasedLbMethod):
changed_method = MomentBasedLbMethod(method.stencil, method.equilibrium_distribution, rr_dict,
method.conserved_quantity_computation,
force_model=method.force_model)
elif isinstance(method, CumulantBasedLbMethod):
changed_method = CumulantBasedLbMethod(method.stencil, method.equilibrium_distribution, rr_dict,
method.conserved_quantity_computation,
force_model=method.force_model,
zero_centered=True)
else:
raise ValueError("Not a moment or cumulant-based method")
return changed_method
def check_for_collision_rule_equivalence(collision_rule1, collision_rule2, use_numeric_subs=False):
collision_rule1 = collision_rule1.new_without_subexpressions()
collision_rule2 = collision_rule2.new_without_subexpressions()
if use_numeric_subs:
free_symbols = collision_rule1.free_symbols
free_symbols.update(collision_rule2.free_symbols)
subs_dict = dict()
value = 10.0
for symbol in free_symbols:
subs_dict.update({symbol: value})
value += 1.1
collision_rule1 = collision_rule1.subs(subs_dict)
collision_rule2 = collision_rule2.subs(subs_dict)
for eq1, eq2 in zip(collision_rule1.main_assignments, collision_rule2.main_assignments):
diff = sp.cancel(sp.expand(eq1.rhs - eq2.rhs))
if use_numeric_subs:
assert math.isclose(diff, 0, rel_tol=0.0, abs_tol=1e-10)
else:
assert diff == 0
def check_method_equivalence(m1, m2, do_simplifications, use_numeric_subs=False):
cr1 = m1.get_collision_rule()
cr2 = m2.get_collision_rule()
if do_simplifications:
cr1 = create_simplification_strategy(m1)(cr1)
cr2 = create_simplification_strategy(m2)(cr2)
check_for_collision_rule_equivalence(cr1, cr2, use_numeric_subs)
@pytest.mark.longrun
def test_cumulant():
stencil = LBStencil(Stencil.D2Q9)
original_method = create_with_default_polynomial_cumulants(stencil, [sp.Symbol("omega")], zero_centered=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True, use_numeric_subs=True)
check_method_equivalence(original_method, changed_method, False, use_numeric_subs=True)
@pytest.mark.longrun
def test_srt():
stencil = LBStencil(Stencil.D3Q27)
original_method = create_srt(stencil, sp.Symbol("omega"), compressible=True,
continuous_equilibrium=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True, use_numeric_subs=True)
check_method_equivalence(original_method, changed_method, False, use_numeric_subs=True)
def test_srt_short():
stencil = LBStencil(Stencil.D2Q9)
original_method = create_srt(stencil, sp.Symbol("omega"), compressible=True,
continuous_equilibrium=True)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True, use_numeric_subs=False)
check_method_equivalence(original_method, changed_method, False, use_numeric_subs=False)
@pytest.mark.parametrize('stencil_name', [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('continuous_moments', [False, True])
@pytest.mark.longrun
def test_trt(stencil_name, continuous_moments):
stencil = LBStencil(stencil_name)
original_method = create_trt(stencil, sp.Symbol("omega1"), sp.Symbol("omega2"),
continuous_equilibrium=continuous_moments)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
@pytest.mark.parametrize('method_name', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
def test_trt_kbc(method_name):
dim = 2
method_nr = method_name.name[-1]
original_method = create_trt_kbc(dim, sp.Symbol("omega1"), sp.Symbol("omega2"),
method_name='KBC-N' + method_nr,
continuous_equilibrium=False)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
@pytest.mark.parametrize('method_name', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
@pytest.mark.longrun
def test_trt_kbc_long(method_name):
dim = 3
method_nr = method_name.name[-1]
original_method = create_trt_kbc(dim, sp.Symbol("omega1"), sp.Symbol("omega2"),
method_name='KBC-N' + method_nr,
continuous_equilibrium=False)
changed_method = __change_relaxation_rate_of_conserved_moments(original_method)
check_method_equivalence(original_method, changed_method, True)
check_method_equivalence(original_method, changed_method, False)
from dataclasses import replace
import numpy as np
import pytest
from pystencils import Target, CreateKernelConfig
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.scenarios import create_channel
from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
def run_equivalence_test(domain_size, lbm_config, lbm_opt, base_config, time_steps=13):
config = replace(base_config, target=Target.CPU)
cpu_scenario = create_channel(domain_size=domain_size, pressure_difference=0.001,
lbm_config=lbm_config, lbm_optimisation=lbm_opt, config=config)
config = replace(base_config, target=Target.GPU)
if not IS_PYSTENCILS_2:
from pystencils.enums import Backend
config = replace(config, backend=Backend.CUDA)
gpu_scenario = create_channel(domain_size=domain_size, pressure_difference=0.001,
lbm_config=lbm_config, lbm_optimisation=lbm_opt, config=config)
cpu_scenario.run(time_steps)
gpu_scenario.run(time_steps)
max_vel_error = np.max(np.abs(cpu_scenario.velocity_slice() - gpu_scenario.velocity_slice()))
max_rho_error = np.max(np.abs(cpu_scenario.density_slice() - gpu_scenario.density_slice()))
np.testing.assert_allclose(max_vel_error, 0, atol=1e-14)
np.testing.assert_allclose(max_rho_error, 0, atol=1e-14)
@pytest.mark.parametrize('scenario', [((17, 12), Method.SRT, False, (12, 4), 'reverse_numpy'),
((18, 20), Method.MRT, True, (4, 2), 'zyxf'),
((7, 11, 18), Method.TRT, False, False, 'numpy')])
def test_force_driven_channel_short(scenario):
pytest.importorskip("cupy")
ds = scenario[0]
stencil = LBStencil(Stencil.D2Q9) if len(ds) == 2 else LBStencil(Stencil.D3Q27)
method = scenario[1]
compressible = scenario[2]
block_size = scenario[3]
field_layout = scenario[4]
lbm_config = LBMConfig(stencil=stencil, method=method,
compressible=compressible, relaxation_rates=[1.95, 1.9, 1.92, 1.92])
lbm_opt = LBMOptimisation(field_layout=field_layout)
# Different methods
if block_size is not False:
config = CreateKernelConfig(gpu_indexing_params={'block_size': block_size})
else:
if IS_PYSTENCILS_2:
config = CreateKernelConfig()
config.gpu.indexing_scheme = "blockwise4d"
else:
config = CreateKernelConfig(gpu_indexing='line')
run_equivalence_test(domain_size=ds, lbm_config=lbm_config, lbm_opt=lbm_opt, base_config=config)
import pytest
import sympy as sp
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_with_monomial_cumulants
from lbmpy.maxwellian_equilibrium import get_weights
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
def test_zero_centering_equilibrium_equivalence(stencil):
stencil = LBStencil(stencil)
omega = sp.Symbol('omega')
r_rates = (omega,) * stencil.Q
weights = sp.Matrix(get_weights(stencil))
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = {delta_rho: rho - rho_background}
eqs = []
for zero_centered in [False, True]:
method = create_with_monomial_cumulants(stencil, r_rates, zero_centered=zero_centered)
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == weights
import pytest
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.methods import create_srt
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_with_default_polynomial_cumulants
@pytest.mark.parametrize('stencil_name', [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
def test_weights(stencil_name):
stencil = LBStencil(stencil_name)
cumulant_method = create_with_default_polynomial_cumulants(stencil, [1])
moment_method = create_srt(stencil, 1, compressible=True, continuous_equilibrium=True)
assert cumulant_method.weights == moment_method.weights
import pytest
from itertools import chain
import sympy as sp
from pystencils import AssignmentCollection
from lbmpy.moment_transforms import (
CentralMomentsToCumulantsByGeneratingFunc, PRE_COLLISION_MONOMIAL_CENTRAL_MOMENT,
PRE_COLLISION_CUMULANT
)
from lbmpy.methods import cascaded_moment_sets_literature
from lbmpy.stencils import Stencil, LBStencil
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19])
def test_identity(stencil):
stencil = LBStencil(stencil)
polys = list(chain.from_iterable(cascaded_moment_sets_literature(stencil)))
rho = sp.Symbol('rho')
u = sp.symbols('u_:2')
transform = CentralMomentsToCumulantsByGeneratingFunc(stencil, polys, rho, u,
post_collision_symbol_base=PRE_COLLISION_CUMULANT)
forward_eqs = transform.forward_transform()
backward_eqs = transform.backward_transform(central_moment_base=PRE_COLLISION_MONOMIAL_CENTRAL_MOMENT)
subexpressions = forward_eqs.all_assignments + backward_eqs.subexpressions
main_assignments = backward_eqs.main_assignments
ac = AssignmentCollection(main_assignments, subexpressions=subexpressions)
ac = ac.new_without_subexpressions()
for lhs, rhs in ac.main_assignments_dict.items():
assert (lhs - rhs).expand() == 0
from lbmpy.creationfunctions import create_lb_method
from lbmpy.cumulants import *
from lbmpy.moments import (
discrete_moment, exponent_to_polynomial_representation, exponents_to_polynomial_representations)
from lbmpy.stencils import get_stencil
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
def test_cumulants_from_pdfs():
"""
......@@ -11,11 +10,10 @@ def test_cumulants_from_pdfs():
- directly pdfs to cumulant
- indirect pdfs -> raw moments -> cumulants
"""
stencil = get_stencil("D2Q9")
dim = len(stencil[0])
indices = moments_up_to_component_order(2, dim=dim)
stencil = LBStencil(Stencil.D2Q9)
indices = moments_up_to_component_order(2, dim=stencil.D)
pdf_symbols = sp.symbols("f_:%d" % (len(stencil),))
pdf_symbols = sp.symbols(f"f_:{stencil.Q}")
direct_version = cumulants_from_pdfs(stencil, pdf_symbols=pdf_symbols, cumulant_indices=indices)
polynomial_moment_indices = exponents_to_polynomial_representations(indices)
direct_version2 = cumulants_from_pdfs(stencil, pdf_symbols=pdf_symbols, cumulant_indices=polynomial_moment_indices)
......@@ -32,11 +30,10 @@ def test_cumulants_from_pdfs():
def test_raw_moment_to_cumulant_transformation():
"""Transforms from raw moments to cumulants and back, then checks for identity"""
for stencil in [get_stencil("D2Q9"), get_stencil("D3Q27")]:
dim = len(stencil[0])
indices = moments_up_to_component_order(2, dim=dim)
for stencil in [LBStencil(Stencil.D2Q9), LBStencil(Stencil.D3Q27)]:
indices = moments_up_to_component_order(2, dim=stencil.D)
symbol_format = "m_%d_%d_%d" if dim == 3 else "m_%d_%d"
symbol_format = "m_%d_%d_%d" if stencil.D == 3 else "m_%d_%d"
moment_symbols = {idx: sp.Symbol(symbol_format % idx) for idx in indices}
forward = {idx: cumulant_as_function_of_raw_moments(idx, moments_dict=moment_symbols)
......@@ -48,11 +45,10 @@ def test_raw_moment_to_cumulant_transformation():
def test_central_moment_to_cumulant_transformation():
"""Transforms from central moments to cumulants and back, then checks for identity"""
for stencil in [get_stencil("D2Q9"), get_stencil("D3Q27")]:
dim = len(stencil[0])
indices = moments_up_to_component_order(2, dim=dim)
for stencil in [LBStencil(Stencil.D2Q9), LBStencil(Stencil.D3Q27)]:
indices = moments_up_to_component_order(2, dim=stencil.D)
symbol_format = "m_%d_%d_%d" if dim == 3 else "m_%d_%d"
symbol_format = "m_%d_%d_%d" if stencil.D == 3 else "m_%d_%d"
moment_symbols = {idx: sp.Symbol(symbol_format % idx) for idx in indices}
forward = {idx: cumulant_as_function_of_central_moments(idx, moments_dict=moment_symbols)
......@@ -65,29 +61,6 @@ def test_central_moment_to_cumulant_transformation():
assert backward[idx] == moment_symbols[idx]
def test_collision_rule():
cumulant_method = create_lb_method(stencil='D2Q9', compressible=True, cumulant=True, relaxation_rate=1.5)
cumulant_method.set_first_moment_relaxation_rate(1.5)
assert cumulant_method.force_model is None
assert cumulant_method.zeroth_order_equilibrium_moment_symbol == sp.symbols("rho")
assert cumulant_method.first_order_equilibrium_moment_symbols == sp.symbols("u_:2")
assert all(e.relaxation_rate == 1.5 for e in cumulant_method.relaxation_info_dict.values())
cumulant_method.get_equilibrium_terms()
cr1 = cumulant_method.get_collision_rule(moment_subexpressions=True, post_collision_subexpressions=True,
pre_collision_subexpressions=True)
cr2 = cumulant_method.get_collision_rule(moment_subexpressions=False, post_collision_subexpressions=False,
pre_collision_subexpressions=False)
cr1_inserted = cr1.new_without_subexpressions()
cr2_inserted = cr2.new_without_subexpressions()
t = cr1_inserted.main_assignments[8].rhs - cr2_inserted.main_assignments[8].rhs
assert t == 0
html = cumulant_method._repr_html_()
assert 'Cumulant' in html
def test_cumulants_from_pdf():
res = cumulants_from_pdfs(get_stencil("D2Q9"))
assert res[(0, 0)] == sp.log(sum(sp.symbols("f_:9")))
res = cumulants_from_pdfs(LBStencil(Stencil.D2Q9))
assert res[(0, 0)] == sp.log(sum(sp.symbols(f"f_:{9}")))
import pytest
import numpy as np
from numpy.testing import assert_allclose
import pystencils as ps
import sympy as sp
from lbmpy.enums import Stencil, Method
from lbmpy.stencils import LBStencil
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation, create_lb_function
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter, macroscopic_values_getter
@pytest.mark.parametrize('zero_centered', [False, True])
def test_density_velocity_input(zero_centered):
stencil = LBStencil(Stencil.D2Q9)
dh = ps.create_data_handling((5,5), default_target=ps.Target.CPU)
rho_in = dh.add_array("rho_in", 1)
rho_out = dh.add_array_like("rho_out", "rho_in")
u_in = dh.add_array("u_in", 2)
u_out = dh.add_array_like("u_out", "u_in")
pdfs = dh.add_array("pdfs", stencil.Q)
lb_config = LBMConfig(stencil=Stencil.D2Q9, method=Method.SRT, zero_centered=zero_centered,
relaxation_rate=sp.Integer(1),
density_input=rho_in.center, velocity_input=u_in.center_vector,
kernel_type="collide_only")
lb_opt = LBMOptimisation(symbolic_field=pdfs)
lb_func = create_lb_function(lbm_config=lb_config, lbm_optimisation=lb_opt)
setter = macroscopic_values_setter(lb_func.method, 1, (0, 0), pdfs.center_vector)
setter_kernel = ps.create_kernel(setter).compile()
getter = macroscopic_values_getter(lb_func.method, rho_out.center, u_out.center_vector, pdfs.center_vector)
getter_kernel = ps.create_kernel(getter).compile()
dh.run_kernel(setter_kernel)
dh.cpu_arrays[rho_in.name][1:-1, 1:-1] = 1.0 + 0.1 * np.random.random_sample((5, 5))
dh.cpu_arrays[u_in.name][1:-1, 1:-1] = 0.05 + 0.01 * np.random.random_sample((5, 5, 2))
dh.run_kernel(lb_func)
dh.run_kernel(getter_kernel)
assert_allclose(dh.cpu_arrays[rho_out.name][1:-1, 1:-1], dh.cpu_arrays[rho_in.name][1:-1, 1:-1])
assert_allclose(dh.cpu_arrays[u_out.name][1:-1, 1:-1], dh.cpu_arrays[u_in.name][1:-1, 1:-1])
import math
import pytest
from dataclasses import replace
import pystencils as ps
from pystencils.slicing import slice_from_direction
from lbmpy import pdf_initialization_assignments
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries.boundaryconditions import DiffusionDirichlet, NeumannByCopy
from lbmpy.geometry import add_box_boundary
from lbmpy.stencils import LBStencil
from lbmpy.maxwellian_equilibrium import get_weights
import sympy as sp
import numpy as np
def test_diffusion_boundary():
domain_size = (10, 10)
stencil = LBStencil(Stencil.D2Q9)
weights = get_weights(stencil, c_s_sq=sp.Rational(1, 3))
concentration = 1.0
# Data Handling
dh = ps.create_data_handling(domain_size=domain_size)
dh.add_array('pdfs', values_per_cell=stencil.Q)
dh.fill("pdfs", 0.0, ghost_layers=True)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.8,
compressible=True, zero_centered=False)
method = create_lb_method(lbm_config=lbm_config)
# Boundary Handling
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'pdfs', name="bh")
add_box_boundary(bh, boundary=DiffusionDirichlet(concentration))
bh()
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[0, 1:-2, 4] - 2 * weights[4]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[0, 1:-2, 6] - 2 * weights[6]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[0, 2:-1, 8] - 2 * weights[8]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[-1, 1:-2, 3] - 2 * weights[3]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[-1, 1:-2, 5] - 2 * weights[5]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[-1, 2:-1, 7] - 2 * weights[7]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[1:-2, 0, 1] - 2 * weights[1]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[2:, 0, 5] - 2 * weights[5]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[:-2, 0, 6] - 2 * weights[6]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[1:-2, -1, 2] - 2 * weights[2]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[2:, -1, 7] - 2 * weights[7]) < 1e-14)
assert all(np.abs(dh.gather_array("pdfs", ghost_layers=True)[:-2, -1, 8] - 2 * weights[8]) < 1e-14)
@pytest.mark.longrun
def test_diffusion():
"""
Runs the "Diffusion from Plate in Uniform Flow" benchmark as it is described
in [ch. 8.6.3, The Lattice Boltzmann Method, Krüger et al.].
dC/dy = 0
┌───────────────┐
│ → → → │
C = 0 │ → u → │ dC/dx = 0
│ → → → │
└───────────────┘
C = 1
The analytical solution is given by:
C(x,y) = 1 * erfc(y / sqrt(4Dx/u))
The hydrodynamic field is not simulated, instead a constant velocity is assumed.
"""
pytest.importorskip("cupy")
# Parameters
domain_size = (1600, 160)
omega = 1.38
diffusion = (1 / omega - 0.5) / 3
velocity = 0.05
time_steps = 50000
stencil = LBStencil(Stencil.D2Q9)
target = ps.Target.GPU
# Data Handling
dh = ps.create_data_handling(domain_size=domain_size, default_target=target)
vel_field = dh.add_array('vel_field', values_per_cell=stencil.D)
dh.fill('vel_field', velocity, 0, ghost_layers=True)
dh.fill('vel_field', 0.0, 1, ghost_layers=True)
con_field = dh.add_array('con_field', values_per_cell=1)
dh.fill('con_field', 0.0, ghost_layers=True)
pdfs = dh.add_array('pdfs', values_per_cell=stencil.Q)
dh.fill('pdfs', 0.0, ghost_layers=True)
pdfs_tmp = dh.add_array('pdfs_tmp', values_per_cell=stencil.Q)
dh.fill('pdfs_tmp', 0.0, ghost_layers=True)
# Lattice Boltzmann method
lbm_config = LBMConfig(stencil=stencil, method=Method.MRT, relaxation_rates=[1, 1.5, 1, 1.5, 1],
zero_centered=False,
velocity_input=vel_field, output={'density': con_field}, compressible=True,
weighted=True, kernel_type='stream_pull_collide')
lbm_opt = LBMOptimisation(symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp)
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=True)
method = create_lb_method(lbm_config=lbm_config)
method.set_conserved_moments_relaxation_rate(omega)
lbm_config = replace(lbm_config, lb_method=method)
update_rule = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
kernel = ps.create_kernel(update_rule, config=config).compile()
# PDF initalization
init = pdf_initialization_assignments(method, con_field.center,
vel_field.center_vector, pdfs.center_vector)
dh.run_kernel(ps.create_kernel(init).compile())
dh.all_to_gpu()
# Boundary Handling
bh = LatticeBoltzmannBoundaryHandling(update_rule.method, dh, 'pdfs', name="bh", target=dh.default_target)
add_box_boundary(bh, boundary=NeumannByCopy())
bh.set_boundary(DiffusionDirichlet(0), slice_from_direction('W', dh.dim))
bh.set_boundary(DiffusionDirichlet(1), slice_from_direction('S', dh.dim))
# Timeloop
for i in range(time_steps):
bh()
dh.run_kernel(kernel)
dh.swap("pdfs", "pdfs_tmp")
dh.all_to_cpu()
# Verification
x = np.arange(1, domain_size[0], 1)
y = np.arange(0, domain_size[1], 1)
X, Y = np.meshgrid(x, y)
analytical = np.zeros(domain_size)
analytical[1:, :] = np.vectorize(math.erfc)(Y / np.vectorize(math.sqrt)(4 * diffusion * X / velocity)).transpose()
simulated = dh.gather_array('con_field', ghost_layers=False)
residual = 0
for i in x:
for j in y:
residual += (simulated[i, j] - analytical[i, j]) ** 2
residual = math.sqrt(residual / (domain_size[0] * domain_size[1]))
assert residual < 1e-2
import platform
import numpy as np
import sympy as sp
import pytest
from lbmpy.enums import ForceModel, Method
from lbmpy.scenarios import create_lid_driven_cavity
@pytest.mark.parametrize('method', [Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4])
def test_entropic_methods(method):
if platform.system().lower() == 'windows' and method == Method.TRT_KBC_N4:
pytest.skip("For some reason this test does not run on windows", allow_module_level=True)
sc_kbc = create_lid_driven_cavity((20, 20), method=method,
relaxation_rates=[1.9999, sp.Symbol("omega_free")],
entropic_newton_iterations=3, entropic=True, compressible=True,
zero_centered=False, force=(-1e-10, 0), force_model=ForceModel.LUO)
sc_kbc.run(1000)
assert np.isfinite(np.max(sc_kbc.velocity[:, :]))
from pystencils.stencil import inverse_direction
from lbmpy.stencils import LBStencil
from lbmpy.advanced_streaming.utility import AccessPdfValues, get_timesteps
import pytest
import numpy as np
from pystencils import Target
from pystencils.datahandling import create_data_handling
from lbmpy.boundaries import LatticeBoltzmannBoundaryHandling, SimpleExtrapolationOutflow, ExtrapolationOutflow
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.advanced_streaming.utility import streaming_patterns
from pystencils.slicing import get_ghost_region_slice
from itertools import product
@pytest.mark.parametrize('stencil_enum', [Stencil.D2Q9, Stencil.D3Q27])
@pytest.mark.parametrize('streaming_pattern', streaming_patterns)
def test_pdf_simple_extrapolation(stencil_enum, streaming_pattern):
stencil = LBStencil(stencil_enum)
# Field contains exactly one fluid cell
domain_size = (3,) * stencil.D
for timestep in get_timesteps(streaming_pattern):
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(lbm_config=LBMConfig(stencil=stencil))
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name, streaming_pattern, target=Target.CPU)
# Set up outflows in all directions
for normal_dir in stencil[1:]:
boundary_obj = SimpleExtrapolationOutflow(normal_dir, stencil)
boundary_slice = get_ghost_region_slice(normal_dir)
bh.set_boundary(boundary_obj, boundary_slice)
pdf_arr = dh.cpu_arrays[pdf_field.name]
# Set up the domain with artificial PDF values
# center = (1,) * dim
out_access = AccessPdfValues(stencil, streaming_pattern, timestep, 'out')
for cell in product(*(range(1, 4) for _ in range(stencil.D))):
for q in range(stencil.Q):
out_access.write_pdf(pdf_arr, cell, q, q)
# Do boundary handling
bh(prev_timestep=timestep)
# Check PDF values
in_access = AccessPdfValues(stencil, streaming_pattern, timestep.next(), 'in')
# Inbound in center cell
for cell in product(*(range(1, 4) for _ in range(stencil.D))):
for q in range(stencil.Q):
f = in_access.read_pdf(pdf_arr, cell, q)
assert f == q
def test_extrapolation_outflow_initialization_by_copy():
stencil = LBStencil(Stencil.D2Q9)
domain_size = (1, 5)
streaming_pattern = 'esotwist'
zeroth_timestep = 'even'
pdf_acc = AccessPdfValues(stencil, streaming_pattern=streaming_pattern,
timestep=zeroth_timestep, streaming_dir='out')
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
pdf_arr = dh.cpu_arrays[pdf_field.name]
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name,
streaming_pattern=streaming_pattern, target=Target.CPU)
for y in range(1, 6):
for j in range(stencil.Q):
pdf_acc.write_pdf(pdf_arr, (1, y), j, j)
normal_dir = (1, 0)
outflow = ExtrapolationOutflow(normal_dir, lb_method, streaming_pattern=streaming_pattern,
zeroth_timestep=zeroth_timestep)
boundary_slice = get_ghost_region_slice(normal_dir)
bh.set_boundary(outflow, boundary_slice)
bh.prepare()
blocks = list(dh.iterate())
index_list = blocks[0][bh._index_array_name].boundary_object_to_index_list[outflow]
assert len(index_list) == 13
for entry in index_list:
direction = stencil[entry["dir"]]
inv_dir = stencil.index(inverse_direction(direction))
assert entry[f'pdf'] == inv_dir
assert entry[f'pdf_nd'] == inv_dir
def test_extrapolation_outflow_initialization_by_callback():
stencil = LBStencil(Stencil.D2Q9)
domain_size = (1, 5)
streaming_pattern = 'esotwist'
zeroth_timestep = 'even'
dh = create_data_handling(domain_size, default_target=Target.CPU)
lb_method = create_lb_method(stencil=stencil)
pdf_field = dh.add_array('f', values_per_cell=stencil.Q)
dh.fill(pdf_field.name, np.nan, ghost_layers=True)
bh = LatticeBoltzmannBoundaryHandling(lb_method, dh, pdf_field.name,
streaming_pattern=streaming_pattern, target=Target.CPU)
normal_dir = (1, 0)
outflow = ExtrapolationOutflow(normal_direction=normal_dir, lb_method=lb_method,
streaming_pattern=streaming_pattern,
zeroth_timestep=zeroth_timestep,
initial_density=lambda x, y: 1,
initial_velocity=lambda x, y: (0, 0))
boundary_slice = get_ghost_region_slice(normal_dir)
bh.set_boundary(outflow, boundary_slice)
bh.prepare()
weights = [w.evalf() for w in lb_method.weights]
blocks = list(dh.iterate())
index_list = blocks[0][bh._index_array_name].boundary_object_to_index_list[outflow]
assert len(index_list) == 13
for entry in index_list:
direction = stencil[entry["dir"]]
inv_dir = stencil.index(inverse_direction(direction))
assert entry[f'pdf_nd'] == weights[inv_dir]
......@@ -87,7 +87,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
......@@ -101,7 +101,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.9.7"
}
},
"nbformat": 4,
......
import pytest
import pystencils as ps
from lbmpy.creationfunctions import create_lb_function, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
@pytest.mark.parametrize("double_precision", [False, True])
@pytest.mark.parametrize(
"method_enum", [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT]
)
def test_creation(double_precision, method_enum):
"""Simple test that makes sure that only float variables are created"""
lbm_config = LBMConfig(method=method_enum, relaxation_rate=1.5, compressible=True)
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig(
default_dtype="float64" if double_precision else "float32"
)
else:
config = ps.CreateKernelConfig(
data_type="float64" if double_precision else "float32",
default_number_float="float64" if double_precision else "float32",
)
func = create_lb_function(lbm_config=lbm_config, config=config)
code = ps.get_code_str(func)
if double_precision:
assert "float" not in code
assert "double" in code
else:
assert "double" not in code
assert "float" in code
@pytest.mark.parametrize("numeric_type", ["float32", "float64"])
@pytest.mark.parametrize(
"method_enum", [Method.SRT, Method.CENTRAL_MOMENT, Method.CUMULANT]
)
def test_scenario(method_enum, numeric_type):
lbm_config = LBMConfig(
stencil=LBStencil(Stencil.D3Q27),
method=method_enum,
relaxation_rate=1.45,
compressible=True,
)
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig(
default_dtype=numeric_type
)
else:
config = ps.CreateKernelConfig(
data_type=numeric_type,
default_number_float=numeric_type
)
sc = create_lid_driven_cavity((16, 16, 8), lbm_config=lbm_config, config=config)
sc.run(1)
code = ps.get_code_str(sc.ast)
if numeric_type == "float64":
assert "float" not in code
assert "double" in code
else:
assert "double" not in code
assert "float" in code
"""Tests velocity and stress fluctuations for thermalized LB"""
import pytest
import pystencils as ps
from lbmpy._compat import IS_PYSTENCILS_2
from pystencils import get_code_str
if IS_PYSTENCILS_2:
pass
else:
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set
from pystencils.cpu.cpujit import get_compiler_config
from pystencils.enums import Target
from lbmpy.creationfunctions import *
from lbmpy.forcemodels import Guo
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
import numpy as np
from lbmpy.enums import Stencil
from lbmpy.moments import is_bulk_moment, is_shear_moment, get_order
from lbmpy.stencils import LBStencil
if not IS_PYSTENCILS_2:
def _skip_instruction_sets_windows(instruction_sets):
if get_compiler_config()["os"] == "windows":
# skip instruction sets supported by the CPU but not by the compiler
if "avx" in instruction_sets and (
"/arch:avx2" not in get_compiler_config()["flags"].lower()
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx")
if (
"avx512" in instruction_sets
and "/arch:avx512" not in get_compiler_config()["flags"].lower()
):
instruction_sets.remove("avx512")
return instruction_sets
INSTRUCTION_SETS = get_supported_instruction_sets()
else:
INSTRUCTION_SETS = Target.available_vector_cpu_targets()
def single_component_maxwell(x1, x2, kT, mass):
"""Integrate the probability density from x1 to x2 using the trapezoidal rule"""
x = np.linspace(x1, x2, 1000)
try:
trapezoid = np.trapezoid # since numpy 2.0
except AttributeError:
trapezoid = np.trapz
return trapezoid(np.exp(-mass * x**2 / (2.0 * kT)), x) / np.sqrt(
2.0 * np.pi * kT / mass
)
def rr_getter(moment_group):
"""Maps a group of moments to a relaxation rate (shear, bulk, even, odd)
in the 4 relaxation time thermalized LB model
"""
is_shear = [is_shear_moment(m, 3) for m in moment_group]
is_bulk = [is_bulk_moment(m, 3) for m in moment_group]
order = [get_order(m) for m in moment_group]
assert min(order) == max(order)
order = order[0]
if order < 2:
return [0] * len(moment_group)
elif any(is_bulk):
assert all(is_bulk)
return [sp.Symbol("omega_bulk")] * len(moment_group)
elif any(is_shear):
assert all(is_shear)
return [sp.Symbol("omega_shear")] * len(moment_group)
elif order % 2 == 0:
assert order > 2
return [sp.Symbol("omega_even")] * len(moment_group)
else:
return [sp.Symbol("omega_odd")] * len(moment_group)
def second_order_moment_tensor_assignments(function_values, stencil, output_field):
"""Assignments for calculating the pressure tensor"""
assert len(function_values) == len(stencil)
dim = len(stencil[0])
return [
ps.Assignment(
output_field(i, j),
sum(c[i] * c[j] * f for f, c in zip(function_values, stencil)),
)
for i in range(dim)
for j in range(dim)
]
def add_pressure_output_to_collision_rule(collision_rule, pressure_field):
pressure_ouput = second_order_moment_tensor_assignments(
collision_rule.method.pre_collision_pdf_symbols,
collision_rule.method.stencil,
pressure_field,
)
collision_rule.main_assignments = collision_rule.main_assignments + pressure_ouput
def get_fluctuating_lb(
size=None,
kT=None,
omega_shear=None,
omega_bulk=None,
omega_odd=None,
omega_even=None,
rho_0=None,
target=None,
zero_centered: bool = False,
):
# Parameters
stencil = LBStencil(Stencil.D3Q19)
# Setup data handling
dh = ps.create_data_handling(
(size,) * stencil.D, periodicity=True, default_target=target
)
src = dh.add_array("src", values_per_cell=stencil.Q, layout="f")
dst = dh.add_array_like("dst", "src")
rho = dh.add_array("rho", layout="f", latex_name="\\rho", values_per_cell=1)
u = dh.add_array("u", values_per_cell=dh.dim, layout="f")
pressure_field = dh.add_array(
"pressure", values_per_cell=(3, 3), layout="f", gpu=target == Target.GPU
)
force_field = dh.add_array(
"force", values_per_cell=stencil.D, layout="f", gpu=target == Target.GPU
)
# Method setup
lbm_config = LBMConfig(
stencil=stencil,
method=Method.MRT,
compressible=True,
weighted=True,
zero_centered=zero_centered,
relaxation_rates=rr_getter,
force_model=Guo(force=force_field.center_vector),
fluctuating={"temperature": kT},
kernel_type="collide_only",
)
lb_method = create_lb_method(lbm_config=lbm_config)
lbm_config.lb_method = lb_method
lbm_opt = LBMOptimisation(symbolic_field=src, cse_global=True)
collision_rule = create_lb_collision_rule(
lbm_config=lbm_config, lbm_optimisation=lbm_opt
)
# add_pressure_output_to_collision_rule(collision_rule, pressure_field)
collision = create_lb_update_rule(
collision_rule=collision_rule, lbm_config=lbm_config, lbm_optimisation=lbm_opt
)
stream = create_stream_pull_with_output_kernel(
collision.method,
src,
dst,
{"density": rho, "velocity": u, "moment2": pressure_field},
)
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
# Compile kernels
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
sync_pdfs = dh.synchronization_function([src.name])
# Initialization
init = macroscopic_values_setter(
collision.method,
velocity=(0,) * dh.dim,
pdfs=src.center_vector,
density=rho_0
)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
dh.fill(rho.name, rho_0)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0)
dh.fill(force_field.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(force_field.name, 0)
dh.run_kernel(init_kernel)
# time loop
def time_loop(start, steps):
dh.all_to_gpu()
for i in range(start, start + steps):
dh.run_kernel(
collision_kernel,
omega_shear=omega_shear,
omega_bulk=omega_bulk,
omega_odd=omega_odd,
omega_even=omega_even,
seed=42,
time_step=i,
)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
return start + steps
return dh, time_loop
@pytest.mark.parametrize(
"zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
)
@pytest.mark.parametrize(
"domain_size", [8, 60]
)
def test_resting_fluid(zero_centered: bool, domain_size: int, target=Target.CPU):
rho_0 = 0.86
kT = 4e-4
L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(
size=L[0],
target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.04,
omega_odd=0.3,
zero_centered=zero_centered,
)
# Test
t = 0
# warm up
t = time_loop(t, 10)
# Measurement
for i in range(10):
t = time_loop(t, 5)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservationo
# density per cell fluctuates, but toal mass is conserved
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(momentum, [0, 0, 0], atol=1e-10)
# temperature (fluctuates around pre-set kT)
kinetic_energy = 1 / 2 * np.dot(res_rho, res_u * res_u) / np.prod(L)
kT_tol = 0.075 *(16/domain_size)**(3/2)
np.testing.assert_allclose(kinetic_energy, [kT / 2] * 3, rtol=kT_tol)
# velocity distribution
v_hist, v_bins = np.histogram(
res_u, bins=11, range=(-0.075, 0.075), density=True
)
# Calculate expected values from single
v_expected = []
for j in range(len(v_hist)):
# Maxwell distribution
res = (
1.0
/ (v_bins[j + 1] - v_bins[j])
* single_component_maxwell(v_bins[j], v_bins[j + 1], kT, rho_0)
)
v_expected.append(res)
v_expected = np.array(v_expected)
hist_tol_all = 0.75 *(16/domain_size)**(3/2)
np.testing.assert_allclose(v_hist, v_expected, rtol=hist_tol_all)
hist_tol_center = hist_tol_all/10
remove = 3
np.testing.assert_allclose(
v_hist[remove:-remove], v_expected[remove:-remove], rtol=hist_tol_center
)
# pressure tensor against expressions from
# Duenweg, Schiller, Ladd, https://arxiv.org/abs/0707.1581
res_pressure = dh.gather_array("pressure").reshape((-1, 3, 3))
c_s = np.sqrt(1 / 3) # speed of sound
# average of pressure tensor
# Diagonal elements are rho c_s^22 +<u,u>. When the fluid is
# thermalized, the expectation value of <u,u> = kT due to the
# equi-partition theorem.
p_av_expected = np.diag([rho_0 * c_s**2 + kT] * 3)
pressure_atol = c_s**2 / 200 *(16/domain_size)**(3/2)
np.testing.assert_allclose(
np.mean(res_pressure, axis=0), p_av_expected, atol=pressure_atol)
@pytest.mark.parametrize(
"zero_centered", [False, True], ids=["regular-storage", "zero-centered"]
)
@pytest.mark.parametrize(
"domain_size", [8, 60]
)
def test_point_force(zero_centered: bool, domain_size: int, target=Target.CPU):
"""Test momentum balance for thermalized fluid with applied poitn forces"""
rho_0 = 0.86
kT = 4e-4
L = [domain_size] * 3
dh, time_loop = get_fluctuating_lb(
size=L[0],
target=target,
rho_0=rho_0,
kT=kT,
omega_shear=0.8,
omega_bulk=0.5,
omega_even=0.8,
omega_odd=0.8,
zero_centered=zero_centered
)
# Test
t = 0
# warm up
t = time_loop(t, 100)
introduced_momentum = np.zeros(3)
for i in range(100):
point_force = 1e-2/domain_size**(3/2) * (np.random.random(3) - 0.5)
introduced_momentum += point_force
# Note that ghost layers are included in the indexing
force_pos = np.random.randint(1, L[0] - 2, size=3)
dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = point_force
t = time_loop(t, 1)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservation
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(
momentum, introduced_momentum + 0.5 * point_force, atol=1e-10
)
dh.cpu_arrays["force"][force_pos[0], force_pos[1], force_pos[2]] = np.zeros(3)
@pytest.mark.skipif(
not INSTRUCTION_SETS, reason="No vector instruction sets supported"
)
@pytest.mark.parametrize("data_type", ("float32", "float64"))
@pytest.mark.parametrize("assume_aligned", (True, False))
@pytest.mark.parametrize("assume_inner_stride_one", (True, False))
@pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
@pytest.mark.xfail(IS_PYSTENCILS_2, reason="Vectorization for RNGs not implemented yet")
def test_vectorization(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
method = create_mrt_orthogonal(
stencil=stencil, compressible=True, weighted=True, relaxation_rates=rr_getter
)
if IS_PYSTENCILS_2:
seed = ps.TypedSymbol("seed", np.uint32)
rng = ps.random.Philox("phil", data_type, seed)
fluct_options = {
"temperature": sp.Symbol("kT"),
"rng": rng,
}
else:
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
fluct_options = {
"temperature": sp.Symbol("kT"),
"rng_node": rng_node,
"block_offsets": tuple([0] * stencil.D),
}
lbm_config = LBMConfig(
lb_method=method,
fluctuating=fluct_options,
compressible=True,
zero_centered=False,
stencil=method.stencil,
kernel_type="collide_only",
)
lbm_opt = LBMOptimisation(
cse_global=True, symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp
)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
if not IS_PYSTENCILS_2:
instruction_sets = _skip_instruction_sets_windows(INSTRUCTION_SETS)
else:
instruction_sets = INSTRUCTION_SETS
instruction_set = instruction_sets[-1]
if IS_PYSTENCILS_2:
config = ps.CreateKernelConfig()
config.target = instruction_set
config.default_dtype = data_type
config.cpu.vectorize.enable = True
config.cpu.vectorize.assume_aligned = assume_aligned
config.cpu.vectorize.assume_inner_stride_one = assume_inner_stride_one
else:
config = ps.CreateKernelConfig(
target=Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info={
"instruction_set": instruction_set,
"assume_aligned": assume_aligned,
"assume_inner_stride_one": assume_inner_stride_one,
"assume_sufficient_line_padding": assume_sufficient_line_padding,
},
)
if not assume_inner_stride_one and "storeS" not in get_vector_instruction_set(
data_type, instruction_set
):
with pytest.warns(UserWarning) as pytest_warnings:
ast = ps.create_kernel(collision, config=config)
assert "Could not vectorize loop" in pytest_warnings[0].message.args[0]
else:
ast = ps.create_kernel(collision, config=config)
ast.compile()
code = get_code_str(ast)
print(code)
@pytest.mark.parametrize("data_type", ("float32", "float64"))
@pytest.mark.parametrize("assume_aligned", (True, False))
@pytest.mark.parametrize("assume_inner_stride_one", (True, False))
@pytest.mark.parametrize("assume_sufficient_line_padding", (True, False))
@pytest.mark.skipif(IS_PYSTENCILS_2, reason="waLBerla block offsets feature unavailable")
def test_fluctuating_lb_issue_188_wlb(
data_type, assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding
):
stencil = LBStencil(Stencil.D3Q19)
temperature = sp.symbols("temperature")
pdfs, pdfs_tmp = ps.fields(
f"pdfs({stencil.Q}), pdfs_tmp({stencil.Q}): {data_type}[3D]", layout="fzyx"
)
rng_node = (
ps.rng.PhiloxTwoDoubles if data_type == "float64" else ps.rng.PhiloxFourFloats
)
fluctuating = {
"temperature": temperature,
"block_offsets": "walberla",
"rng_node": rng_node,
}
lbm_config = LBMConfig(
stencil=stencil,
method=Method.MRT,
compressible=True,
weighted=True,
zero_centered=False,
relaxation_rate=1.4,
fluctuating=fluctuating,
)
lbm_opt = LBMOptimisation(
symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp, cse_global=True
)
up = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
cpu_vectorize_info = {
"instruction_set": "avx",
"assume_inner_stride_one": True,
"assume_aligned": True,
}
config = ps.CreateKernelConfig(
target=ps.Target.CPU,
data_type=data_type,
default_number_float=data_type,
cpu_vectorize_info=cpu_vectorize_info,
)
ast = create_kernel(up, config=config)
code = ps.get_code_str(ast)
# print(code)
if data_type == "float32":
assert "0.5f" in code
assert "_mm256_mul_ps" in code
assert "_mm256_sqrt_ps" in code
else:
assert "0.5f" not in code
assert "_mm256_mul_pd" in code
assert "_mm256_sqrt_pd" in code
import pytest
import numpy as np
from numpy.testing import assert_allclose
import sympy as sp
import pystencils as ps
from pystencils import Target
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Stencil, Method, ForceModel
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter, macroscopic_values_getter
from lbmpy.moments import (is_bulk_moment, moments_up_to_component_order,
exponents_to_polynomial_representations, exponent_tuple_sort_key)
from lbmpy.stencils import LBStencil
# all force models available are defined in the ForceModel enum, but Cumulant is not a "real" force model
force_models = [f for f in ForceModel if f is not ForceModel.CENTRALMOMENT]
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("omega", [0.5, 1.5])
def test_total_momentum(method_enum, zero_centered, force_model, omega):
if method_enum == Method.CUMULANT and \
force_model not in (ForceModel.SIMPLE, ForceModel.LUO, ForceModel.GUO, ForceModel.HE):
return
L = (16, 16)
stencil = LBStencil(Stencil.D2Q9)
F = (2e-4, -3e-4)
dh = ps.create_data_handling(L, periodicity=True, default_target=Target.CPU)
src = dh.add_array('src', values_per_cell=stencil.Q)
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho', values_per_cell=1)
u = dh.add_array('u', values_per_cell=stencil.D)
lbm_config = LBMConfig(method=method_enum, stencil=stencil, relaxation_rate=omega,
compressible=True, zero_centered=zero_centered,
force_model=force_model, force=F, streaming_pattern='pull')
lbm_opt = LBMOptimisation(symbolic_field=src)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
collision_kernel = ps.create_kernel(collision, config=config).compile()
fluid_density = 1.1
def init():
dh.fill(ρ.name, fluid_density)
dh.fill(u.name, 0)
setter = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
pdfs=src, density=ρ.center,
set_pre_collision_pdfs=True)
kernel = ps.create_kernel(setter).compile()
dh.run_kernel(kernel)
sync_pdfs = dh.synchronization_function([src.name])
getter = macroscopic_values_getter(collision.method, ρ.center, u.center_vector, src, use_pre_collision_pdfs=True)
getter_kernel = ps.create_kernel(getter).compile()
def time_loop(steps):
dh.all_to_gpu()
for _ in range(steps):
dh.run_kernel(collision_kernel)
dh.swap(src.name, dst.name)
sync_pdfs()
dh.all_to_cpu()
t = 20
init()
time_loop(t)
dh.run_kernel(getter_kernel)
# Check that density did not change
assert_allclose(dh.gather_array(ρ.name), fluid_density)
# Check for correct velocity
total = np.sum(dh.gather_array(u.name), axis=(0, 1))
assert_allclose(total / np.prod(L) / F * fluid_density / t, 1)
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_modes(force_model, compressible):
"""check force terms in mode space"""
_check_modes(LBStencil(Stencil.D2Q9), force_model, compressible)
@pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_modes_longrun(stencil, force_model, compressible):
"""check force terms in mode space"""
_check_modes(LBStencil(stencil), force_model, compressible)
@pytest.mark.parametrize("force_model", force_models)
def test_momentum_density_shift(force_model):
target = Target.CPU
stencil = LBStencil(Stencil.D2Q9)
domain_size = (4, 4)
dh = ps.create_data_handling(domain_size=domain_size, default_target=target)
rho = dh.add_array('rho', values_per_cell=1)
dh.fill('rho', 0.0, ghost_layers=True)
momentum_density = dh.add_array('momentum_density', values_per_cell=dh.dim)
dh.fill('momentum_density', 0.0, ghost_layers=True)
src = dh.add_array('src', values_per_cell=len(stencil))
dh.fill('src', 0.0, ghost_layers=True)
lbm_config = LBMConfig(method=Method.SRT, compressible=True, force_model=force_model,
force=(1, 2))
method = create_lb_method(lbm_config=lbm_config)
cqc = method.conserved_quantity_computation
momentum_density_getter = cqc.output_equations_from_pdfs(src.center_vector,
{'density': rho.center,
'momentum_density': momentum_density.center_vector})
config = ps.CreateKernelConfig(target=dh.default_target)
momentum_density_ast = ps.create_kernel(momentum_density_getter, config=config)
momentum_density_kernel = momentum_density_ast.compile()
dh.run_kernel(momentum_density_kernel)
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 0]) == np.prod(domain_size) / 2
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 1]) == np.prod(domain_size)
@pytest.mark.parametrize('force_model', force_models)
def test_forcing_space_equivalences(force_model):
if force_model == ForceModel.HE:
# We don't expect equivalence for the He model since its
# moments are derived from the continuous maxwellian
return
stencil = LBStencil(Stencil.D3Q27)
force = sp.symbols(f"F_:{stencil.D}")
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, force=force, force_model=force_model)
fmodel = lbm_config.force_model
lb_method = create_lb_method(lbm_config=lbm_config)
inv_moment_matrix = lb_method.moment_matrix.inv()
force_pdfs = sp.Matrix(fmodel(lb_method))
force_moments = fmodel.moment_space_forcing(lb_method)
diff = (force_pdfs - (inv_moment_matrix * force_moments)).expand()
for i, d in enumerate(diff):
assert d == 0, f"Mismatch between population and moment space forcing " \
f"in force model {force_model}, population f_{i}"
@pytest.mark.parametrize("force_model", [ForceModel.GUO, ForceModel.BUICK, ForceModel.SHANCHEN])
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("method", [Method.SRT, Method.TRT, Method.MRT])
def test_literature(force_model, stencil, method):
# Be aware that the choice of the conserved moments does not affect the forcing although omega is introduced
# in the forcing vector then. The reason is that:
# m_{100}^{\ast} = m_{100} + \omega ( m_{100}^{eq} - m_{100} ) + \left( 1 - \frac{\omega}{2} \right) F_x
# always simplifies to:
# m_{100}^{\ast} = m_{100} + F_x
# Thus the relaxation rate gets cancled again.
stencil = LBStencil(stencil)
omega_s = sp.Symbol("omega_s")
omega_b = sp.Symbol("omega_b")
omega_o = sp.Symbol("omega_o")
omega_e = sp.Symbol("omega_e")
if method == Method.SRT:
rrs = [omega_s]
omega_o = omega_b = omega_e = omega_s
elif method == Method.TRT:
rrs = [omega_e, omega_o]
omega_s = omega_b = omega_e
else:
rrs = [omega_s, omega_b, omega_o, omega_e, omega_o, omega_e]
F = sp.symbols(f"F_:{stencil.D}")
lbm_config = LBMConfig(method=method, weighted=True, stencil=stencil, relaxation_rates=rrs,
compressible=force_model != ForceModel.BUICK,
force_model=force_model, force=F)
lb_method = create_lb_method(lbm_config=lbm_config)
omega_momentum = list(set(lb_method.relaxation_rates[1:stencil.D + 1]))
assert len(omega_momentum) == 1
omega_momentum = omega_momentum[0]
subs_dict = lb_method.subs_dict_relaxation_rate
force_term = sp.simplify(lb_method.force_model(lb_method).subs(subs_dict))
u = sp.Matrix(lb_method.first_order_equilibrium_moment_symbols)
rho = lb_method.conserved_quantity_computation.density_symbol
# see silva2020 for nomenclature
F = sp.Matrix(F)
uf = sp.Matrix(u).dot(F)
F2 = sp.Matrix(F).dot(sp.Matrix(F))
Fq = sp.zeros(stencil.Q, 1)
uq = sp.zeros(stencil.Q, 1)
for i, cq in enumerate(stencil):
Fq[i] = sp.Matrix(cq).dot(sp.Matrix(F))
uq[i] = sp.Matrix(cq).dot(u)
common_plus = 3 * (1 - omega_e / 2)
common_minus = 3 * (1 - omega_momentum / 2)
result = []
if method == Method.MRT and force_model == ForceModel.GUO:
# check against eq. 4.68 from schiller2008thermal
uf = u.dot(F) * sp.eye(len(F))
G = (u * F.transpose() + F * u.transpose() - uf * sp.Rational(2, lb_method.dim)) * sp.Rational(1, 2) * \
(2 - omega_s) + uf * sp.Rational(1, lb_method.dim) * (2 - omega_b)
for direction, w_i in zip(lb_method.stencil, lb_method.weights):
direction = sp.Matrix(direction)
tr = sp.trace(G * (direction * direction.transpose() - sp.Rational(1, 3) * sp.eye(len(F))))
result.append(3 * w_i * (F.dot(direction) + sp.Rational(3, 2) * tr))
elif force_model == ForceModel.GUO:
# check against table 2 in silva2020 (correct for SRT and TRT), matches eq. 20 from guo2002discrete (for SRT)
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf)
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.BUICK:
# check against table 2 in silva2020 (correct for all collision models due to the simplicity of Buick),
# matches eq. 18 from silva2010 (for SRT)
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = 0
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.EDM:
# check against table 2 in silva2020
if method == Method.MRT:
# for mrt no literature terms are known at the time of writing this test case.
# However it is most likly correct since SRT and TRT are derived from the moment space representation
pytest.skip()
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf) + ((w_i / (8 * rho)) * (3 * Fq[i] ** 2 - F2))
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
elif force_model == ForceModel.SHANCHEN:
# check against table 2 in silva2020
if method == Method.MRT:
# for mrt no literature terms are known at the time of writing this test case.
# However it is most likly correct since SRT and TRT are derived from the moment space representation
pytest.skip()
Sq_plus = sp.zeros(stencil.Q, 1)
Sq_minus = sp.zeros(stencil.Q, 1)
for i, w_i in enumerate(lb_method.weights):
Sq_plus[i] = common_plus * w_i * (3 * uq[i] * Fq[i] - uf) + common_plus ** 2 * (
(w_i / (2 * rho)) * (3 * Fq[i] ** 2 - F2))
Sq_minus[i] = common_minus * w_i * Fq[i]
result = Sq_plus + Sq_minus
assert list(sp.simplify(force_term - sp.Matrix(result))) == [0] * len(stencil)
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_modes_central_moment(force_model, compressible):
"""check force terms in mode space"""
stencil = LBStencil(Stencil.D2Q9)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
def test_symmetric_forcing_equivalence(force_model, compressible):
stencil = LBStencil(Stencil.D2Q9)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
moments = moments_up_to_component_order(2, dim=2)
moments = sorted(moments, key=exponent_tuple_sort_key)
moment_polys = exponents_to_polynomial_representations(moments)
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
nested_moments=moment_polys, compressible=True, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
if not method.force_model.has_symmetric_central_moment_forcing:
return
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.central_moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
force_before, force_after = method.force_model.symmetric_central_moment_forcing(method, moments)
d = method.relaxation_matrix
eye = sp.eye(stencil.Q)
force_combined = (eye - d) @ force_before + force_after
assert (force_moments - force_combined).expand() == sp.Matrix([0] * stencil.Q)
@pytest.mark.parametrize("stencil", [Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_modes_central_moment_longrun(stencil, force_model, compressible):
"""check force terms in mode space"""
stencil = LBStencil(stencil)
omega_s = sp.Symbol("omega_s")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.CENTRAL_MOMENT, stencil=stencil, relaxation_rate=omega_s,
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
def _check_modes(stencil, force_model, compressible):
omega_s = sp.Symbol("omega_s")
omega_b = sp.Symbol("omega_b")
omega_o = sp.Symbol("omega_o")
omega_e = sp.Symbol("omega_e")
F = list(sp.symbols(f"F_:{stencil.D}"))
lbm_config = LBMConfig(method=Method.MRT, stencil=stencil,
relaxation_rates=[omega_s, omega_b, omega_o, omega_e, omega_o, omega_e],
compressible=compressible, force_model=force_model, force=tuple(F))
method = create_lb_method(lbm_config=lbm_config)
subs_dict = method.subs_dict_relaxation_rate
force_moments = method.force_model.moment_space_forcing(method)
force_moments = force_moments.subs(subs_dict)
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:stencil.D + 1]) == F
if force_model == ForceModel.GUO:
num_stresses = (stencil.D * stencil.D - stencil.D) // 2 + stencil.D
lambda_s, lambda_b = -omega_s, -omega_b
# The stress moments should match eq. 47 from https://doi.org/10.1023/A:1010414013942
u = method.first_order_equilibrium_moment_symbols
def traceless(m):
tr = sp.simplify(sum([m[i, i] for i in range(stencil.D)]))
return m - tr / m.shape[0] * sp.eye(m.shape[0])
C = sp.Rational(1, 2) * (2 + lambda_s) * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) +
traceless(sp.Matrix(F) * sp.Matrix(u).transpose())) + \
sp.Rational(1, method.dim) * (2 + lambda_b) * sp.Matrix(u).dot(F) * sp.eye(method.dim)
subs = {sp.Symbol(chr(ord("x") + i)) * sp.Symbol(chr(ord("x") + j)): C[i, j]
for i in range(stencil.D) for j in range(stencil.D)}
for force_moment, moment in zip(force_moments[stencil.D + 1:stencil.D + 1 + num_stresses],
method.moments[stencil.D + 1:stencil.D + 1 + num_stresses]):
ref = moment.subs(subs)
diff = sp.simplify(ref - force_moment)
if is_bulk_moment(moment, stencil.D):
assert diff == 0 or isinstance(diff, sp.Rational) # difference should be zero or a constant
else:
assert diff == 0 # difference should be zero
ff = method.moment_matrix.inv() * sp.Matrix(method.force_model.moment_space_forcing(method).subs(subs_dict))
# Check eq. 4.53a from schiller2008thermal
assert sp.simplify(sum(ff)) == 0
# Check eq. 4.53b from schiller2008thermal
assert [sp.simplify(sum(ff[i] * stencil[i][j] for i in range(len(stencil)))) for j in range(stencil.D)] == F
# Check eq. 4.61a from schiller2008thermal
ref = (2 + lambda_s) / 2 * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) +
traceless(sp.Matrix(F) * sp.Matrix(u).transpose()))
s = sp.zeros(stencil.D)
for i in range(0, len(stencil)):
s += ff[i] * traceless(sp.Matrix(stencil[i]) * sp.Matrix(stencil[i]).transpose())
assert sp.simplify(s - ref) == sp.zeros(stencil.D)
# Check eq. 4.61b from schiller2008thermal
assert sp.simplify(sum(ff[i] * stencil[i][a] ** 2 for i in range(len(stencil)) for a in range(stencil.D))
- (2 + lambda_b) * sp.Matrix(u).dot(F)) == 0
# All other moments should be zero
assert list(force_moments[stencil.D + 1 + num_stresses:]) == [0] * \
(len(stencil) - (stencil.D + 1 + num_stresses))
elif force_model == ForceModel.SIMPLE:
# All other moments should be zero
assert list(force_moments[stencil.D + 1:]) == [0] * (len(stencil) - (stencil.D + 1))