Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Sparse
  • WallLaw
  • improved_comm
  • master
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
  • release/0.4.1
  • release/0.4.2
  • release/0.4.3
  • release/0.4.4
  • release/1.0
  • release/1.0.1
  • release/1.1
  • release/1.1.1
  • release/1.2
  • release/1.3
  • release/1.3.1
  • release/1.3.2
  • release/1.3.3
  • release/1.3.4
  • release/1.3.5
  • release/1.3.6
  • release/1.3.7
42 results

Target

Select target project
  • ravi.k.ayyala/lbmpy
  • brendan-waters/lbmpy
  • anirudh.jonnalagadda/lbmpy
  • jbadwaik/lbmpy
  • alexander.reinauer/lbmpy
  • itischler/lbmpy
  • he66coqe/lbmpy
  • ev81oxyl/lbmpy
  • Bindgen/lbmpy
  • da15siwa/lbmpy
  • holzer/lbmpy
  • RudolfWeeber/lbmpy
  • pycodegen/lbmpy
13 results
Select Git revision
  • GetterSetterAPI
  • HRR
  • HydroPressure
  • InplaceConfig
  • Outflow
  • PhaseField
  • Sparse
  • UBBVelocity
  • UpdateAPISparse
  • WallLaw
  • WetNodeBoundaries
  • csebug
  • feature/sparse
  • feature/try
  • improved_comm
  • install_requires
  • master
  • phaseField
  • relaxationrates
  • test_martin
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
  • release/0.4.1
  • release/0.4.2
  • release/0.4.3
  • release/0.4.4
  • release/1.0
  • release/1.0.1
  • release/1.1
  • release/1.1.1
  • release/1.2
  • release/1.3
  • release/1.3.1
  • release/1.3.2
  • release/1.3.3
  • release/1.3.4
  • release/1.3.5
  • release/1.3.6
