Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 210 additions and 2295 deletions
import numpy as np
from lbmpy.scenarios import create_channel
def test_fluctuating_generation_pipeline():
ch = create_channel((40, 10), method='mrt3', relaxation_rates=[1.5, 1, 1], force=1e-5,
fluctuating={'temperature': 1e-9},
kernel_params={'time_step': 1, 'seed': 312},
optimization={'cse_global': True})
ch.run(10)
assert np.max(ch.velocity[:, :]) < 0.1
import numpy as np
from lbmpy.creationfunctions import create_lb_method
from lbmpy.macroscopic_value_kernels import (
compile_macroscopic_values_getter, compile_macroscopic_values_setter)
def test_set_get_density_velocity_with_fields():
for stencil in ['D2Q9', 'D3Q19']:
for force_model in ['guo', 'luo', 'none']:
for compressible in [True, False]:
force = (0.1, 0.12, -0.17)
method = create_lb_method(stencil=stencil, force_model=force_model, method='trt',
compressible=compressible, force=force)
size = (3, 7, 4)[:method.dim]
pdf_arr = np.zeros(size + (len(method.stencil),))
density_input_field = 1 + 0.2 * (np.random.random_sample(size) - 0.5)
velocity_input_field = 0.1 * (np.random.random_sample(size + (method.dim, )) - 0.5)
setter = compile_macroscopic_values_setter(method, pdf_arr=pdf_arr,
quantities_to_set={'density': density_input_field,
'velocity': velocity_input_field}, )
setter(pdf_arr)
getter = compile_macroscopic_values_getter(method, ['density', 'velocity'], pdf_arr=pdf_arr)
density_output_field = np.empty_like(density_input_field)
velocity_output_field = np.empty_like(velocity_input_field)
getter(pdfs=pdf_arr, density=density_output_field, velocity=velocity_output_field)
np.testing.assert_almost_equal(density_input_field, density_output_field)
np.testing.assert_almost_equal(velocity_input_field, velocity_output_field)
def test_set_get_constant_velocity():
for stencil in ['D2Q9', 'D3Q19']:
for force_model in ['guo', 'luo', 'none']:
for compressible in [True, False]:
ref_velocity = [0.01, -0.2, 0.1]
force = (0.1, 0.12, -0.17)
method = create_lb_method(stencil=stencil, force_model=force_model, method='trt',
compressible=compressible, force=force)
size = (1, 1, 1)[:method.dim]
pdf_arr = np.zeros(size + (len(method.stencil),))
setter = compile_macroscopic_values_setter(method, pdf_arr=pdf_arr,
quantities_to_set={'velocity': ref_velocity[:method.dim]}, )
setter(pdf_arr)
getter = compile_macroscopic_values_getter(method, ['velocity'], pdf_arr=pdf_arr)
velocity_output_field = np.zeros(size + (method.dim, ))
getter(pdfs=pdf_arr, velocity=velocity_output_field)
if method.dim == 2:
computed_velocity = velocity_output_field[0, 0, :]
else:
computed_velocity = velocity_output_field[0, 0, 0, :]
np.testing.assert_almost_equal(np.array(ref_velocity[:method.dim]), computed_velocity)
from lbmpy.cumulants import raw_moment_as_function_of_cumulants
from lbmpy.maxwellian_equilibrium import *
from lbmpy.moments import (
MOMENT_SYMBOLS, exponents_to_polynomial_representations, moment_matrix,
moments_up_to_component_order, moments_up_to_order)
from lbmpy.stencils import get_stencil
from pystencils.sympyextensions import remove_higher_order_terms
def test_maxwellian_moments():
"""Check moments of continuous Maxwellian"""
rho = sp.Symbol("rho")
u = sp.symbols("u_0 u_1 u_2")
c_s = sp.Symbol("c_s")
eq_moments = get_moments_of_continuous_maxwellian_equilibrium(((0, 0, 0), (0, 0, 1)),
dim=3, rho=rho, u=u, c_s_sq=c_s ** 2)
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[2]
x, y, z = MOMENT_SYMBOLS
one = sp.Rational(1, 1)
eq_moments = get_moments_of_continuous_maxwellian_equilibrium((one, x, x ** 2, x * y),
dim=2, rho=rho, u=u[:2], c_s_sq=c_s ** 2)
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[0]
assert eq_moments[2] == rho * (c_s ** 2 + u[0] ** 2)
assert eq_moments[3] == rho * u[0] * u[1]
def test_continuous_discrete_moment_equivalence():
"""Check that moments up to order 3 agree with moments of the continuous Maxwellian"""
for stencil in [get_stencil(n) for n in ["D2Q9", "D3Q15", "D3Q19", "D3Q27"]]:
dim = len(stencil[0])
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=dim, include_permutations=False))
cm = sp.Matrix(get_moments_of_continuous_maxwellian_equilibrium(moments, order=2, dim=dim, c_s_sq=c_s_sq))
dm = sp.Matrix(get_moments_of_discrete_maxwellian_equilibrium(stencil, moments, order=2,
compressible=True, c_s_sq=c_s_sq))
diff = sp.simplify(cm - dm)
for d in diff:
assert d == 0
def test_moment_cumulant_continuous_equivalence():
"""Test that discrete equilibrium is the same up to order 3 when obtained with following methods
* eq1: take moments of continuous Maxwellian and transform back to pdf space
* eq2: take cumulants of continuous Maxwellian, transform to moments then transform to pdf space
* eq3: take discrete equilibrium from LBM literature
* eq4: same as eq1 but with built-in function
"""
for stencil in [get_stencil('D2Q9'), get_stencil('D3Q27')]:
dim = len(stencil[0])
u = sp.symbols("u_:{dim}".format(dim=dim))
indices = tuple(moments_up_to_component_order(2, dim=dim))
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_moments_of_continuous_maxwellian_equilibrium(indices, dim=dim, u=u, c_s_sq=c_s_sq)
eq_cumulants = get_cumulants_of_continuous_maxwellian_equilibrium(indices, dim=dim, u=u, c_s_sq=c_s_sq)
eq_cumulants = {idx: c for idx, c in zip(indices, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in indices]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4
def test_moment_cumulant_continuous_equivalence_polynomial_formulation():
"""Same as test above, but instead of index tuples, the polynomial formulation is used."""
for stencil in [get_stencil('D2Q9'), get_stencil('D3Q27')]:
dim = len(stencil[0])
u = sp.symbols(f"u_:{dim}")
index_tuples = tuple(moments_up_to_component_order(2, dim=dim))
indices = exponents_to_polynomial_representations(index_tuples)
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_moments_of_continuous_maxwellian_equilibrium(indices, dim=dim, u=u, c_s_sq=c_s_sq)
eq_cumulants = get_cumulants_of_continuous_maxwellian_equilibrium(indices, dim=dim, u=u, c_s_sq=c_s_sq)
eq_cumulants = {idx: c for idx, c in zip(index_tuples, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in index_tuples]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4
"""
Moment-based methods are created by specifying moments and their equilibrium value.
This test checks if the equilibrium formula obtained by this method is the same as the explicitly
given discrete_maxwellian_equilibrium
"""
import sympy as sp
from lbmpy.creationfunctions import create_lb_method
from lbmpy.maxwellian_equilibrium import discrete_maxwellian_equilibrium
from lbmpy.methods import create_mrt_orthogonal, create_srt, create_trt
from lbmpy.relaxationrates import get_shear_relaxation_rate
from lbmpy.stencils import get_stencil
def check_for_matching_equilibrium(method_name, stencil, compressibility):
omega = sp.Symbol("omega")
if method_name == 'srt':
method = create_srt(stencil, omega, compressible=compressibility, equilibrium_order=2)
elif method_name == 'trt':
method = create_trt(stencil, omega, omega, compressible=compressibility, equilibrium_order=2)
elif method_name == 'mrt':
method = create_mrt_orthogonal(stencil, lambda v: omega, compressible=compressibility, equilibrium_order=2)
else:
raise ValueError("Unknown method")
reference_equilibrium = discrete_maxwellian_equilibrium(stencil, order=2,
c_s_sq=sp.Rational(1, 3), compressible=compressibility)
equilibrium = method.get_equilibrium()
equilibrium = equilibrium.new_without_subexpressions(subexpressions_to_keep=sp.symbols("rho u_0 u_1 u_2"))
diff = sp.Matrix(reference_equilibrium) - sp.Matrix([eq.rhs for eq in equilibrium.main_assignments])
diff = sp.simplify(diff)
assert diff.is_zero
def check_for_matching_equilibrium_for_stencil(stencil_name):
stencil = get_stencil(stencil_name)
for method in ['srt', 'trt', 'mrt']:
check_for_matching_equilibrium(method, stencil, True)
check_for_matching_equilibrium(method, stencil, False)
def test_d2_q9():
check_for_matching_equilibrium_for_stencil('D2Q9')
def test_d3_q27():
check_for_matching_equilibrium_for_stencil('D3Q27')
def test_d3_q19():
check_for_matching_equilibrium_for_stencil('D3Q19')
def test_d3_q15():
check_for_matching_equilibrium_for_stencil('D3Q15')
def test_relaxation_rate_setter():
o1, o2, o3 = sp.symbols("o1 o2 o3")
method = create_lb_method(method='srt', stencil='D2Q9', relaxation_rates=[o3])
method2 = create_lb_method(method='mrt3', stencil='D2Q9', relaxation_rates=[o3, o3, o3])
method.set_zeroth_moment_relaxation_rate(o1)
method.set_first_moment_relaxation_rate(o2)
assert get_shear_relaxation_rate(method) == o3
method.set_zeroth_moment_relaxation_rate(o3)
method.set_first_moment_relaxation_rate(o3)
assert method.collision_matrix == method2.collision_matrix
def test_mrt_orthogonal():
m = create_mrt_orthogonal(get_stencil("D2Q9"), maxwellian_moments=True)
assert (m.moment_matrix * m.moment_matrix.T).is_diagonal()
m = create_mrt_orthogonal(get_stencil("D3Q27"), maxwellian_moments=True)
assert (m.moment_matrix * m.moment_matrix.T).is_diagonal()
%% Cell type:code id: tags:
``` python
from lbmpy.session import *
from lbmpy.phasefield.n_phase_boyer import *
from lbmpy.phasefield.kerneleqs import *
from lbmpy.phasefield.contact_angle_circle_fitting import *
from scipy.ndimage.filters import gaussian_filter
from pystencils.simp import sympy_cse_on_assignment_list
one = sp.sympify(1)
import pyximport
pyximport.install(language_level=3)
from lbmpy.phasefield.simplex_projection import simplex_projection_2d # NOQA
```
%% Cell type:markdown id: tags:
# Simulation arbitrary surface tension case
%% Cell type:code id: tags:
``` python
n = 4
dx, dt = 1, 1
mobility = 2e-3
domain_size = (150, 150)
ε = one * 4
penalty_factor = 0
stabilization_factor = 10
κ = (one, one/2, one/3, one/4)
sigma_factor = one / 15
σ = sp.ImmutableDenseMatrix(n, n, lambda i,j: sigma_factor* (κ[i] + κ[j]) if i != j else 0 )
```
%% Cell type:code id: tags:
``` python
dh = create_data_handling(domain_size, periodicity=True, default_target='gpu')
c = dh.add_array('c', values_per_cell=n)
c_tmp = dh.add_array_like('c_tmp', 'c')
μ = dh.add_array('mu', values_per_cell=n)
cvec = c.center_vector
μvec = μ.center_vector
```
%% Cell type:code id: tags:
``` python
α, _ = diffusion_coefficients(σ)
f = lambda c: c**2 * ( 1 - c ) **2
a, b = compute_ab(f)
capital_f = capital_f0(cvec, σ) + correction_g(cvec, σ) + stabilization_factor * stabilization_term(cvec, α)
f_bulk = free_energy_bulk(capital_f, b, ε) + penalty_factor * (one - sum(cvec))
f_if = free_energy_interfacial(cvec, σ, a, ε)
f = f_bulk + f_if
```
%% Cell type:code id: tags:
``` python
#f_bulk
```
%% Cell type:code id: tags:
``` python
μ_assignments = mu_kernel(f, cvec, c, μ)
μ_assignments = [Assignment(a.lhs, a.rhs.doit()) for a in μ_assignments]
μ_assignments = sympy_cse_on_assignment_list(μ_assignments)
```
%% Cell type:code id: tags:
``` python
discretize = fd.Discretization2ndOrder(dx=dx, dt=dt)
```
%% Cell type:code id: tags:
``` python
def lapl(e):
return sum(ps.fd.diff(e, d, d) for d in range(dh.dim))
```
%% Cell type:code id: tags:
``` python
rhs = α * μvec
discretized_rhs = [discretize(fd.expand_diff_full( lapl(mobility * rhs_i) + fd.transient(cvec[i], idx=i), functions=μvec))
for i, rhs_i in enumerate(rhs)]
c_assignments = [Assignment(lhs, rhs)
for lhs, rhs in zip(c_tmp.center_vector, discretized_rhs)]
```
%% Cell type:code id: tags:
``` python
#c_assignments
```
%% Cell type:code id: tags:
``` python
μ_sync = dh.synchronization_function(μ.name)
c_sync = dh.synchronization_function(c.name)
optimization = {'cpu_openmp': 4, 'cpu_vectorize_info': None}
μ_kernel = create_kernel(μ_assignments, target=dh.default_target, **optimization).compile()
c_kernel = create_kernel(c_assignments, target=dh.default_target, **optimization).compile()
def set_c(slice_obj, values):
for block in dh.iterate(slice_obj):
arr = block[c.name]
arr[..., : ] = values
def smooth():
for block in dh.iterate(ghost_layers=True):
c_arr = block[c.name]
for i in range(n):
gaussian_filter(c_arr[..., i], sigma=2, output=c_arr[..., i])
def time_loop(steps):
dh.all_to_gpu()
for t in range(steps):
c_sync()
dh.run_kernel(μ_kernel)
μ_sync()
dh.run_kernel(c_kernel)
dh.swap(c.name, c_tmp.name)
#simplex_projection_2d(dh.cpu_arrays[c.name])
dh.all_to_cpu()
```
%% Cell type:code id: tags:
``` python
set_c(make_slice[:, :], [0, 0, 0, 0])
set_c(make_slice[:, 0.5:], [1, 0, 0, 0])
set_c(make_slice[:, :0.5], [0, 1, 0, 0])
set_c(make_slice[0.3:0.7, 0.3:0.7], [0, 0, 1, 0])
smooth()
```
%% Cell type:code id: tags:
``` python
#dh.load_all('n_phases_state_size200_stab10.npz')
```
%% Cell type:code id: tags:
``` python
plt.phase_plot(dh.gather_array(c.name))
```
%% Output
%% Cell type:code id: tags:
``` python
neumann_angles_from_surface_tensions(lambda i, j: float(σ[i, j]))
```
%% Output
$\displaystyle \left[ 146.44269023807928, \ 117.81813928465394, \ 95.73917047726678\right]$
[146.44269023807928, 117.81813928465394, 95.73917047726678]
%% Cell type:code id: tags:
``` python
import time
for i in range(10):
start = time.perf_counter()
time_loop(1_000)
end = time.perf_counter()
try:
print(i, end - start, liquid_lens_neumann_angles(dh.gather_array(c.name)))
except Exception:
print(i, end - start, "none found")
```
%% Output
0 0.30607624700132874 [83.97888710273061, 100.48794346625529, 175.5331694310141]
1 0.2600655169990205 [83.73094685534376, 100.65854574856168, 175.6105073960945]
2 0.2601136189987301 [83.49914818603683, 100.82173327601079, 175.67911853795226]
3 0.25987518599868054 [83.31519592224448, 100.94468140501989, 175.74012267273554]
4 0.2651959220020217 [83.14239972296966, 101.06100094405181, 175.79659933297853]
5 0.25910847799968906 [82.984481834461, 101.16731750637399, 175.8482006591651]
6 0.259863024999504 [82.84781128433397, 101.2570276449976, 175.89516107066834]
7 0.2606479199966998 [82.7456965110742, 101.31687551766585, 175.93742797125986]
8 0.25991897900166805 [82.67010885583116, 101.35099855297112, 175.97889259119805]
9 0.2590353729974595 [75.9000280154447, 108.9652166787719, 175.1347553057833]
%% Cell type:code id: tags:
``` python
plt.subplot(1,3,1)
t = dh.gather_array(c.name, make_slice[25, :]).squeeze()
plt.plot(t);
plt.subplot(1,3,2)
plt.phase_plot(dh.gather_array(c.name), linewidth=1)
plt.subplot(1,3,3)
plt.scalar_field(dh.gather_array(μ.name)[:, :, 2])
plt.colorbar();
```
%% Output
%% Cell type:code id: tags:
``` python
assert not np.isnan(dh.max(c.name))
```
%% Cell type:code id: tags:
``` python
t = dh.gather_array(c.name, make_slice[25, 55:90]).squeeze()
plt.hlines(0.5, 0, 30)
plt.plot(t);
```
%% Output
from lbmpy.phasefield.scenarios import *
from pystencils import make_slice
def test_setup():
domain_size = (30, 15)
scenarios = [
create_three_phase_model(domain_size=domain_size, include_rho=True),
#create_three_phase_model(domain_size=domain_size, include_rho=False),
create_n_phase_model_penalty_term(domain_size=domain_size, num_phases=4),
]
for i, sc in enumerate(scenarios):
print("Testing scenario", i)
sc.set_concentration(make_slice[:, :0.5], [1, 0, 0])
sc.set_concentration(make_slice[:, 0.5:], [0, 1, 0])
sc.set_concentration(make_slice[0.4:0.6, 0.4:0.6], [0, 0, 1])
sc.set_pdf_fields_from_macroscopic_values()
sc.run(10)
def test_fd_cahn_hilliard():
sc = create_n_phase_model_penalty_term(domain_size=(100, 50), num_phases=3,
solve_cahn_hilliard_with_finite_differences=True)
sc.set_concentration(make_slice[:, 0.5:], [1, 0, 0])
sc.set_concentration(make_slice[:, :0.5], [0, 1, 0])
sc.set_concentration(make_slice[0.3:0.7, 0.3:0.7], [0, 0, 1])
sc.set_pdf_fields_from_macroscopic_values()
sc.run(100)
assert np.isfinite(np.max(sc.concentration[:, :]))
import pytest
from lbmpy.creationfunctions import create_lb_method
from lbmpy.relaxationrates import get_shear_relaxation_rate
def test_relaxation_rate():
method = create_lb_method(stencil="D3Q19", method='mrt_raw',
relaxation_rates=[1 + i / 10 for i in range(19)])
with pytest.raises(ValueError) as e:
get_shear_relaxation_rate(method)
assert 'Shear moments are relaxed with different relaxation' in str(e.value)
method = create_lb_method(stencil="D2Q9", method='mrt_raw',
relaxation_rates=[1 + i / 10 for i in range(9)])
assert get_shear_relaxation_rate(method) == 1.5
import os
from types import MappingProxyType
import numpy as np
from lbmpy.scenarios import create_lid_driven_cavity as run_ldc_lbmpy
from pystencils import make_slice
def create_force_driven_channel(force=1e-6, domain_size=None, dim=2, radius=None, length=None,
optimization=MappingProxyType({}), lbm_kernel=None, kernel_params=MappingProxyType({}),
**kwargs):
from lbmpy.lbstep import LatticeBoltzmannStep
from lbmpy.boundaries import NoSlip
from pystencils.slicing import slice_from_direction
wall_boundary = NoSlip()
if domain_size is not None:
dim = len(domain_size)
else:
if dim is None or radius is None or length is None:
raise ValueError("Pass either 'domain_size' or 'dim', 'radius' and 'length'")
assert dim in (2, 3)
kwargs['force'] = tuple([force, 0, 0][:dim])
round_channel = False
if radius is not None:
assert length is not None
if dim == 3:
domain_size = (length, 2 * radius + 1, 2 * radius + 1)
round_channel = True
else:
if domain_size is None:
domain_size = (length, 2 * radius)
if 'force_model' not in kwargs:
kwargs['force_model'] = 'guo'
lb_step = LatticeBoltzmannStep(domain_size, optimization=optimization, lbm_kernel=lbm_kernel,
kernel_params=kernel_params, periodicity=(True, False, False), **kwargs)
boundary_handling = lb_step.boundary_handling
if dim == 2:
for direction in ('N', 'S'):
boundary_handling.set_boundary(wall_boundary, slice_from_direction(direction, dim))
elif dim == 3:
if round_channel:
def circle_mask_cb(_, y, z):
y_mid = np.max(y) // 2
z_mid = np.max(z) // 2
return (y - y_mid) ** 2 + (z - z_mid) ** 2 > radius ** 2
boundary_handling.set_boundary(wall_boundary, mask_callback=circle_mask_cb)
else:
for direction in ('N', 'S', 'T', 'B'):
boundary_handling.set_boundary(wall_boundary, slice_from_direction(direction, dim))
assert domain_size is not None
if 'force_model' not in kwargs:
kwargs['force_model'] = 'guo'
return lb_step
def plot_velocity_fields(vel1, vel2):
import lbmpy.plot as plt
diff = np.average(np.abs(vel2 - vel1))
has_diff = diff > 1e-12
num_plots = 3 if has_diff else 2
plt.subplot(1, num_plots, 1)
plt.title("lbmpy")
plt.vector_field(vel1)
plt.subplot(1, num_plots, 2)
plt.title("walberla ref")
plt.vector_field(vel2)
if has_diff:
plt.title("Difference (%f)" % diff)
plt.subplot(1, num_plots, 3)
plt.vector_field(vel2 - vel1)
plt.show()
def compare_scenario(lbmpy_scenario_creator, walberla_scenario_creator, optimization=MappingProxyType({}),
action='Testing', name='ss', plot="off", **kwargs):
if 'time_steps' in kwargs:
time_steps = kwargs['time_steps']
del kwargs['time_steps']
else:
time_steps = 100
ref_file_path = get_directory_reference_files()
if action == 'Testing':
reference = np.load(os.path.join(ref_file_path, name + ".npz"))
lbmpy_version = lbmpy_scenario_creator(optimization=optimization, **kwargs)
lbmpy_version.run(time_steps)
rho_lbmpy = lbmpy_version.density_slice(make_slice[:, :] if lbmpy_version.dim == 2 else make_slice[:, :, :])
vel_lbmpy = lbmpy_version.velocity_slice(make_slice[:, :] if lbmpy_version.dim == 2 else make_slice[:, :, :])
if plot == "on":
plot_velocity_fields(vel_lbmpy, reference['vel'])
np.testing.assert_almost_equal(reference['rho'], rho_lbmpy, err_msg="Density fields are different")
np.testing.assert_almost_equal(reference['vel'], vel_lbmpy, err_msg="Velocity fields are different")
else:
wlb_time_loop = walberla_scenario_creator(**kwargs)
pdfs_wlb, rho_wlb, vel_wlb = wlb_time_loop(time_steps)
if os.path.exists(ref_file_path + name + ".npz"):
os.remove(ref_file_path + name + ".npz")
np.savez_compressed(ref_file_path + name, pdfs=pdfs_wlb, rho=rho_wlb, vel=vel_wlb)
def get_directory_reference_files():
script_file = os.path.realpath(__file__)
script_dir = os.path.dirname(script_file)
return os.path.join(script_dir, "reference_files")
def compare_lid_driven_cavity(optimization=MappingProxyType({}), action='Testing', plot="off", **kwargs):
if kwargs['method'] == 'MRT':
name = "LidDrivenCavity_" + kwargs['method']
else:
name = "LidDrivenCavity_" + kwargs['method'] + "_" + kwargs['force_model']
if kwargs['compressible']:
name = name + "_compressible"
else:
name = name + "_incompressible"
try:
from lbmpy_tests.walberla_scenario_setup import create_lid_driven_cavity as run_lid_driven_cavity_walberla
except ImportError:
run_lid_driven_cavity_walberla = None
return compare_scenario(run_ldc_lbmpy, run_lid_driven_cavity_walberla, optimization, action, name, plot, **kwargs)
def compare_force_driven_channel(optimization=MappingProxyType({}), action='Testing', plot="off", **kwargs):
from functools import partial
lbmpy_func = partial(create_force_driven_channel, dim=2)
name = "ForceDrivenChannel_" + kwargs['force_model']
if kwargs['compressible']:
name = name + "_compressible"
else:
name = name + "_incompressible"
try:
from lbmpy_tests.walberla_scenario_setup import create_lid_driven_cavity as run_force_driven_channel_walberla
except ImportError:
run_force_driven_channel_walberla = None
return compare_scenario(lbmpy_func, run_force_driven_channel_walberla, optimization, action, name, plot, **kwargs)
def test_channel_srt(action='Testing', plot="off"):
params = {'force': 0.0001,
'radius': 40,
'length': 40,
'method': 'SRT',
'stencil': 'D2Q9',
'time_steps': 500,
'maxwellian_moments': False,
'relaxation_rates': [1.8]}
if action == 'Testing' or action == 'Regenerate':
for force_model_name in ('simple', 'luo', 'guo'):
for compressible in (False, True):
print("%s Channel SRT, Force Model %s, compressible %d" % (action, force_model_name, compressible))
compare_force_driven_channel(compressible=compressible, force_model=force_model_name,
action=action, plot=plot, **params)
else:
print("Possible Actions: Regenerate or Testing")
def test_ldc_srt(action='Testing', plot="off"):
force = (0.0001, -0.00002)
if action == 'Testing' or action == 'Regenerate':
for force_model_name in ('simple', 'luo', 'guo'):
for compressible in (False, True):
print("%s LidDrivenCavity SRT, Force Model %s, compressible %d" % (action, force_model_name,
compressible))
compare_lid_driven_cavity(domain_size=(16, 19), lid_velocity=0.005, stencil='D2Q9',
method='SRT', relaxation_rates=[1.8], compressible=compressible,
maxwellian_moments=False,
force=force, force_model=force_model_name, action=action, plot=plot)
else:
print("Possible Actions: Regenerate or Testing")
def test_ldc_trt(action='Testing', plot="off"):
f = (0.0001, -0.00002, 0.0000124)
if action == 'Testing' or action == 'Regenerate':
for force_modelName in ('luo',): # guo for multiple relaxation rates has to be implemented...
for compressible in (True, False):
print("%s LidDrivenCavity TRT, Force Model %s, compressible %d" % (action, force_modelName,
compressible))
# print("testing", force_modelName, compressible)
compare_lid_driven_cavity(domain_size=(16, 17, 18), lid_velocity=0.005, stencil='D3Q19',
method='TRT', relaxation_rates=[1.8, 1.3], compressible=compressible,
maxwellian_moments=False,
force=f, force_model=force_modelName, action=action, plot=plot)
else:
print("Possible Actions: Regenerate or Testing")
def test_ldc_mrt(action='Testing', plot="off"):
if action == 'Testing' or action == 'Regenerate':
print("%s LidDrivenCavity MRT, compressible 0" % action)
compare_lid_driven_cavity(domain_size=(16, 17, 18), lid_velocity=0.005, stencil='D3Q19',
method='MRT', compressible=False, maxwellian_moments=False,
relaxation_rates=[1, 1.3, 1.4, 1.5, 1.25, 1.36, 1.12], action=action, plot=plot)
else:
print("Possible Actions: Regenerate or Testing")
import numpy as np
from lbmpy.creationfunctions import create_lb_function
def test_srt():
src = np.zeros((3, 3, 9))
dst = np.zeros_like(src)
cuda = False
opt_params = {} if not cuda else {'target': 'gpu'}
func = create_lb_function(method='srt', stencil='D2Q9', relaxation_rates=[1.8], compressible=False,
optimization=opt_params)
if cuda:
import pycuda.gpuarray as gpuarray
gpu_src, gpu_dst = gpuarray.to_gpu(src), gpuarray.to_gpu(dst)
func(src=gpu_src, dst=gpu_dst)
gpu_src.get(src)
gpu_dst.get(dst)
else:
func(src=src, dst=dst)
np.testing.assert_allclose(np.sum(np.abs(dst)), 0.0, atol=1e-13)
%% Cell type:code id: tags:
``` python
from lbmpy.session import *
from lbmpy.sparse import *
from pystencils.field import FieldType
import pycuda.gpuarray as gpuarray
import pystencils as ps
```
%% Cell type:markdown id: tags:
# Sparse (list based) LBM
%% Cell type:code id: tags:
``` python
domain_size = (20, 20)
omega = 1.8
target = 'cpu'
ghost_layers = 1
arr_size = tuple(ds + 2 * ghost_layers for ds in domain_size)
lid_velocity = 0.01
force = 1e-6
channel = True
if channel:
kwargs={'force': (force, 0)}
else:
kwargs = {}
method = create_lb_method(stencil='D2Q9', relaxation_rate=omega, compressible=False, **kwargs)
```
%% Cell type:code id: tags:
``` python
ubb = UBB( (lid_velocity, 0) )
noslip = NoSlip()
flags = {
'fluid': 1,
noslip: 2,
ubb: 4,
}
flag_arr = np.zeros(arr_size, dtype=np.uint16)
flag_arr.fill(flags['fluid'])
if channel:
flag_arr[0, :] = 0
flag_arr[-1, :] = 0
flag_arr[:, 0] = flags[noslip]
flag_arr[:, -1] = flags[noslip]
else:
flag_arr[:, -1] = flags[ubb]
flag_arr[:, 0] = flags[noslip]
flag_arr[0, :] = flags[noslip]
flag_arr[-1, :] = flags[noslip]
plt.scalar_field(flag_arr)
plt.colorbar();
```
%% Output
%% Cell type:markdown id: tags:
### Mappings
%% Cell type:code id: tags:
``` python
mapping = SparseLbMapper(method.stencil, flag_arr, flags['fluid'], flags[noslip], flags[ubb])
index_arr = mapping.create_index_array(ghost_layers)
# Arrays
#index_arr = index_arr_linear.reshape([len(method.stencil), mapping.num_fluid_cells])
#index_arr = index_arr.swapaxes(0, 1)
pdf_arr = np.empty((len(mapping), len(method.stencil)), order='f')
pdf_arr_tmp = np.empty_like(pdf_arr)
vel_arr = np.ones([mapping.num_fluid_cells, method.dim], order='f')
```
%% Cell type:code id: tags:
``` python
pdf_field, pdf_field_tmp, vel_field = ps.fields("f(9), d(9), u(2): [1D]",
#f=pdf_arr[:mapping.num_fluid_cells],
#d=pdf_arr_tmp[:mapping.num_fluid_cells],
#u=vel_arr
)
pdf_field.field_type = FieldType.CUSTOM
pdf_field.pdf_field_tmp = FieldType.CUSTOM
```
%% Cell type:markdown id: tags:
### Macroscopic quantities
%% Cell type:code id: tags:
``` python
cqc = method.conserved_quantity_computation
inp_eqs = cqc.equilibrium_input_equations_from_init_values()
setter_eqs = method.get_equilibrium(conserved_quantity_equations=inp_eqs)
setter_eqs = setter_eqs.new_with_substitutions({sym: pdf_field(i)
for i, sym in enumerate(method.post_collision_pdf_symbols)})
kernel_initialize = ps.create_kernel(setter_eqs, ghost_layers=((0, 0),), ).compile()
def init():
kernel_initialize(f=pdf_arr[:mapping.num_fluid_cells])
init()
getter_eqs = cqc.output_equations_from_pdfs(pdf_field.center_vector,
{'velocity': vel_field})
kernel_compute_u = ps.create_kernel(getter_eqs, ghost_layers=((0,0),) ).compile()
def get_velocity(arr=pdf_arr):
fluid_cell_arr = mapping.coordinates
kernel_compute_u(f=pdf_arr[:mapping.num_fluid_cells], u=vel_arr)
full_velocity_arr = np.zeros(flag_arr.shape + (2,), dtype=np.float64)
full_velocity_arr.fill(np.nan)
arr = fluid_cell_arr[:mapping.num_fluid_cells]
full_velocity_arr[arr['x'], arr['y']] = vel_arr
return full_velocity_arr[1:-1, 1:-1]
```
%% Cell type:markdown id: tags:
### Stream Collide Kernel
%% Cell type:code id: tags:
``` python
#index_field = ps.Field.create_from_numpy_array("idx", index_arr, index_dimensions=1)
index_field = ps.Field.create_generic("idx", spatial_dimensions=1, index_dimensions=1, dtype=index_arr.dtype)
collision_rule = method.get_collision_rule()
Q = len(method.stencil)
symbol_subs = {sym: pdf_field.absolute_access((index_field(i-1),),())
for i, sym in enumerate(method.pre_collision_pdf_symbols)}
symbol_subs.update({sym: pdf_field_tmp(i) for i, sym in enumerate(method.post_collision_pdf_symbols)})
symbol_subs[method.pre_collision_pdf_symbols[0]] = pdf_field(0) # special case for center
symbol_subs
```
%% Output
$\displaystyle \left\{ d_{0} : {{d}_{0}}, \ d_{1} : {{d}_{0}^{1}}, \ d_{2} : {{d}_{0}^{2}}, \ d_{3} : {{d}_{0}^{3}}, \ d_{4} : {{d}_{0}^{4}}, \ d_{5} : {{d}_{0}^{5}}, \ d_{6} : {{d}_{0}^{6}}, \ d_{7} : {{d}_{0}^{7}}, \ d_{8} : {{d}_{0}^{8}}, \ f_{0} : {{f}_{0}}, \ f_{1} : {{f}_{\mathbf{{idx}_{0}}}}, \ f_{2} : {{f}_{\mathbf{{idx}_{0}^{1}}}}, \ f_{3} : {{f}_{\mathbf{{idx}_{0}^{2}}}}, \ f_{4} : {{f}_{\mathbf{{idx}_{0}^{3}}}}, \ f_{5} : {{f}_{\mathbf{{idx}_{0}^{4}}}}, \ f_{6} : {{f}_{\mathbf{{idx}_{0}^{5}}}}, \ f_{7} : {{f}_{\mathbf{{idx}_{0}^{6}}}}, \ f_{8} : {{f}_{\mathbf{{idx}_{0}^{7}}}}\right\}$
{d₀: d_C__0, d₁: d_C__1, d₂: d_C__2, d₃: d_C__3, d₄: d_C__4, d₅: d_C__5, d₆: d
_C__6, d₇: d_C__7, d₈: d_C__8, f₀: f_C__0, f₁: f_daad556c0781, f₂: f_14e3fe181
65b, f₃: f_cd08e7a24cfc, f₄: f_ac7562ff93b9, f₅: f_5b49411a8d3e, f₆: f_d924a61
a086a, f₇: f_716db6c2b896, f₈: f_89df4cd9f421}
%% Cell type:code id: tags:
``` python
collision_rule = method.get_collision_rule()
update_rule = collision_rule.new_with_substitutions(symbol_subs)
kernel_stream_collide = ps.create_kernel(update_rule, ghost_layers=[(0,0)], target=target).compile()
```
%% Cell type:markdown id: tags:
### Boundary Kernels
%% Cell type:code id: tags:
``` python
if not channel:
if target == 'gpu':
raise NotImplementedError("UBB on GPU not working yet")
ubb_mapper = SparseLbBoundaryMapper(ubb, method, pdf_field)
#TODO the following line is wrong: kernel contains accesses to index_field and pdf_field which have
#different size: a correct kernel comes out when by change the shape is taken from index field,
# when taken from pdf field, a wrong kernel is generated
ubb_kernel = ps.create_kernel(ubb_mapper.assignments(), ghost_layers=0).compile()
ubb_idx_arr = ubb_mapper.create_index_arr(mapping, flags[ubb])
ps.show_code(ubb_kernel.ast)
def handle_ubb():
ubb_kernel(indexField=ubb_idx_arr, f=pdf_arr[:mapping.num_fluid_cells])
else:
def handle_ubb():
pass
```
%% Cell type:markdown id: tags:
### Time Loop
%% Cell type:code id: tags:
``` python
def time_step():
global pdf_arr, pdf_arr_tmp, index_arr
handle_ubb()
if target == 'gpu':
gpu_pdf_arr = gpuarray.to_gpu(pdf_arr)
gpu_pdf_arr_tmp = gpuarray.to_gpu(pdf_arr_tmp)
gpu_index_arr = gpuarray.to_gpu(index_arr)
kernel_stream_collide(f=gpu_pdf_arr[:mapping.num_fluid_cells],
d=gpu_pdf_arr_tmp[:mapping.num_fluid_cells],
idx=gpu_index_arr)
pdf_arr = gpu_pdf_arr.get()
pdf_arr_tmp = gpu_pdf_arr_tmp.get()
index_arr = gpu_index_arr.get()
else:
kernel_stream_collide(f=pdf_arr[:mapping.num_fluid_cells],
d=pdf_arr_tmp[:mapping.num_fluid_cells],
idx=index_arr)
pdf_arr_tmp, pdf_arr = pdf_arr, pdf_arr_tmp
def run(steps=100):
for t in range(steps):
time_step()
return get_velocity()
```
%% Cell type:code id: tags:
``` python
init()
```
%% Cell type:code id: tags:
``` python
init()
result = run(100)
plt.vector_field(result, step=1);
```
%% Output
%% Cell type:markdown id: tags:
### Check against reference
%% Cell type:code id: tags:
``` python
if channel:
reference = create_channel(domain_size, force=force, lb_method=method)
else:
reference = create_lid_driven_cavity(domain_size, relaxation_rate=omega, lid_velocity=lid_velocity,
compressible=False)
reference.run(100)
```
%% Cell type:code id: tags:
``` python
np.testing.assert_almost_equal(reference.velocity[:, :], result)
```
import numpy as np
import pytest
from lbmpy.creationfunctions import create_lb_ast
from lbmpy.scenarios import create_lid_driven_cavity
from pystencils.sympyextensions import count_operations_in_ast
def test_split_number_of_operations():
# For the following configurations the number of operations for splitted and un-splitted version are
# exactly equal. This is not true for D3Q15 and D3Q27 because some sub-expressions are computed in multiple
# splitted, inner loops.
for stencil in ['D2Q9', 'D3Q19']:
for compressible in (True, False):
for method in ('srt', 'trt'):
common_params = {'stencil': stencil,
'method': method,
'compressible': compressible,
'force': (1e-6, 1e-5, 1e-7)
}
ast_with_splitting = create_lb_ast(optimization={'split': True}, **common_params)
ast_without_splitting = create_lb_ast(optimization={'split': False}, **common_params)
op_with_splitting = count_operations_in_ast(ast_with_splitting)
op_without_splitting = count_operations_in_ast(ast_without_splitting)
assert op_without_splitting['muls'] == op_with_splitting['muls']
assert op_without_splitting['adds'] == op_with_splitting['adds']
assert op_without_splitting['divs'] == op_with_splitting['divs']
@pytest.mark.longrun
def test_equivalence():
relaxation_rates = [1.8, 1.7, 1.0]
for stencil in ['D2Q9', 'D3Q15', 'D3Q19', 'D3Q27']:
for compressible in (True, False):
for method in ('srt', 'mrt3'):
for force in ((0, 0, 0), (1e-6, 1e-7, 2e-6)):
common_params = {'domain_size': (20, 30) if stencil.startswith('D2') else (10, 13, 7),
'stencil': stencil,
'method': method,
'compressible': compressible,
'force': force,
'relaxation_rates': relaxation_rates}
print("Running Scenario", common_params)
with_split = create_lid_driven_cavity(optimization={'split': True}, **common_params)
without_split = create_lid_driven_cavity(optimization={'split': False}, **common_params)
with_split.run(100)
without_split.run(100)
np.testing.assert_almost_equal(with_split.velocity_slice(), without_split.velocity_slice())
def test_equivalence_short():
relaxation_rates = [1.8, 1.7, 1.0]
for stencil, compressible, method, force in [('D2Q9', True, 'srt', 1e-7), ('D3Q19', False, 'mrt3', 0)]:
dim = int(stencil[1])
common_params = {'domain_size': (20, 30) if stencil.startswith('D2') else (10, 13, 7),
'stencil': stencil,
'method': method,
'compressible': compressible,
'force': (force, 0, 0)[:dim],
'relaxation_rates': relaxation_rates}
print("Running Scenario", common_params)
with_split = create_lid_driven_cavity(optimization={'split': True}, **common_params)
without_split = create_lid_driven_cavity(optimization={'split': False}, **common_params)
with_split.run(100)
without_split.run(100)
np.testing.assert_almost_equal(with_split.velocity_slice(), without_split.velocity_slice())
import itertools
import warnings
import pytest
import sympy as sp
import lbmpy.stencils as s
import pystencils as ps
from lbmpy.stencils import get_stencil
def get_3d_stencils():
return s.get_stencil('D3Q15'), s.get_stencil('D3Q19'), s.get_stencil('D3Q27')
def get_all_stencils():
return [
s.get_stencil('D2Q9', 'walberla'),
s.get_stencil('D3Q15', 'walberla'),
s.get_stencil('D3Q19', 'walberla'),
s.get_stencil('D3Q27', 'walberla'),
s.get_stencil('D2Q9', 'counterclockwise'),
s.get_stencil('D2Q9', 'braunschweig'),
s.get_stencil('D3Q19', 'braunschweig'),
s.get_stencil('D3Q27', 'premnath'),
s.get_stencil("D3Q27", "fakhari"),
]
def test_sizes():
assert len(s.get_stencil('D2Q9')) == 9
assert len(s.get_stencil('D3Q15')) == 15
assert len(s.get_stencil('D3Q19')) == 19
assert len(s.get_stencil('D3Q27')) == 27
def test_dimensionality():
for d in s.get_stencil('D2Q9'):
assert len(d) == 2
for d in itertools.chain(*get_3d_stencils()):
assert len(d) == 3
def test_uniqueness():
for stencil in get_3d_stencils():
direction_set = set(stencil)
assert len(direction_set) == len(stencil)
def test_run_self_check():
for st in get_all_stencils():
assert ps.stencil.is_valid(st, max_neighborhood=1)
assert ps.stencil.is_symmetric(st)
def test_inverse_direction():
assert ps.stencil.inverse_direction((1, 0, -1)), (-1, 0 == 1)
def test_free_functions():
assert not ps.stencil.is_symmetric([(1, 0), (0, 1)])
assert not ps.stencil.is_valid([(1, 0), (1, 1, 0)])
assert not ps.stencil.is_valid([(2, 0), (0, 1)], max_neighborhood=1)
with pytest.raises(ValueError) as e:
get_stencil("name_that_does_not_exist")
assert "No such stencil" in str(e.value)
def test_visualize():
import matplotlib.pyplot as plt
plt.clf()
plt.cla()
d2q9, d3q19 = get_stencil("D2Q9"), get_stencil("D3Q19")
figure = plt.gcf()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ps.stencil.plot(d2q9, figure=figure, data=[str(i) for i in range(9)])
ps.stencil.plot(d3q19, figure=figure, data=sp.symbols("a_:19"))
This source diff could not be displayed because it is too large. You can view the blob instead.
import numpy as np
import pytest
from lbmpy.scenarios import create_lid_driven_cavity
def test_lbm_vectorization_short():
print("Computing reference solutions")
size1 = (64, 32)
relaxation_rate = 1.8
ldc1_ref = create_lid_driven_cavity(size1, relaxation_rate=relaxation_rate)
ldc1_ref.run(10)
ldc1 = create_lid_driven_cavity(size1, relaxation_rate=relaxation_rate,
optimization={'vectorization': {'instruction_set': 'avx',
'assume_aligned': True,
'nontemporal': True,
'assume_inner_stride_one': False,
'assume_sufficient_line_padding': False,
}},
fixed_loop_sizes=False)
ldc1.run(10)
@pytest.mark.longrun
def test_lbm_vectorization():
vectorization_options = [{'instruction_set': instruction_set,
'assume_aligned': aa,
'nontemporal': nt,
'assume_inner_stride_one': True,
'assume_sufficient_line_padding': lp,
}
for instruction_set in ('sse', 'avx')
for aa, lp in ([False, False], [True, False], [True, True],)
for nt in (False, True)
]
time_steps = 100
size1 = (64, 32)
size2 = (666, 34)
relaxation_rate = 1.8
print("Computing reference solutions")
ldc1_ref = create_lid_driven_cavity(size1, relaxation_rate=relaxation_rate)
ldc1_ref.run(time_steps)
ldc2_ref = create_lid_driven_cavity(size2, relaxation_rate=relaxation_rate)
ldc2_ref.run(time_steps)
for double_precision in (False, True):
for vec_opt in vectorization_options:
for fixed_loop_sizes in (True, False):
optimization = {'double_precision': double_precision,
'vectorization': vec_opt,
'cse_global': True,
}
print("Vectorization test, double precision {}, vectorization {}, fixed loop sizes {}".format(
double_precision, vec_opt, fixed_loop_sizes))
ldc1 = create_lid_driven_cavity(size1, relaxation_rate=relaxation_rate, optimization=optimization,
fixed_loop_sizes=fixed_loop_sizes)
ldc1.run(time_steps)
np.testing.assert_almost_equal(ldc1_ref.velocity[:, :], ldc1.velocity[:, :])
optimization['split'] = True
ldc2 = create_lid_driven_cavity(size2, relaxation_rate=relaxation_rate, optimization=optimization,
fixed_loop_sizes=fixed_loop_sizes)
ldc2.run(time_steps)
np.testing.assert_almost_equal(ldc2_ref.velocity[:, :], ldc2.velocity[:, :])
if __name__ == '__main__':
test_lbm_vectorization()
import waLBerla.field as field
from waLBerla import createUniformBlockGrid, createUniformBufferedScheme, makeSlice
def create_walberla_lattice_model(stencil, method, relaxation_rates, compressible=False, order=2,
force_model='none', force=(0, 0, 0), **_):
from waLBerla import lbm
if method.lower() == 'srt':
collision_model = lbm.collisionModels.SRT(relaxation_rates[0])
elif method.lower() == 'trt':
collision_model = lbm.collisionModels.TRT(relaxation_rates[0], relaxation_rates[1])
elif method.lower() == 'mrt':
if stencil != 'D3Q19':
raise ValueError("MRT is available for D3Q19 only in walberla")
collision_model = lbm.collisionModels.D3Q19MRT(*relaxation_rates[1:7])
else:
raise ValueError("Unknown method: " + str(method))
if len(force) == 2:
force = (force[0], force[1], 0)
if force_model is None or force_model.lower() == 'none':
force_model = lbm.forceModels.NoForce()
elif force_model.lower() == 'simple':
force_model = lbm.forceModels.SimpleConstant(force)
elif force_model.lower() == 'luo':
force_model = lbm.forceModels.LuoConstant(force)
elif force_model.lower() == 'guo':
force_model = lbm.forceModels.GuoConstant(force)
else:
raise ValueError("Unknown force model")
return lbm.makeLatticeModel(stencil, collision_model, force_model, compressible, order)
def create_force_driven_channel_2d(force, radius, length, **kwargs):
from waLBerla import lbm
kwargs['force'] = tuple([force, 0, 0])
domain_size = (length, 2 * radius, 1)
lattice_model = create_walberla_lattice_model(**kwargs)
blocks = createUniformBlockGrid(cells=domain_size, periodic=(1, 0, 1))
# Adding fields
lbm.addPdfFieldToStorage(blocks, "pdfs", lattice_model, velocityAdaptor="vel", densityAdaptor="rho",
initialDensity=1.0)
field.addFlagFieldToStorage(blocks, 'flags')
lbm.addBoundaryHandlingToStorage(blocks, 'boundary', 'pdfs', 'flags')
# Communication
communication = createUniformBufferedScheme(blocks, lattice_model.communicationStencilName)
communication.addDataToCommunicate(field.createPackInfo(blocks, 'pdfs'))
# Setting boundaries
for block in blocks:
b = block['boundary']
if block.atDomainMaxBorder[1]: # N
b.forceBoundary('NoSlip', makeSlice[:, -1, :, 'g'])
if block.atDomainMinBorder[1]: # S
b.forceBoundary('NoSlip', makeSlice[:, 0, :, 'g'])
b.fillWithDomain()
sweep = lbm.makeCellwiseSweep(blocks, "pdfs", flagFieldID='flags', flagList=['fluid']).streamCollide
def time_loop(time_steps):
for t in range(time_steps):
communication()
for B in blocks:
B['boundary']()
for B in blocks:
sweep(B)
full_pdf_field = field.toArray(field.gather(blocks, 'pdfs', makeSlice[:, :, :]), withGhostLayers=False)
density = field.toArray(field.gather(blocks, 'rho', makeSlice[:, :, :]), withGhostLayers=False)
velocity = field.toArray(field.gather(blocks, 'vel', makeSlice[:, :, :]), withGhostLayers=False)
full_pdf_field = full_pdf_field[:, :, 0, :]
density = density[:, :, 0, 0]
velocity = velocity[:, :, 0, :2]
return full_pdf_field, density, velocity
return time_loop
def create_lid_driven_cavity(domain_size, lid_velocity=0.005, **kwargs):
from waLBerla import lbm
d = len(domain_size)
if 'stencil' not in kwargs:
kwargs['stencil'] = 'D2Q9' if d == 2 else 'D3Q27'
if d == 2:
domain_size = (domain_size[0], domain_size[1], 1)
lattice_model = create_walberla_lattice_model(**kwargs)
blocks = createUniformBlockGrid(cells=domain_size, periodic=(1, 1, 1))
# Adding fields
lbm.addPdfFieldToStorage(blocks, "pdfs", lattice_model, velocityAdaptor="vel", densityAdaptor="rho",
initialDensity=1.0)
field.addFlagFieldToStorage(blocks, 'flags')
lbm.addBoundaryHandlingToStorage(blocks, 'boundary', 'pdfs', 'flags')
# Communication
communication = createUniformBufferedScheme(blocks, lattice_model.communicationStencilName)
communication.addDataToCommunicate(field.createPackInfo(blocks, 'pdfs'))
# Setting boundaries
for block in blocks:
b = block['boundary']
if block.atDomainMaxBorder[1]: # N
b.forceBoundary('UBB', makeSlice[:, -1, :, 'g'], {'x': lid_velocity})
if block.atDomainMinBorder[1]: # S
b.forceBoundary('NoSlip', makeSlice[:, 0, :, 'g'])
if block.atDomainMinBorder[0]: # W
b.forceBoundary('NoSlip', makeSlice[0, :, :, 'g'])
if block.atDomainMaxBorder[0]: # E
b.forceBoundary('NoSlip', makeSlice[-1, :, :, 'g'])
if block.atDomainMinBorder[2] and d == 3: # T
b.forceBoundary('NoSlip', makeSlice[:, :, 0, 'g'])
if block.atDomainMaxBorder[2] and d == 3: # B
b.forceBoundary('NoSlip', makeSlice[:, :, -1, 'g'])
b.fillWithDomain()
sweep = lbm.makeCellwiseSweep(blocks, "pdfs", flagFieldID='flags', flagList=['fluid']).streamCollide
def time_loop(time_steps):
for t in range(time_steps):
communication()
for B in blocks:
B['boundary']()
for B in blocks:
sweep(B)
full_pdf_field = field.toArray(field.gather(blocks, 'pdfs', makeSlice[:, :, :]), withGhostLayers=False)
density = field.toArray(field.gather(blocks, 'rho', makeSlice[:, :, :]), withGhostLayers=False)
velocity = field.toArray(field.gather(blocks, 'vel', makeSlice[:, :, :]), withGhostLayers=False)
if d == 2:
full_pdf_field = full_pdf_field[:, :, 0, :]
density = density[:, :, 0, 0]
velocity = velocity[:, :, 0, :2]
elif d == 3:
density = density[:, :, :, 0]
return full_pdf_field, density, velocity
return time_loop
[project]
name = "lbmpy"
description = "Code Generation for Lattice Boltzmann Methods"
dynamic = ["version"]
readme = "README.md"
authors = [
{ name = "Martin Bauer" },
{ name = "Markus Holzer" },
{ name = "Frederik Hennig" },
{ email = "cs10-codegen@fau.de" },
]
license = { file = "COPYING.txt" }
requires-python = ">=3.10"
dependencies = ["pystencils>=1.3", "sympy>=1.9,<=1.12.1", "numpy>=1.8.0", "appdirs", "joblib"]
classifiers = [
"Development Status :: 4 - Beta",
"Framework :: Jupyter",
"Topic :: Software Development :: Code Generators",
"Topic :: Scientific/Engineering :: Physics",
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
]
[project.urls]
"Bug Tracker" = "https://i10git.cs.fau.de/pycodegen/lbmpy/-/issues"
"Documentation" = "https://pycodegen.pages.i10git.cs.fau.de/lbmpy/"
"Source Code" = "https://i10git.cs.fau.de/pycodegen/lbmpy"
[project.optional-dependencies]
gpu = ['cupy']
alltrafos = ['islpy', 'py-cpuinfo']
bench_db = ['blitzdb', 'pymongo', 'pandas']
interactive = [
'matplotlib',
'ipy_table',
'imageio',
'jupyter',
'pyevtk',
'rich',
'graphviz',
'scipy',
'scikit-image'
]
use_cython = [
'Cython'
]
doc = [
'sphinx',
'sphinx_rtd_theme',
'nbsphinx',
'sphinxcontrib-bibtex',
'sphinx_autodoc_typehints',
'pandoc',
]
tests = [
'pytest',
'pytest-cov',
'pytest-html',
'ansi2html',
'pytest-xdist',
'flake8',
'nbformat',
'nbconvert',
'ipython',
'randomgen>=1.18',
]
[build-system]
requires = [
"setuptools>=69",
"versioneer[toml]>=0.29",
]
build-backend = "setuptools.build_meta"
[tool.setuptools.package-data]
[tool.setuptools.packages.find]
where = ["src"]
include = ["lbmpy", "lbmpy.*"]
namespaces = false
[tool.versioneer]
# See the docstring in versioneer.py for instructions. Note that you must
# re-run 'versioneer.py setup' after changing this section, and commit the
# resulting files.
VCS = "git"
style = "pep440"
versionfile_source = "src/lbmpy/_version.py"
versionfile_build = "lbmpy/_version.py"
tag_prefix = "release/"
parentdir_prefix = "lbmpy-"
[pytest]
testpaths = src tests doc/notebooks
pythonpath = src
python_files = test_*.py *_test.py scenario_*.py
norecursedirs = *.egg-info .git .cache .ipynb_checkpoints htmlcov
addopts = --doctest-modules --durations=20 --cov-config pytest.ini
markers =
longrun: tests only run at night since they have large execution time
notebook: mark for notebooks
# these warnings all come from third party libraries.
filterwarnings =
ignore:the imp module is deprecated in favour of importlib:DeprecationWarning
ignore:'contextfilter' is renamed to 'pass_context':DeprecationWarning
ignore:Persisting input arguments took:UserWarning
[run]
branch = True
source = lbmpy
lbmpy_tests
source = src/lbmpy
tests
omit = doc/*
lbmpy_tests/*
tests/*
setup.py
conftest.py
versioneer.py
src/lbmpy/_version.py
venv/
[report]
exclude_lines =
......@@ -19,10 +33,12 @@ exclude_lines =
pragma: no cover
def __repr__
def _repr_html_
# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError
NotImplementedError()
#raise ValueError
# Don't complain if non-runnable code isn't run:
......@@ -31,7 +47,7 @@ exclude_lines =
if __name__ == .__main__.:
skip_covered = True
fail_under = 89
fail_under = 87
[html]
directory = coverage_report
#!/usr/bin/env python3
from contextlib import redirect_stdout
import io
from tests.test_quicktests import (
test_poiseuille_channel_quicktest,
test_entropic_methods,
test_cumulant_ldc
)
quick_tests = [
test_poiseuille_channel_quicktest,
test_entropic_methods,
test_cumulant_ldc,
]
if __name__ == "__main__":
print("Running lbmpy quicktests")
for qt in quick_tests:
print(f" -> {qt.__name__}")
with redirect_stdout(io.StringIO()):
qt()
import os
import sys
import io
from setuptools import setup, find_packages
import distutils
from contextlib import redirect_stdout
from importlib import import_module
sys.path.insert(0, os.path.abspath('doc'))
from version_from_git import version_number_from_git
from setuptools import setup, __version__ as setuptools_version
if int(setuptools_version.split('.')[0]) < 61:
raise Exception(
"[ERROR] lbmpy requires at least setuptools version 61 to install.\n"
"If this error occurs during an installation via pip, it is likely that there is a conflict between "
"versions of setuptools installed by pip and the system package manager. "
"In this case, it is recommended to install lbmpy into a virtual environment instead."
)
quick_tests = [
'test_serial_scenarios.test_ldc_mrt',
'test_serial_scenarios.test_channel_srt',
]
import versioneer
class SimpleTestRunner(distutils.cmd.Command):
"""A custom command to run selected tests"""
def get_cmdclass():
return versioneer.get_cmdclass()
description = 'run some quick tests'
user_options = []
@staticmethod
def _run_tests_in_module(test):
"""Short test runner function - to work also if py.test is not installed."""
test = 'lbmpy_tests.' + test
mod, function_name = test.rsplit('.', 1)
if isinstance(mod, str):
mod = import_module(mod)
func = getattr(mod, function_name)
print(" -> %s in %s" % (function_name, mod.__name__))
with redirect_stdout(io.StringIO()):
func()
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
"""Run command."""
for test in quick_tests:
self._run_tests_in_module(test)
setup(name='lbmpy',
version=version_number_from_git(),
description='Code Generation for Lattice Boltzmann Methods',
author='Martin Bauer',
license='AGPLv3',
author_email='martin.bauer@fau.de',
url='https://i10git.cs.fau.de/pycodegen/lbmpy/',
packages=['lbmpy'] + ['lbmpy.' + s for s in find_packages('lbmpy')],
install_requires=['pystencils'],
classifiers=[
'Development Status :: 4 - Beta',
'Framework :: Jupyter',
'Topic :: Software Development :: Code Generators',
'Topic :: Scientific/Engineering :: Physics',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)',
],
python_requires=">=3.6",
extras_require={
'gpu': ['pycuda'],
'opencl': ['pyopencl'],
'alltrafos': ['islpy', 'py-cpuinfo'],
'interactive': ['scipy', 'scikit-image', 'cython', 'matplotlib',
'ipy_table', 'imageio', 'jupyter', 'pyevtk'],
'doc': ['sphinx', 'sphinx_rtd_theme', 'nbsphinx',
'sphinxcontrib-bibtex', 'sphinx_autodoc_typehints', 'pandoc'],
},
cmdclass={
'quicktest': SimpleTestRunner
}
)
setup(
version=versioneer.get_version(),
cmdclass=get_cmdclass(),
)
from .creationfunctions import (
create_lb_ast,
create_lb_collision_rule,
create_lb_function,
create_lb_method,
create_lb_update_rule,
LBMConfig,
LBMOptimisation,
)
from .enums import Stencil, Method, ForceModel, CollisionSpace, SubgridScaleModel
from .lbstep import LatticeBoltzmannStep
from .macroscopic_value_kernels import (
pdf_initialization_assignments,
macroscopic_values_getter,
strain_rate_tensor_getter,
compile_macroscopic_values_getter,
compile_macroscopic_values_setter,
create_advanced_velocity_setter_collision_rule,
)
from .maxwellian_equilibrium import get_weights
from .relaxationrates import (
relaxation_rate_from_lattice_viscosity,
lattice_viscosity_from_relaxation_rate,
relaxation_rate_from_magic_number,
)
from .scenarios import create_lid_driven_cavity, create_fully_periodic_flow
from .stencils import LBStencil
__all__ = [
"create_lb_ast",
"create_lb_collision_rule",
"create_lb_function",
"create_lb_method",
"create_lb_update_rule",
"LBMConfig",
"LBMOptimisation",
"Stencil",
"Method",
"ForceModel",
"CollisionSpace",
"SubgridScaleModel",
"LatticeBoltzmannStep",
"pdf_initialization_assignments",
"macroscopic_values_getter",
"strain_rate_tensor_getter",
"compile_macroscopic_values_getter",
"compile_macroscopic_values_setter",
"create_advanced_velocity_setter_collision_rule",
"get_weights",
"relaxation_rate_from_lattice_viscosity",
"lattice_viscosity_from_relaxation_rate",
"relaxation_rate_from_magic_number",
"create_lid_driven_cavity",
"create_fully_periodic_flow",
"LBStencil",
]
from . import _version
__version__ = _version.get_versions()['version']