Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • ravi.k.ayyala/lbmpy
  • brendan-waters/lbmpy
  • anirudh.jonnalagadda/lbmpy
  • jbadwaik/lbmpy
  • alexander.reinauer/lbmpy
  • itischler/lbmpy
  • he66coqe/lbmpy
  • ev81oxyl/lbmpy
  • Bindgen/lbmpy
  • da15siwa/lbmpy
  • holzer/lbmpy
  • RudolfWeeber/lbmpy
  • pycodegen/lbmpy
13 results
Select Git revision
Show changes
Showing
with 1704 additions and 16 deletions
import pytest
import numpy as np
from pystencils.slicing import slice_from_direction
from lbmpy import LBMConfig, LBStencil, Stencil, Method
from lbmpy.boundaries.boundaryconditions import NoSlip, NoSlipLinearBouzidi, QuadraticBounceBack, UBB
from lbmpy.lbstep import LatticeBoltzmannStep
def check_velocity(noslip_velocity, interpolated_velocity, wall_distance):
# First we check if the flow is fully developed
np.testing.assert_almost_equal(np.gradient(np.gradient(noslip_velocity)), 0, decimal=8)
np.testing.assert_almost_equal(np.gradient(np.gradient(interpolated_velocity)), 0, decimal=8)
# If the wall is closer to the first fluid cell we expect a lower velocity at the first fluid cell
if wall_distance < 0.5:
assert noslip_velocity[0] > interpolated_velocity[0]
# If the wall is further away from the first fluid cell we expect a higher velocity at the first fluid cell
if wall_distance > 0.5:
assert noslip_velocity[0] < interpolated_velocity[0]
# If the wall cuts the cell halfway the interpolation BC should approximately fall back to a noslip boundary
if wall_distance == 0.5:
np.testing.assert_almost_equal(noslip_velocity[0], interpolated_velocity[0], decimal=2)
def couette_flow(stencil, method_enum, zero_centered, wall_distance, compressible):
dim = stencil.D
if dim == 2:
domain_size = (50, 25)
wall_velocity = (0.01, 0)
periodicity = (True, False)
else:
domain_size = (50, 25, 25)
wall_velocity = (0.01, 0, 0)
periodicity = (True, False, True)
timesteps = 10000
omega = 1.1
lbm_config = LBMConfig(stencil=stencil,
method=method_enum,
relaxation_rate=omega,
zero_centered=zero_centered,
compressible=compressible,
weighted=True)
lb_step_noslip = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
lb_step_bouzidi = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
lb_step_quadratic_bb = LatticeBoltzmannStep(domain_size=domain_size, periodicity=periodicity,
lbm_config=lbm_config, compute_velocity_in_every_step=True)
def init_wall_distance(boundary_data, **_):
for cell in boundary_data.index_array:
cell['q'] = wall_distance
moving_wall = UBB(wall_velocity)
noslip = NoSlip("wall")
bouzidi = NoSlipLinearBouzidi("wall", init_wall_distance=init_wall_distance)
quadratic_bb = QuadraticBounceBack(omega, "wall", init_wall_distance=init_wall_distance)
lb_step_noslip.boundary_handling.set_boundary(noslip, slice_from_direction('S', dim))
lb_step_noslip.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_bouzidi.boundary_handling.set_boundary(bouzidi, slice_from_direction('S', dim))
lb_step_bouzidi.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_quadratic_bb.boundary_handling.set_boundary(quadratic_bb, slice_from_direction('S', dim))
lb_step_quadratic_bb.boundary_handling.set_boundary(moving_wall, slice_from_direction('N', dim))
lb_step_noslip.run(timesteps)
lb_step_bouzidi.run(timesteps)
lb_step_quadratic_bb.run(timesteps)
if dim == 2:
noslip_velocity = lb_step_noslip.velocity[domain_size[0] // 2, :, 0]
bouzidi_velocity = lb_step_bouzidi.velocity[domain_size[0] // 2, :, 0]
quadratic_bb_velocity = lb_step_quadratic_bb.velocity[domain_size[0] // 2, :, 0]
else:
noslip_velocity = lb_step_noslip.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
bouzidi_velocity = lb_step_bouzidi.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
quadratic_bb_velocity = lb_step_quadratic_bb.velocity[domain_size[0] // 2, :, domain_size[2] // 2, 0]
check_velocity(noslip_velocity, bouzidi_velocity, wall_distance)
check_velocity(noslip_velocity, quadratic_bb_velocity, wall_distance)
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("compressible", [True, False])
def test_couette_flow_short(zero_centered, wall_distance, compressible):
stencil = LBStencil(Stencil.D2Q9)
couette_flow(stencil, Method.SRT, zero_centered, wall_distance, compressible)
@pytest.mark.parametrize("method_enum", [Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize("zero_centered", [False, True])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("compressible", [True, False])
@pytest.mark.longrun
def test_couette_flow_long(method_enum, zero_centered, wall_distance, compressible):
if method_enum is Method.CUMULANT and compressible is False:
pytest.skip("incompressible cumulant is not supported")
stencil = LBStencil(Stencil.D2Q9)
couette_flow(stencil, method_enum, zero_centered, wall_distance, compressible)
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
@pytest.mark.parametrize("wall_distance", [0.1, 0.5, 0.9])
@pytest.mark.parametrize("stencil", [Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.longrun
def test_couette_flow_d3d(method_enum, wall_distance, stencil):
stencil = LBStencil(stencil)
couette_flow(stencil, method_enum, True, wall_distance, True)
"""
Test the lbmpy-specific JSON encoder and serializer as used in the Database class.
"""
import tempfile
import pystencils as ps
from lbmpy import Stencil, Method, ForceModel, SubgridScaleModel
from lbmpy.advanced_streaming import Timestep
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation, LBStencil
from lbmpy.fieldaccess import StreamPullTwoFieldsAccessor
from pystencils.runhelper import Database
from lbmpy.db import LbmpyJsonSerializer
def test_json_serializer():
stencil = LBStencil(Stencil.D3Q27)
q = stencil.Q
pdfs, pdfs_tmp = ps.fields(f"pdfs({q}), pdfs_tmp({q}): double[3D]", layout='fzyx')
density = ps.fields(f"rho: double[3D]", layout='fzyx')
from lbmpy.non_newtonian_models import CassonsParameters
cassons_params = CassonsParameters(0.2)
# create dummy lbmpy config
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q27), method=Method.CUMULANT, force_model=ForceModel.GUO,
compressible=True, relaxation_rate=1.999, subgrid_scale_model=SubgridScaleModel.SMAGORINSKY,
galilean_correction=True, cassons=cassons_params, density_input=density,
kernel_type=StreamPullTwoFieldsAccessor, timestep=Timestep.BOTH)
lbm_optimization = LBMOptimisation(cse_pdfs=False, cse_global=False, builtin_periodicity=(True, False, False),
symbolic_field=pdfs, symbolic_temporary_field=pdfs_tmp)
# create dummy database
temp_dir = tempfile.TemporaryDirectory()
db = Database(file=temp_dir.name, serializer_info=('lbmpy_serializer', LbmpyJsonSerializer))
db.save(params={'lbm_config': lbm_config, 'lbm_optimization': lbm_optimization}, result={'test': 'dummy'})
import numpy as np
import pytest
from types import MappingProxyType
from pystencils import Target, CreateKernelConfig
from lbmpy.scenarios import create_fully_periodic_flow, create_lid_driven_cavity
from lbmpy import SubgridScaleModel
try:
import pycuda.driver
import cupy
gpu_available = True
except ImportError:
gpu_available = False
......@@ -30,18 +33,18 @@ def test_data_handling_3d():
for gpu in [False, True] if gpu_available else [False]:
if parallel and gpu and not hasattr(wLB, 'cuda'):
continue
print("Testing parallel: %s\tgpu: %s" % (parallel, gpu))
opt_params = {'target': 'gpu' if gpu else 'cpu',
'gpu_indexing_params': {'block_size': (8, 4, 2)}}
print(f"Testing parallel: {parallel}\tgpu: {gpu}")
config = CreateKernelConfig(target=Target.GPU if gpu else Target.CPU,
gpu_indexing_params=MappingProxyType({'block_size': (8, 4, 2)}))
if parallel:
from pystencils.datahandling import ParallelDataHandling
blocks = wLB.createUniformBlockGrid(blocks=(2, 3, 4), cellsPerBlock=(5, 5, 5),
oneBlockPerProcess=False)
dh = ParallelDataHandling(blocks, dim=3)
rho = ldc_setup(data_handling=dh, optimization=opt_params)
rho = ldc_setup(data_handling=dh, config=config)
results.append(rho)
else:
rho = ldc_setup(domain_size=(10, 15, 20), parallel=False, optimization=opt_params)
rho = ldc_setup(domain_size=(10, 15, 20), parallel=False, config=config)
results.append(rho)
for i, arr in enumerate(results[1:]):
print("Testing equivalence version 0 with version %d" % (i + 1,))
......@@ -56,26 +59,32 @@ def test_data_handling_2d():
if parallel and gpu and not hasattr(wLB, 'cuda'):
continue
print("Testing parallel: %s\tgpu: %s" % (parallel, gpu))
opt_params = {'target': 'gpu' if gpu else 'cpu',
'gpu_indexing_params': {'block_size': (8, 4, 2)}}
print(f"Testing parallel: {parallel}\tgpu: {gpu}")
config = CreateKernelConfig(target=Target.GPU if gpu else Target.CPU,
gpu_indexing_params=MappingProxyType({'block_size': (8, 4, 2)}))
if parallel:
from pystencils.datahandling import ParallelDataHandling
blocks = wLB.createUniformBlockGrid(blocks=(2, 3, 1), cellsPerBlock=(5, 5, 1),
oneBlockPerProcess=False)
dh = ParallelDataHandling(blocks, dim=2)
rho = ldc_setup(data_handling=dh, optimization=opt_params)
rho = ldc_setup(data_handling=dh, config=config)
results.append(rho)
else:
rho = ldc_setup(domain_size=(10, 15), parallel=False, optimization=opt_params)
rho = ldc_setup(domain_size=(10, 15), parallel=False, config=config)
results.append(rho)
for i, arr in enumerate(results[1:]):
print("Testing equivalence version 0 with version %d" % (i + 1,))
print(f"Testing equivalence version 0 with version {i + 1}")
np.testing.assert_almost_equal(results[0], arr)
def test_smagorinsky_setup():
step = create_lid_driven_cavity((30, 30), smagorinsky=0.16, relaxation_rate=1.99)
step = create_lid_driven_cavity((30, 30), subgrid_scale_model=(SubgridScaleModel.SMAGORINSKY, 0.16),
relaxation_rate=1.99)
step.run(10)
def test_qr_setup():
step = create_lid_driven_cavity((30, 30), subgrid_scale_model=(SubgridScaleModel.QR, 0.33),
relaxation_rate=1.99)
step.run(10)
......@@ -89,7 +98,7 @@ def test_advanced_initialization():
init_vel[:, height // 3: height // 3 * 2, 0] = -velocity_magnitude
# small random y velocity component
init_vel[:, :, 1] = 0.1 * velocity_magnitude * np.random.rand(width, height)
shear_flow_scenario = create_fully_periodic_flow(initial_velocity=init_vel, relaxation_rate=1.95)
shear_flow_scenario = create_fully_periodic_flow(initial_velocity=init_vel, relaxation_rate=1.99)
with pytest.raises(ValueError) as e:
shear_flow_scenario.run_iterative_initialization(max_steps=20000, check_residuum_after=500)
assert 'did not converge' in str(e.value)
......
from functools import partial
import pystencils as ps
from pystencils.astnodes import LoopOverCoordinate
from pystencils.slicing import get_periodic_boundary_functor
from lbmpy.creationfunctions import create_lb_update_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Stencil, ForceModel
from lbmpy.stencils import LBStencil
from lbmpy.updatekernels import create_stream_pull_with_output_kernel
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from lbmpy.relaxationrates import lattice_viscosity_from_relaxation_rate
import sympy as sp
import numpy as np
def get_le_boundary_functor(neighbor_stencil, shear_offset, ghost_layers=1, thickness=None, n=64):
functor_2 = get_periodic_boundary_functor(neighbor_stencil, ghost_layers, thickness)
def functor(pdfs, **_):
functor_2(pdfs)
weight = np.fmod(shear_offset[0] + n, 1.)
# First, we interpolate the upper pdfs
for i in range(len(pdfs[:, ghost_layers, :])):
ind1 = int(np.floor(i - shear_offset[0]) % n)
ind2 = int(np.ceil(i - shear_offset[0]) % n)
res = (1 - weight) * pdfs[ind1, ghost_layers, :] + weight * pdfs[ind2, ghost_layers, :]
pdfs[i, -ghost_layers, :] = res
# Second, we interpolate the lower pdfs
for i in range(len(pdfs[:, -ghost_layers, :])):
ind1 = int(np.floor(i + shear_offset[0]) % n)
ind2 = int(np.ceil(i + shear_offset[0]) % n)
res = (1 - weight) * pdfs[ind1, -ghost_layers - 1, :] + weight * pdfs[ind2, -ghost_layers - 1, :]
pdfs[i, ghost_layers - 1, :] = res
return functor
def get_solution_navier_stokes(x, t, viscosity, velocity=1.0, height=1.0, max_iterations=10):
w = x / height - 0.5
for k in np.arange(1, max_iterations + 1):
w += 1.0 / (np.pi * k) * np.exp(-4 * np.pi ** 2 * viscosity * k ** 2 / height ** 2 * t) * np.sin(
2 * np.pi / height * k * x)
return velocity * w
def test_lees_edwards():
domain_size = (64, 64)
omega = 1.0 # relaxation rate of first component
shear_velocity = 0.1 # shear velocity
shear_dir = 0 # direction of shear flow
shear_dir_normal = 1 # direction normal to shear plane, for interpolation
stencil = LBStencil(Stencil.D2Q9)
dh = ps.create_data_handling(domain_size, periodicity=True, default_target=ps.Target.CPU)
src = dh.add_array('src', values_per_cell=stencil.Q)
dh.fill('src', 1.0, ghost_layers=True)
dst = dh.add_array_like('dst', 'src')
dh.fill('dst', 0.0, ghost_layers=True)
force = dh.add_array('force', values_per_cell=stencil.D)
dh.fill('force', 0.0, ghost_layers=True)
rho = dh.add_array('rho', values_per_cell=1)
dh.fill('rho', 1.0, ghost_layers=True)
u = dh.add_array('u', values_per_cell=stencil.D)
dh.fill('u', 0.0, ghost_layers=True)
counters = [LoopOverCoordinate.get_loop_counter_symbol(i) for i in range(stencil.D)]
points_up = sp.Symbol('points_up')
points_down = sp.Symbol('points_down')
u_p = sp.Piecewise((1, sp.And(counters[1] <= 1, points_down)),
(-1, sp.And(counters[1] >= src.shape[1] - 2, points_up)), (0, True)) * shear_velocity
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=omega, compressible=True,
velocity_input=u.center_vector + sp.Matrix([u_p, 0]), density_input=rho,
force_model=ForceModel.LUO, force=force.center_vector,
kernel_type='collide_only')
lbm_opt = LBMOptimisation(symbolic_field=src)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
to_insert = [s.lhs for s in collision.subexpressions
if collision.method.first_order_equilibrium_moment_symbols[shear_dir]
in s.free_symbols]
for s in to_insert:
collision = collision.new_with_inserted_subexpression(s)
ma = []
for a, c in zip(collision.main_assignments, collision.method.stencil):
if c[shear_dir_normal] == -1:
b = (True, False)
elif c[shear_dir_normal] == 1:
b = (False, True)
else:
b = (False, False)
a = ps.Assignment(a.lhs, a.rhs.replace(points_down, b[0]))
a = ps.Assignment(a.lhs, a.rhs.replace(points_up, b[1]))
ma.append(a)
collision.main_assignments = ma
stream = create_stream_pull_with_output_kernel(collision.method, src, dst,
{'density': rho, 'velocity': u})
config = ps.CreateKernelConfig(target=dh.default_target)
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
init = macroscopic_values_setter(collision.method, velocity=(0, 0),
pdfs=src.center_vector, density=rho.center)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
offset = [0.0]
sync_pdfs = dh.synchronization_function([src.name],
functor=partial(get_le_boundary_functor, shear_offset=offset))
dh.run_kernel(init_kernel)
time = 500
dh.all_to_gpu()
for i in range(time):
dh.run_kernel(collision_kernel)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
offset[0] += shear_velocity
dh.all_to_cpu()
nu = lattice_viscosity_from_relaxation_rate(omega)
h = domain_size[0]
k_max = 100
analytical_solution = get_solution_navier_stokes(np.linspace(0.5, h - 0.5, h), time, nu, shear_velocity, h, k_max)
np.testing.assert_array_almost_equal(analytical_solution, dh.gather_array(u.name)[0, :, 0], decimal=5)
dh.fill(rho.name, 1.0, ghost_layers=True)
dh.run_kernel(init_kernel)
dh.fill(u.name, 0.0, ghost_layers=True)
dh.fill('force', 0.0, ghost_layers=True)
dh.cpu_arrays[force.name][64 // 3, 1, :] = [1e-2, -1e-1]
offset[0] = 0
time = 20
dh.all_to_gpu()
for i in range(time):
dh.run_kernel(collision_kernel)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
dh.all_to_cpu()
vel_unshifted = np.array(dh.gather_array(u.name)[:, -3:-1, :])
dh.fill(rho.name, 1.0, ghost_layers=True)
dh.run_kernel(init_kernel)
dh.fill(u.name, 0.0, ghost_layers=True)
dh.fill('force', 0.0, ghost_layers=True)
dh.cpu_arrays[force.name][64 // 3, 1, :] = [1e-2, -1e-1]
offset[0] = 10
time = 20
dh.all_to_gpu()
for i in range(time):
dh.run_kernel(collision_kernel)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
dh.all_to_cpu()
vel_shifted = np.array(dh.gather_array(u.name)[:, -3:-1, :])
vel_rolled = np.roll(vel_shifted, -offset[0], axis=0)
np.testing.assert_array_almost_equal(vel_unshifted, vel_rolled)
import pytest
import numpy as np
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil, ForceModel, Method
from lbmpy.macroscopic_value_kernels import (
compile_macroscopic_values_getter, compile_macroscopic_values_setter, strain_rate_tensor_getter)
from lbmpy.advanced_streaming.utility import streaming_patterns, Timestep
from lbmpy.stencils import LBStencil
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('force_model', [ForceModel.GUO, ForceModel.LUO, None])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('zero_centered', [True, False])
@pytest.mark.parametrize('streaming_pattern', streaming_patterns)
@pytest.mark.parametrize('prev_timestep', [Timestep.EVEN, Timestep.ODD])
def test_set_get_density_velocity_with_fields(stencil, force_model, compressible, zero_centered, streaming_pattern, prev_timestep):
force = (0.1, 0.12, -0.17)
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, force_model=force_model, method=Method.TRT,
compressible=compressible, zero_centered=zero_centered, force=force)
method = create_lb_method(lbm_config=lbm_config)
ghost_layers = 1
inner_size = (3, 7, 4)[:method.dim]
total_size = tuple(s + 2 * ghost_layers for s in inner_size)
pdf_arr = np.zeros(total_size + (len(method.stencil),))
inner_slice = (slice(ghost_layers, -ghost_layers, 1), ) * method.dim
density_input_field = np.zeros(total_size)
velocity_input_field = np.zeros(total_size + (method.dim, ))
density_input_field[inner_slice] = 1 + 0.2 * (np.random.random_sample(inner_size) - 0.5)
velocity_input_field[inner_slice] = 0.1 * \
(np.random.random_sample(inner_size + (method.dim, )) - 0.5)
quantities_to_set = {'density': density_input_field, 'velocity': velocity_input_field}
setter = compile_macroscopic_values_setter(method, pdf_arr=pdf_arr, quantities_to_set=quantities_to_set,
ghost_layers=ghost_layers, streaming_pattern=streaming_pattern, previous_timestep=prev_timestep)
setter(pdf_arr)
getter = compile_macroscopic_values_getter(method, ['density', 'density_deviation', 'velocity'], pdf_arr=pdf_arr,
ghost_layers=ghost_layers, streaming_pattern=streaming_pattern, previous_timestep=prev_timestep)
inner_part = (slice(ghost_layers, -ghost_layers), ) * stencil.D
density_output_field = np.zeros_like(density_input_field)
density_deviation_output_field = np.zeros_like(density_input_field)
velocity_output_field = np.zeros_like(velocity_input_field)
getter(pdfs=pdf_arr, density=density_output_field, density_deviation=density_deviation_output_field, velocity=velocity_output_field)
np.testing.assert_almost_equal(density_input_field, density_output_field)
np.testing.assert_almost_equal(density_input_field[inner_part] - 1.0, density_deviation_output_field[inner_part])
np.testing.assert_almost_equal(velocity_input_field, velocity_output_field)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('force_model', [ForceModel.GUO, ForceModel.LUO, None])
@pytest.mark.parametrize('compressible', [True, False])
def test_set_get_constant_velocity(stencil, force_model, compressible):
ref_velocity = [0.01, -0.2, 0.1]
force = (0.1, 0.12, -0.17)
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, force_model=force_model, method=Method.TRT,
compressible=compressible, force=force)
method = create_lb_method(lbm_config=lbm_config)
size = (1, 1, 1)[:method.dim]
pdf_arr = np.zeros(size + (len(method.stencil),))
setter = compile_macroscopic_values_setter(method, pdf_arr=pdf_arr, ghost_layers=0,
quantities_to_set={'velocity': ref_velocity[:method.dim]}, )
setter(pdf_arr)
getter = compile_macroscopic_values_getter(method, ['velocity'], pdf_arr=pdf_arr, ghost_layers=0)
velocity_output_field = np.zeros(size + (method.dim, ))
getter(pdfs=pdf_arr, velocity=velocity_output_field)
if method.dim == 2:
computed_velocity = velocity_output_field[0, 0, :]
else:
computed_velocity = velocity_output_field[0, 0, 0, :]
np.testing.assert_almost_equal(np.array(ref_velocity[:method.dim]), computed_velocity)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
def test_get_strain_rate_tensor(stencil, method_enum):
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, method=method_enum, compressible=True)
method = create_lb_method(lbm_config=lbm_config)
pdfs = sp.symbols(f"f_:{stencil.Q}")
strain_rate_tensor = sp.symbols(f"strain_rate_tensor_:{method.dim * method.dim}")
getter_assignments = strain_rate_tensor_getter(method, strain_rate_tensor, pdfs)
assert len(getter_assignments) == method.dim * method.dim
import pytest
from lbmpy.cumulants import raw_moment_as_function_of_cumulants
from lbmpy.enums import Stencil
from lbmpy.maxwellian_equilibrium import *
from lbmpy.moments import (
MOMENT_SYMBOLS, exponents_to_polynomial_representations, moment_matrix,
moments_up_to_component_order, moments_up_to_order)
from lbmpy.stencils import LBStencil
from pystencils.sympyextensions import remove_higher_order_terms
def test_maxwellian_moments():
"""Check moments of continuous Maxwellian"""
rho = sp.Symbol("rho")
u = sp.symbols("u_0 u_1 u_2")
c_s = sp.Symbol("c_s")
eq_moments = get_equilibrium_values_of_maxwell_boltzmann_function(((0, 0, 0), (0, 0, 1)),
dim=3, rho=rho, u=u, c_s_sq=c_s ** 2,
space="moment")
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[2]
x, y, z = MOMENT_SYMBOLS
one = sp.Rational(1, 1)
eq_moments = get_equilibrium_values_of_maxwell_boltzmann_function((one, x, x ** 2, x * y),
dim=2, rho=rho, u=u[:2], c_s_sq=c_s ** 2,
space="moment")
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[0]
assert eq_moments[2] == rho * (c_s ** 2 + u[0] ** 2)
assert eq_moments[3] == rho * u[0] * u[1]
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
def test_continuous_discrete_moment_equivalence(stencil):
"""Check that moments up to order 3 agree with moments of the continuous Maxwellian"""
stencil = LBStencil(stencil)
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=stencil.D, include_permutations=False))
cm = sp.Matrix(get_equilibrium_values_of_maxwell_boltzmann_function(moments, order=2, dim=stencil.D,
c_s_sq=c_s_sq, space="moment"))
dm = sp.Matrix(get_moments_of_discrete_maxwellian_equilibrium(stencil, moments, order=2,
compressible=True, c_s_sq=c_s_sq))
diff = sp.simplify(cm - dm)
for d in diff:
assert d == 0
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q27])
def test_moment_cumulant_continuous_equivalence(stencil):
"""Test that discrete equilibrium is the same up to order 3 when obtained with following methods
* eq1: take moments of continuous Maxwellian and transform back to pdf space
* eq2: take cumulants of continuous Maxwellian, transform to moments then transform to pdf space
* eq3: take discrete equilibrium from LBM literature
* eq4: same as eq1 but with built-in function
"""
stencil = LBStencil(stencil)
u = sp.symbols(f"u_:{stencil.D}")
indices = tuple(moments_up_to_component_order(2, dim=stencil.D))
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D, u=u, c_s_sq=c_s_sq,
space="moment")
eq_cumulants = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D, u=u, c_s_sq=c_s_sq,
space="cumulant")
eq_cumulants = {idx: c for idx, c in zip(indices, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in indices]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q27])
def test_moment_cumulant_continuous_equivalence_polynomial_formulation(stencil):
"""Same as test above, but instead of index tuples, the polynomial formulation is used."""
stencil = LBStencil(stencil)
u = sp.symbols(f"u_:{stencil.D}")
index_tuples = tuple(moments_up_to_component_order(2, dim=stencil.D))
indices = exponents_to_polynomial_representations(index_tuples)
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D,
u=u, c_s_sq=c_s_sq, space="moment")
eq_cumulants = get_equilibrium_values_of_maxwell_boltzmann_function(indices, dim=stencil.D,
u=u, c_s_sq=c_s_sq, space="cumulant")
eq_cumulants = {idx: c for idx, c in zip(index_tuples, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in index_tuples]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4
import pytest
import sympy as sp
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
from lbmpy.moments import get_default_moment_set_for_stencil
from lbmpy.moment_transforms import (
PdfsToMomentsByMatrixTransform, PdfsToMomentsByChimeraTransform,
PdfsToCentralMomentsByShiftMatrix, PdfsToCentralMomentsByMatrix, FastCentralMomentTransform
)
transforms = [
PdfsToMomentsByMatrixTransform, PdfsToMomentsByChimeraTransform,
PdfsToCentralMomentsByShiftMatrix, PdfsToCentralMomentsByMatrix, FastCentralMomentTransform
]
@pytest.mark.parametrize('transform_class', transforms)
def test_monomial_equations(transform_class):
stencil = LBStencil(Stencil.D2Q9)
rho = sp.symbols("rho")
u = sp.symbols(f"u_:{stencil.D}")
moment_polynomials = get_default_moment_set_for_stencil(stencil)
transform = transform_class(stencil, moment_polynomials, rho, u)
pdfs = sp.symbols(f"f_:{stencil.Q}")
fw_eqs = transform.forward_transform(pdfs, return_monomials=True)
bw_eqs = transform.backward_transform(pdfs, start_from_monomials=True)
mono_symbols_pre = set(transform.pre_collision_monomial_symbols)
mono_symbols_post = set(transform.post_collision_monomial_symbols)
assert mono_symbols_pre <= set(fw_eqs.defined_symbols)
assert mono_symbols_post <= set(bw_eqs.free_symbols)
symbols_pre = set(transform.pre_collision_symbols)
symbols_post = set(transform.post_collision_symbols)
assert symbols_pre.isdisjoint(set(fw_eqs.atoms(sp.Symbol)))
assert symbols_post.isdisjoint(set(bw_eqs.atoms(sp.Symbol)))
import pytest
import sympy as sp
from dataclasses import replace
from lbmpy.enums import CollisionSpace
from lbmpy.methods import CollisionSpaceInfo
from lbmpy.creationfunctions import create_lb_method, create_lb_collision_rule, LBMConfig, LBMOptimisation
from lbmpy.enums import Method, ForceModel, Stencil
from lbmpy.moments import (
extract_monomials, get_default_moment_set_for_stencil, non_aliased_polynomial_raw_moments,
exponent_tuple_sort_key)
from lbmpy.stencils import LBStencil
from lbmpy.methods.creationfunctions import create_srt
from lbmpy.methods.default_moment_sets import mrt_orthogonal_modes_literature
from lbmpy.maxwellian_equilibrium import get_weights
from lbmpy.moment_transforms import (
PdfsToMomentsByMatrixTransform, PdfsToMomentsByChimeraTransform
)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('monomials', [False, True])
def test_moment_transform_equivalences(stencil, monomials):
stencil = LBStencil(stencil)
pdfs = sp.symbols(f"f_:{stencil.Q}")
rho = sp.Symbol('rho')
u = sp.symbols(f"u_:{stencil.D}")
moment_polynomials = get_default_moment_set_for_stencil(stencil)
if monomials:
polys_nonaliased = non_aliased_polynomial_raw_moments(moment_polynomials, stencil)
moment_exponents = sorted(extract_monomials(polys_nonaliased), key=exponent_tuple_sort_key)
moment_polynomials = None
else:
moment_exponents = None
matrix_transform = PdfsToMomentsByMatrixTransform(stencil, moment_polynomials, rho, u,
moment_exponents=moment_exponents)
chimera_transform = PdfsToMomentsByChimeraTransform(stencil, moment_polynomials, rho, u,
moment_exponents=moment_exponents)
f_to_m_matrix = matrix_transform.forward_transform(pdfs, return_monomials=monomials)
f_to_m_matrix = f_to_m_matrix.new_without_subexpressions().main_assignments_dict
f_to_m_chimera = chimera_transform.forward_transform(pdfs, return_monomials=monomials)
f_to_m_chimera = f_to_m_chimera.new_without_subexpressions().main_assignments_dict
m_to_f_matrix = matrix_transform.backward_transform(pdfs, start_from_monomials=monomials)
m_to_f_matrix = m_to_f_matrix.new_without_subexpressions().main_assignments_dict
m_to_f_chimera = chimera_transform.backward_transform(pdfs, start_from_monomials=monomials)
m_to_f_chimera = m_to_f_chimera.new_without_subexpressions().main_assignments_dict
m_pre_matrix = matrix_transform.pre_collision_monomial_symbols if monomials else matrix_transform.pre_collision_symbols
m_pre_chimera = chimera_transform.pre_collision_monomial_symbols if monomials else chimera_transform.pre_collision_symbols
for m1, m2 in zip(m_pre_matrix, m_pre_chimera):
rhs_matrix = f_to_m_matrix[m1]
rhs_chimera = f_to_m_chimera[m2]
diff = (rhs_matrix - rhs_chimera).expand()
assert diff == 0, f"Mismatch between matrix and chimera forward transform at {m1}, {m2}."
for f in pdfs:
rhs_matrix = m_to_f_matrix[f]
rhs_chimera = m_to_f_chimera[f]
diff = (rhs_matrix - rhs_chimera).expand()
assert diff == 0, f"Mismatch between matrix and chimera backward transform at {f}"
d3q15_literature = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q15), True)
d3q19_literature = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q19), True)
setups = [
(Stencil.D2Q9, Method.SRT, None, ForceModel.GUO),
(Stencil.D2Q9, Method.MRT, None, ForceModel.SIMPLE),
(Stencil.D3Q15, Method.MRT, None, ForceModel.SIMPLE),
(Stencil.D3Q15, Method.MRT, d3q15_literature, ForceModel.SIMPLE),
(Stencil.D3Q19, Method.TRT, None, ForceModel.SIMPLE),
(Stencil.D3Q19, Method.MRT, d3q19_literature, ForceModel.GUO),
(Stencil.D3Q27, Method.SRT, None, ForceModel.GUO)
]
@pytest.mark.parametrize('setup', setups)
def test_population_and_moment_space_equivalence(setup: tuple):
stencil = LBStencil(setup[0])
method = setup[1]
nested_moments = setup[2]
fmodel = setup[3]
force = sp.symbols(f'F_:{stencil.D}')
conserved_moments = 1 + stencil.D
rr = [*[0] * conserved_moments, *sp.symbols(f'omega_:{stencil.Q - conserved_moments}')]
lbm_config = LBMConfig(stencil=stencil, method=method, relaxation_rates=rr,
nested_moments=nested_moments, force_model=fmodel, force=force,
weighted=True, compressible=True, collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
lbm_opt = LBMOptimisation(cse_global=False, cse_pdfs=False, pre_simplification=True, simplification=False)
lb_method_moment_space = create_lb_method(lbm_config=lbm_config)
lbm_config = replace(lbm_config, collision_space_info=CollisionSpaceInfo(CollisionSpace.POPULATIONS))
lb_method_pdf_space = create_lb_method(lbm_config=lbm_config)
rho = lb_method_moment_space.zeroth_order_equilibrium_moment_symbol
u = lb_method_moment_space.first_order_equilibrium_moment_symbols
keep = set((rho,) + u)
cr_moment_space = create_lb_collision_rule(lb_method=lb_method_moment_space, lbm_optimisation=lbm_opt)
cr_moment_space = cr_moment_space.new_without_subexpressions(subexpressions_to_keep=keep)
lbm_opt = replace(lbm_opt, simplification='auto')
cr_pdf_space = create_lb_collision_rule(lb_method=lb_method_pdf_space, lbm_optimisation=lbm_opt)
cr_pdf_space = cr_pdf_space.new_without_subexpressions(subexpressions_to_keep=keep)
for a, b in zip(cr_moment_space.main_assignments, cr_pdf_space.main_assignments):
diff = (a.rhs - b.rhs).expand()
assert diff == 0, f"Mismatch between population- and moment-space equations in PDFs {a.lhs}, {b.lhs}"
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
def test_full_and_delta_equilibrium_equivalence(stencil, compressible):
stencil = LBStencil(stencil)
zero_centered = True
omega = sp.Symbol('omega')
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = { delta_rho : rho - rho_background }
eqs = []
for delta_eq in [False, True]:
method = create_srt(stencil, omega, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq, equilibrium_order=None,
collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == sp.Matrix((0,) * stencil.Q)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.longrun
def test_full_and_delta_population_space_equivalence(stencil, compressible):
stencil = LBStencil(stencil)
zero_centered = True
omega = sp.Symbol('omega')
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = { delta_rho : rho - rho_background }
eqs = []
for delta_eq in [False, True]:
method = create_srt(stencil, omega, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq, equilibrium_order=None,
collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
eq = method.get_collision_rule().new_without_subexpressions()
eq = sp.Matrix([a.rhs for a in eq])
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == sp.Matrix((0,) * stencil.Q)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('delta_eq', [True, False])
def test_zero_centering_equilibrium_equivalence(stencil, compressible, delta_eq):
stencil = LBStencil(stencil)
omega = sp.Symbol('omega')
weights = sp.Matrix(get_weights(stencil))
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
subs = { delta_rho : rho - rho_background }
eqs = []
for zero_centered in [False, True]:
method = create_srt(stencil, omega, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq and zero_centered,
equilibrium_order=None,
collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
eq = method.get_equilibrium_terms()
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == weights
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('delta_eq', [True, False])
@pytest.mark.longrun
def test_zero_centering_population_space_equivalence(stencil, compressible, delta_eq):
stencil = LBStencil(stencil)
omega = sp.Symbol('omega')
weights = sp.Matrix(get_weights(stencil))
rho = sp.Symbol("rho")
rho_background = sp.Integer(1)
delta_rho = sp.Symbol("delta_rho")
delta_f = sp.symbols(f"delta_f_:{stencil.Q}")
subs = { delta_rho : rho - rho_background }
eqs = []
for zero_centered in [False, True]:
method = create_srt(stencil, omega, continuous_equilibrium=True, compressible=compressible,
zero_centered=zero_centered, delta_equilibrium=delta_eq and zero_centered,
equilibrium_order=None,
collision_space_info=CollisionSpaceInfo(CollisionSpace.RAW_MOMENTS))
eq = method.get_collision_rule().new_without_subexpressions()
eq = sp.Matrix([a.rhs for a in eq])
if zero_centered:
eq = eq.subs({f : df for f, df in zip(method.pre_collision_pdf_symbols, delta_f)})
else:
eq = eq.subs({f : w + df for f, df, w in zip(method.pre_collision_pdf_symbols, delta_f, weights)})
eqs.append(eq.subs(subs))
assert (eqs[0] - eqs[1]).expand() == weights
"""
Moment-based methods are created by specifying moments and their equilibrium value.
This test checks if the equilibrium formula obtained by this method is the same as the explicitly
given discrete_maxwellian_equilibrium
"""
import pytest
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil, Method
from lbmpy.maxwellian_equilibrium import discrete_maxwellian_equilibrium
from lbmpy.methods import create_mrt_orthogonal, create_srt, create_trt, mrt_orthogonal_modes_literature
from lbmpy.moments import is_bulk_moment, is_shear_moment
from lbmpy.relaxationrates import get_shear_relaxation_rate
from lbmpy.stencils import LBStencil
def check_for_matching_equilibrium(method_name, stencil, compressibility):
omega = sp.Symbol("omega")
if method_name == Method.SRT:
method = create_srt(stencil, omega, compressible=compressibility,
continuous_equilibrium=False, equilibrium_order=2)
elif method_name == Method.TRT:
method = create_trt(stencil, omega, omega, compressible=compressibility,
continuous_equilibrium=False, equilibrium_order=2)
elif method_name == Method.MRT:
method = create_mrt_orthogonal(stencil, [omega] * stencil.Q, continuous_equilibrium=False,
weighted=False, compressible=compressibility, equilibrium_order=2)
else:
raise ValueError("Unknown method")
reference_equilibrium = discrete_maxwellian_equilibrium(stencil, order=2,
c_s_sq=sp.Rational(1, 3), compressible=compressibility)
equilibrium = method.get_equilibrium()
equilibrium = equilibrium.new_without_subexpressions(subexpressions_to_keep=sp.symbols("rho u_0 u_1 u_2"))
diff = sp.Matrix(reference_equilibrium) - sp.Matrix([eq.rhs for eq in equilibrium.main_assignments])
diff = sp.simplify(diff)
assert sum(diff).is_zero
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize('method', [Method.SRT, Method.TRT, Method.MRT])
def test_for_matching_equilibrium_for_stencil(stencil, method):
stencil = LBStencil(stencil)
check_for_matching_equilibrium(method, stencil, True)
check_for_matching_equilibrium(method, stencil, False)
def test_relaxation_rate_setter():
o1, o2, o3 = sp.symbols("o1 o2 o3")
lbm_config_1 = LBMConfig(method=Method.SRT, stencil=LBStencil(Stencil.D2Q9), relaxation_rates=[o3])
lbm_config_2 = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D2Q9), relaxation_rates=[o3, o3, o3, o3])
lbm_config_3 = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D2Q9), zero_centered=False,
relaxation_rates=[o3] * 9, entropic=True)
method = create_lb_method(lbm_config=lbm_config_1)
method2 = create_lb_method(lbm_config=lbm_config_2)
method3 = create_lb_method(lbm_config=lbm_config_3)
method.set_zeroth_moment_relaxation_rate(o1)
method.set_first_moment_relaxation_rate(o2)
assert get_shear_relaxation_rate(method) == o3
method.set_zeroth_moment_relaxation_rate(o3)
method.set_first_moment_relaxation_rate(o3)
method2.set_conserved_moments_relaxation_rate(o3)
assert method.collision_matrix == method2.collision_matrix == method3.collision_matrix
def test_mrt_orthogonal():
m_ref = {}
moments = mrt_orthogonal_modes_literature(LBStencil(Stencil.D2Q9), True)
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D2Q9), continuous_equilibrium=True,
nested_moments=moments)
m = create_lb_method(lbm_config=lbm_config)
assert m.is_weighted_orthogonal
m_ref[(Stencil.D2Q9, True)] = m
moments = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q15), True)
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D3Q15), continuous_equilibrium=True,
nested_moments=moments)
m = create_lb_method(lbm_config=lbm_config)
assert m.is_weighted_orthogonal
m_ref[(Stencil.D3Q15, True)] = m
moments = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q19), True)
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D3Q19), continuous_equilibrium=True,
nested_moments=moments)
m = create_lb_method(lbm_config=lbm_config)
assert m.is_weighted_orthogonal
m_ref[(Stencil.D3Q19, True)] = m
moments = mrt_orthogonal_modes_literature(LBStencil(Stencil.D3Q27), False)
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(Stencil.D3Q27), continuous_equilibrium=True,
nested_moments=moments)
m = create_lb_method(lbm_config=lbm_config)
assert m.is_orthogonal
m_ref[(Stencil.D3Q27, False)] = m
for weighted in [True, False]:
for stencil in [Stencil.D2Q9, Stencil.D3Q15, Stencil.D3Q19, Stencil.D3Q27]:
lbm_config = LBMConfig(method=Method.MRT, stencil=LBStencil(stencil), continuous_equilibrium=True,
weighted=weighted)
m = create_lb_method(lbm_config=lbm_config)
if weighted:
assert m.is_weighted_orthogonal
else:
assert m.is_orthogonal
bulk_moments = set([mom for mom in m.moments if is_bulk_moment(mom, m.dim)])
shear_moments = set([mom for mom in m.moments if is_shear_moment(mom, m.dim)])
assert len(bulk_moments) == 1
assert len(shear_moments) == 1 + (m.dim - 2) + m.dim * (m.dim - 1) / 2
if (stencil, weighted) in m_ref:
ref = m_ref[(stencil, weighted)]
bulk_moments_lit = set([mom for mom in ref.moments if is_bulk_moment(mom, ref.dim)])
shear_moments_lit = set([mom for mom in ref.moments if is_shear_moment(mom, ref.dim)])
if stencil != Stencil.D3Q27: # this one uses a different linear combination in literature
assert shear_moments == shear_moments_lit
assert bulk_moments == bulk_moments_lit
from lbmpy.moments import *
from lbmpy.stencils import get_stencil
from lbmpy.enums import Stencil
from lbmpy.stencils import LBStencil
def test_moment_permutation_multiplicity():
......@@ -69,7 +70,34 @@ def test_gram_schmidt_orthogonalization():
moments = moments_up_to_component_order(2, 2)
assert len(moments) == 9
stencil = get_stencil("D2Q9")
stencil = LBStencil(Stencil.D2Q9)
orthogonal_moments = gram_schmidt(moments, stencil)
pdfs_to_moments = moment_matrix(orthogonal_moments, stencil)
assert (pdfs_to_moments * pdfs_to_moments.T).is_diagonal()
def test_is_bulk_moment():
x, y, z = MOMENT_SYMBOLS
assert not is_bulk_moment(x, 2)
assert not is_bulk_moment(x ** 3, 2)
assert not is_bulk_moment(x * y, 2)
assert not is_bulk_moment(x ** 2, 2)
assert not is_bulk_moment(x ** 2 + y ** 2, 3)
assert is_bulk_moment(x ** 2 + y ** 2, 2)
assert is_bulk_moment(x ** 2 + y ** 2 + z ** 2, 3)
assert is_bulk_moment(x ** 2 + y ** 2 + x, 2)
assert is_bulk_moment(x ** 2 + y ** 2 + 1, 2)
def test_is_shear_moment():
x, y, z = MOMENT_SYMBOLS
assert not is_shear_moment(x ** 3, 2)
assert not is_shear_moment(x, 2)
assert not is_shear_moment(x ** 2 + y ** 2, 2)
assert not is_shear_moment(x ** 2 + y ** 2 + z ** 2, 3)
assert is_shear_moment(x ** 2, 2)
assert is_shear_moment(x ** 2 - 1, 2)
assert is_shear_moment(x ** 2 - x, 2)
assert is_shear_moment(x * y, 2)
assert is_shear_moment(x * y - 1, 2)
assert is_shear_moment(x * y - x, 2)
import pytest
import sympy as sp
from pystencils.sympyextensions import is_constant
from lbmpy import Stencil, LBStencil, Method, create_lb_collision_rule, LBMConfig, LBMOptimisation
@pytest.mark.parametrize('method', [Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
def test_mrt_simplifications(method: Method):
stencil = Stencil.D3Q19
lbm_config = LBMConfig(stencil=stencil, method=method, compressible=True)
lbm_opt = LBMOptimisation(simplification='auto')
cr = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
for subexp in cr.subexpressions:
rhs = subexp.rhs
# Check for aliases
assert not isinstance(rhs, sp.Symbol)
# Check for logarithms
assert not rhs.atoms(sp.log)
# Check for nonextracted constant summands or factors
exprs = rhs.atoms(sp.Add, sp.Mul)
for expr in exprs:
for arg in expr.args:
if isinstance(arg, sp.Number):
if arg not in {sp.Number(1), sp.Number(-1), sp.Float(1), sp.Float(-1)}:
breakpoint()
# Check for divisions
if not (isinstance(rhs, sp.Pow) and rhs.args[1] < 0):
powers = rhs.atoms(sp.Pow)
for p in powers:
assert p.args[1] > 0
from lbmpy.updatekernels import create_stream_pull_with_output_kernel
from lbmpy import create_lb_update_rule, relaxation_rate_from_lattice_viscosity, ForceModel, Method, LBStencil
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from pystencils.boundaries.boundaryhandling import BoundaryHandling
from pystencils.boundaries.boundaryconditions import Neumann, Dirichlet
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries import NoSlip
from lbmpy.oldroydb import *
import pytest
# # Lattice Boltzmann with Finite-Volume Oldroyd-B
# # taken from the electronic supplement of https://doi.org/10.1140/epje/s10189-020-00005-6,
# # available at https://doi.org/10.24416/UU01-2AFZSW
pytest.importorskip('scipy.optimize')
def test_oldroydb():
import scipy.optimize
# ## Definitions
L = (34, 34)
lambda_p = sp.Symbol("lambda_p")
eta_p = sp.Symbol("eta_p")
lb_stencil = LBStencil("D2Q9")
fv_stencil = LBStencil("D2Q9")
eta = 1 - eta_p
omega = relaxation_rate_from_lattice_viscosity(eta)
f_pre = 0.00001
# ## Data structures
dh = ps.create_data_handling(L, periodicity=(True, False), default_target=ps.Target.CPU)
opts = {'cpu_openmp': False,
'cpu_vectorize_info': None,
'target': dh.default_target}
src = dh.add_array('src', values_per_cell=len(lb_stencil), layout='c')
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho', layout='c', latex_name='\\rho')
u = dh.add_array('u', values_per_cell=dh.dim, layout='c')
tauface = dh.add_array('tau_face', values_per_cell=(len(fv_stencil) // 2, dh.dim, dh.dim), latex_name='\\tau_f',
field_type=ps.FieldType.STAGGERED, layout='c')
tau = dh.add_array('tau', values_per_cell=(dh.dim, dh.dim), layout='c', latex_name='\\tau')
tauflux = dh.add_array('j_tau', values_per_cell=(len(fv_stencil) // 2, dh.dim, dh.dim), latex_name='j_\\tau',
field_type=ps.FieldType.STAGGERED_FLUX, layout='c')
F = dh.add_array('F', values_per_cell=dh.dim, layout='c')
fluxbh = BoundaryHandling(dh, tauflux.name, fv_stencil, name="flux_boundary_handling",
openmp=opts['cpu_openmp'], target=dh.default_target)
ubh = BoundaryHandling(dh, u.name, lb_stencil, name="velocity_boundary_handling",
openmp=opts['cpu_openmp'], target=dh.default_target)
taufacebh = BoundaryHandling(dh, tauface.name, fv_stencil, name="tauface_boundary_handling",
openmp=opts['cpu_openmp'], target=dh.default_target)
# ## Solver
collision = create_lb_update_rule(stencil=lb_stencil,
method=Method.TRT,
relaxation_rate=omega,
compressible=True,
force_model=ForceModel.GUO,
force=F.center_vector + sp.Matrix([f_pre, 0]),
kernel_type='collide_only',
optimization={'symbolic_field': src})
stream = create_stream_pull_with_output_kernel(collision.method, src, dst, {'density': ρ, 'velocity': u})
lbbh = LatticeBoltzmannBoundaryHandling(collision.method, dh, src.name,
openmp=opts['cpu_openmp'], target=dh.default_target)
stream_kernel = ps.create_kernel(stream, **opts).compile()
collision_kernel = ps.create_kernel(collision, **opts).compile()
ob = OldroydB(dh.dim, u, tau, F, tauflux, tauface, lambda_p, eta_p)
flux_kernel = ps.create_staggered_kernel(ob.flux(), **opts).compile()
tauface_kernel = ps.create_staggered_kernel(ob.tauface(), **opts).compile()
continuity_kernel = ps.create_kernel(ob.continuity(), **opts).compile()
force_kernel = ps.create_kernel(ob.force(), **opts).compile()
# ## Set up the simulation
init = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
pdfs=src.center_vector, density=ρ.center)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
# no-slip for the fluid, no-flux for the stress
noslip = NoSlip()
noflux = Flux(fv_stencil)
nostressdiff = Flux(fv_stencil, tau.center_vector)
# put some good values into the boundaries so we can take derivatives
noforce = Neumann() # put the same stress into the boundary cells that is in the nearest fluid cell
noflow = Dirichlet((0,) * dh.dim) # put zero velocity into the boundary cells
lbbh.set_boundary(noslip, ps.make_slice[:, :4])
lbbh.set_boundary(noslip, ps.make_slice[:, -4:])
fluxbh.set_boundary(noflux, ps.make_slice[:, :4])
fluxbh.set_boundary(noflux, ps.make_slice[:, -4:])
ubh.set_boundary(noflow, ps.make_slice[:, :4])
ubh.set_boundary(noflow, ps.make_slice[:, -4:])
taufacebh.set_boundary(nostressdiff, ps.make_slice[:, :4])
taufacebh.set_boundary(nostressdiff, ps.make_slice[:, -4:])
for bh in lbbh, fluxbh, ubh, taufacebh:
assert len(bh._boundary_object_to_boundary_info) == 1, "Restart kernel to clear boundaries"
def init():
dh.fill(ρ.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(ρ.name, 1)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0)
dh.fill(tau.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(tau.name, 0)
dh.fill(tauflux.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(tauface.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(F.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(F.name, 0) # needed for LB initialization
sync_tau() # force calculation inside the initialization needs neighbor taus
dh.run_kernel(init_kernel)
dh.fill(F.name, np.nan)
sync_pdfs = dh.synchronization_function([src.name]) # needed before stream, but after collision
sync_u = dh.synchronization_function([u.name]) # needed before continuity, but after stream
sync_tau = dh.synchronization_function([tau.name]) # needed before flux and tauface, but after continuity
def time_loop(steps, lambda_p_val, eta_p_val):
dh.all_to_gpu()
vmid = np.empty((2, steps // 10 + 1))
sync_tau()
sync_u()
ubh()
i = -1
for i in range(steps):
dh.run_kernel(flux_kernel)
fluxbh() # zero the fluxes into/out of boundaries
dh.run_kernel(continuity_kernel, **{lambda_p.name: lambda_p_val, eta_p.name: eta_p_val})
sync_tau()
dh.run_kernel(tauface_kernel) # needed for force
taufacebh()
dh.run_kernel(force_kernel)
dh.run_kernel(collision_kernel, **{eta_p.name: eta_p_val})
sync_pdfs()
lbbh() # bounce-back populations into boundaries
dh.run_kernel(stream_kernel)
sync_u()
ubh() # need neighboring us for flux and continuity
dh.swap(src.name, dst.name)
if i % 10 == 0:
if u.name in dh.gpu_arrays:
dh.to_cpu(u.name)
uu = dh.gather_array(u.name)
uu = uu[L[0] // 2 - 1:L[0] // 2 + 1, L[1] // 2 - 1:L[1] // 2 + 1, 0].mean()
if np.isnan(uu):
raise Exception(f"NaN encountered after {i} steps")
vmid[:, i // 10] = [i, uu]
sync_u()
dh.all_to_cpu()
return vmid[:, :i // 10 + 1]
# ## Analytical solution
#
# comes from Waters and King, Unsteady flow of an elastico-viscous liquid, Rheologica Acta 9, 345–355 (1970).
def N(n):
return (2 * n - 1) * np.pi
def Alpha_n(N, El, eta_p):
return 1 + (1 - eta_p) * El * N * N / 4
def Beta_n(alpha_n, N, El):
return np.sqrt(np.abs(alpha_n * alpha_n - El * N * N))
def Gamma_n(N, El, eta_p):
return 1 - (1 + eta_p) * El * N * N / 4
def G(alpha_n, beta_n, gamma_n, flag, T):
if (flag):
return ((1.0 - gamma_n / beta_n) * np.exp(-(alpha_n + beta_n) * T / 2) +
(1.0 + gamma_n / beta_n) * np.exp((beta_n - alpha_n) * T / 2))
else:
return 2 * np.exp(-alpha_n * T / 2) * (np.cos(beta_n * T / 2) + (gamma_n / beta_n) * np.sin(beta_n * T / 2))
def W(T, El, eta_p):
W_ = 1.5
for n in range(1, 1000):
N_ = N(n)
alpha_n = Alpha_n(N_, El, eta_p)
if alpha_n * alpha_n - El * N_ * N_ < 0:
flag_ = False
else:
flag_ = True
beta_n = Beta_n(alpha_n, N_, El)
gamma_n = Gamma_n(N_, El, eta_p)
G_ = G(alpha_n, beta_n, gamma_n, flag_, T)
W_ -= 24 * (np.sin(N_ / 2) / (N_ * N_ * N_)) * G_
return W_
# ## Run the simulation
lambda_p_val = 3000
eta_p_val = 0.9
init()
vmid = time_loop(lambda_p_val * 4, lambda_p_val, eta_p_val)
actual_width = sum(dh.gather_array(lbbh.flag_array_name)[L[0] // 2, :] == 1)
uref = float(f_pre * actual_width ** 2 / (8 * (eta + eta_p)))
Wi = lambda_p_val * uref / (actual_width / 2)
Re = uref * (actual_width / 2) / (eta + eta_p)
El = float(Wi / Re)
pref = 1 / W(vmid[0, -1] / lambda_p_val, El, eta_p_val)
El_measured, pref_measured = scipy.optimize.curve_fit(lambda a, b, c: W(a, b, eta_p_val) * c,
vmid[0, :] / lambda_p_val, vmid[1, :] / vmid[1, -1],
p0=(El, pref))[0]
measured_width = np.sqrt(4 * lambda_p_val * float(eta + eta_p) / El_measured)
print(f"El={El}, El_measured={El_measured}")
print(f"L={actual_width}, L_measured={measured_width}")
assert abs(measured_width - actual_width) < 1, "effective channel width differs significantly from defined width"
an = W(vmid[0, :] / lambda_p_val, El, eta_p_val) * pref
an_measured = W(vmid[0, :] / lambda_p_val, El_measured, eta_p_val) * pref_measured
diff = abs(vmid[1, :] / vmid[1, -1] - an_measured) / an_measured
assert diff[lambda_p_val // 5:].max() < 0.03, "maximum velocity deviation is too large"
# from pystencils import plot as plt
#
# plt.xlabel("$t$")
# plt.ylabel(r"$u_{max}/u_{max}^{Newtonian}$")
# plt.plot(vmid[0,:], vmid[1,:]/vmid[1,-1] if vmid[1,-1] != 0 else 0, label='FVM')
# plt.plot(vmid[0,:], np.ones_like(vmid[0,:]), 'k--', label='Newtonian')
#
# plt.plot(vmid[0,:], an, label="analytic")
# plt.plot(vmid[0,:], an_measured, label="analytic, fit width")
# plt.legend()
#
# if eta_p_val == 0.1:
# plt.ylim(0.9, 1.15)
# elif lambda_p_val == 9000:
# plt.ylim(0.8, 1.5)
# elif eta_p_val == 0.3:
# plt.ylim(0.8, 1.4)
# plt.show()
import os
from tempfile import TemporaryDirectory
import shutil
import pytest
import numpy as np
......@@ -7,6 +10,7 @@ import lbmpy.plot as plt
from lbmpy.scenarios import create_lid_driven_cavity
@pytest.mark.skipif(shutil.which('ffmpeg') is None, reason="ffmpeg not available")
def test_animation():
ldc = create_lid_driven_cavity((10, 10), relaxation_rate=1.8)
......
"""
Test Poiseuille flow against analytical solution
"""
import pytest
from lbmpy.enums import Stencil, CollisionSpace
import pystencils as ps
from . import poiseuille
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('stencil_name', (Stencil.D2Q9, Stencil.D3Q19))
@pytest.mark.parametrize('zero_centered', (False, True))
@pytest.mark.parametrize('moment_space_collision', (False, True))
def test_poiseuille_channel(target, stencil_name, zero_centered, moment_space_collision):
# Cuda
if target == ps.Target.GPU:
import pytest
pytest.importorskip("cupy")
cspace_info = CollisionSpace.RAW_MOMENTS if moment_space_collision else CollisionSpace.POPULATIONS
poiseuille.poiseuille_channel(target=target,
stencil_name=stencil_name,
zero_centered=zero_centered,
collision_space_info=cspace_info)
import numpy as np
import pytest
from lbmpy.postprocessing import scalar_field_interpolator, vector_field_interpolator
def test_interpolation():
pytest.importorskip('scipy.ndimage')
scalar_arr = np.arange(0, 3*3).reshape(3, 3)
scalar_ip = scalar_field_interpolator(scalar_arr)
np.testing.assert_equal(scalar_ip([[1, 1.5], [0.5, 1]]), [2.5, 0.5])
......
import pytest
import numpy as np
from pystencils import fields, CreateKernelConfig, Target, create_kernel, get_code_str, create_data_handling
from lbmpy import LBMConfig, Stencil, Method, LBStencil, create_lb_method, create_lb_collision_rule, LBMOptimisation, \
create_lb_update_rule
from lbmpy.partially_saturated_cells import PSMConfig
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.MRT, Method.CUMULANT])
def test_psm_integration(stencil, method_enum):
stencil = LBStencil(stencil)
fraction_field = fields("fraction_field: double[10, 10, 5]", layout=(2, 1, 0))
object_vel = fields("object_vel(3): double[10, 10, 5]", layout=(3, 2, 1, 0))
psm_config = PSMConfig(fraction_field=fraction_field, object_velocity_field=object_vel)
lbm_config = LBMConfig(stencil=stencil, method=method_enum, relaxation_rate=1.5, compressible=True,
psm_config=psm_config)
config = CreateKernelConfig(target=Target.CPU)#
collision_rule = create_lb_collision_rule(lbm_config=lbm_config)
ast = create_kernel(collision_rule, config=config)
code_str = get_code_str(ast)
def get_data_handling_for_psm(use_psm):
domain_size = (10, 10)
stencil = LBStencil(Stencil.D2Q9)
dh = create_data_handling(domain_size=domain_size, periodicity=(True, False))
f = dh.add_array('f', values_per_cell=len(stencil))
dh.fill(f.name, 0.0, ghost_layers=True)
f_tmp = dh.add_array('f_tmp', values_per_cell=len(stencil))
dh.fill(f_tmp.name, 0.0, ghost_layers=True)
psm_config = None
if use_psm:
fraction_field = dh.add_array('fraction_field', values_per_cell=1)
dh.fill(fraction_field.name, 0.0, ghost_layers=True)
object_vel = dh.add_array('object_vel', values_per_cell=dh.dim)
dh.fill(object_vel.name, 0.0, ghost_layers=True)
psm_config = PSMConfig(fraction_field=fraction_field, object_velocity_field=object_vel)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.5, psm_config=psm_config)
lbm_optimisation = LBMOptimisation(symbolic_field=f, symbolic_temporary_field=f_tmp)
update_rule = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_optimisation)
kernel_lb_step_with_psm = create_kernel(update_rule).compile()
return (dh, kernel_lb_step_with_psm)
def test_lbm_vs_psm():
psm_dh, psm_kernel = get_data_handling_for_psm(True)
lbm_dh, lbm_kernel = get_data_handling_for_psm(False)
for i in range(20):
psm_dh.run_kernel(psm_kernel)
psm_dh.swap('f', 'f_tmp')
lbm_dh.run_kernel(lbm_kernel)
lbm_dh.swap('f', 'f_tmp')
max_vel_error = np.max(np.abs(psm_dh.gather_array('f') - lbm_dh.gather_array('f')))
np.testing.assert_allclose(max_vel_error, 0, atol=1e-14)
import numpy as np
import sympy as sp
from lbmpy.forcemodels import Guo
from lbmpy.methods.entropic_eq_srt import create_srt_entropic
import pystencils as ps
from lbmpy.enums import ForceModel, Method, Stencil
from lbmpy.scenarios import create_lid_driven_cavity
from lbmpy.stencils import get_stencil
from . import poiseuille
def test_poiseuille_channel_quicktest():
poiseuille.poiseuille_channel(target=ps.Target.CPU, stencil_name=Stencil.D2Q9)
def test_entropic_methods():
sc_kbc = create_lid_driven_cavity((20, 20), method='trt-kbc-n4', relaxation_rate=1.9999,
sc_kbc = create_lid_driven_cavity((40, 40), method=Method.TRT_KBC_N4,
relaxation_rates=[1.9999, sp.Symbol("omega_free")],
entropic_newton_iterations=3, entropic=True, compressible=True,
force=(-1e-10, 0))
zero_centered=False, force=(-1e-10, 0), force_model=ForceModel.LUO)
sc_srt = create_lid_driven_cavity((40, 40), relaxation_rate=1.9999, lid_velocity=0.05, compressible=True,
force=(-1e-10, 0))
sc_entropic = create_lid_driven_cavity((40, 40), method='entropic-srt', relaxation_rate=1.9999,
lid_velocity=0.05, compressible=True, force=(-1e-10, 0))
zero_centered=False, force=(-1e-10, 0), force_model=ForceModel.LUO)
sc_srt.run(1000)
sc_kbc.run(1000)
sc_entropic.run(1000)
assert np.isnan(np.max(sc_srt.velocity[:, :]))
assert np.isfinite(np.max(sc_kbc.velocity[:, :]))
assert np.isfinite(np.max(sc_entropic.velocity[:, :]))
def test_entropic_srt():
stencil = get_stencil("D2Q9")
method = create_srt_entropic(stencil, 1.8, Guo((0, 1e-6)), True)
assert method.zeroth_order_equilibrium_moment_symbol == sp.symbols("rho")
assert method.first_order_equilibrium_moment_symbols == sp.symbols("u_:2")
def test_cumulant_ldc():
sc_cumulant = create_lid_driven_cavity((40, 40), method=Method.CUMULANT, relaxation_rate=1.999999,
compressible=True, force=(-1e-10, 0))
sc_cumulant.run(100)
assert np.isfinite(np.max(sc_cumulant.velocity[:, :]))
import pytest
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.moments import is_shear_moment, get_order
from lbmpy.enums import Method, Stencil
from lbmpy.relaxationrates import get_shear_relaxation_rate
from lbmpy.stencils import LBStencil
def test_relaxation_rate():
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q19), method=Method.MRT_RAW,
relaxation_rates=[1 + i / 10 for i in range(19)])
method = create_lb_method(lbm_config=lbm_config)
with pytest.raises(ValueError) as e:
get_shear_relaxation_rate(method)
assert 'Shear moments are relaxed with different relaxation' in str(e.value)
omegas = sp.symbols("omega_:4")
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q19), method=Method.MRT,
relaxation_rates=omegas)
method = create_lb_method(lbm_config=lbm_config)
assert get_shear_relaxation_rate(method) == omegas[0]
@pytest.mark.parametrize("method", [Method.MRT, Method.CENTRAL_MOMENT, Method.CUMULANT])
def test_default_mrt_behaviour(method):
lbm_config = LBMConfig(stencil=LBStencil(Stencil.D3Q19), method=method, compressible=True)
method = create_lb_method(lbm_config=lbm_config)
for moment, relax_info in method.relaxation_info_dict.items():
if get_order(moment) <= 1:
assert relax_info.relaxation_rate == 0
elif is_shear_moment(moment, method.dim):
assert relax_info.relaxation_rate == sp.Symbol('omega')
else:
assert relax_info.relaxation_rate == 1
"""
Test shear flow velocity and pressure against analytical solutions
"""
import numpy as np
import pytest
import sympy as sp
from lbmpy.boundaries import UBB
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.creationfunctions import create_lb_update_rule, create_stream_pull_with_output_kernel,\
LBMConfig, LBMOptimisation
from lbmpy.enums import Method, Stencil
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
from lbmpy.relaxationrates import relaxation_rate_from_lattice_viscosity
from lbmpy.stencils import LBStencil
import pystencils as ps
def shear_flow(x, t, nu, v, h, k_max):
"""
Analytical solution for driven shear flow between two plates.
Parameters
----------
x : :obj:`float`
Position from the left plane.
t : :obj:`float`
Time since start of the shearing.
nu : :obj:`float`
Kinematic viscosity.
v : :obj:`float`
Shear rate.
h : :obj:`float`
Distance between the plates.
k_max : :obj:`int`
Maximum considered wave number.
Returns
-------
:obj:`double` : Analytical velocity
"""
u = x / h - 0.5
for k in np.arange(1, k_max + 1):
u += 1.0 / (np.pi * k) * np.exp(
-4 * np.pi ** 2 * nu * k ** 2 / h ** 2 * t) * np.sin(2 * np.pi / h * k * x)
return -v * u
rho_0 = 2.2 # density
eta = 1.6 # kinematic viscosity
width = 40 # of box
wall_thickness = 2
actual_width = width - wall_thickness # subtract boundary layer from box width
shear_velocity = 0.2 # scale by width to keep stable
t_max = 2000
@pytest.mark.parametrize('target', (ps.Target.CPU, ps.Target.GPU))
@pytest.mark.parametrize('stencil_name', (Stencil.D2Q9, Stencil.D3Q19))
@pytest.mark.parametrize('zero_centered', [True, False])
def test_shear_flow(target, stencil_name, zero_centered):
# Cuda
if target == ps.Target.GPU:
pytest.importorskip("cupy")
# LB parameters
stencil = LBStencil(stencil_name)
if stencil.D == 2:
L = (4, width)
elif stencil.D == 3:
L = (4, width, 4)
else:
raise Exception()
periodicity = [True, False] + [True] * (stencil.D - 2)
omega = relaxation_rate_from_lattice_viscosity(eta)
# ## Data structures
dh = ps.create_data_handling(L, periodicity=periodicity, default_target=target)
src = dh.add_array('src', values_per_cell=stencil.Q)
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho', latex_name='\\rho', values_per_cell=1)
u = dh.add_array('u', values_per_cell=stencil.D)
p = dh.add_array('p', values_per_cell=stencil.D**2)
# LB Setup
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=omega, method=Method.TRT,
compressible=True, zero_centered=zero_centered,
kernel_type='collide_only')
lbm_opt = LBMOptimisation(symbolic_field=src)
collision = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
stream = create_stream_pull_with_output_kernel(collision.method, src, dst, {'velocity': u})
config = ps.CreateKernelConfig(cpu_openmp=False, target=dh.default_target)
stream_kernel = ps.create_kernel(stream, config=config).compile()
collision_kernel = ps.create_kernel(collision, config=config).compile()
# Boundaries
lbbh = LatticeBoltzmannBoundaryHandling(collision.method, dh, src.name, target=dh.default_target)
# Second moment test setup
cqc = collision.method.conserved_quantity_computation
getter_eqs = cqc.output_equations_from_pdfs(src.center_vector,
{'moment2': p})
kernel_compute_p = ps.create_kernel(getter_eqs, config=config).compile()
# ## Set up the simulation
init = macroscopic_values_setter(collision.method, velocity=(0,) * dh.dim,
pdfs=src.center_vector, density=ρ.center)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
vel_vec = sp.Matrix([0.5 * shear_velocity] + [0] * (stencil.D - 1))
if stencil.D == 2:
lbbh.set_boundary(UBB(velocity=vel_vec), ps.make_slice[:, :wall_thickness])
lbbh.set_boundary(UBB(velocity=-vel_vec), ps.make_slice[:, -wall_thickness:])
elif stencil.D == 3:
lbbh.set_boundary(UBB(velocity=vel_vec), ps.make_slice[:, :wall_thickness, :])
lbbh.set_boundary(UBB(velocity=-vel_vec), ps.make_slice[:, -wall_thickness:, :])
else:
raise Exception()
for bh in lbbh, :
assert len(bh._boundary_object_to_boundary_info) == 2, "Restart kernel to clear boundaries"
def init():
dh.fill(ρ.name, rho_0)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0)
dh.run_kernel(init_kernel)
sync_pdfs = dh.synchronization_function([src.name])
# Time loop
def time_loop(steps):
dh.all_to_gpu()
for i in range(steps):
dh.run_kernel(collision_kernel)
sync_pdfs()
lbbh()
dh.run_kernel(stream_kernel)
dh.run_kernel(kernel_compute_p)
dh.swap(src.name, dst.name)
if u.name in dh.gpu_arrays:
dh.to_cpu(u.name)
uu = dh.gather_array(u.name)
# average periodic directions
if stencil.D == 3: # dont' swap order
uu = np.average(uu, axis=2)
uu = np.average(uu, axis=0)
if p.name in dh.gpu_arrays:
dh.to_cpu(p.name)
pp = dh.gather_array(p.name)
# average periodic directions
if stencil.D == 3: # dont' swap order
pp = np.average(pp, axis=2)
pp = np.average(pp, axis=0)
# cut off wall regions
uu = uu[wall_thickness:-wall_thickness]
pp = pp[wall_thickness:-wall_thickness]
if stencil.D == 2:
pp = pp.reshape((len(pp), 2, 2))
if stencil.D == 3:
pp = pp.reshape((len(pp), 3, 3))
return uu, pp
init()
# Simulation
profile, pressure_profile = time_loop(t_max)
expected = shear_flow(x=(np.arange(0, actual_width) + .5),
t=t_max,
nu=eta / rho_0,
v=shear_velocity,
h=actual_width,
k_max=100)
if stencil.D == 2:
shear_direction = np.array((1, 0), dtype=float)
shear_plane_normal = np.array((0, 1), dtype=float)
if stencil.D == 3:
shear_direction = np.array((1, 0, 0), dtype=float)
shear_plane_normal = np.array((0, 1, 0), dtype=float)
shear_rate = shear_velocity / actual_width
dynamic_viscosity = eta * rho_0
correction_factor = eta / (eta - 1. / 6.)
p_expected = rho_0 * np.identity(dh.dim) / 3.0 + dynamic_viscosity * shear_rate / correction_factor * (
np.outer(shear_plane_normal, shear_direction) + np.transpose(np.outer(shear_plane_normal, shear_direction)))
# Subtract the tensor product of the velocity to get the pressure
pressure_profile[:, 0, 0] -= rho_0 * profile[:, 0]**2
np.testing.assert_allclose(profile[:, 0], expected[1:-1], atol=1E-9)
for i in range(actual_width - 2):
np.testing.assert_allclose(pressure_profile[i], p_expected, atol=1E-9, rtol=1E-3)