57 results
Show changes
Showing
with 1615 additions and 42 deletions
import numpy as np
from lbmpy.boundaries import UBB, NoSlip
from lbmpy.enums import ForceModel
from lbmpy.scenarios import create_channel
from pystencils import make_slice
try:
import waLBerla as wLB
except ImportError:
wLB = None
# try:
# import waLBerla as wLB
# except ImportError:
wLB = None
def calculate_force(step, obstacle):
......@@ -26,10 +27,11 @@ def test_force_on_boundary():
for parallel in (False, True) if wLB else (False,):
for boundary_obj in boundaries:
print("Testing parallel %d, boundary %s" % (parallel, boundary_obj.name))
step = create_channel(domain_size, force=1e-5, relaxation_rate=1.5, parallel=parallel, force_model='buick')
print(f"Testing parallel {parallel}, boundary {boundary_obj.name}")
step = create_channel(domain_size, force=1e-5, relaxation_rate=1.5, parallel=parallel,
force_model=ForceModel.BUICK)
force = calculate_force(step, boundary_obj)
print(" -> force = ", force)
print(f" -> force = {force}")
results.append(force)
for res in results[1:]:
......
from dataclasses import replace
import pystencils as ps
import lbmpy as lp
from pystencils.slicing import slice_from_direction
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries.boundaryconditions import FreeSlip, NoSlip, ExtrapolationOutflow, UBB
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule
from lbmpy.macroscopic_value_kernels import pdf_initialization_assignments
import numpy as np
def velocity_info_callback(boundary_data, **_):
boundary_data['vel_1'] = 0
boundary_data['vel_2'] = 0
u_max = 0.1
x, y = boundary_data.link_positions(0), boundary_data.link_positions(1)
dist = (15 - y) / 15
boundary_data['vel_0'] = u_max * (1 - dist)
def test_free_slip():
stencil = lp.LBStencil(lp.Stencil.D3Q27)
domain_size = (30, 15, 30)
dim = len(domain_size)
dh = ps.create_data_handling(domain_size=domain_size)
src = dh.add_array('src', values_per_cell=stencil.Q)
dh.fill('src', 0.0, ghost_layers=True)
dst = dh.add_array('dst', values_per_cell=stencil.Q)
dh.fill('dst', 0.0, ghost_layers=True)
velField = dh.add_array('velField', values_per_cell=stencil.D)
dh.fill('velField', 0.0, ghost_layers=True)
lbm_config = lp.LBMConfig(stencil=stencil, method=lp.Method.SRT, relaxation_rate=1.8,
output={'velocity': velField}, kernel_type='stream_pull_collide')
method = create_lb_method(lbm_config=lbm_config)
init = pdf_initialization_assignments(method, 1.0, (0, 0, 0), src.center_vector)
config = ps.CreateKernelConfig(target=dh.default_target, cpu_openmp=False)
ast_init = ps.create_kernel(init, config=config)
kernel_init = ast_init.compile()
dh.run_kernel(kernel_init)
lbm_opt = lp.LBMOptimisation(symbolic_field=src, symbolic_temporary_field=dst)
lbm_config = replace(lbm_config, lb_method=method)
update = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
ast_kernel = ps.create_kernel(update, config=config)
kernel = ast_kernel.compile()
bh = LatticeBoltzmannBoundaryHandling(method, dh, 'src', name="bh")
inflow = UBB(velocity_info_callback, dim=dim)
outflow = ExtrapolationOutflow(stencil[4], method)
wall = NoSlip("wall")
freeslip = FreeSlip(stencil, (0, -1, 0))
bh.set_boundary(inflow, slice_from_direction('W', dim))
bh.set_boundary(outflow, slice_from_direction('E', dim))
bh.set_boundary(wall, slice_from_direction('S', dim))
bh.set_boundary(wall, slice_from_direction('T', dim))
bh.set_boundary(wall, slice_from_direction('B', dim))
bh.set_boundary(freeslip, slice_from_direction('N', dim))
for i in range(2000):
bh()
dh.run_kernel(kernel)
dh.swap("src", "dst")
vel_profile = dh.gather_array('velField')[-2, :, domain_size[2] // 2, 0]
np.testing.assert_almost_equal(np.gradient(vel_profile)[-1], 0, decimal=3)
import os
import numpy as np
import pytest
from lbmpy.boundaries import NoSlip
from lbmpy.enums import Method
from lbmpy.geometry import add_black_and_white_image, add_pipe_walls
from lbmpy.lbstep import LatticeBoltzmannStep
import lbmpy.plot as plt
from pystencils.slicing import make_slice
......@@ -22,23 +25,20 @@ def test_pipe():
plot = False
for domain_size in [(30, 10, 10), (30, 10)]:
for diameter in [5, 10, diameter_callback]:
sc = LatticeBoltzmannStep(domain_size=domain_size, method='srt', relaxation_rate=1.9,
optimization={})
sc = LatticeBoltzmannStep(domain_size=domain_size, method=Method.SRT, relaxation_rate=1.9)
add_pipe_walls(sc.boundary_handling, diameter)
if plot:
import lbmpy.plot as plt
from pystencils.slicing import make_slice
if len(domain_size) == 2:
plt.boundary_handling(sc.boundary_handling)
plt.title("2D, diameter=%s" % (str(diameter,)))
plt.title(f"2D, diameter={str(diameter,)}")
plt.show()
elif len(domain_size) == 3:
plt.subplot(1, 2, 1)
plt.boundary_handling(sc.boundary_handling, make_slice[0.5, :, :])
plt.title("3D, diameter=%s" % (str(diameter,)))
plt.title(f"3D, diameter={str(diameter,)}")
plt.subplot(1, 2, 2)
plt.boundary_handling(sc.boundary_handling, make_slice[:, 0.5, :])
plt.title("3D, diameter=%s" % (str(diameter, )))
plt.title(f"3D, diameter={str(diameter, )}")
plt.show()
......@@ -49,20 +49,19 @@ def get_test_image_path():
def test_image():
sc = LatticeBoltzmannStep(domain_size=(50, 40), method='srt', relaxation_rate=1.9,
optimization={})
pytest.importorskip('scipy.ndimage')
sc = LatticeBoltzmannStep(domain_size=(50, 40), method=Method.SRT, relaxation_rate=1.9)
add_black_and_white_image(sc.boundary_handling, get_test_image_path(), keep_aspect_ratio=True)
def test_slice_mask_combination():
sc = LatticeBoltzmannStep(domain_size=(30, 30), method='srt', relaxation_rate=1.9,
optimization={})
sc = LatticeBoltzmannStep(domain_size=(30, 30), method=Method.SRT, relaxation_rate=1.9)
def callback(*coordinates):
x = coordinates[0]
print("x", coordinates[0][:, 0])
print("y", coordinates[1][0, :])
print(x.shape)
return np.ones_like(x, dtype=np.bool)
return np.ones_like(x, dtype=bool)
sc.boundary_handling.set_boundary(NoSlip(), make_slice[6:7, -1], callback)
from lbmpy.creationfunctions import create_lb_ast, LBMConfig
from lbmpy.enums import Method, Stencil
from lbmpy.stencils import LBStencil
from lbmpy._compat import IS_PYSTENCILS_2
import pytest
from pystencils import Target, CreateKernelConfig
@pytest.mark.xfail(IS_PYSTENCILS_2, reason="GPU block size limiting by registers is not available yet")
def test_gpu_block_size_limiting():
pytest.importorskip("cupy")
too_large = 2048*2048
lbm_config = LBMConfig(method=Method.CUMULANT, stencil=LBStencil(Stencil.D3Q19),
relaxation_rate=1.8, compressible=True)
config = CreateKernelConfig(target=Target.GPU,
gpu_indexing_params={'block_size': (too_large, too_large, too_large)})
ast = create_lb_ast(lbm_config=lbm_config, config=config)
limited_block_size = ast.indexing.call_parameters((1024, 1024, 1024))
kernel = ast.compile()
assert all(b < too_large for b in limited_block_size['block'])
bs = [too_large, too_large, too_large]
bs = ast.indexing.limit_block_size_by_register_restriction(bs, kernel.num_regs)
assert all(b < too_large for b in bs)
from lbmpy.creationfunctions import create_lb_method
import pytest
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil
from lbmpy.methods.creationfunctions import compare_moment_based_lb_methods
from lbmpy.moments import (
moment_equality_table, moment_equality_table_by_stencil, moments_up_to_component_order)
from lbmpy.stencils import get_stencil
from lbmpy.stencils import LBStencil
def test_moment_comparison_table():
new = create_lb_method(stencil='D3Q19', maxwellian_moments=True)
old = create_lb_method(stencil='D3Q19', maxwellian_moments=False)
pytest.importorskip('ipy_table')
lbm_config_new = LBMConfig(stencil=LBStencil(Stencil.D3Q19), continuous_equilibrium=True)
lbm_config_old = LBMConfig(stencil=LBStencil(Stencil.D3Q19), continuous_equilibrium=False)
new = create_lb_method(lbm_config=lbm_config_new)
old = create_lb_method(lbm_config=lbm_config_old)
assert old.zeroth_order_equilibrium_moment_symbol == new.zeroth_order_equilibrium_moment_symbol
......@@ -19,17 +27,18 @@ def test_moment_comparison_table():
res_all = compare_moment_based_lb_methods(old, new, show_deviations_only=False)
assert len(res_all.array) == 20
d3q27 = create_lb_method(stencil='D3Q27')
d3q27 = create_lb_method(LBMConfig(stencil=LBStencil(Stencil.D3Q27)))
compare_moment_based_lb_methods(d3q27, new, show_deviations_only=False)
compare_moment_based_lb_methods(new, d3q27, show_deviations_only=False)
def test_moment_equality_table():
d3q19 = get_stencil('D3Q19')
pytest.importorskip('ipy_table')
d3q19 = LBStencil(Stencil.D3Q19)
table1 = moment_equality_table(d3q19, max_order=3)
assert len(table1.array) == 5
table2 = moment_equality_table_by_stencil({'D3Q19': d3q19, 'D3Q27': get_stencil("D3Q27")},
table2 = moment_equality_table_by_stencil({'D3Q19': d3q19, 'D3Q27': LBStencil(Stencil.D3Q27)},
moments_up_to_component_order(2, dim=3))
assert len(table2.array) == 11
assert len(table2.array[0]) == 2 + 2
import pytest
import sympy as sp
from itertools import chain
from pystencils.sympyextensions import remove_higher_order_terms
from lbmpy.stencils import Stencil, LBStencil
from lbmpy.equilibrium import ContinuousHydrodynamicMaxwellian, DiscreteHydrodynamicMaxwellian
from lbmpy.moments import moments_up_to_component_order, moments_up_to_order
from lbmpy.maxwellian_equilibrium import get_equilibrium_values_of_maxwell_boltzmann_function
from lbmpy.methods.default_moment_sets import cascaded_moment_sets_literature, mrt_orthogonal_modes_literature
def test_compressible_raw_moment_values():
stencil = LBStencil("D3Q27")
equilibrium = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=True, deviation_only=False)
raw_moments = list(chain.from_iterable(mrt_orthogonal_modes_literature(stencil, False)))
values_a = equilibrium.moments(raw_moments)
values_b = get_equilibrium_values_of_maxwell_boltzmann_function(raw_moments, stencil.D, space="moment")
for m, a, b in zip(raw_moments, values_a, values_b):
assert (a - b).expand() == sp.Integer(0), f"Mismatch at moment {m}."
def test_compressible_central_moment_values():
stencil = LBStencil("D3Q27")
equilibrium = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=True, deviation_only=False)
central_moments = list(chain.from_iterable(cascaded_moment_sets_literature(stencil)))
values_a = equilibrium.central_moments(central_moments)
values_b = get_equilibrium_values_of_maxwell_boltzmann_function(central_moments, stencil.D, space="central moment")
for m, a, b in zip(central_moments, values_a, values_b):
assert (a - b).expand() == sp.Integer(0), f"Mismatch at moment {m}."
def test_compressible_cumulant_values():
stencil = LBStencil("D3Q27")
equilibrium = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=True, deviation_only=False)
cumulants = list(chain.from_iterable(cascaded_moment_sets_literature(stencil)))
values_a = equilibrium.cumulants(cumulants, rescale=False)
values_b = get_equilibrium_values_of_maxwell_boltzmann_function(cumulants, stencil.D, space="cumulant")
for m, a, b in zip(cumulants, values_a, values_b):
assert (a - b).expand() == sp.Integer(0), f"Mismatch at cumulant {m}."
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('compressible', [False, True])
@pytest.mark.parametrize('deviation_only', [False, True])
def test_continuous_discrete_moment_equivalence(stencil, compressible, deviation_only):
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cd = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
cm = sp.Matrix(cd.moments(moments))
dd = DiscreteHydrodynamicMaxwellian(stencil, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
dm = sp.Matrix(dd.moments(moments))
rho = cd.density
delta_rho = cd.density_deviation
rho_0 = cd.background_density
subs = { delta_rho : rho - rho_0 }
assert (cm - dm).subs(subs).expand() == sp.Matrix((0,) * len(moments))
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('compressible', [False, True])
@pytest.mark.parametrize('deviation_only', [False, True])
def test_continuous_discrete_central_moment_equivalence(stencil, compressible, deviation_only):
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cd = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
cm = sp.Matrix(cd.central_moments(moments))
dd = DiscreteHydrodynamicMaxwellian(stencil, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
dm = sp.Matrix(dd.central_moments(moments))
dm = sp.Matrix([remove_higher_order_terms(t, dd.velocity, order=2) for t in dm])
rho = cd.density
delta_rho = cd.density_deviation
rho_0 = cd.background_density
subs = { delta_rho : rho - rho_0 }
assert (cm - dm).subs(subs).expand() == sp.Matrix((0,) * len(moments))
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
def test_continuous_discrete_cumulant_equivalence(stencil):
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
compressible = True
deviation_only = False
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cd = ContinuousHydrodynamicMaxwellian(dim=stencil.D, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
cm = sp.Matrix(cd.cumulants(moments))
dd = DiscreteHydrodynamicMaxwellian(stencil, compressible=compressible, deviation_only=deviation_only,
order=2, c_s_sq=c_s_sq)
dm = sp.Matrix(dd.cumulants(moments))
dm = sp.Matrix([remove_higher_order_terms(t, dd.velocity, order=2) for t in dm])
rho = cd.density
delta_rho = cd.density_deviation
rho_0 = cd.background_density
subs = { delta_rho : rho - rho_0 }
assert (cm - dm).subs(subs).expand() == sp.Matrix((0,) * len(moments))
import pytest
import numpy as np
from pystencils.slicing import slice_from_direction
from lbmpy import LBMConfig, LBStencil, Stencil, Method
from lbmpy.boundaries.boundaryconditions import NoSlip, NoSlipLinearBouzidi, QuadraticBounceBack, UBB
from lbmpy.lbstep import LatticeBoltzmannStep
def check_velocity(noslip_velocity, interpolated_velocity, wall_distance):
# First we check if the flow is fully developed
np.testing.assert_almost_equal(np.gradient(np.gradient(noslip_velocity)), 0, decimal=8)
np.testing.assert_almost_equal(np.gradient(np.gradient(interpolated_velocity)), 0, decimal=8)
# If the wall is closer to the first fluid cell we expect a lower velocity at the first fluid cell
if wall_distance < 0.5:
assert noslip_velocity[0] > interpolated_velocity[0]
# If the wall is further away from the first fluid cell we expect a higher velocity at the first fluid cell
if wall_distance > 0.5:
assert noslip_velocity[0] < interpolated_velocity[0]
# If the wall cuts the cell halfway the interpolation BC should approximately fall back to a noslip boundary
if wall_distance == 0.5:
np.testing.assert_almost_equal(noslip_velocity[0], interpolated_velocity[0], decimal=2)
def couette_flow(stencil, method_enum, zero_centered, wall_distance, compressible):
dim = stencil.D
if dim == 2:
domain_size = (50, 25)
wall_velocity = (0.01, 0)
periodicity = (True, False)
else:
domain_size = (50, 25, 25)
wall_velocity = (0.01, 0, 0)
periodicity = (True, False, True)
timesteps = 10000
omega = 1.1
lbm_config = LBMConfig(stencil=stencil,
method=method_enum,
relaxation_rate=omega,
zero_centered=zero_centered,
compressible=compressible,
weighted=True)
lb_step_noslip = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
lb_step_bouzidi = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
lb_step_quadratic_bb = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
def init_wall_distance(boundary_data, **_):
for cell in boundary_data.index_array:
cell['q'] = wall_distance
moving_wall = UBB(wall_velocity)
noslip = NoSlip("wall")
bouzidi = NoSlipLinearBouzidi("wall", init_wall_distance=init_wall_distance)
quadratic_bb = QuadraticBounceBack(omega, "wall", init_wall_distance=init_wall_distance)
lb_step_noslip.boundary_handling.set_boundary(noslip, slice_from_direction('S', dim))
lb_step_noslip.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_bouzidi.boundary_handling.set_boundary(bouzidi, slice_from_direction('S', dim))
lb_step_bouzidi.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_quadratic_bb.boundary_handling.set_boundary(quadratic_bb, slice_from_direction('S', dim))
lb_step_quadratic_bb.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_noslip.run(timesteps)
lb_step_bouzidi.run(timesteps)
lb_step_quadratic_bb.run(timesteps)
if dim == 2:
noslip_velocity = lb_step_noslip.velocity[domain_size[0] // 2, :, 0]
bouzidi_velocity = lb_step_bouzidi.velocity[domain_size[0] // 2, :, 0]
quadratic_bb_velocity = lb_step_quadratic_bb.velocity[domain_size[0] // 2, :, 0]
else:
noslip_velocity = lb_step_noslip.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
bouzidi_velocity = lb_step_bouzidi.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
quadratic_bb_velocity = lb_step_quadratic_bb.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
check_velocity(noslip_velocity, bouzidi_velocity, wall_distance)
check_velocity(noslip_velocity, quadratic_bb_velocity, wall_distance)
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("compressible", [True, False])
def test_couette_flow_short(zero_centered, wall_distance, compressible):
stencil = LBStencil(Stencil.D2Q9)
couette_flow(stencil, Method.SRT, zero_centered, wall_distance, compressible)
@pytest.mark.parametrize("method_enum", [Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_couette_flow_long(method_enum, zero_centered, wall_distance, compressible):
if method_enum is Method.CUMULANT and compressible is False:
pytest.skip("incompressible cumulant is not supported")
stencil = LBStencil(Stencil.D2Q9)
couette_flow(stencil, method_enum, zero_centered, wall_distance, compressible)
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("stencil", [Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.longrun
def test_couette_flow_d3d(method_enum, wall_distance, stencil):
stencil = LBStencil(stencil)
couette_flow(stencil, method_enum, True, wall_distance, True)
"""
Test the lbmpy-specific JSON encoder and serializer as used in the Database class.
"""
import tempfile
import pystencils as ps
from lbmpy import Stencil, Method, ForceModel, SubgridScaleModel
from lbmpy.advanced_streaming import Timestep
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation, LBStencil
from lbmpy.fieldaccess import StreamPullTwoFieldsAccessor
from pystencils.runhelper import Database
from lbmpy.db import LbmpyJsonSerializer
def test_json_serializer():
stencil = LBStencil(Stencil.D3Q27)
q = stencil.Q
pdfs, pdfs_tmp = ps.fields(f"pdfs({q}), pdfs_tmp({q}): double[3D]", layout='fzyx')
density = ps.fields(f"rho: double[3D]", layout='fzyx')
from lbmpy.non_newtonian_models import CassonsParameters
cassons_params = CassonsParameters(0.2)
# create dummy lbmpy config
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q27), method=Method.CUMULANT, force_model=ForceModel.GUO,
compressible=True, relaxation_rate=1.999, subgrid_scale_model=SubgridScaleModel.SMAGORINSKY,
galilean_correction=True, cassons=cassons_params, density_input=density,
kernel_type=StreamPullTwoFieldsAccessor, timestep=Timestep.BOTH)
lbm_optimization = LBMOptimisation(cse_pdfs=False, cse_global=False, builtin_periodicity=(True, False, False),
symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp)
# create dummy database
temp_dir = tempfile.TemporaryDirectory()
db = Database(file=temp_dir.name, serializer_info=('lbmpy_serializer', LbmpyJsonSerializer))
db.save(params={'lbm_config': lbm_config, 'lbm_optimization': lbm_optimization}, result={'test': 'dummy'})
import numpy as np
import pytest
from types import MappingProxyType
from pystencils import Target, CreateKernelConfig
from lbmpy.scenarios import create_fully_periodic_flow, create_lid_driven_cavity
from lbmpy import SubgridScaleModel
try:
import pycuda.driver
import cupy
gpu_available = True
except ImportError:
gpu_available = False
......@@ -30,18 +33,18 @@ def test_data_handling_3d():
for gpu in [False, True] if gpu_available else [False]:
if parallel and gpu and not hasattr(wLB, 'cuda'):
continue
print("Testing parallel: %s\tgpu: %s" % (parallel, gpu))
opt_params = {'target': 'gpu' if gpu else 'cpu',
'gpu_indexing_params': {'block_size': (8, 4, 2)}}
print(f"Testing parallel: {parallel}\tgpu: {gpu}")
config = CreateKernelConfig(target=Target.GPU if gpu else Target.CPU,
gpu_indexing_params=MappingProxyType({'block_size': (8, 4, 2)}))
if parallel:
from pystencils.datahandling import ParallelDataHandling
blocks = wLB.createUniformBlockGrid(blocks=(2, 3, 4), cellsPerBlock=(5, 5, 5),
oneBlockPerProcess=False)
dh = ParallelDataHandling(blocks, dim=3)
rho = ldc_setup(data_handling=dh, optimization=opt_params)
rho = ldc_setup(data_handling=dh, config=config)
results.append(rho)
else:
rho = ldc_setup(domain_size=(10, 15, 20), parallel=False, optimization=opt_params)
rho = ldc_setup(domain_size=(10, 15, 20), parallel=False, config=config)
results.append(rho)
for i, arr in enumerate(results[1:]):
print("Testing equivalence version 0 with version %d" % (i + 1,))
......@@ -56,26 +59,32 @@ def test_data_handling_2d():
if parallel and gpu and not hasattr(wLB, 'cuda'):
continue
print("Testing parallel: %s\tgpu: %s" % (parallel, gpu))
opt_params = {'target': 'gpu' if gpu else 'cpu',
'gpu_indexing_params': {'block_size': (8, 4, 2)}}
print(f"Testing parallel: {parallel}\tgpu: {gpu}")
config = CreateKernelConfig(target=Target.GPU if gpu else Target.CPU,
gpu_indexing_params=MappingProxyType({'block_size': (8, 4, 2)}))
if parallel:
from pystencils.datahandling import ParallelDataHandling
blocks = wLB.createUniformBlockGrid(blocks=(2, 3, 1), cellsPerBlock=(5, 5, 1),
oneBlockPerProcess=False)
dh = ParallelDataHandling(blocks, dim=2)
rho = ldc_setup(data_handling=dh, optimization=opt_params)
rho = ldc_setup(data_handling=dh, config=config)
results.append(rho)
else:
rho = ldc_setup(domain_size=(10, 15), parallel=False, optimization=opt_params)
rho = ldc_setup(domain_size=(10, 15), parallel=False, config=config)
results.append(rho)
for i, arr in enumerate(results[1:]):
print("Testing equivalence version 0 with version %d" % (i + 1,))
print(f"Testing equivalence version 0 with version {i + 1}")
np.testing.assert_almost_equal(results[0], arr)
def test_smagorinsky_setup():
step = create_lid_driven_cavity((30, 30), smagorinsky=0.16, relaxation_rate=1.99)
step = create_lid_driven_cavity((30, 30), subgrid_scale_model=(SubgridScaleModel.SMAGORINSKY, 0.16),
relaxation_rate=1.99)
step.run(10)
def test_qr_setup():
step = create_lid_driven_cavity((30, 30), subgrid_scale_model=(SubgridScaleModel.QR, 0.33),
relaxation_rate=1.99)
step.run(10)
......@@ -89,7 +98,7 @@ def test_advanced_initialization():
init_vel[:, height // 3: height // 3 * 2, 0] = -velocity_magnitude
# small random y velocity component
init_vel[:, :, 1] = 0.1 * velocity_magnitude * np.random.rand(width, height)
shear_flow_scenario = create_fully_periodic_flow(initial_velocity=init_vel, relaxation_rate=1.95)
shear_flow_scenario = create_fully_periodic_flow(initial_velocity=init_vel, relaxation_rate=1.99)
with pytest.raises(ValueError) as e:
shear_flow_scenario.run_iterative_initialization(max_steps=20000, check_residuum_after=500)
assert 'did not converge' in str(e.value)
......
from functools import partial
import pystencils as ps
from lbmpy._compat import get_loop_counter_symbol
from pystencils.slicing import get_periodic_boundary_functor
from lbmpy.creationfunctions import create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Stencil, ForceModel
from lbmpy.stencils import LBStencil
from lbmpy.updatekernels import create_stream_pull_with_output_kernel
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from lbmpy.relaxationrates import lattice_viscosity_from_relaxation_rate
import sympy as sp
import numpy as np
def get_le_boundary_functor(neighbor_stencil, shear_offset, ghost_layers=1, thickness=None, n=64):
functor_2 = get_periodic_boundary_functor(neighbor_stencil, ghost_layers, thickness)
def functor(pdfs, **_):
functor_2(pdfs)
weight = np.fmod(shear_offset[0] + n, 1.)
# First, we interpolate the upper pdfs
for i in range(len(pdfs[:, ghost_layers, :])):
ind1 = int(np.floor(i - shear_offset[0]) % n)
ind2 = int(np.ceil(i - shear_offset[0]) % n)
res = (1 - weight) * pdfs[ind1, ghost_layers, :] + weight * pdfs[ind2, ghost_layers, :]
pdfs[i, -ghost_layers, :] = res
# Second, we interpolate the lower pdfs
for i in range(len(pdfs[:, -ghost_layers, :])):
ind1 = int(np.floor(i + shear_offset[0]) % n)
ind2 = int(np.ceil(i + shear_offset[0]) % n)
res = (1 - weight) * pdfs[ind1, -ghost_layers - 1, :] + weight * pdfs[ind2, -ghost_layers - 1, :]
pdfs[i, ghost_layers - 1, :] = res
return functor
def get_solution_navier_stokes(x, t, viscosity, velocity=1.0, height=1.0, max_iterations=10):
w = x / height - 0.5
for k in np.arange(1, max_iterations + 1):
w += 1.0 / (np.pi * k) * np.exp(-4 * np.pi ** 2 * viscosity * k ** 2 / height ** 2 * t) * np.sin(
2 * np.pi / height * k * x)
return velocity * w
def test_lees_edwards():
domain_size = (64, 64)
omega = 1.0 # relaxation rate of first component
shear_velocity = 0.1 # shear velocity
shear_dir = 0 # direction of shear flow
shear_dir_normal = 1 # direction normal to shear plane, for interpolation
stencil = LBStencil(Stencil.D2Q9)
dh = ps.create_data_handling(domain_size, periodicity=True, default_target=ps.Target.CPU)
src = dh.add_array('src', values_per_cell=stencil.Q)
dh.fill('src', 1.0, ghost_layers=True)
dst = dh.add_array_like('dst', 'src')
dh.fill('dst', 0.0, ghost_layers=True)
force = dh.add_array('force', values_per_cell=stencil.D)
dh.fill('force', 0.0, ghost_layers=True)
rho = dh.add_array('rho', values_per_cell=1)
dh.fill('rho', 1.0, ghost_layers=True)
u = dh.add_array('u', values_per_cell=stencil.D)
dh.fill('u', 0.0, ghost_layers=True)
counters = [get_loop_counter_symbol(i) for i in range(stencil.D)]
points_up = sp.Symbol('points_up')
points_down = sp.Symbol('points_down')
u_p = sp.Piecewise((1, sp.And(counters[1] <= 1, points_down)),
(-1, sp.And(counters[1] >= src.shape[1] - 2, points_up)), (0, True)) * shear_velocity
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=omega, compressible=True,
velocity_input=u.center_vector + sp.Matrix([u_p, 0]), density_input=rho,
force_model=ForceModel.LUO, force=force.center_vector,
kernel_type='collide_only')
lbm_opt = LBMOptimisation(symbolic_field=src)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
to_insert = [s.lhs for s in collision.subexpressions
if collision.method.first_order_equilibrium_moment_symbols[shear_dir]
in s.free_symbols]
for s in to_insert:
collision = collision.new_with_inserted_subexpression(s)
ma = []
for a, c in zip(collision.main_assignments, collision.method.stencil):
if c[shear_dir_normal] == -1:
b = (True, False)
elif c[shear_dir_normal] == 1:
b = (False, True)
else:
b = (False, False)
a = ps.Assignment(a.lhs, a.rhs.replace(points_down, b[0]))
a = ps.Assignment(a.lhs, a.rhs.replace(points_up, b[1]))
ma.append(a)
collision.main_assignments = ma
stream = create_stream_pull_with_output_kernel(collision.method, src, dst,
{'density': rho, 'velocity': u})
config = ps.CreateKernelConfig(target=dh.default_target)
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
init = macroscopic_values_setter(collision.method, velocity=(0, 0),
pdfs=src.center_vector, density=rho.center)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
offset = [0.0]
sync_pdfs = dh.synchronization_function([src.name],
functor=partial(get_le_boundary_functor, shear_offset=offset))
dh.run_kernel(init_kernel)
time = 500
dh.all_to_gpu()
for i in range(time):
dh.run_kernel(collision_kernel)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
offset[0] += shear_velocity
dh.all_to_cpu()
nu = lattice_viscosity_from_relaxation_rate(omega)
h = domain_size[0]
k_max = 100
analytical_solution = get_solution_navier_stokes(np.linspace(0.5, h - 0.5, h), time, nu, shear_velocity, h, k_max)
np.testing.assert_array_almost_equal(analytical_solution, dh.gather_array(u.name)[0, :, 0], decimal=5)
dh.fill(rho.name, 1.0, ghost_layers=True)
dh.run_kernel(init_kernel)
dh.fill(u.name, 0.0, ghost_layers=True)
dh.fill('force', 0.0, ghost_layers=True)
dh.cpu_arrays[force.name][64 // 3, 1, :] = [1e-2, -1e-1]
offset[0] = 0
time = 20
dh.all_to_gpu()
for i in range(time):
dh.run_kernel(collision_kernel)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
dh.all_to_cpu()
vel_unshifted = np.array(dh.gather_array(u.name)[:, -3:-1, :])
dh.fill(rho.name, 1.0, ghost_layers=True)
dh.run_kernel(init_kernel)
dh.fill(u.name, 0.0, ghost_layers=True)
dh.fill('force', 0.0, ghost_layers=True)
dh.cpu_arrays[force.name][64 // 3, 1, :] = [1e-2, -1e-1]
offset[0] = 10
time = 20
dh.all_to_gpu()
for i in range(time):
dh.run_kernel(collision_kernel)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
dh.all_to_cpu()
vel_shifted = np.array(dh.gather_array(u.name)[:, -3:-1, :])
vel_rolled = np.roll(vel_shifted, -offset[0], axis=0)
np.testing.assert_array_almost_equal(vel_unshifted, vel_rolled)
import pytest
import numpy as np
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil, ForceModel, Method
from lbmpy.macroscopic_value_kernels import (
compile_macroscopic_values_getter, compile_macroscopic_values_setter, strain_rate_tensor_getter)
from lbmpy.advanced_streaming.utility import streaming_patterns, Timestep
from lbmpy.stencils import LBStencil
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('force_model', [ForceModel.GUO, ForceModel.LUO, None])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('zero_centered', [True, False])
@pytest.mark.parametrize('streaming_pattern', streaming_patterns)
@pytest.mark.parametrize('prev_timestep', [Timestep.EVEN, Timestep.ODD])
def test_set_get_density_velocity_with_fields(stencil, force_model, compressible, zero_centered, streaming_pattern, prev_timestep):
force = (0.1, 0.12, -0.17)
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, force_model=force_model, method=Method.TRT,
compressible=compressible, zero_centered=zero_centered, force=force)
method = create_lb_method(lbm_config=lbm_config)
ghost_layers = 1
inner_size = (3, 7, 4)[:method.dim]
total_size = tuple(s + 2 * ghost_layers for s in inner_size)
pdf_arr = np.zeros(total_size + (len(method.stencil),))
inner_slice = (slice(ghost_layers, -ghost_layers, 1), ) * method.dim
density_input_field = np.zeros(total_size)
velocity_input_field = np.zeros(total_size + (method.dim, ))
density_input_field[inner_slice] = 1 + 0.2 * (np.random.random_sample(inner_size) - 0.5)
velocity_input_field[inner_slice] = 0.1 * \
(np.random.random_sample(inner_size + (method.dim, )) - 0.5)
quantities_to_set = {'density': density_input_field, 'velocity': velocity_input_field}
setter = compile_macroscopic_values_setter(method, pdf_arr=pdf_arr, quantities_to_set=quantities_to_set,
ghost_layers=ghost_layers, streaming_pattern=streaming_pattern, previous_timestep=prev_timestep)
setter(pdf_arr)
getter = compile_macroscopic_values_getter(method, ['density', 'density_deviation', 'velocity'], pdf_arr=pdf_arr,
ghost_layers=ghost_layers, streaming_pattern=streaming_pattern, previous_timestep=prev_timestep)
inner_part = (slice(ghost_layers, -ghost_layers), ) * stencil.D
density_output_field = np.zeros_like(density_input_field)
density_deviation_output_field = np.zeros_like(density_input_field)
velocity_output_field = np.zeros_like(velocity_input_field)
getter(pdfs=pdf_arr, density=density_output_field, density_deviation=density_deviation_output_field, velocity=velocity_output_field)
np.testing.assert_almost_equal(density_input_field, density_output_field)
np.testing.assert_almost_equal(density_input_field[inner_part] - 1.0, density_deviation_output_field[inner_part])
np.testing.assert_almost_equal(velocity_input_field, velocity_output_field)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('force_model', [ForceModel.GUO, ForceModel.LUO, None])
@pytest.mark.parametrize('compressible', [True, False])
def test_set_get_constant_velocity(stencil, force_model, compressible):
ref_velocity = [0.01, -0.2, 0.1]
force = (0.1, 0.12, -0.17)
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, force_model=force_model, method=Method.TRT,
compressible=compressible, force=force)
method = create_lb_method(lbm_config=lbm_config)
size = (1, 1, 1)[:method.dim]
pdf_arr = np.zeros(size + (len(method.stencil),))
setter = compile_macroscopic_values_setter(method, pdf_arr=pdf_arr, ghost_layers=0,
quantities_to_set={'velocity': ref_velocity[:method.dim]}, )
setter(pdf_arr)
getter = compile_macroscopic_values_getter(method, ['velocity'], pdf_arr=pdf_arr, ghost_layers=0)
velocity_output_field = np.zeros(size + (method.dim, ))
getter(pdfs=pdf_arr, velocity=velocity_output_field)
if method.dim == 2:
computed_velocity = velocity_output_field[0, 0, :]
else:
computed_velocity = velocity_output_field[0, 0, 0, :]
np.testing.assert_almost_equal(np.array(ref_velocity[:method.dim]), computed_velocity)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
def test_get_strain_rate_tensor(stencil, method_enum):
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, method=method_enum, compressible=True)
method = create_lb_method(lbm_config=lbm_config)
pdfs = sp.symbols(f"f_:{stencil.Q}")
strain_rate_tensor = sp.symbols(f"strain_rate_tensor_:{method.dim * method.dim}")
getter_assignments = strain_rate_tensor_getter(method, strain_rate_tensor, pdfs)
assert len(getter_assignments) == method.dim * method.dim
import pytest
from lbmpy.cumulants import raw_moment_as_function_of_cumulants
from lbmpy.enums import Stencil
from lbmpy.maxwellian_equilibrium import *
from lbmpy.moments import (
MOMENT_SYMBOLS, exponents_to_polynomial_representations, moment_matrix,
moments_up_to_component_order, moments_up_to_order)
from lbmpy.stencils import LBStencil
from pystencils.sympyextensions import remove_higher_order_terms
def test_maxwellian_moments():
"""Check moments of continuous Maxwellian"""
rho = sp.Symbol("rho")
u = sp.symbols("u_0 u_1 u_2")
c_s = sp.Symbol("c_s")
eq_moments = get_equilibrium_values_of_maxwell_boltzmann_function(((0, 0, 0), (0, 0, 1)),
dim=3, rho=rho, u=u, c_s_sq=c_s ** 2,
space="moment")
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[2]
x, y, z = MOMENT_SYMBOLS
one = sp.Rational(1, 1)
eq_moments = get_equilibrium_values_of_maxwell_boltzmann_function((one, x, x ** 2, x * y),
dim=2, rho=rho, u=u[:2], c_s_sq=c_s ** 2,
space="moment")
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[0]
assert eq_moments[2] == rho * (c_s ** 2 + u[0] ** 2)
assert eq_moments[3] == rho * u[0] * u[1]
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
def test_continuous_discrete_moment_equivalence(stencil):
"""Check that moments up to order 3 agree with moments of the continuous Maxwellian"""
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cm = sp.Matrix(get_equilibrium_values_of_maxwell_boltzmann_function(moments, order=2, dim=stencil.D,
c_s_sq=c_s_sq, space="moment"))
dm = sp.Matrix(get_moments_of_discrete_maxwellian_equilibrium(stencil, moments, order=2,
compressible=True, c_s_sq=c_s_sq))
diff = sp.simplify(cm - dm)
for d in diff:
assert d == 0
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q27])
def test_moment_cumulant_continuous_equivalence(stencil):
"""Test that discrete equilibrium is the same up to order 3 when obtained with following methods
* eq1: take moments of continuous Maxwellian and transform back to pdf space
* eq2: take cumulants of continuous Maxwellian, transform to moments then transform to pdf space
* eq3: take discrete equilibrium from LBM literature
* eq4: same as eq1 but with built-in function
"""
stencil = LBStencil(stencil)
u = sp.symbols(f"u_:{stencil.D}")
indices = tuple(moments_up_to_component_order(2, dim=stencil.D))
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D, u=u, c_s_sq=c_s_sq,
space="moment")
eq_cumulants = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D, u=u, c_s_sq=c_s_sq,
space="cumulant")
eq_cumulants = {idx: c for idx, c in zip(indices, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in indices]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q27])
def test_moment_cumulant_continuous_equivalence_polynomial_formulation(stencil):
"""Same as test above, but instead of index tuples, the polynomial formulation is used."""
stencil = LBStencil(stencil)
u = sp.symbols(f"u_:{stencil.D}")
index_tuples = tuple(moments_up_to_component_order(2, dim=stencil.D))
indices = exponents_to_polynomial_representations(index_tuples)
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D,
u=u, c_s_sq=c_s_sq, space="moment")
eq_cumulants = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D,
u=u, c_s_sq=c_s_sq, space="cumulant")
eq_cumulants = {idx: c for idx, c in zip(index_tuples, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in index_tuples]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4
import pytest
import sympy as sp
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.moments import get_default_moment_set_for_stencil
from lbmpy.moment_transforms import (
PdfsToMomentsByMatrixTransform, PdfsToMomentsByChimeraTransform,
PdfsToCentralMomentsByShiftMatrix, PdfsToCentralMomentsByMatrix, FastCentralMomentTransform
)
transforms = [
PdfsToMomentsByMatrixTransform, PdfsToMomentsByChimeraTransform,
PdfsToCentralMomentsByShiftMatrix, PdfsToCentralMomentsByMatrix, FastCentralMomentTransform
]
@pytest.mark.parametrize('transform_class', transforms)
def test_monomial_equations(transform_class):
stencil = LBStencil(Stencil.D2Q9)
rho = sp.symbols("rho")
u = sp.symbols(f"u_:{stencil.D}")
moment_polynomials = get_default_moment_set_for_stencil(stencil)
transform = transform_class(stencil, moment_polynomials, rho, u)
pdfs = sp.symbols(f"f_:{stencil.Q}")
fw_eqs = transform.forward_transform(pdfs, return_monomials=True)
bw_eqs = transform.backward_transform(pdfs, start_from_monomials=True)
mono_symbols_pre = set(transform.pre_collision_monomial_symbols)
mono_symbols_post = set(transform.post_collision_monomial_symbols)
assert mono_symbols_pre <= set(fw_eqs.defined_symbols)
assert mono_symbols_post <= set(bw_eqs.free_symbols)
symbols_pre = set(transform.pre_collision_symbols)
symbols_post = set(transform.post_collision_symbols)
assert symbols_pre.isdisjoint(set(fw_eqs.atoms(sp.Symbol)))
assert symbols_post.isdisjoint(set(bw_eqs.atoms(sp.Symbol)))
import pytest
import sympy as sp
from dataclasses import replace
from lbmpy.enums import CollisionSpace
from lbmpy.methods import CollisionSpaceInfo
from lbmpy.creationfunctions import create_lb_method, create_lb_collision_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Method, ForceModel, Stencil
from lbmpy.moments import (
extract_monomials, get_default_moment_set_for_stencil, non_aliased_polynomial_raw_moments,
exponent_tuple_sort_key)
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_srt
from lbmpy.methods.default_moment_sets import mrt_orthogonal_modes_literature
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.moment_transforms import (
PdfsToMomentsByMatrixTransform, PdfsToMomentsByChimeraTransform
)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('monomials', [False, True])
def test_moment_transform_equivalences(stencil, monomials):
stencil = LBStencil(stencil)
pdfs = sp.symbols(f"f_:{stencil.Q}")
rho = sp.Symbol('rho')
u = sp.symbols(f"u_:{stencil.D}")
moment_polynomials = get_default_moment_set_for_stencil(stencil)
if monomials:
polys_nonaliased = non_aliased_polynomial_raw_moments(moment_polynomials, stencil)
moment_exponents = sorted(extract_monomials(polys_nonaliased), key=exponent_tuple_sort_key)
moment_polynomials = None
else:
moment_exponents = None
matrix_transform = PdfsToMomentsByMatrixTransform(stencil, moment_polynomials, rho, u,
moment_exponents=moment_exponents)
chimera_transform = PdfsToMomentsByChimeraTransform(stencil, moment_polynomials, rho, u,
moment_exponents=moment_exponents)
f_to_m_matrix = matrix_transform.forward_transform(pdfs, return_monomials=monomials)
f_to_m_matrix = f_to_m_matrix.new_without_subexpressions().main_assignments_dict
f_to_m_chimera = chimera_transform.forward_transform(pdfs, return_monomials=monomials)
f_to_m_chimera = f_to_m_chimera.new_without_subexpressions().main_assignments_dict
m_to_f_matrix = matrix_transform.backward_transform(pdfs, start_from_monomials=monomials)
m_to_f_matrix = m_to_f_matrix.new_without_subexpressions().main_assignments_dict
m_to_f_chimera = chimera_transform.backward_transform(pdfs, start_from_monomials=monomials)
m_to_f_chimera = m_to_f_chimera.new_without_subexpressions().main_assignments_dict
m_pre_matrix = matrix_transform.pre_collision_monomial_symbols if monomials else matrix_transform.pre_collision_symbols
m_pre_chimera = chimera_transform.pre_collision_monomial_symbols if monomials else chimera_transform.pre_collision_symbols
for m1, m2 in zip(m_pre_matrix, m_pre_chimera):
rhs_matrix = f_to_m_matrix[m1]
rhs_chimera = f_to_m_chimera[m2]
diff = (rhs_matrix - rhs_chimera).expand()
assert diff == 0, f"Mismatch between matrix and chimera forward transform at {m1}, {m2}."
for f in pdfs:
rhs_matrix = m_to_f_matrix[f]
rhs_chimera = m_to_f_chimera[f]
diff = (rhs_matrix - rhs_chimera).expand()
assert diff == 0, f"Mismatch between matrix and chimera backward transform at {f}"
d3q15_literature = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q15), True)
d3q19_literature = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q19), True)
setups = [
(Stencil.D2Q9, Method.SRT, None, ForceModel.GUO),
(Stencil.D2Q9, Method.MRT, None, ForceModel.SIMPLE),
(Stencil.D3Q15, Method.MRT, None, ForceModel.SIMPLE),
(Stencil.D3Q15, Method.MRT, d3q15_literature, ForceModel.SIMPLE),
(Stencil.D3Q19, Method.TRT, None, ForceModel.SIMPLE),
(Stencil.D3Q19, Method.MRT, d3q19_literature, ForceModel.GUO),
(Stencil.D3Q27, Method.SRT, None, ForceModel.GUO)
]
@pytest.mark.parametrize('setup', setups)
def test_population_and_moment_space_equivalence(setup: tuple):
stencil = LBStencil(setup[0])
method = setup[1]
nested_moments = setup[2]
fmodel = setup[3]
force = sp.symbols(f'F_:{stencil.D}')
conserved_moments = 1 + stencil.D
rr = [*[0] * conserved_moments, *sp.symbols(f'omega_:{stencil.Q - conserved_moments}')]
lbm_config = LBMConfig(stencil=stencil, method=method, relaxation_rates=rr,
nested_moments=nested_moments, force_model=fmodel, force=force,
weighted=True, compressible=True, collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
lbm_opt = LBMOptimisation(cse_global=False, cse_pdfs=False, pre_simplification=True, simplification=False)
lb_method_moment_space = create_lb_method(lbm_config=lbm_config)
lbm_config = replace(lbm_config, collision_space_info=CollisionSpaceInfo(CollisionSpace.POPULATIONS))
lb_method_pdf_space = create_lb_method(lbm_config=lbm_config)
rho = lb_method_moment_space.zeroth_order_equilibrium_moment_symbol
u = lb_method_moment_space.first_order_equilibrium_moment_symbols
keep = set((rho,) + u)
cr_moment_space = create_lb_collision_rule(lb_method=lb_method_moment_space, lbm_optimisation=lbm_opt)
cr_moment_space = cr_moment_space.new_without_subexpressions(subexpressions_to_keep=keep)
lbm_opt = replace(lbm_opt, simplification='auto')
cr_pdf_space = create_lb_collision_rule(lb_method=lb_method_pdf_space, lbm_optimisation=lbm_opt)
cr_pdf_space = cr_pdf_space.new_without_subexpressions(subexpressions_to_keep=keep)
for a, b in zip(cr_moment_space.main_assignments, cr_pdf_space.main_assignments):
diff = (a.rhs - b.rhs).expand()
assert diff == 0, f"Mismatch between population- and moment-space equations in PDFs {a.lhs}, {b.lhs}"
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
def test_full_and_delta_equilibrium_equivalence(stencil, compressible):
stencil = LBStencil(stencil)
zero_centered = True
omega = sp.Symbol('omega')
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = { delta_rho : rho - rho_background }
eqs = []
for delta_eq in [False, True]:
method = create_srt(stencil, omega, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq, equilibrium_order=None,
collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == sp.Matrix((0,) * stencil.Q)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.longrun
def test_full_and_delta_population_space_equivalence(stencil, compressible):
stencil = LBStencil(stencil)
zero_centered = True
omega = sp.Symbol('omega')
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = { delta_rho : rho - rho_background }
eqs = []
for delta_eq in [False, True]:
method = create_srt(stencil, omega, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq, equilibrium_order=None,
collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
eq = method.get_collision_rule().new_without_subexpressions()
eq = sp.Matrix([a.rhs for a in eq])
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == sp.Matrix((0,) * stencil.Q)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('delta_eq', [True, False])
def test_zero_centering_equilibrium_equivalence(stencil, compressible, delta_eq):
stencil = LBStencil(stencil)
omega = sp.Symbol('omega')
weights = sp.Matrix(get_weights(stencil))
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = { delta_rho : rho - rho_background }
eqs = []
for zero_centered in [False, True]:
method = create_srt(stencil, omega, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq and zero_centered,
equilibrium_order=None,
collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == weights
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('delta_eq', [True, False])
@pytest.mark.longrun
def test_zero_centering_population_space_equivalence(stencil, compressible, delta_eq):
stencil = LBStencil(stencil)
omega = sp.Symbol('omega')
weights = sp.Matrix(get_weights(stencil))
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
delta_f = sp.symbols(f"delta_f_:{stencil.Q}")
subs = { delta_rho : rho - rho_background }
eqs = []
for zero_centered in [False, True]:
method = create_srt(stencil, omega, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq and zero_centered,
equilibrium_order=None,
collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
eq = method.get_collision_rule().new_without_subexpressions()
eq = sp.Matrix([a.rhs for a in eq])
if zero_centered:
eq = eq.subs({f : df for f, df in zip(method.pre_collision_pdf_symbols, delta_f)})
else:
eq = eq.subs({f : w + df for f, df, w in zip(method.pre_collision_pdf_symbols, delta_f, weights)})
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == weights
"""
Moment-based methods are created by specifying moments and their equilibrium value.
This test checks if the equilibrium formula obtained by this method is the same as the explicitly
given discrete_maxwellian_equilibrium
"""
import pytest
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil, Method
from lbmpy.maxwellian_equilibrium import discrete_maxwellian_equilibrium
from lbmpy.methods import create_mrt_orthogonal, create_srt, create_trt, mrt_orthogonal_modes_literature
from lbmpy.moments import is_bulk_moment, is_shear_moment
from lbmpy.relaxationrates import get_shear_relaxation_rate
from lbmpy.stencils import LBStencil
def check_for_matching_equilibrium(method_name, stencil, compressibility):
omega = sp.Symbol("omega")
if method_name == Method.SRT:
method = create_srt(stencil, omega, compressible=compressibility,
continuous_equilibrium=False, equilibrium_order=2)
elif method_name == Method.TRT:
method = create_trt(stencil, omega, omega, compressible=compressibility,
continuous_equilibrium=False, equilibrium_order=2)
elif method_name == Method.MRT:
method = create_mrt_orthogonal(stencil, [omega] * stencil.Q, continuous_equilibrium=False,
weighted=False, compressible=compressibility, equilibrium_order=2)
else:
raise ValueError("Unknown method")
reference_equilibrium = discrete_maxwellian_equilibrium(stencil, order=2,
c_s_sq=sp.Rational(1, 3), compressible=compressibility)
equilibrium = method.get_equilibrium()
equilibrium = equilibrium.new_without_subexpressions(subexpressions_to_keep=sp.symbols("rho u_0 u_1 u_2"))
diff = sp.Matrix(reference_equilibrium) - sp.Matrix([eq.rhs for eq in equilibrium.main_assignments])
diff = sp.simplify(diff)
assert sum(diff).is_zero
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('method', [Method.SRT, Method.TRT, Method.MRT])
def test_for_matching_equilibrium_for_stencil(stencil, method):
stencil = LBStencil(stencil)
check_for_matching_equilibrium(method, stencil, True)
check_for_matching_equilibrium(method, stencil, False)
def test_relaxation_rate_setter():
o1, o2, o3 = sp.symbols("o1 o2 o3")
lbm_config_1 = LBMConfig(method=Method.SRT, stencil=LBStencil(Stencil.D2Q9), relaxation_rates=[o3])
lbm_config_2 = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D2Q9), relaxation_rates=[o3, o3, o3, o3])
lbm_config_3 = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D2Q9), zero_centered=False,
relaxation_rates=[o3] * 9, entropic=True)
method = create_lb_method(lbm_config=lbm_config_1)
method2 = create_lb_method(lbm_config=lbm_config_2)
method3 = create_lb_method(lbm_config=lbm_config_3)
method.set_zeroth_moment_relaxation_rate(o1)
method.set_first_moment_relaxation_rate(o2)
assert get_shear_relaxation_rate(method) == o3
method.set_zeroth_moment_relaxation_rate(o3)
method.set_first_moment_relaxation_rate(o3)
method2.set_conserved_moments_relaxation_rate(o3)
assert method.collision_matrix == method2.collision_matrix == method3.collision_matrix
def test_mrt_orthogonal():
m_ref = {}
moments = mrt_orthogonal_modes_literature(LBStencil(Stencil.D2Q9), True)
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D2Q9), continuous_equilibrium=True,
nested_moments=moments)
m = create_lb_method(lbm_config=lbm_config)
assert m.is_weighted_orthogonal
m_ref[(Stencil.D2Q9, True)] = m
moments = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q15), True)
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D3Q15), continuous_equilibrium=True,
nested_moments=moments)
m = create_lb_method(lbm_config=lbm_config)
assert m.is_weighted_orthogonal
m_ref[(Stencil.D3Q15, True)] = m
moments = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q19), True)
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D3Q19), continuous_equilibrium=True,
nested_moments=moments)
m = create_lb_method(lbm_config=lbm_config)
assert m.is_weighted_orthogonal
m_ref[(Stencil.D3Q19, True)] = m
moments = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q27), False)
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D3Q27), continuous_equilibrium=True,
nested_moments=moments)
m = create_lb_method(lbm_config=lbm_config)
assert m.is_orthogonal
m_ref[(Stencil.D3Q27, False)] = m
for weighted in [True, False]:
for stencil in [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27]:
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(stencil), continuous_equilibrium=True,
weighted=weighted)
m = create_lb_method(lbm_config=lbm_config)
if weighted:
assert m.is_weighted_orthogonal
else:
assert m.is_orthogonal
bulk_moments = set([mom for mom in m.moments if is_bulk_moment(mom, m.dim)])
shear_moments = set([mom for mom in m.moments if is_shear_moment(mom, m.dim)])
assert len(bulk_moments) == 1
assert len(shear_moments) == 1 + (m.dim - 2) + m.dim * (m.dim - 1) / 2
if (stencil, weighted) in m_ref:
ref = m_ref[(stencil, weighted)]
bulk_moments_lit = set([mom for mom in ref.moments if is_bulk_moment(mom, ref.dim)])
shear_moments_lit = set([mom for mom in ref.moments if is_shear_moment(mom, ref.dim)])
if stencil != Stencil.D3Q27: # this one uses a different linear combination in literature
assert shear_moments == shear_moments_lit
assert bulk_moments == bulk_moments_lit
from lbmpy.moments import *
from lbmpy.stencils import get_stencil
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
def test_moment_permutation_multiplicity():
......@@ -69,7 +70,34 @@ def test_gram_schmidt_orthogonalization():
moments = moments_up_to_component_order(2, 2)
assert len(moments) == 9
stencil = get_stencil("D2Q9")
stencil = LBStencil(Stencil.D2Q9)
orthogonal_moments = gram_schmidt(moments, stencil)
pdfs_to_moments = moment_matrix(orthogonal_moments, stencil)
assert (pdfs_to_moments * pdfs_to_moments.T).is_diagonal()
def test_is_bulk_moment():
x, y, z = MOMENT_SYMBOLS
assert not is_bulk_moment(x, 2)
assert not is_bulk_moment(x ** 3, 2)
assert not is_bulk_moment(x * y, 2)
assert not is_bulk_moment(x ** 2, 2)
assert not is_bulk_moment(x ** 2 + y ** 2, 3)
assert is_bulk_moment(x ** 2 + y ** 2, 2)
assert is_bulk_moment(x ** 2 + y ** 2 + z ** 2, 3)
assert is_bulk_moment(x ** 2 + y ** 2 + x, 2)
assert is_bulk_moment(x ** 2 + y ** 2 + 1, 2)
def test_is_shear_moment():
x, y, z = MOMENT_SYMBOLS
assert not is_shear_moment(x ** 3, 2)
assert not is_shear_moment(x, 2)
assert not is_shear_moment(x ** 2 + y ** 2, 2)
assert not is_shear_moment(x ** 2 + y ** 2 + z ** 2, 3)
assert is_shear_moment(x ** 2, 2)
assert is_shear_moment(x ** 2 - 1, 2)
assert is_shear_moment(x ** 2 - x, 2)
assert is_shear_moment(x * y, 2)
assert is_shear_moment(x * y - 1, 2)
assert is_shear_moment(x * y - x, 2)
import pytest
import sympy as sp
from pystencils.sympyextensions import is_constant
from lbmpy import Stencil, LBStencil, Method, create_lb_collision_rule, LBMConfig, LBMOptimisation
@pytest.mark.parametrize('method', [Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
def test_mrt_simplifications(method: Method):
stencil = Stencil.D3Q19
lbm_config = LBMConfig(stencil=stencil, method=method, compressible=True)
lbm_opt = LBMOptimisation(simplification='auto')
cr = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
for subexp in cr.subexpressions:
rhs = subexp.rhs
# Check for aliases
assert not isinstance(rhs, sp.Symbol)
# Check for logarithms
assert not rhs.atoms(sp.log)
# Check for nonextracted constant summands or factors
exprs = rhs.atoms(sp.Add, sp.Mul)
for expr in exprs:
for arg in expr.args:
if isinstance(arg, sp.Number):
if arg not in {sp.Number(1), sp.Number(-1), sp.Float(1), sp.Float(-1)}:
breakpoint()
# Check for divisions
if not (isinstance(rhs, sp.Pow) and rhs.args[1] < 0):
powers = rhs.atoms(sp.Pow)
for p in powers:
assert p.args[1] > 0
from lbmpy.updatekernels import create_stream_pull_with_output_kernel
from lbmpy import create_lb_update_rule, relaxation_rate_from_lattice_viscosity, ForceModel, Method, LBStencil
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from pystencils.boundaries.boundaryhandling import BoundaryHandling
from pystencils.boundaries.boundaryconditions import Neumann, Dirichlet
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries import NoSlip
from lbmpy._compat import IS_PYSTENCILS_2
from lbmpy.oldroydb import *
import pytest
# # Lattice Boltzmann with Finite-Volume Oldroyd-B
# # taken from the electronic supplement of https://doi.org/10.1140/epje/s10189-020-00005-6,
# # available at https://doi.org/10.24416/UU01-2AFZSW
pytest.importorskip('scipy.optimize')
@pytest.mark.xfail(IS_PYSTENCILS_2, reason="Staggered kernels are unavailable")
def test_oldroydb():
import scipy.optimize
# ## Definitions
L = (34, 34)
lambda_p = sp.Symbol("lambda_p")
eta_p = sp.Symbol("eta_p")
lb_stencil = LBStencil("D2Q9")
fv_stencil = LBStencil("D2Q9")
eta = 1 - eta_p
omega = relaxation_rate_from_lattice_viscosity(eta)
f_pre = 0.00001
# ## Data structures
dh = ps.create_data_handling(L, periodicity=(True, False), default_target=ps.Target.CPU)
opts = {'cpu_openmp': False,
'cpu_vectorize_info': None,
'target': dh.default_target}
src = dh.add_array('src', values_per_cell=len(lb_stencil), layout='c')
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho', layout='c', latex_name='\\rho')
u = dh.add_array('u', values_per_cell=dh.dim, layout='c')
tauface = dh.add_array('tau_face', values_per_cell=(len(fv_stencil) // 2, dh.dim, dh.dim), latex_name='\\tau_f',
field_type=ps.FieldType.STAGGERED, layout='c')
tau = dh.add_array('tau', values_per_cell=(dh.dim, dh.dim), layout='c', latex_name='\\tau')
tauflux = dh.add_array('j_tau', values_per_cell=(len(fv_stencil) // 2, dh.dim, dh.dim), latex_name='j_\\tau',
field_type=ps.FieldType.STAGGERED_FLUX, layout='c')
F = dh.add_array('F', values_per_cell=dh.dim, layout='c')
fluxbh = BoundaryHandling(dh, tauflux.name, fv_stencil, name="flux_boundary_handling",
openmp=opts['cpu_openmp'], target=dh.default_target)
ubh = BoundaryHandling(dh, u.name, lb_stencil, name="velocity_boundary_handling",
openmp=opts['cpu_openmp'], target=dh.default_target)
taufacebh = BoundaryHandling(dh, tauface.name, fv_stencil, name="tauface_boundary_handling",
openmp=opts['cpu_openmp'], target=dh.default_target)
# ## Solver
collision = create_lb_update_rule(stencil=lb_stencil,
method=Method.TRT,
relaxation_rate=omega,
compressible=True,
force_model=ForceModel.GUO,
force=F.center_vector + sp.Matrix([f_pre, 0]),
kernel_type='collide_only',
optimization={'symbolic_field': src})
stream = create_stream_pull_with_output_kernel(collision.method, src, dst, {'density': ρ, 'velocity': u})
lbbh = LatticeBoltzmannBoundaryHandling(collision.method, dh, src.name,
openmp=opts['cpu_openmp'], target=dh.default_target)
stream_kernel = ps.create_kernel(stream, **opts).compile()
collision_kernel = ps.create_kernel(collision, **opts).compile()
ob = OldroydB(dh.dim, u, tau, F, tauflux, tauface, lambda_p, eta_p)
flux_kernel = ps.create_staggered_kernel(ob.flux(), **opts).compile()
tauface_kernel = ps.create_staggered_kernel(ob.tauface(), **opts).compile()
continuity_kernel = ps.create_kernel(ob.continuity(), **opts).compile()
force_kernel = ps.create_kernel(ob.force(), **opts).compile()
# ## Set up the simulation
init = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
pdfs=src.center_vector, density=ρ.center)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
# no-slip for the fluid, no-flux for the stress
noslip = NoSlip()
noflux = Flux(fv_stencil)
nostressdiff = Flux(fv_stencil, tau.center_vector)
# put some good values into the boundaries so we can take derivatives
noforce = Neumann() # put the same stress into the boundary cells that is in the nearest fluid cell
noflow = Dirichlet((0,) * dh.dim) # put zero velocity into the boundary cells
lbbh.set_boundary(noslip, ps.make_slice[:, :4])
lbbh.set_boundary(noslip, ps.make_slice[:, -4:])
fluxbh.set_boundary(noflux, ps.make_slice[:, :4])
fluxbh.set_boundary(noflux, ps.make_slice[:, -4:])
ubh.set_boundary(noflow, ps.make_slice[:, :4])
ubh.set_boundary(noflow, ps.make_slice[:, -4:])
taufacebh.set_boundary(nostressdiff, ps.make_slice[:, :4])
taufacebh.set_boundary(nostressdiff, ps.make_slice[:, -4:])
for bh in lbbh, fluxbh, ubh, taufacebh:
assert len(bh._boundary_object_to_boundary_info) == 1, "Restart kernel to clear boundaries"
def init():
dh.fill(ρ.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(ρ.name, 1)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0)
dh.fill(tau.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(tau.name, 0)
dh.fill(tauflux.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(tauface.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(F.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(F.name, 0) # needed for LB initialization
sync_tau() # force calculation inside the initialization needs neighbor taus
dh.run_kernel(init_kernel)
dh.fill(F.name, np.nan)
sync_pdfs = dh.synchronization_function([src.name]) # needed before stream, but after collision
sync_u = dh.synchronization_function([u.name]) # needed before continuity, but after stream
sync_tau = dh.synchronization_function([tau.name]) # needed before flux and tauface, but after continuity
def time_loop(steps, lambda_p_val, eta_p_val):
dh.all_to_gpu()
vmid = np.empty((2, steps // 10 + 1))
sync_tau()
sync_u()
ubh()
i = -1
for i in range(steps):
dh.run_kernel(flux_kernel)
fluxbh() # zero the fluxes into/out of boundaries
dh.run_kernel(continuity_kernel, **{lambda_p.name: lambda_p_val, eta_p.name: eta_p_val})
sync_tau()
dh.run_kernel(tauface_kernel) # needed for force
taufacebh()
dh.run_kernel(force_kernel)
dh.run_kernel(collision_kernel, **{eta_p.name: eta_p_val})
sync_pdfs()
lbbh() # bounce-back populations into boundaries
dh.run_kernel(stream_kernel)
sync_u()
ubh() # need neighboring us for flux and continuity
dh.swap(src.name, dst.name)
if i % 10 == 0:
if u.name in dh.gpu_arrays:
dh.to_cpu(u.name)
uu = dh.gather_array(u.name)
uu = uu[L[0] // 2 - 1:L[0] // 2 + 1, L[1] // 2 - 1:L[1] // 2 + 1, 0].mean()
if np.isnan(uu):
raise Exception(f"NaN encountered after {i} steps")
vmid[:, i // 10] = [i, uu]
sync_u()
dh.all_to_cpu()
return vmid[:, :i // 10 + 1]
# ## Analytical solution
#
# comes from Waters and King, Unsteady flow of an elastico-viscous liquid, Rheologica Acta 9, 345–355 (1970).
def N(n):
return (2 * n - 1) * np.pi
def Alpha_n(N, El, eta_p):
return 1 + (1 - eta_p) * El * N * N / 4
def Beta_n(alpha_n, N, El):
return np.sqrt(np.abs(alpha_n * alpha_n - El * N * N))
def Gamma_n(N, El, eta_p):
return 1 - (1 + eta_p) * El * N * N / 4
def G(alpha_n, beta_n, gamma_n, flag, T):
if (flag):
return ((1.0 - gamma_n / beta_n) * np.exp(-(alpha_n + beta_n) * T / 2) +
(1.0 + gamma_n / beta_n) * np.exp((beta_n - alpha_n) * T / 2))
else:
return 2 * np.exp(-alpha_n * T / 2) * (np.cos(beta_n * T / 2) + (gamma_n / beta_n) * np.sin(beta_n * T / 2))
def W(T, El, eta_p):
W_ = 1.5
for n in range(1, 1000):
N_ = N(n)
alpha_n = Alpha_n(N_, El, eta_p)
if alpha_n * alpha_n - El * N_ * N_ < 0:
flag_ = False
else:
flag_ = True
beta_n = Beta_n(alpha_n, N_, El)
gamma_n = Gamma_n(N_, El, eta_p)
G_ = G(alpha_n, beta_n, gamma_n, flag_, T)
W_ -= 24 * (np.sin(N_ / 2) / (N_ * N_ * N_)) * G_
return W_
# ## Run the simulation
lambda_p_val = 3000
eta_p_val = 0.9
init()
vmid = time_loop(lambda_p_val * 4, lambda_p_val, eta_p_val)
actual_width = sum(dh.gather_array(lbbh.flag_array_name)[L[0] // 2, :] == 1)
uref = float(f_pre * actual_width ** 2 / (8 * (eta + eta_p)))
Wi = lambda_p_val * uref / (actual_width / 2)
Re = uref * (actual_width / 2) / (eta + eta_p)
El = float(Wi / Re)
pref = 1 / W(vmid[0, -1] / lambda_p_val, El, eta_p_val)
El_measured, pref_measured = scipy.optimize.curve_fit(lambda a, b, c: W(a, b, eta_p_val) * c,
vmid[0, :] / lambda_p_val, vmid[1, :] / vmid[1, -1],
p0=(El, pref))[0]
measured_width = np.sqrt(4 * lambda_p_val * float(eta + eta_p) / El_measured)
print(f"El={El}, El_measured={El_measured}")
print(f"L={actual_width}, L_measured={measured_width}")
assert abs(measured_width - actual_width) < 1, "effective channel width differs significantly from defined width"
an = W(vmid[0, :] / lambda_p_val, El, eta_p_val) * pref
an_measured = W(vmid[0, :] / lambda_p_val, El_measured, eta_p_val) * pref_measured
diff = abs(vmid[1, :] / vmid[1, -1] - an_measured) / an_measured
assert diff[lambda_p_val // 5:].max() < 0.03, "maximum velocity deviation is too large"
# from pystencils import plot as plt
#
# plt.xlabel("$t$")
# plt.ylabel(r"$u_{max}/u_{max}^{Newtonian}$")
# plt.plot(vmid[0,:], vmid[1,:]/vmid[1,-1] if vmid[1,-1] != 0 else 0, label='FVM')
# plt.plot(vmid[0,:], np.ones_like(vmid[0,:]), 'k--', label='Newtonian')
#
# plt.plot(vmid[0,:], an, label="analytic")
# plt.plot(vmid[0,:], an_measured, label="analytic, fit width")
# plt.legend()
#
# if eta_p_val == 0.1:
# plt.ylim(0.9, 1.15)
# elif lambda_p_val == 9000:
# plt.ylim(0.8, 1.5)
# elif eta_p_val == 0.3:
# plt.ylim(0.8, 1.4)
# plt.show()
import os
from tempfile import TemporaryDirectory
import shutil
import pytest
import numpy as np
......@@ -7,6 +10,7 @@ import lbmpy.plot as plt
from lbmpy.scenarios import create_lid_driven_cavity
@pytest.mark.skipif(shutil.which('ffmpeg') is None, reason="ffmpeg not available")
def test_animation():
ldc = create_lid_driven_cavity((10, 10), relaxation_rate=1.8)
......
"""
Test Poiseuille flow against analytical solution
"""
import pytest
from lbmpy.enums import Stencil, CollisionSpace
import pystencils as ps
from . import poiseuille
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('stencil_name', (Stencil.D2Q9, Stencil.D3Q19))
@pytest.mark.parametrize('zero_centered', (False, True))
@pytest.mark.parametrize('moment_space_collision', (False, True))
def test_poiseuille_channel(target, stencil_name, zero_centered, moment_space_collision):
# Cuda
if target == ps.Target.GPU:
import pytest
pytest.importorskip("cupy")
cspace_info = CollisionSpace.RAW_MOMENTS if moment_space_collision else CollisionSpace.POPULATIONS
poiseuille.poiseuille_channel(target=target,
stencil_name=stencil_name,
zero_centered=zero_centered,
collision_space_info=cspace_info)