Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
%% Cell type:code id: tags:
``` python
import pystencils as ps
from lbmpy.session import *
from lbmpy.boundaries.boundaries_in_kernel import update_rule_with_push_boundaries
from lbmpy.macroscopic_value_kernels import macroscopic_values_getter, macroscopic_values_setter
from collections import OrderedDict
from time import perf_counter
```
%% Cell type:markdown id: tags:
# Version 1: compile-in boundaries
%% Cell type:code id: tags:
``` python
domain_size = (32, 32, 32)
relaxation_rate = 1.8
time_steps = 100
lid_velocity = 0.05
stencil = LBStencil(Stencil.D3Q19)
```
%% Cell type:code id: tags:
``` python
dh = create_data_handling(domain_size, default_target=ps.Target.CPU)
pdfs = dh.add_array('pdfs', values_per_cell=stencil.Q)
u = dh.add_array('u', values_per_cell=stencil.D)
streaming_pattern = 'aa'
```
%% Cell type:code id: tags:
``` python
boundaries = OrderedDict((
((0, 1, 0), UBB([lid_velocity, 0, 0])),
((1, 0, 0), NoSlip()),
((-1, 0, 0), NoSlip()),
((0, -1, 0), NoSlip()),
((0, 0, 1), NoSlip()),
((0, 0, -1), NoSlip()),
))
lbm_opt = LBMOptimisation(symbolic_field=pdfs, cse_global=False, cse_pdfs=False)
lbm_config = LBMConfig(stencil=stencil, relaxation_rate=relaxation_rate, compressible=False)
cr_even = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
cr_odd = create_lb_collision_rule(lbm_config=lbm_config, lbm_optimisation=lbm_opt)
update_rule_aa_even = update_rule_with_push_boundaries(cr_even, pdfs, boundaries, streaming_pattern, Timestep.EVEN)
update_rule_aa_odd = update_rule_with_push_boundaries(cr_odd, pdfs, boundaries, streaming_pattern, Timestep.ODD)
getter_assignments = macroscopic_values_getter(update_rule_aa_even.method, velocity=u.center_vector,
pdfs=pdfs, density=None,
streaming_pattern=streaming_pattern,
previous_timestep=Timestep.EVEN)
config = ps.CreateKernelConfig(target=dh.default_target)
getter_kernel = ps.create_kernel(getter_assignments, config=config).compile()
even_kernel = ps.create_kernel(update_rule_aa_even, config=config).compile()
odd_kernel = ps.create_kernel(update_rule_aa_odd, config=config).compile()
```
%% Output
WARNING:root:Using Nodes is experimental and not fully tested. Double check your generated code!
WARNING:root:Using Nodes is experimental and not fully tested. Double check your generated code!
WARNING:root:Using Nodes is experimental and not fully tested. Double check your generated code!
%% Cell type:code id: tags:
``` python
def init():
dh.fill(pdfs.name, 0, ghost_layers=True)
def aa_time_loop(steps=100):
assert steps % 2 == 0, "Works only for an even number of time steps"
dh.all_to_gpu()
for i in range(steps // 2):
dh.run_kernel(odd_kernel)
dh.run_kernel(even_kernel)
dh.run_kernel(getter_kernel)
dh.all_to_cpu()
```
%% Cell type:code id: tags:
``` python
init()
aa_time_loop(time_steps)
vel_version1 = dh.gather_array(u.name, ghost_layers=False).copy()
plt.vector_field_magnitude(vel_version1[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
<matplotlib.colorbar.Colorbar at 0x13b12c820>
<matplotlib.colorbar.Colorbar at 0x11b578710>
%% Cell type:markdown id: tags:
# Version 2: Normal boundary handling
%% Cell type:code id: tags:
``` python
ldc = create_lid_driven_cavity(domain_size, relaxation_rate=relaxation_rate, lid_velocity=lid_velocity)
ldc.run(time_steps)
vel_version2 = ldc.velocity[:, :, :, :]
plt.vector_field_magnitude(vel_version2[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
WARNING:root:Using Nodes is experimental and not fully tested. Double check your generated code!
WARNING:root:Using Nodes is experimental and not fully tested. Double check your generated code!
WARNING:root:Using Nodes is experimental and not fully tested. Double check your generated code!
WARNING:root:Lhs"dir of type "int64_t" is assigned with a different datatype rhs: "indexField[0](dir)" of type "int32_t".
WARNING:root:Lhs"dir of type "int64_t" is assigned with a different datatype rhs: "indexField[0](dir)" of type "int32_t".
<matplotlib.colorbar.Colorbar at 0x149a4a190>
<matplotlib.colorbar.Colorbar at 0x12e912a90>
%% Cell type:code id: tags:
``` python
np.testing.assert_almost_equal(vel_version1[1:-1, 1:-1, :], vel_version2[1:-1, 1:-1, :], decimal=2)
```
......
......@@ -33,7 +33,13 @@ def _skip_instruction_sets_windows(instruction_sets):
def single_component_maxwell(x1, x2, kT, mass):
"""Integrate the probability density from x1 to x2 using the trapezoidal rule"""
x = np.linspace(x1, x2, 1000)
return np.trapz(np.exp(-mass * x ** 2 / (2. * kT)), x) / np.sqrt(2. * np.pi * kT / mass)
try:
trapezoid = np.trapezoid # since numpy 2.0
except AttributeError:
trapezoid = np.trapz
return trapezoid(np.exp(-mass * x ** 2 / (2. * kT)), x) / np.sqrt(2. * np.pi * kT / mass)
def rr_getter(moment_group):
......
import pytest
import numpy as np
import sympy as sp
from lbmpy.creationfunctions import create_lb_method, LBMConfig
from lbmpy.enums import Stencil, ForceModel, Method
from lbmpy.macroscopic_value_kernels import (
compile_macroscopic_values_getter, compile_macroscopic_values_setter)
compile_macroscopic_values_getter, compile_macroscopic_values_setter, strain_rate_tensor_getter)
from lbmpy.advanced_streaming.utility import streaming_patterns, Timestep
from lbmpy.stencils import LBStencil
......@@ -83,3 +84,18 @@ def test_set_get_constant_velocity(stencil, force_model, compressible):
computed_velocity = velocity_output_field[0, 0, 0, :]
np.testing.assert_almost_equal(np.array(ref_velocity[:method.dim]), computed_velocity)
@pytest.mark.parametrize('stencil', [Stencil.D2Q9, Stencil.D3Q19])
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.TRT, Method.MRT, Method.CUMULANT])
def test_get_strain_rate_tensor(stencil, method_enum):
stencil = LBStencil(stencil)
lbm_config = LBMConfig(stencil=stencil, method=method_enum, compressible=True)
method = create_lb_method(lbm_config=lbm_config)
pdfs = sp.symbols(f"f_:{stencil.Q}")
strain_rate_tensor = sp.symbols(f"strain_rate_tensor_:{method.dim * method.dim}")
getter_assignments = strain_rate_tensor_getter(method, strain_rate_tensor, pdfs)
assert len(getter_assignments) == method.dim * method.dim
import pytest
import numpy as np
from pystencils import fields, CreateKernelConfig, Target, create_kernel, get_code_str, create_data_handling
from lbmpy import LBMConfig, Stencil, Method, LBStencil, create_lb_method, create_lb_collision_rule, LBMOptimisation, \
create_lb_update_rule
from lbmpy.partially_saturated_cells import PSMConfig
@pytest.mark.parametrize("stencil", [Stencil.D2Q9, Stencil.D3Q19, Stencil.D3Q27])
@pytest.mark.parametrize("method_enum", [Method.SRT, Method.MRT, Method.CUMULANT])
def test_psm_integration(stencil, method_enum):
stencil = LBStencil(stencil)
fraction_field = fields("fraction_field: double[10, 10, 5]", layout=(2, 1, 0))
object_vel = fields("object_vel(3): double[10, 10, 5]", layout=(3, 2, 1, 0))
psm_config = PSMConfig(fraction_field=fraction_field, object_velocity_field=object_vel)
lbm_config = LBMConfig(stencil=stencil, method=method_enum, relaxation_rate=1.5, compressible=True,
psm_config=psm_config)
config = CreateKernelConfig(target=Target.CPU)#
collision_rule = create_lb_collision_rule(lbm_config=lbm_config)
ast = create_kernel(collision_rule, config=config)
code_str = get_code_str(ast)
def get_data_handling_for_psm(use_psm):
domain_size = (10, 10)
stencil = LBStencil(Stencil.D2Q9)
dh = create_data_handling(domain_size=domain_size, periodicity=(True, False))
f = dh.add_array('f', values_per_cell=len(stencil))
dh.fill(f.name, 0.0, ghost_layers=True)
f_tmp = dh.add_array('f_tmp', values_per_cell=len(stencil))
dh.fill(f_tmp.name, 0.0, ghost_layers=True)
psm_config = None
if use_psm:
fraction_field = dh.add_array('fraction_field', values_per_cell=1)
dh.fill(fraction_field.name, 0.0, ghost_layers=True)
object_vel = dh.add_array('object_vel', values_per_cell=dh.dim)
dh.fill(object_vel.name, 0.0, ghost_layers=True)
psm_config = PSMConfig(fraction_field=fraction_field, object_velocity_field=object_vel)
lbm_config = LBMConfig(stencil=stencil, method=Method.SRT, relaxation_rate=1.5, psm_config=psm_config)
lbm_optimisation = LBMOptimisation(symbolic_field=f, symbolic_temporary_field=f_tmp)
update_rule = create_lb_update_rule(lbm_config=lbm_config, lbm_optimisation=lbm_optimisation)
kernel_lb_step_with_psm = create_kernel(update_rule).compile()
return (dh, kernel_lb_step_with_psm)
def test_lbm_vs_psm():
psm_dh, psm_kernel = get_data_handling_for_psm(True)
lbm_dh, lbm_kernel = get_data_handling_for_psm(False)
for i in range(20):
psm_dh.run_kernel(psm_kernel)
psm_dh.swap('f', 'f_tmp')
lbm_dh.run_kernel(lbm_kernel)
lbm_dh.swap('f', 'f_tmp')
max_vel_error = np.max(np.abs(psm_dh.gather_array('f') - lbm_dh.gather_array('f')))
np.testing.assert_allclose(max_vel_error, 0, atol=1e-14)
import pytest
import numpy as np
import pystencils as ps
from lbmpy.flow_statistics import welford_assignments
@pytest.mark.parametrize('order', [1, 2, 3])
@pytest.mark.parametrize('dim', [1, 2, 3])
def test_welford(order, dim):
target = ps.Target.CPU
# create data handling and fields
dh = ps.create_data_handling((1, 1, 1), periodicity=False, default_target=target)
vector_field = dh.add_array('vector', values_per_cell=dim)
dh.fill(vector_field.name, 0.0, ghost_layers=False)
mean_field = dh.add_array('mean', values_per_cell=dim)
dh.fill(mean_field.name, 0.0, ghost_layers=False)
if order >= 2:
sos = dh.add_array('sos', values_per_cell=dim**2)
dh.fill(sos.name, 0.0, ghost_layers=False)
if order == 3:
soc = dh.add_array('soc', values_per_cell=dim**3)
dh.fill(soc.name, 0.0, ghost_layers=False)
else:
soc = None
else:
sos = None
soc = None
welford = welford_assignments(field=vector_field, mean_field=mean_field,
sum_of_squares_field=sos, sum_of_cubes_field=soc)
welford_kernel = ps.create_kernel(welford).compile()
# set random seed
np.random.seed(42)
n = int(1e4)
x = np.random.normal(size=n * dim).reshape(n, dim)
analytical_mean = np.zeros(dim)
analytical_covariance = np.zeros(dim**2)
analytical_third_order_moments = np.zeros(dim**3)
# calculate analytical mean
for i in range(dim):
analytical_mean[i] = np.mean(x[:, i])
# calculate analytical covariances
for i in range(dim):
for j in range(dim):
analytical_covariance[i * dim + j] \
= (np.sum((x[:, i] - analytical_mean[i]) * (x[:, j] - analytical_mean[j]))) / n
# calculate analytical third-order central moments
for i in range(dim):
for j in range(dim):
for k in range(dim):
analytical_third_order_moments[(i * dim + j) * dim + k] \
= (np.sum((x[:, i] - analytical_mean[i])
* (x[:, j] - analytical_mean[j])
* (x[:, k] - analytical_mean[k]))) / n
# Time loop
counter = 1
for i in range(n):
for d in range(dim):
new_value = x[i, d]
if dim > 1:
dh.fill(array_name=vector_field.name, val=new_value, value_idx=d, ghost_layers=False)
else:
dh.fill(array_name=vector_field.name, val=new_value, ghost_layers=False)
dh.run_kernel(welford_kernel, counter=counter)
counter += 1
welford_mean = dh.gather_array(mean_field.name).flatten()
np.testing.assert_allclose(welford_mean, analytical_mean)
if order >= 2:
welford_covariance = dh.gather_array(sos.name).flatten() / n
np.testing.assert_allclose(welford_covariance, analytical_covariance)
if order == 3:
welford_third_order_moments = dh.gather_array(soc.name).flatten() / n
np.testing.assert_allclose(welford_third_order_moments, analytical_third_order_moments)
if __name__ == '__main__':
test_welford(1, 1)