Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
This diff is collapsed.
......@@ -33,7 +33,13 @@ def _skip_instruction_sets_windows(instruction_sets):
def single_component_maxwell(x1, x2, kT, mass):
"""Integrate the probability density from x1 to x2 using the trapezoidal rule"""
x = np.linspace(x1, x2, 1000)
return np.trapz(np.exp(-mass * x ** 2 / (2. * kT)), x) / np.sqrt(2. * np.pi * kT / mass)
try:
trapezoid = np.trapezoid # since numpy 2.0
except AttributeError:
trapezoid = np.trapz
return trapezoid(np.exp(-mass * x ** 2 / (2. * kT)), x) / np.sqrt(2. * np.pi * kT / mass)
def rr_getter(moment_group):
......
import pytest
import numpy as np
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil, ForceModel, Method
from lbmpy.macroscopic_value_kernels import (
compile_macroscopic_values_getter, compile_macroscopic_values_setter)
compile_macroscopic_values_getter, compile_macroscopic_values_setter, strain_rate_tensor_getter)
from lbmpy.advanced_streaming.utility import streaming_patterns, Timestep
from lbmpy.stencils import LBStencil
......@@ -83,3 +84,18 @@ def test_set_get_constant_velocity(stencil, force_model, compressible):
computed_velocity = velocity_output_field[0, 0, 0, :]
np.testing.assert_almost_equal(np.array(ref_velocity[:method.dim]), computed_velocity)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
def test_get_strain_rate_tensor(stencil, method_enum):
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, method=method_enum, compressible=True)
method = create_lb_method(lbm_config=lbm_config)
pdfs = sp.symbols(f"f_:{stencil.Q}")
strain_rate_tensor = sp.symbols(f"strain_rate_tensor_:{method.dim * method.dim}")
getter_assignments = strain_rate_tensor_getter(method, strain_rate_tensor, pdfs)
assert len(getter_assignments) == method.dim * method.dim
import pytest
import numpy as np
from pystencils import fields, CreateKernelConfig, Target, create_kernel, get_code_str, create_data_handling
from lbmpy import LBMConfig, Stencil, Method, LBStencil, create_lb_method, create_lb_collision_rule, LBMOptimisation, \
create_lb_update_rule
from lbmpy.partially_saturated_cells import PSMConfig
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.MRT, Method.CUMULANT])
def test_psm_integration(stencil, method_enum):
stencil = LBStencil(stencil)
fraction_field = fields("fraction_field: double[10, 10, 5]", layout=(2, 1, 0))
object_vel = fields("object_vel(3): double[10, 10, 5]", layout=(3, 2, 1, 0))
psm_config = PSMConfig(fraction_field=fraction_field, object_velocity_field=object_vel)
lbm_config = LBMConfig(stencil=stencil, method=method_enum, relaxation_rate=1.5, compressible=True,
psm_config=psm_config)
config = CreateKernelConfig(target=Target.CPU)#
collision_rule = create_lb_collision_rule(lbm_config=lbm_config)
ast = create_kernel(collision_rule, config=config)
code_str = get_code_str(ast)
def get_data_handling_for_psm(use_psm):
domain_size = (10, 10)
stencil = LBStencil(Stencil.D2Q9)
dh = create_data_handling(domain_size=domain_size, periodicity=(True, False))
f = dh.add_array('f', values_per_cell=len(stencil))
dh.fill(f.name, 0.0, ghost_layers=True)
f_tmp = dh.add_array('f_tmp', values_per_cell=len(stencil))
dh.fill(f_tmp.name, 0.0, ghost_layers=True)
psm_config = None
if use_psm:
fraction_field = dh.add_array('fraction_field', values_per_cell=1)
dh.fill(fraction_field.name, 0.0, ghost_layers=True)
object_vel = dh.add_array('object_vel', values_per_cell=dh.dim)
dh.fill(object_vel.name, 0.0, ghost_layers=True)
psm_config = PSMConfig(fraction_field=fraction_field, object_velocity_field=object_vel)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.5, psm_config=psm_config)
lbm_optimisation = LBMOptimisation(symbolic_field=f, symbolic_temporary_field=f_tmp)
update_rule = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_optimisation)
kernel_lb_step_with_psm = create_kernel(update_rule).compile()
return (dh, kernel_lb_step_with_psm)
def test_lbm_vs_psm():
psm_dh, psm_kernel = get_data_handling_for_psm(True)
lbm_dh, lbm_kernel = get_data_handling_for_psm(False)
for i in range(20):
psm_dh.run_kernel(psm_kernel)
psm_dh.swap('f', 'f_tmp')
lbm_dh.run_kernel(lbm_kernel)
lbm_dh.swap('f', 'f_tmp')
max_vel_error = np.max(np.abs(psm_dh.gather_array('f') - lbm_dh.gather_array('f')))
np.testing.assert_allclose(max_vel_error, 0, atol=1e-14)
import pytest
import numpy as np
import pystencils as ps
from lbmpy.flow_statistics import welford_assignments
@pytest.mark.parametrize('order', [1, 2, 3])
@pytest.mark.parametrize('dim', [1, 2, 3])
def test_welford(order, dim):
target = ps.Target.CPU
# create data handling and fields
dh = ps.create_data_handling((1, 1, 1), periodicity=False, default_target=target)
vector_field = dh.add_array('vector', values_per_cell=dim)
dh.fill(vector_field.name, 0.0, ghost_layers=False)
mean_field = dh.add_array('mean', values_per_cell=dim)
dh.fill(mean_field.name, 0.0, ghost_layers=False)
if order >= 2:
sos = dh.add_array('sos', values_per_cell=dim**2)
dh.fill(sos.name, 0.0, ghost_layers=False)
if order == 3:
soc = dh.add_array('soc', values_per_cell=dim**3)
dh.fill(soc.name, 0.0, ghost_layers=False)
else:
soc = None
else:
sos = None
soc = None
welford = welford_assignments(field=vector_field, mean_field=mean_field,
sum_of_squares_field=sos, sum_of_cubes_field=soc)
welford_kernel = ps.create_kernel(welford).compile()
# set random seed
np.random.seed(42)
n = int(1e4)
x = np.random.normal(size=n * dim).reshape(n, dim)
analytical_mean = np.zeros(dim)
analytical_covariance = np.zeros(dim**2)
analytical_third_order_moments = np.zeros(dim**3)
# calculate analytical mean
for i in range(dim):
analytical_mean[i] = np.mean(x[:, i])
# calculate analytical covariances
for i in range(dim):
for j in range(dim):
analytical_covariance[i * dim + j] \
= (np.sum((x[:, i] - analytical_mean[i]) * (x[:, j] - analytical_mean[j]))) / n
# calculate analytical third-order central moments
for i in range(dim):
for j in range(dim):
for k in range(dim):
analytical_third_order_moments[(i * dim + j) * dim + k] \
= (np.sum((x[:, i] - analytical_mean[i])
* (x[:, j] - analytical_mean[j])
* (x[:, k] - analytical_mean[k]))) / n
# Time loop
counter = 1
for i in range(n):
for d in range(dim):
new_value = x[i, d]
if dim > 1:
dh.fill(array_name=vector_field.name, val=new_value, value_idx=d, ghost_layers=False)
else:
dh.fill(array_name=vector_field.name, val=new_value, ghost_layers=False)
dh.run_kernel(welford_kernel, counter=counter)
counter += 1
welford_mean = dh.gather_array(mean_field.name).flatten()
np.testing.assert_allclose(welford_mean, analytical_mean)
if order >= 2:
welford_covariance = dh.gather_array(sos.name).flatten() / n
np.testing.assert_allclose(welford_covariance, analytical_covariance)
if order == 3:
welford_third_order_moments = dh.gather_array(soc.name).flatten() / n
np.testing.assert_allclose(welford_third_order_moments, analytical_third_order_moments)
if __name__ == '__main__':
test_welford(1, 1)