Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • Sparse
  • WallLaw
  • fhennig/pystencils2.0-compat
  • improved_comm
  • master
  • suffa/psm_optimization
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
  • release/0.4.1
  • release/0.4.2
  • release/0.4.3
  • release/0.4.4
  • release/1.0
  • release/1.0.1
  • release/1.1
  • release/1.1.1
  • release/1.2
  • release/1.3
  • release/1.3.1
  • release/1.3.2
  • release/1.3.3
  • release/1.3.4
  • release/1.3.5
  • release/1.3.6
  • release/1.3.7
44 results

Target

Select target project
  • ravi.k.ayyala/lbmpy
  • brendan-waters/lbmpy
  • anirudh.jonnalagadda/lbmpy
  • jbadwaik/lbmpy
  • alexander.reinauer/lbmpy
  • itischler/lbmpy
  • he66coqe/lbmpy
  • ev81oxyl/lbmpy
  • Bindgen/lbmpy
  • da15siwa/lbmpy
  • holzer/lbmpy
  • RudolfWeeber/lbmpy
  • pycodegen/lbmpy
13 results
Select Git revision
  • GetterSetterAPI
  • HRR
  • HydroPressure
  • InplaceConfig
  • Outflow
  • PhaseField
  • Sparse
  • UBBVelocity
  • UpdateAPISparse
  • WallLaw
  • WetNodeBoundaries
  • csebug
  • feature/sparse
  • feature/try
  • improved_comm
  • install_requires
  • master
  • phaseField
  • relaxationrates
  • test_martin
  • release/0.2.1
  • release/0.2.10
  • release/0.2.11
  • release/0.2.12
  • release/0.2.13
  • release/0.2.14
  • release/0.2.15
  • release/0.2.2
  • release/0.2.3
  • release/0.2.4
  • release/0.2.5
  • release/0.2.6
  • release/0.2.7
  • release/0.2.8
  • release/0.2.9
  • release/0.3.0
  • release/0.3.1
  • release/0.3.2
  • release/0.3.3
  • release/0.3.4
  • release/0.4.0
  • release/0.4.1
  • release/0.4.2
  • release/0.4.3
  • release/0.4.4
  • release/1.0
  • release/1.0.1
  • release/1.1
  • release/1.1.1
  • release/1.2
  • release/1.3
  • release/1.3.1
  • release/1.3.2
  • release/1.3.3
  • release/1.3.4
  • release/1.3.5
  • release/1.3.6
57 results
Show changes
Showing
with 0 additions and 3284 deletions
%% Cell type:code id: tags:
``` python
import pytest
pytest.importorskip('skimage')
```
%% Cell type:code id: tags:
``` python
from lbmpy.session import *
from lbmpy.phasefield.phasefieldstep_direct import PhaseFieldStepDirect
from lbmpy.phasefield.contact_angle_circle_fitting import liquid_lens_neumann_angles
from pystencils.fd import Diff
```
%% Cell type:markdown id: tags:
# Test of phase field with 4th order FD
%% Cell type:markdown id: tags:
### Free energy definition:
%% Cell type:code id: tags:
``` python
num_phases = 3
kappa = [0.01, 0.01, 0.01]
penalty_factor = 0.01
domain_size = (40, 40)
```
%% Cell type:code id: tags:
``` python
c = sp.symbols("c_:{}".format(num_phases))
def f(c):
return c**2 * (1 - c)**2
free_energy = sum((kappa[i] / 2) * f( c[i] ) + kappa[i]/2 * (Diff(c[i]))**2
for i in range(num_phases))
free_energy += penalty_factor*(1-sum(c[i] for i in range(num_phases)))**2
free_energy
```
%% Output
$\displaystyle 0.005 c_{0}^{2} \left(1 - c_{0}\right)^{2} + 0.005 c_{1}^{2} \left(1 - c_{1}\right)^{2} + 0.005 c_{2}^{2} \left(1 - c_{2}\right)^{2} + 0.01 \left(- c_{0} - c_{1} - c_{2} + 1\right)^{2} + 0.005 {\partial c_{0}}^{2} + 0.005 {\partial c_{1}}^{2} + 0.005 {\partial c_{2}}^{2}$
2 2 2 2 2 2
0.005⋅c₀ ⋅(1 - c₀) + 0.005⋅c₁ ⋅(1 - c₁) + 0.005⋅c₂ ⋅(1 - c₂) + 0.01⋅(-c₀ -
2 2 2 2
c₁ - c₂ + 1) + 0.005⋅D(c_0) + 0.005⋅D(c_1) + 0.005⋅D(c_2)
%% Cell type:markdown id: tags:
### Simulation:
%% Cell type:code id: tags:
``` python
step = PhaseFieldStepDirect(free_energy, c, domain_size)
# geometric setup
step.set_concentration(make_slice[:, :], [0, 0, 0])
step.set_single_concentration(make_slice[:, 0.5:], phase_idx=0)
step.set_single_concentration(make_slice[:, :0.5], phase_idx=1)
step.set_single_concentration(make_slice[0.25:0.75, 0.25:0.75], phase_idx=2)
step.set_pdf_fields_from_macroscopic_values()
```
%% Cell type:code id: tags:
``` python
for i in range(1500):
step.time_step()
```
%% Cell type:code id: tags:
``` python
plt.phase_plot(step.phi[:, :])
```
%% Output
%% Cell type:code id: tags:
``` python
angles = liquid_lens_neumann_angles(step.phi[:, :])
assert angles[0] > 107
angles
```
%% Output
$\displaystyle \left[ 110.18261758183783, \ 104.07972456756643, \ 145.73765785059575\right]$
[110.18261758183783, 104.07972456756643, 145.73765785059575]
%% Cell type:code id: tags:
``` python
from lbmpy.session import *
from lbmpy.phasefield.phasefieldstep_direct import PhaseFieldStepDirect
from lbmpy.phasefield.n_phase_boyer import *
from lbmpy.phasefield.contact_angle_circle_fitting import liquid_lens_neumann_angles
from pystencils.fd import Diff
one = sp.sympify(1)
import pyximport
pyximport.install(language_level=3)
from lbmpy.phasefield.simplex_projection import simplex_projection_2d # NOQA
```
%% Cell type:markdown id: tags:
# Test of phase field with 4th order FD
%% Cell type:markdown id: tags:
### Free energy definition:
%% Cell type:code id: tags:
``` python
n = 3
domain_size = (150, 150)
ε = one * 4
penalty_factor = 0.01
stabilization_factor = 10
#κ = (one, one/2, one/3, one/4)
κ = (one, one, one, one)
sigma_factor = one / 4 * 67 * 100
σ = sp.ImmutableDenseMatrix(n, n, lambda i,j: sigma_factor* (κ[i] + κ[j]) if i != j else 0 )
c = sp.symbols("c_:{}".format(n))
```
%% Cell type:code id: tags:
``` python
α, _ = diffusion_coefficients(σ)
f = lambda c: c**2 * ( 1 - c ) **2
a, b = compute_ab(f)
capital_f = capital_f0(c, σ) + correction_g(c, σ) + stabilization_factor * stabilization_term(c, α)
f_bulk = free_energy_bulk(capital_f, b, ε)
f_if = free_energy_interfacial(c, σ, a, ε)
free_energy = (f_bulk + f_if) / 20000 / 100 + penalty_factor * (one - sum(c))**2
```
%% Cell type:code id: tags:
``` python
free_energy.evalf()
```
%% Output
$$1.5 \cdot 10^{-5} c_{0}^{2} c_{1}^{2} c_{2}^{2} + 0.005025 c_{0}^{2} \left(- c_{0} + 1.0\right)^{2} + 0.005025 c_{1}^{2} \left(- c_{1} + 1.0\right)^{2} + 0.005025 c_{2}^{2} \left(- c_{2} + 1.0\right)^{2} - 0.0025125 \left(c_{0} + c_{1}\right)^{2} \left(- c_{0} - c_{1} + 1.0\right)^{2} - 0.0025125 \left(c_{0} + c_{2}\right)^{2} \left(- c_{0} - c_{2} + 1.0\right)^{2} - 0.0025125 \left(c_{1} + c_{2}\right)^{2} \left(- c_{1} - c_{2} + 1.0\right)^{2} + 0.01 \left(- c_{0} - c_{1} - c_{2} + 1.0\right)^{2} - 0.005025 {\partial c_{0}} {\partial c_{1}} - 0.005025 {\partial c_{0}} {\partial c_{2}} - 0.005025 {\partial c_{1}} {\partial c_{2}}$$
2 2 2 2 2 2 2
1.5e-5⋅c₀ ⋅c₁ ⋅c₂ + 0.005025⋅c₀ ⋅(-c₀ + 1.0) + 0.005025⋅c₁ ⋅(-c₁ + 1.0) + 0
2 2 2 2
.005025⋅c₂ ⋅(-c₂ + 1.0) - 0.0025125⋅(c₀ + c₁) ⋅(-c₀ - c₁ + 1.0) - 0.0025125⋅
2 2 2 2
(c₀ + c₂) ⋅(-c₀ - c₂ + 1.0) - 0.0025125⋅(c₁ + c₂) ⋅(-c₁ - c₂ + 1.0) + 0.01⋅(
2
-c₀ - c₁ - c₂ + 1.0) - 0.005025⋅D(c_0)⋅D(c_1) - 0.005025⋅D(c_0)⋅D(c_2) - 0.00
5025⋅D(c_1)⋅D(c_2)
%% Cell type:markdown id: tags:
### Simulation:
%% Cell type:code id: tags:
``` python
step = PhaseFieldStepDirect(free_energy, c, domain_size)
```
%% Cell type:code id: tags:
``` python
# geometric setup
step.set_concentration(make_slice[:, :], [0, 0, 0])
step.set_single_concentration(make_slice[:, :], phase_idx=0)
step.set_single_concentration(make_slice[:, :0.5], phase_idx=1)
step.set_single_concentration(make_slice[0.25:0.75, 0.25:0.75], phase_idx=2)
#step.smooth(4)
step.set_pdf_fields_from_macroscopic_values()
plt.phase_plot(step.phi[:, :])
```
%% Output
%% Cell type:code id: tags:
``` python
for i in range(10):
step.time_step()
#simplex_projection_2d(step.data_handling.cpu_arrays[step.phi_field.name])
plt.phase_plot(step.phi[:, :])
```
%% Output
%% Cell type:code id: tags:
``` python
plt.plot(step.phi[25, :])
```
%% Output
[<matplotlib.lines.Line2D at 0x7f1106c0eda0>,
<matplotlib.lines.Line2D at 0x7f1106c0eef0>,
<matplotlib.lines.Line2D at 0x7f1106b98080>]
%% Cell type:code id: tags:
``` python
plt.plot(step.phi[15, :])
```
%% Output
[<matplotlib.lines.Line2D at 0x7f1106c3bef0>,
<matplotlib.lines.Line2D at 0x7f11077832e8>,
<matplotlib.lines.Line2D at 0x7f1106c27048>]
import numpy as np
from lbmpy.creationfunctions import create_lb_method, create_lb_update_rule
from lbmpy.phasefield_allen_cahn.analytical import analytic_rising_speed
from lbmpy.phasefield_allen_cahn.force_model import MultiphaseForceModel
from lbmpy.phasefield_allen_cahn.kernel_equations import (
get_collision_assignments_hydro, hydrodynamic_force, initializer_kernel_hydro_lb, initializer_kernel_phase_field_lb,
interface_tracking_force)
from lbmpy.phasefield_allen_cahn.parameter_calculation import (
calculate_dimensionless_rising_bubble, calculate_parameters_rti)
from lbmpy.stencils import get_stencil
from pystencils import AssignmentCollection, fields
def test_codegen_3d():
stencil_phase = get_stencil("D3Q15")
stencil_hydro = get_stencil("D3Q27")
assert (len(stencil_phase[0]) == len(stencil_hydro[0]))
dimensions = len(stencil_hydro[0])
parameters = calculate_dimensionless_rising_bubble(reference_time=18000,
density_heavy=1.0,
bubble_radius=16,
bond_number=30,
reynolds_number=420,
density_ratio=1000,
viscosity_ratio=100)
np.isclose(parameters["density_light"], 0.001, rtol=1e-05, atol=1e-08, equal_nan=False)
np.isclose(parameters["gravitational_acceleration"], -9.876543209876543e-08, rtol=1e-05, atol=1e-08, equal_nan=False)
parameters = calculate_parameters_rti(reference_length=128,
reference_time=18000,
density_heavy=1.0,
capillary_number=9.1,
reynolds_number=128,
atwood_number=1.0,
peclet_number=744,
density_ratio=3,
viscosity_ratio=3)
np.isclose(parameters["density_light"], 1/3, rtol=1e-05, atol=1e-08, equal_nan=False)
np.isclose(parameters["gravitational_acceleration"], -3.9506172839506174e-07, rtol=1e-05, atol=1e-08, equal_nan=False)
np.isclose(parameters["mobility"], 0.0012234169653524492, rtol=1e-05, atol=1e-08, equal_nan=False)
rs = analytic_rising_speed(1-6, 20, 0.01)
np.isclose(rs, 16666.666666666668, rtol=1e-05, atol=1e-08, equal_nan=False)
density_liquid = 1.0
density_gas = 0.001
surface_tension = 0.0001
M = 0.02
# phase-field parameter
drho3 = (density_liquid - density_gas) / 3
# interface thickness
W = 5
# coefficient related to surface tension
beta = 12.0 * (surface_tension / W)
# coefficient related to surface tension
kappa = 1.5 * surface_tension * W
# relaxation rate allen cahn (h)
w_c = 1.0 / (0.5 + (3.0 * M))
# fields
u = fields("vel_field(" + str(dimensions) + "): [" + str(dimensions) + "D]", layout='fzyx')
C = fields("phase_field: [" + str(dimensions) + "D]", layout='fzyx')
h = fields("lb_phase_field(" + str(len(stencil_phase)) + "): [" + str(dimensions) + "D]", layout='fzyx')
h_tmp = fields("lb_phase_field_tmp(" + str(len(stencil_phase)) + "): [" + str(dimensions) + "D]", layout='fzyx')
g = fields("lb_velocity_field(" + str(len(stencil_hydro)) + "): [" + str(dimensions) + "D]", layout='fzyx')
g_tmp = fields("lb_velocity_field_tmp(" + str(len(stencil_hydro)) + "): [" + str(dimensions) + "D]", layout='fzyx')
# calculate the relaxation rate for the hydro lb as well as the body force
density = density_gas + C.center * (density_liquid - density_gas)
# force acting on all phases of the model
body_force = np.array([0, 1e-6, 0])
relaxation_time = 0.03 + 0.5
relaxation_rate = 1.0 / relaxation_time
method_phase = create_lb_method(stencil=stencil_phase, method='srt', relaxation_rate=w_c, compressible=True)
method_hydro = create_lb_method(stencil=stencil_hydro, method="mrt", weighted=True,
relaxation_rates=[relaxation_rate, 1, 1, 1, 1, 1],
maxwellian_moments=True, entropic=False)
# create the kernels for the initialization of the g and h field
h_updates = initializer_kernel_phase_field_lb(h, C, u, method_phase, W)
g_updates = initializer_kernel_hydro_lb(g, u, method_hydro)
force_h = [f / 3 for f in interface_tracking_force(C, stencil_phase, W)]
force_model_h = MultiphaseForceModel(force=force_h)
force_g = hydrodynamic_force(g, C, method_hydro,
relaxation_time, density_liquid, density_gas, kappa, beta, body_force)
force_model_g = MultiphaseForceModel(force=force_g, rho=density)
h_tmp_symbol_list = [h_tmp.center(i) for i, _ in enumerate(stencil_phase)]
sum_h = np.sum(h_tmp_symbol_list[:])
method_phase.set_force_model(force_model_h)
allen_cahn_lb = create_lb_update_rule(lb_method=method_phase,
velocity_input=u,
compressible=True,
optimization={"symbolic_field": h,
"symbolic_temporary_field": h_tmp},
kernel_type='stream_pull_collide')
allen_cahn_lb.set_main_assignments_from_dict({**allen_cahn_lb.main_assignments_dict, **{C.center: sum_h}})
allen_cahn_update_rule = AssignmentCollection(main_assignments=allen_cahn_lb.main_assignments,
subexpressions=allen_cahn_lb.subexpressions)
# ---------------------------------------------------------------------------------------------------------
hydro_lb_update_rule_normal = get_collision_assignments_hydro(lb_method=method_hydro,
density=density,
velocity_input=u,
force=force_g,
sub_iterations=2,
symbolic_fields={"symbolic_field": g,
"symbolic_temporary_field": g_tmp},
kernel_type='collide_only')
hydro_lb_update_rule_push = get_collision_assignments_hydro(lb_method=method_hydro,
density=density,
velocity_input=u,
force=force_g,
sub_iterations=2,
symbolic_fields={"symbolic_field": g,
"symbolic_temporary_field": g_tmp},
kernel_type='collide_stream_push')
%% Cell type:code id: tags:
``` python
import math
from lbmpy.session import *
from pystencils.session import *
from lbmpy.moments import MOMENT_SYMBOLS
from collections import OrderedDict
from lbmpy.methods.creationfunctions import create_with_discrete_maxwellian_eq_moments
from lbmpy.phasefield_allen_cahn.parameter_calculation import calculate_parameters_rti
from lbmpy.phasefield_allen_cahn.kernel_equations import *
from lbmpy.phasefield_allen_cahn.force_model import MultiphaseForceModel
```
%% Cell type:code id: tags:
``` python
try:
import pycuda
except ImportError:
pycuda = None
gpu = False
target = 'cpu'
print('No pycuda installed')
if pycuda:
gpu = True
target = 'gpu'
```
%% Cell type:code id: tags:
``` python
stencil_phase = get_stencil("D2Q9")
stencil_hydro = get_stencil("D2Q9")
assert(len(stencil_phase[0]) == len(stencil_hydro[0]))
dimensions = len(stencil_phase[0])
```
%% Cell type:code id: tags:
``` python
# domain
L0 = 256
domain_size = (L0, 4 * L0)
# time step
timesteps = 1000
# reference time
reference_time = 8000
# density of the heavier fluid
rho_H = 1.0
# calculate the parameters for the RTI
parameters = calculate_parameters_rti(reference_length=L0,
reference_time=reference_time,
density_heavy=rho_H,
capillary_number=0.44,
reynolds_number=3000,
atwood_number=0.998,
peclet_number=1000,
density_ratio=1000,
viscosity_ratio=100)
# get the parameters
rho_L = parameters.get("density_light")
mu_H = parameters.get("dynamic_viscosity_heavy")
mu_L = parameters.get("dynamic_viscosity_light")
tau_H = parameters.get("relaxation_time_heavy")
tau_L = parameters.get("relaxation_time_light")
sigma = parameters.get("surface_tension")
M = parameters.get("mobility")
gravitational_acceleration = parameters.get("gravitational_acceleration")
drho3 = (rho_H - rho_L)/3
# interface thickness
W = 5
# coeffcient related to surface tension
beta = 12.0 * (sigma/W)
# coeffcient related to surface tension
kappa = 1.5 * sigma*W
# relaxation rate allen cahn (h)
w_c = 1.0/(0.5 + (3.0 * M))
```
%% Cell type:code id: tags:
``` python
dh = ps.create_data_handling((domain_size), periodicity = (True, False), parallel=False, default_target=target)
g = dh.add_array("g", values_per_cell=len(stencil_hydro))
dh.fill("g", 0.0, ghost_layers=True)
h = dh.add_array("h",values_per_cell=len(stencil_phase))
dh.fill("h", 0.0, ghost_layers=True)
g_tmp = dh.add_array("g_tmp", values_per_cell=len(stencil_hydro))
dh.fill("g_tmp", 0.0, ghost_layers=True)
h_tmp = dh.add_array("h_tmp",values_per_cell=len(stencil_phase))
dh.fill("h_tmp", 0.0, ghost_layers=True)
u = dh.add_array("u", values_per_cell=dh.dim)
dh.fill("u", 0.0, ghost_layers=True)
C = dh.add_array("C")
dh.fill("C", 0.0, ghost_layers=True)
```
%% Cell type:code id: tags:
``` python
tau = 0.5 + tau_L + (C.center) * (tau_H - tau_L)
s8 = 1/(tau)
rho = rho_L + (C.center) * (rho_H - rho_L)
body_force = [0, 0, 0]
body_force[1] = gravitational_acceleration * rho
```
%% Cell type:code id: tags:
``` python
method_phase = create_lb_method(stencil=stencil_phase, method='srt', relaxation_rate=w_c, compressible = True)
method_hydro = create_lb_method(stencil=stencil_hydro, method="mrt", weighted=True,
relaxation_rates=[s8, 1, 1, 1, 1, 1], maxwellian_moments=True, entropic=False)
```
%% Cell type:code id: tags:
``` python
# initialize the domain
def Initialize_distributions():
Nx = domain_size[0]
Ny = domain_size[1]
for block in dh.iterate(ghost_layers=True, inner_ghost_layers=False):
x = np.zeros_like(block.midpoint_arrays[0])
x[:, :] = block.midpoint_arrays[0]
y = np.zeros_like(block.midpoint_arrays[1])
y[:, :] = block.midpoint_arrays[1]
y -= 2 * L0
tmp = 0.1 * Nx * np.cos((2 * math.pi * x) / Nx)
init_values = 0.5 + 0.5 * np.tanh((y - tmp) / (W / 2))
block["C"][:, :] = init_values
if gpu:
dh.all_to_gpu()
dh.run_kernel(h_init)
dh.run_kernel(g_init)
```
%% Cell type:code id: tags:
``` python
h_updates = initializer_kernel_phase_field_lb(h, C, u, method_phase, W)
g_updates = initializer_kernel_hydro_lb(g, u, method_hydro)
h_init = ps.create_kernel(h_updates, target=dh.default_target, cpu_openmp=True).compile()
g_init = ps.create_kernel(g_updates, target=dh.default_target, cpu_openmp=True).compile()
```
%% Cell type:code id: tags:
``` python
force_h = [f / 3 for f in interface_tracking_force(C, stencil_phase, W)]
force_model_h = MultiphaseForceModel(force=force_h)
force_g = hydrodynamic_force(g, C, method_hydro, tau, rho_H, rho_L, kappa, beta, body_force)
```
%% Cell type:code id: tags:
``` python
h_tmp_symbol_list = [h_tmp.center(i) for i, _ in enumerate(stencil_phase)]
sum_h = np.sum(h_tmp_symbol_list[:])
method_phase.set_force_model(force_model_h)
allen_cahn_lb = create_lb_update_rule(lb_method=method_phase,
velocity_input = u,
compressible = True,
optimization = {"symbolic_field": h,
"symbolic_temporary_field": h_tmp},
kernel_type = 'stream_pull_collide')
allen_cahn_lb.set_main_assignments_from_dict({**allen_cahn_lb.main_assignments_dict, **{C.center: sum_h}})
allen_cahn_lb = sympy_cse(allen_cahn_lb)
ast_allen_cahn_lb = ps.create_kernel(allen_cahn_lb, target=dh.default_target, cpu_openmp=True)
kernel_allen_cahn_lb = ast_allen_cahn_lb.compile()
```
%% Cell type:code id: tags:
``` python
hydro_lb_update_rule = get_collision_assignments_hydro(lb_method=method_hydro,
density=rho,
velocity_input=u,
force = force_g,
sub_iterations=2,
symbolic_fields={"symbolic_field": g,
"symbolic_temporary_field": g_tmp},
kernel_type='collide_only')
hydro_lb_update_rule = sympy_cse(hydro_lb_update_rule)
ast_hydro_lb = ps.create_kernel(hydro_lb_update_rule, target=dh.default_target, cpu_openmp=True)
kernel_hydro_lb = ast_hydro_lb.compile()
```
%% Cell type:code id: tags:
``` python
# periodic Boundarys for g, h and C
periodic_BC_g = dh.synchronization_function(g.name, target=dh.default_target, optimization = {"openmp": True})
periodic_BC_h = dh.synchronization_function(h.name, target=dh.default_target, optimization = {"openmp": True})
periodic_BC_C = dh.synchronization_function(C.name, target=dh.default_target, optimization = {"openmp": True})
# No slip boundary for the phasefield lbm
bh_allen_cahn = LatticeBoltzmannBoundaryHandling(method_phase, dh, 'h',
target=dh.default_target, name='boundary_handling_h')
# No slip boundary for the velocityfield lbm
bh_hydro = LatticeBoltzmannBoundaryHandling(method_hydro, dh, 'g' ,
target=dh.default_target, name='boundary_handling_g')
wall = NoSlip()
if dimensions == 2:
bh_allen_cahn.set_boundary(wall, make_slice[:, 0])
bh_allen_cahn.set_boundary(wall, make_slice[:, -1])
bh_hydro.set_boundary(wall, make_slice[:, 0])
bh_hydro.set_boundary(wall, make_slice[:, -1])
else:
bh_allen_cahn.set_boundary(wall, make_slice[:, 0, :])
bh_allen_cahn.set_boundary(wall, make_slice[:, -1, :])
bh_hydro.set_boundary(wall, make_slice[:, 0, :])
bh_hydro.set_boundary(wall, make_slice[:, -1, :])
bh_allen_cahn.prepare()
bh_hydro.prepare()
```
%% Cell type:code id: tags:
``` python
ac_g = create_lb_update_rule(stencil = stencil_hydro,
optimization={"symbolic_field": g,
"symbolic_temporary_field": g_tmp},
kernel_type='stream_pull_only')
ast_g = ps.create_kernel(ac_g, target=dh.default_target, cpu_openmp=True)
stream_g = ast_g.compile()
```
%% Cell type:code id: tags:
``` python
# definition of the timestep for the immiscible fluids model
def Improved_PhaseField_h_g():
bh_allen_cahn()
periodic_BC_h()
dh.run_kernel(kernel_allen_cahn_lb)
periodic_BC_C()
dh.run_kernel(kernel_hydro_lb)
bh_hydro()
periodic_BC_g()
dh.run_kernel(stream_g)
dh.swap("h", "h_tmp")
dh.swap("g", "g_tmp")
```
%% Cell type:code id: tags:
``` python
Initialize_distributions()
for i in range(0, timesteps):
Improved_PhaseField_h_g()
```
%% Cell type:code id: tags:
``` python
if gpu:
dh.to_cpu("C")
Ny = domain_size[1]
pos_liquid_front = (np.argmax(dh.cpu_arrays["C"][L0//2, :] > 0.5) - Ny//2)/L0
pos_bubble_front = (np.argmax(dh.cpu_arrays["C"][0, :] > 0.5) - Ny//2)/L0
```
%% Cell type:code id: tags:
``` python
assert(np.isclose(pos_liquid_front, -1e-1, atol=0.01))
assert(np.isclose(pos_bubble_front, 9e-2, atol=0.01))
```
import numpy as np
import pytest
from lbmpy.boundaries import NoSlip, UBB, SimpleExtrapolationOutflow, ExtrapolationOutflow,\
FixedDensity, DiffusionDirichlet, NeumannByCopy, StreamInConstant
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.creationfunctions import create_lb_function, create_lb_method
from lbmpy.geometry import add_box_boundary
from lbmpy.lbstep import LatticeBoltzmannStep
from lbmpy.stencils import get_stencil
from pystencils import create_data_handling, make_slice
@pytest.mark.parametrize("target", ['cpu', 'gpu', 'opencl'])
def test_simple(target):
if target == 'gpu':
import pytest
pytest.importorskip('pycuda')
elif target == 'opencl':
import pytest
pytest.importorskip('pyopencl')
import pystencils.opencl.autoinit
dh = create_data_handling((4, 4), parallel=False, default_target=target)
dh.add_array('pdfs', values_per_cell=9, cpu=True, gpu=target != 'cpu')
for i in range(9):
dh.fill("pdfs", i, value_idx=i, ghost_layers=True)
if target == 'gpu' or target == 'opencl':
dh.all_to_gpu()
lb_func = create_lb_function(stencil='D2Q9',
compressible=False,
relaxation_rate=1.8,
optimization={'target': target})
bh = LatticeBoltzmannBoundaryHandling(lb_func.method, dh, 'pdfs', target=target)
wall = NoSlip()
moving_wall = UBB((1, 0))
bh.set_boundary(wall, make_slice[0, :])
bh.set_boundary(wall, make_slice[-1, :])
bh.set_boundary(wall, make_slice[:, 0])
bh.set_boundary(moving_wall, make_slice[:, -1])
bh.prepare()
bh()
if target == 'gpu' or target == 'opencl':
dh.all_to_cpu()
# left lower corner
assert (dh.cpu_arrays['pdfs'][0, 0, 6] == 7)
assert (dh.cpu_arrays['pdfs'][0, 1, 4] == 3)
assert (dh.cpu_arrays['pdfs'][0, 1, 6] == 7)
assert (dh.cpu_arrays['pdfs'][1, 0, 1] == 2)
assert (dh.cpu_arrays['pdfs'][1, 0, 6] == 7)
# left side
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 4] == 3))
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 6] == 7))
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 5] == 5))
# left upper corner
assert (dh.cpu_arrays['pdfs'][0, 4, 4] == 3)
assert (dh.cpu_arrays['pdfs'][0, 4, 8] == 5)
assert (dh.cpu_arrays['pdfs'][0, 5, 8] == 5 + 6 / 36)
assert (dh.cpu_arrays['pdfs'][1, 5, 8] == 5 + 6 / 36)
assert (dh.cpu_arrays['pdfs'][1, 5, 2] == 1)
# top side
assert (all(dh.cpu_arrays['pdfs'][2:4, 5, 2] == 1))
assert (all(dh.cpu_arrays['pdfs'][2:4, 5, 7] == 6 - 6 / 36))
assert (all(dh.cpu_arrays['pdfs'][2:4, 5, 8] == 5 + 6 / 36))
# right upper corner
assert (dh.cpu_arrays['pdfs'][4, 5, 2] == 1)
assert (dh.cpu_arrays['pdfs'][4, 5, 7] == 6 - 6 / 36)
assert (dh.cpu_arrays['pdfs'][5, 5, 7] == 6 - 6 / 36)
assert (dh.cpu_arrays['pdfs'][5, 4, 3] == 4)
assert (dh.cpu_arrays['pdfs'][5, 4, 7] == 6)
# right side
assert (all(dh.cpu_arrays['pdfs'][5, 2:4, 3] == 4))
assert (all(dh.cpu_arrays['pdfs'][5, 2:4, 5] == 8))
assert (all(dh.cpu_arrays['pdfs'][5, 2:4, 7] == 6))
# right lower corner
assert (dh.cpu_arrays['pdfs'][5, 1, 3] == 4)
assert (dh.cpu_arrays['pdfs'][5, 1, 5] == 8)
assert (dh.cpu_arrays['pdfs'][5, 0, 5] == 8)
assert (dh.cpu_arrays['pdfs'][4, 0, 1] == 2)
assert (dh.cpu_arrays['pdfs'][4, 0, 5] == 8)
# lower side
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 4] == 3))
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 6] == 7))
assert (all(dh.cpu_arrays['pdfs'][0, 2:4, 8] == 5))
def test_exotic_boundaries():
step = LatticeBoltzmannStep((50, 50), relaxation_rate=1.8, compressible=False, periodicity=False)
add_box_boundary(step.boundary_handling, NeumannByCopy())
step.boundary_handling.set_boundary(StreamInConstant(0), make_slice[0, :])
step.run(100)
assert np.max(step.velocity[:, :, :]) < 1e-13
def test_boundary_utility_functions():
stencil = get_stencil("D2Q9")
method = create_lb_method(stencil=stencil)
noslip = NoSlip("noslip")
assert noslip == NoSlip("noslip")
assert not noslip == NoSlip("test")
assert not noslip == UBB((0, 0), name="ubb")
assert noslip.name == "noslip"
noslip.name = "test name setter"
assert noslip.name == "test name setter"
ubb = UBB((0, 0), name="ubb")
assert ubb == UBB((0, 0), name="ubb")
assert not noslip == UBB((0, 0), name="test")
assert not ubb == NoSlip("noslip")
simple_extrapolation = SimpleExtrapolationOutflow(normal_direction=stencil[4], stencil=stencil, name="simple")
assert simple_extrapolation == SimpleExtrapolationOutflow(normal_direction=stencil[4],
stencil=stencil, name="simple")
assert not simple_extrapolation == SimpleExtrapolationOutflow(normal_direction=stencil[4],
stencil=stencil, name="test")
assert not simple_extrapolation == NoSlip("noslip")
outflow = ExtrapolationOutflow(normal_direction=stencil[4], lb_method=method, name="outflow")
assert outflow == ExtrapolationOutflow(normal_direction=stencil[4], lb_method=method, name="outflow")
assert not outflow == ExtrapolationOutflow(normal_direction=stencil[4], lb_method=method, name="test")
assert not outflow == simple_extrapolation
density = FixedDensity(density=1.0, name="fixedDensity")
assert density == FixedDensity(density=1.0, name="fixedDensity")
assert not density == FixedDensity(density=1.0, name="test")
assert not density == UBB((0, 0), name="ubb")
diffusion = DiffusionDirichlet(concentration=1.0, name="diffusion")
assert diffusion == DiffusionDirichlet(concentration=1.0, name="diffusion")
assert not diffusion == DiffusionDirichlet(concentration=1.0, name="test")
assert not diffusion == density
neumann = NeumannByCopy(name="Neumann")
assert neumann == NeumannByCopy(name="Neumann")
assert not neumann == NeumannByCopy(name="test")
assert not neumann == diffusion
stream = StreamInConstant(constant=1.0, name="stream")
assert stream == StreamInConstant(constant=1.0, name="stream")
assert not stream == StreamInConstant(constant=1.0, name="test")
assert not stream == noslip
from hashlib import sha256
from lbmpy.creationfunctions import create_lb_ast
def test_hash_equivalence_llvm():
import pytest
pytest.importorskip("llvmlite")
from pystencils.llvm.llvmjit import generate_llvm
ref_value = "6db6ed9e2cbd05edae8fcaeb8168e3178dd578c2681133f3ae9228b23d2be432"
ast = create_lb_ast(stencil='D3Q19', method='srt', optimization={'target': 'llvm'})
code = generate_llvm(ast)
hash_value = sha256(str(code).encode()).hexdigest()
assert hash_value == ref_value
%% Cell type:code id: tags:
``` python
from lbmpy.session import *
from lbmpy.boundaries.boundaries_in_kernel import update_rule_with_push_boundaries
from lbmpy.macroscopic_value_kernels import macroscopic_values_getter, macroscopic_values_setter
from collections import OrderedDict
from time import perf_counter
```
%% Cell type:markdown id: tags:
# Version 1: compile-in boundaries
%% Cell type:code id: tags:
``` python
domain_size = (32, 32, 32)
relaxation_rate = 1.8
time_steps = 100
lid_velocity = 0.05
```
%% Cell type:code id: tags:
``` python
dh = create_data_handling(domain_size, default_target='cpu')
pdfs = dh.add_array('pdfs', values_per_cell=19)
u = dh.add_array('u', values_per_cell=len(domain_size))
streaming_pattern = 'aa'
```
%% Cell type:code id: tags:
``` python
boundaries = OrderedDict((
((0, 1, 0), UBB([lid_velocity, 0, 0])),
((1, 0, 0), NoSlip()),
((-1, 0, 0), NoSlip()),
((0, -1, 0), NoSlip()),
((0, 0, 1), NoSlip()),
((0, 0, -1), NoSlip()),
))
opt = {'symbolic_field': pdfs, 'cse_global': False, 'cse_pdfs': True}
cr_even = create_lb_collision_rule(stencil="D3Q19", relaxation_rate=relaxation_rate, compressible=False, optimization=opt)
cr_odd = create_lb_collision_rule(stencil="D3Q19", relaxation_rate=relaxation_rate, compressible=False, optimization=opt)
update_rule_aa_even = update_rule_with_push_boundaries(cr_even, pdfs, boundaries, streaming_pattern, Timestep.EVEN)
update_rule_aa_odd = update_rule_with_push_boundaries(cr_odd, pdfs, boundaries, streaming_pattern, Timestep.ODD)
getter_assignments = macroscopic_values_getter(update_rule_aa_even.method, velocity=u.center_vector,
pdfs=pdfs, density=None,
streaming_pattern=streaming_pattern,
previous_timestep=Timestep.EVEN)
getter_kernel = ps.create_kernel(getter_assignments, target=dh.default_target).compile()
even_kernel = ps.create_kernel(update_rule_aa_even, target=dh.default_target, ghost_layers=1).compile()
odd_kernel = ps.create_kernel(update_rule_aa_odd, target=dh.default_target, ghost_layers=1).compile()
```
%% Cell type:code id: tags:
``` python
def init():
dh.fill(pdfs.name, 0, ghost_layers=True)
def aa_time_loop(steps=100):
assert steps % 2 == 0, "Works only for an even number of time steps"
dh.all_to_gpu()
for i in range(steps // 2):
dh.run_kernel(odd_kernel)
dh.run_kernel(even_kernel)
dh.run_kernel(getter_kernel)
dh.all_to_cpu()
```
%% Cell type:code id: tags:
``` python
init()
aa_time_loop(time_steps)
vel_version1 = dh.gather_array(u.name, ghost_layers=False).copy()
plt.vector_field_magnitude(vel_version1[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
<matplotlib.colorbar.Colorbar at 0x7f9d260364c0>
%% Cell type:markdown id: tags:
# Version 2: Normal boundary handling
%% Cell type:code id: tags:
``` python
ldc = create_lid_driven_cavity(domain_size, relaxation_rate=relaxation_rate, lid_velocity=lid_velocity)
ldc.run(time_steps)
vel_version2 = ldc.velocity[:, :, :, :]
plt.vector_field_magnitude(vel_version2[:, :, domain_size[2]//2, :])
plt.colorbar()
```
%% Output
<matplotlib.colorbar.Colorbar at 0x7f9d2158a520>
from copy import deepcopy
import numpy as np
import pytest
from lbmpy.scenarios import create_channel
def run_equivalence_test(scenario_creator, time_steps=13, **params):
print("Scenario", params)
params['optimization']['target'] = 'cpu'
cpu_scenario = scenario_creator(**params)
params['optimization']['target'] = 'gpu'
gpu_scenario = scenario_creator(**params)
cpu_scenario.run(time_steps)
gpu_scenario.run(time_steps)
max_vel_error = np.max(np.abs(cpu_scenario.velocity_slice() - gpu_scenario.velocity_slice()))
max_rho_error = np.max(np.abs(cpu_scenario.density_slice() - gpu_scenario.density_slice()))
np.testing.assert_allclose(max_vel_error, 0, atol=1e-14)
np.testing.assert_allclose(max_rho_error, 0, atol=1e-14)
def test_force_driven_channel_short():
default = {
'scenario_creator': create_channel,
'domain_size': (32, 32),
'relaxation_rates': [1.95, 1.9, 1.92, 1.92],
'method': 'mrt3',
'pressure_difference': 0.001,
'optimization': {},
}
scenarios = []
# Different methods
for ds, method, compressible, block_size, field_layout in [((17, 12), 'srt', False, (12, 4), 'reverse_numpy'),
((18, 20), 'mrt', True, (4, 2), 'zyxf'),
((7, 11, 18), 'trt', False, False, 'numpy')]:
params = deepcopy(default)
if block_size is not False:
params['optimization'].update({
'gpu_indexing_params': {'block_size': block_size}
})
else:
params['optimization']['gpu_indexing'] = 'line'
params['domain_size'] = ds
params['method'] = method
params['compressible'] = compressible
params['optimization']['field_layout'] = field_layout
scenarios.append(params)
for scenario in scenarios:
run_equivalence_test(**scenario)
@pytest.mark.longrun
def test_force_driven_channel():
default = {
'scenario_creator': create_channel,
'domain_size': (32, 32),
'relaxation_rates': [1.95, 1.9, 1.92, 1.92],
'method': 'mrt',
'pressure_difference': 0.001,
'optimization': {},
}
scenarios = []
# Different methods
for method in ('srt', 'mrt'):
for compressible in (True, False):
params = deepcopy(default)
params['optimization'].update({
'gpu_indexing_params': {'block_size': (16, 16)}
})
params['method'] = method
params['compressible'] = compressible
scenarios.append(params)
# Blocked indexing with different block sizes
for block_size in ((16, 16), (8, 16), (4, 2)):
params = deepcopy(default)
params['method'] = 'mrt'
params['compressible'] = True
params['optimization'].update({
'gpu_indexing': 'block',
'gpu_indexing_params': {'block_size': block_size}
})
scenarios.append(params)
# Line wise indexing
params = deepcopy(default)
params['optimization']['gpu_indexing'] = 'line'
scenarios.append(params)
# Different field layouts
for field_layout in ('numpy', 'reverse_numpy', 'zyxf'):
for fixed_size in (False, True):
params = deepcopy(default)
params['optimization'].update({
'gpu_indexing_params': {'block_size': (16, 16)}
})
if fixed_size:
params['optimization']['field_size'] = params['domain_size']
else:
params['optimization']['field_size'] = None
params['optimization']['field_layout'] = field_layout
scenarios.append(params)
print("Testing %d scenarios" % (len(scenarios),))
for scenario in scenarios:
run_equivalence_test(**scenario)
import pytest
import numpy as np
from lbmpy.creationfunctions import create_lb_method, create_lb_method_from_existing
from lbmpy.methods import create_srt
from lbmpy.stencils import get_stencil
from lbmpy.methods.creationfunctions import create_with_default_polynomial_cumulants
from lbmpy.scenarios import create_lid_driven_cavity
@pytest.mark.parametrize('stencil_name', ['D2Q9', 'D3Q19', 'D3Q27'])
def test_weights(stencil_name):
stencil = get_stencil(stencil_name)
cumulant_method = create_with_default_polynomial_cumulants(stencil, [1])
moment_method = create_srt(stencil, 1, compressible=True, maxwellian_moments=True)
assert cumulant_method.weights == moment_method.weights
def test_cumulant_ldc():
sc_cumulant = create_lid_driven_cavity((20, 20), method='cumulant', relaxation_rate=1.999999,
compressible=True, force=(-1e-10, 0))
sc_cumulant_3D = create_lid_driven_cavity((20, 20, 3), method='cumulant', relaxation_rate=1.999999,
compressible=True, force=(-1e-10, 0, 0),
galilean_correction=True)
sc_cumulant.run(1000)
sc_cumulant_3D.run(1000)
assert np.isfinite(np.max(sc_cumulant.velocity[:, :]))
assert np.isfinite(np.max(sc_cumulant_3D.velocity[:, :, :]))
def test_create_cumulant_method_from_existing():
method = create_lb_method(stencil='D2Q9', method='cumulant', relaxation_rate=1.5)
old_relaxation_info_dict = method.relaxation_info_dict
def modification_func(cumulant, eq, rate):
if rate == 0:
return cumulant, eq, 1.0
return cumulant, eq, rate
new_method = create_lb_method_from_existing(method, modification_func)
new_relaxation_info_dict = new_method.relaxation_info_dict
for i, (o, n) in enumerate(zip(old_relaxation_info_dict.items(), new_relaxation_info_dict.items())):
assert o[0] == n[0]
assert o[1].equilibrium_value == n[1].equilibrium_value
if o[1].relaxation_rate == 0:
assert n[1].relaxation_rate == 1.0
else:
assert o[1].relaxation_rate == n[1].relaxation_rate
from lbmpy.creationfunctions import create_lb_function
from lbmpy.scenarios import create_lid_driven_cavity
from pystencils import show_code
def test_creation():
"""Simple test that makes sure that only float variables are created"""
func = create_lb_function(method='srt', relaxation_rate=1.5,
optimization={'double_precision': False})
code = str(show_code(func.ast))
assert 'double' not in code
def test_scenario():
sc = create_lid_driven_cavity((16, 16, 8), relaxation_rate=1.5,
optimization={'double_precision': False})
sc.run(1)
code_str = str(show_code(sc.ast))
assert 'double' not in code_str
"""Tests velocity and stress fluctuations for thermalized LB"""
import pystencils as ps
from lbmpy.creationfunctions import *
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
import numpy as np
from lbmpy.moments import is_bulk_moment, is_shear_moment, get_order
from pystencils.rng import PhiloxTwoDoubles
import pytest
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set
from pystencils.cpu.cpujit import get_compiler_config
def single_component_maxwell(x1, x2, kT, mass):
"""Integrate the probability density from x1 to x2 using the trapezoidal rule"""
x = np.linspace(x1, x2, 1000)
return np.trapz(np.exp(-mass * x**2 / (2. * kT)), x) / np.sqrt(2. * np.pi * kT/mass)
def rr_getter(moment_group):
"""Maps a group of moments to a relaxation rate (shear, bulk, even, odd)
in the 4 relaxation time thermalized LB model
"""
is_shear = [is_shear_moment(m, 3) for m in moment_group]
is_bulk = [is_bulk_moment(m, 3) for m in moment_group]
order = [get_order(m) for m in moment_group]
assert min(order) == max(order)
order = order[0]
if order < 2:
return 0
elif any(is_bulk):
assert all(is_bulk)
return sp.Symbol("omega_bulk")
elif any(is_shear):
assert all(is_shear)
return sp.Symbol("omega_shear")
elif order % 2 == 0:
assert order > 2
return sp.Symbol("omega_even")
else:
return sp.Symbol("omega_odd")
def second_order_moment_tensor_assignments(function_values, stencil, output_field):
"""Assignments for calculating the pressure tensor"""
assert len(function_values) == len(stencil)
dim = len(stencil[0])
return [ps.Assignment(output_field(i, j),
sum(c[i] * c[j] * f for f, c in zip(function_values, stencil)))
for i in range(dim) for j in range(dim)]
def add_pressure_output_to_collision_rule(collision_rule, pressure_field):
pressure_ouput = second_order_moment_tensor_assignments(collision_rule.method.pre_collision_pdf_symbols,
collision_rule.method.stencil, pressure_field)
collision_rule.main_assignments = collision_rule.main_assignments + pressure_ouput
def get_fluctuating_lb(size=None, kT=None, omega_shear=None, omega_bulk=None, omega_odd=None, omega_even=None, rho_0=None, target=None):
# Parameters
stencil = get_stencil('D3Q19')
# Setup data handling
dh = ps.create_data_handling(
[size]*3, periodicity=True, default_target=target)
src = dh.add_array('src', values_per_cell=len(stencil), layout='f')
dst = dh.add_array_like('dst', 'src')
rho = dh.add_array('rho', layout='f', latex_name='\\rho')
u = dh.add_array('u', values_per_cell=dh.dim, layout='f')
pressure_field = dh.add_array('pressure', values_per_cell=(
3, 3), layout='f', gpu=target == 'gpu')
force_field = dh.add_array(
'force', values_per_cell=3, layout='f', gpu=target == 'gpu')
# Method setup
method = create_mrt_orthogonal(
stencil=get_stencil('D3Q19'),
compressible=True,
weighted=True,
relaxation_rate_getter=rr_getter,
force_model=force_model_from_string('schiller', force_field.center_vector))
collision_rule = create_lb_collision_rule(
method,
fluctuating={
'temperature': kT
},
optimization={'cse_global': True}
)
add_pressure_output_to_collision_rule(collision_rule, pressure_field)
collision = create_lb_update_rule(collision_rule=collision_rule,
stencil=stencil,
method=method,
compressible=True,
kernel_type='collide_only',
optimization={'symbolic_field': src})
stream = create_stream_pull_with_output_kernel(collision.method, src, dst,
{'density': rho, 'velocity': u})
opts = {'cpu_openmp': True,
'cpu_vectorize_info': None,
'target': dh.default_target}
# Compile kernels
stream_kernel = ps.create_kernel(stream, **opts).compile()
collision_kernel = ps.create_kernel(collision, **opts).compile()
sync_pdfs = dh.synchronization_function([src.name])
# Initialization
init = macroscopic_values_setter(collision.method, velocity=(0,)*dh.dim,
pdfs=src.center_vector, density=rho.center)
init_kernel = ps.create_kernel(init, ghost_layers=0).compile()
dh.fill(rho.name, rho_0)
dh.fill(u.name, np.nan, ghost_layers=True, inner_ghost_layers=True)
dh.fill(u.name, 0)
dh.fill(force_field.name, np.nan,
ghost_layers=True, inner_ghost_layers=True)
dh.fill(force_field.name, 0)
dh.run_kernel(init_kernel)
# time loop
def time_loop(start, steps):
dh.all_to_gpu()
for i in range(start, start+steps):
dh.run_kernel(collision_kernel, omega_shear=omega_shear, omega_bulk=omega_bulk,
omega_odd=omega_odd, omega_even=omega_even, seed=42, time_step=i)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
return start+steps
return dh, time_loop
def test_resting_fluid(target="cpu"):
rho_0 = 0.86
kT = 4E-4
L = [60]*3
dh, time_loop = get_fluctuating_lb(size=L[0], target=target,
rho_0=rho_0, kT=kT,
omega_shear=0.8, omega_bulk=0.5, omega_even=.04, omega_odd=0.3)
# Test
t = 0
# warm up
t = time_loop(t, 10)
# Measurement
for i in range(10):
t = time_loop(t, 5)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservation
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(momentum, [0, 0, 0], atol=1E-10)
# temperature
kinetic_energy = 1/2*np.dot(res_rho, res_u*res_u)/np.product(L)
np.testing.assert_allclose(
kinetic_energy, [kT/2]*3, atol=kT*0.01)
# velocity distribution
v_hist, v_bins = np.histogram(
res_u, bins=11, range=(-.075, .075), density=True)
# Calculate expected values from single
v_expected = []
for j in range(len(v_hist)):
# Maxwell distribution
res = 1./(v_bins[j+1]-v_bins[j]) * \
single_component_maxwell(
v_bins[j], v_bins[j+1], kT, rho_0)
v_expected.append(res)
v_expected = np.array(v_expected)
# 10% accuracy on the entire histogram
np.testing.assert_allclose(v_hist, v_expected, rtol=0.1)
# 1% accuracy on the middle part
remove = 3
np.testing.assert_allclose(
v_hist[remove:-remove], v_expected[remove:-remove], rtol=0.01)
# pressure tensor against expressions from
# Duenweg, Schiller, Ladd, https://arxiv.org/abs/0707.1581
res_pressure = dh.gather_array("pressure").reshape((-1, 3, 3))
c_s = np.sqrt(1/3) # speed of sound
# average of pressure tensor
# Diagonal elements are rho c_s^22 +<u,u>. When the fluid is
# thermalized, the expectation value of <u,u> = kT due to the
# equi-partition theorem.
p_av_expected = np.diag([rho_0*c_s**2 + kT]*3)
np.testing.assert_allclose(
np.mean(res_pressure, axis=0), p_av_expected, atol=c_s**2/2000)
def test_point_force(target="cpu"):
"""Test momentum balance for thermalized fluid with applied poitn forces"""
rho_0 = 0.86
kT = 4E-4
L = [8]*3
dh, time_loop = get_fluctuating_lb(size=L[0], target=target,
rho_0=rho_0, kT=kT,
omega_shear=0.8, omega_bulk=0.5, omega_even=.04, omega_odd=0.3)
# Test
t = 0
# warm up
t = time_loop(t, 100)
introduced_momentum = np.zeros(3)
for i in range(100):
point_force = 1E-5*(np.random.random(3) - .5)
introduced_momentum += point_force
# Note that ghost layers are included in the indexing
force_pos = np.random.randint(1, L[0]-2, size=3)
dh.cpu_arrays["force"][force_pos[0],
force_pos[1], force_pos[2]] = point_force
t = time_loop(t, 1)
res_u = dh.gather_array("u").reshape((-1, 3))
res_rho = dh.gather_array("rho").reshape((-1,))
# mass conservation
np.testing.assert_allclose(np.mean(res_rho), rho_0, atol=3E-12)
# momentum conservation
momentum = np.dot(res_rho, res_u)
np.testing.assert_allclose(
momentum, introduced_momentum + 0.5 * point_force, atol=1E-10)
dh.cpu_arrays["force"][force_pos[0],
force_pos[1], force_pos[2]] = np.zeros(3)
@pytest.mark.skipif(not get_supported_instruction_sets(), reason="No vector instruction sets supported")
@pytest.mark.parametrize('assume_aligned', (True, False))
@pytest.mark.parametrize('assume_inner_stride_one', (True, False))
@pytest.mark.parametrize('assume_sufficient_line_padding', (True, False))
def test_vectorization(assume_aligned, assume_inner_stride_one, assume_sufficient_line_padding):
method = create_mrt_orthogonal(
stencil=get_stencil('D2Q9'),
compressible=True,
weighted=True,
relaxation_rate_getter=rr_getter)
collision_rule = create_lb_collision_rule(
method,
fluctuating={
'temperature': sp.Symbol("kT"),
'rng_node': PhiloxTwoDoubles,
'block_offsets': (0, 0),
},
optimization={'cse_global': True}
)
collision = create_lb_update_rule(collision_rule=collision_rule,
stencil=method.stencil,
method=method,
compressible=True,
kernel_type='collide_only')
instruction_sets = get_supported_instruction_sets()
if get_compiler_config()['os'] == 'windows':
# skip instruction sets supported by the CPU but not by the compiler
if 'avx' in instruction_sets and ('/arch:avx2' not in get_compiler_config()['flags'].lower()
and '/arch:avx512' not in get_compiler_config()['flags'].lower()):
instruction_sets.remove('avx')
if 'avx512' in instruction_sets and '/arch:avx512' not in get_compiler_config()['flags'].lower():
instruction_sets.remove('avx512')
instruction_set = instruction_sets[-1]
opts = {'cpu_openmp': False,
'cpu_vectorize_info': {
'instruction_set': instruction_set,
'assume_aligned': assume_aligned,
'assume_inner_stride_one': assume_inner_stride_one,
'assume_sufficient_line_padding': assume_sufficient_line_padding,
},
'target': 'cpu'}
if not assume_inner_stride_one and 'storeS' not in get_vector_instruction_set('double', instruction_set):
with pytest.warns(UserWarning) as warn:
code = ps.create_kernel(collision, **opts)
assert 'Could not vectorize loop' in warn[0].message.args[0]
else:
with pytest.warns(None) as warn:
code = ps.create_kernel(collision, **opts)
assert len(warn) == 0
code.compile()
from pystencils.session import *
from lbmpy.session import *
from lbmpy.macroscopic_value_kernels import macroscopic_values_setter
import lbmpy.forcemodels
from lbmpy.moments import is_bulk_moment
import pytest
from contextlib import ExitStack as does_not_raise
force_models = [fm.lower() for fm in dir(lbmpy.forcemodels) if fm[0].isupper()]
def test_force_models_available():
assert 'guo' in force_models
assert 'luo' in force_models
@pytest.mark.parametrize("method", ["srt", "trt"])
@pytest.mark.parametrize("force_model", force_models)
@pytest.mark.parametrize("omega", [0.5, 1.5])
def test_total_momentum(method, force_model, omega):
L = (16, 16)
stencil = get_stencil("D2Q9")
F = [2e-4, -3e-4]
dh = ps.create_data_handling(L, periodicity=True, default_target='cpu')
src = dh.add_array('src', values_per_cell=len(stencil))
dst = dh.add_array_like('dst', 'src')
ρ = dh.add_array('rho')
u = dh.add_array('u', values_per_cell=dh.dim)
expectation = does_not_raise()
skip = False
if force_model in ['guo', 'buick'] and method != 'srt':
expectation = pytest.raises(AssertionError)
skip = True
with expectation:
collision = create_lb_update_rule(method=method,
stencil=stencil,
relaxation_rate=omega,
compressible=True,
force_model=force_model,
force=F,
kernel_type='collide_only',
optimization={'symbolic_field': src})
if skip:
return
stream = create_stream_pull_with_output_kernel(collision.method, src, dst,
{'density': ρ, 'velocity': u})
opts = {'cpu_openmp': True,
'cpu_vectorize_info': None,
'target': dh.default_target}
stream_kernel = ps.create_kernel(stream, **opts).compile()
collision_kernel = ps.create_kernel(collision, **opts).compile()
def init():
dh.fill(ρ.name, 1)
dh.fill(u.name, 0)
setter = macroscopic_values_setter(collision.method, velocity=(0,)*dh.dim,
pdfs=src.center_vector, density=ρ.center)
kernel = ps.create_kernel(setter, ghost_layers=0).compile()
dh.run_kernel(kernel)
sync_pdfs = dh.synchronization_function([src.name])
def time_loop(steps):
dh.all_to_gpu()
for i in range(steps):
dh.run_kernel(collision_kernel)
sync_pdfs()
dh.run_kernel(stream_kernel)
dh.swap(src.name, dst.name)
dh.all_to_cpu()
t = 20
init()
time_loop(t)
total = np.sum(dh.gather_array(u.name), axis=(0,1))
assert np.allclose(total/np.prod(L)/F/t, 1)
@pytest.mark.parametrize("stencil", ["D2Q9", "D3Q15", "D3Q19", "D3Q27"])
@pytest.mark.parametrize("force_model", ["simple", "schiller"])
def test_modes(stencil, force_model):
"""check Schiller's force term in mode space"""
stencil = get_stencil(stencil)
dim = len(stencil[0])
omega_s = sp.Symbol("omega_s")
omega_b = sp.Symbol("omega_b")
omega_o = sp.Symbol("omega_o")
omega_e = sp.Symbol("omega_e")
F = [sp.Symbol(f"F_{i}") for i in range(dim)]
method = create_lb_method(method="mrt", weighted=True,
stencil=stencil,
relaxation_rates=[omega_s, omega_b, omega_o, omega_e, omega_o, omega_e],
compressible=True,
force_model=force_model,
force=F)
force_moments = sp.simplify(method.moment_matrix * sp.Matrix(method.force_model(method)))
# The mass mode should be zero
assert force_moments[0] == 0
# The momentum moments should contain the force
assert list(force_moments[1:dim+1]) == F
if force_model == "schiller":
num_stresses = (dim*dim-dim)//2+dim
lambda_s, lambda_b = -omega_s, -omega_b
# The stress moments should match eq. 47 from https://doi.org/10.1023/A:1010414013942
u = method.first_order_equilibrium_moment_symbols
def traceless(m):
tr = sp.simplify(sum([m[i,i] for i in range(dim)]))
return m - tr/m.shape[0]*sp.eye(m.shape[0])
C = sp.Rational(1,2) * (2 + lambda_s) * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) + \
traceless(sp.Matrix(F) * sp.Matrix(u).transpose())) + \
sp.Rational(1,method.dim) * (2 + lambda_b) * sp.Matrix(u).dot(F) * sp.eye(method.dim)
subs = {sp.Symbol(chr(ord("x")+i)) * sp.Symbol(chr(ord("x")+j)) : C[i,j]
for i in range(dim) for j in range(dim)}
for force_moment, moment in zip(force_moments[dim+1:dim+1+num_stresses],
method.moments[dim+1:dim+1+num_stresses]):
ref = moment.subs(subs)
diff = sp.simplify(ref - force_moment)
if is_bulk_moment(moment, dim):
assert diff == 0 or isinstance(diff, sp.Rational) # difference should be zero or a constant
else:
assert diff == 0 # difference should be zero
ff = sp.Matrix(method.force_model(method))
# Check eq. 4.53a from schiller2008thermal
assert sp.simplify(sum(ff)) == 0
# Check eq. 4.53b from schiller2008thermal
assert [sp.simplify(sum(ff[i] * stencil[i][j] for i in range(len(stencil)))) for j in range(dim)] == F
# Check eq. 4.61a from schiller2008thermal
ref = (2 + lambda_s)/2 * (traceless(sp.Matrix(u) * sp.Matrix(F).transpose()) + \
traceless(sp.Matrix(F) * sp.Matrix(u).transpose()))
s = sp.zeros(dim)
for i in range(0, len(stencil)):
s += ff[i] * traceless(sp.Matrix(stencil[i]) * sp.Matrix(stencil[i]).transpose())
assert sp.simplify(s-ref) == sp.zeros(dim)
# Check eq. 4.61b from schiller2008thermal
assert sp.simplify(sum(ff[i] * stencil[i][a]**2 for i in range(len(stencil)) for a in range(dim))
- (2+lambda_b)*sp.Matrix(u).dot(F)) == 0
# All other moments should be zero
assert list(force_moments[dim+1+num_stresses:]) == [0] * (len(stencil)-(dim+1+num_stresses))
elif force_model == "simple":
# All other moments should be zero
assert list(force_moments[dim+1:]) == [0] * (len(stencil)-(dim+1))
@pytest.mark.parametrize("force_model", force_models)
def test_momentum_density_shift(force_model):
target = 'cpu'
stencil = get_stencil('D2Q9')
domain_size = (4, 4)
dh = ps.create_data_handling(domain_size=domain_size, default_target=target)
rho = dh.add_array('rho', values_per_cell=1)
dh.fill('rho', 0.0, ghost_layers=True)
velField = dh.add_array('velField', values_per_cell=dh.dim)
dh.fill('velField', 0.0, ghost_layers=True)
momentum_density = dh.add_array('momentum_density', values_per_cell=dh.dim)
dh.fill('momentum_density', 0.0, ghost_layers=True)
src = dh.add_array('src', values_per_cell=len(stencil))
dh.fill('src', 0.0, ghost_layers=True)
method = create_lb_method(method="srt", compressible=True, force_model=force_model, force=[1, 2])
momentum_density_symbols = sp.symbols("md_:2")
cqc = method.conserved_quantity_computation
momentum_density_getter = cqc.output_equations_from_pdfs(src.center_vector,
{'density': rho.center,
'momentum_density': momentum_density.center_vector})
momentum_density_ast = ps.create_kernel(momentum_density_getter, target=dh.default_target)
momentum_density_kernel = momentum_density_ast.compile()
dh.run_kernel(momentum_density_kernel)
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 0]) == np.prod(domain_size) / 2
assert np.sum(dh.gather_array(momentum_density.name)[:, :, 1]) == np.prod(domain_size)
from lbmpy.cumulants import raw_moment_as_function_of_cumulants
from lbmpy.maxwellian_equilibrium import *
from lbmpy.moments import (
MOMENT_SYMBOLS, exponents_to_polynomial_representations, moment_matrix,
moments_up_to_component_order, moments_up_to_order)
from lbmpy.stencils import get_stencil
from pystencils.sympyextensions import remove_higher_order_terms
def test_maxwellian_moments():
"""Check moments of continuous Maxwellian"""
rho = sp.Symbol("rho")
u = sp.symbols("u_0 u_1 u_2")
c_s = sp.Symbol("c_s")
eq_moments = get_moments_of_continuous_maxwellian_equilibrium(((0, 0, 0), (0, 0, 1)),
dim=3, rho=rho, u=u, c_s_sq=c_s ** 2)
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[2]
x, y, z = MOMENT_SYMBOLS
one = sp.Rational(1, 1)
eq_moments = get_moments_of_continuous_maxwellian_equilibrium((one, x, x ** 2, x * y),
dim=2, rho=rho, u=u[:2], c_s_sq=c_s ** 2)
assert eq_moments[0] == rho
assert eq_moments[1] == rho * u[0]
assert eq_moments[2] == rho * (c_s ** 2 + u[0] ** 2)
assert eq_moments[3] == rho * u[0] * u[1]
def test_continuous_discrete_moment_equivalence():
"""Check that moments up to order 3 agree with moments of the continuous Maxwellian"""
for stencil in [get_stencil(n) for n in ["D2Q9", "D3Q15", "D3Q19", "D3Q27"]]:
dim = len(stencil[0])
c_s_sq = sp.Rational(1, 3)
moments = tuple(moments_up_to_order(3, dim=dim, include_permutations=False))
cm = sp.Matrix(get_moments_of_continuous_maxwellian_equilibrium(moments, order=2, dim=dim, c_s_sq=c_s_sq))
dm = sp.Matrix(get_moments_of_discrete_maxwellian_equilibrium(stencil, moments, order=2,
compressible=True, c_s_sq=c_s_sq))
diff = sp.simplify(cm - dm)
for d in diff:
assert d == 0
def test_moment_cumulant_continuous_equivalence():
"""Test that discrete equilibrium is the same up to order 3 when obtained with following methods
* eq1: take moments of continuous Maxwellian and transform back to pdf space
* eq2: take cumulants of continuous Maxwellian, transform to moments then transform to pdf space
* eq3: take discrete equilibrium from LBM literature
* eq4: same as eq1 but with built-in function
"""
for stencil in [get_stencil('D2Q9'), get_stencil('D3Q27')]:
dim = len(stencil[0])
u = sp.symbols("u_:{dim}".format(dim=dim))
indices = tuple(moments_up_to_component_order(2, dim=dim))
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_moments_of_continuous_maxwellian_equilibrium(indices, dim=dim, u=u, c_s_sq=c_s_sq)
eq_cumulants = get_cumulants_of_continuous_maxwellian_equilibrium(indices, dim=dim, u=u, c_s_sq=c_s_sq)
eq_cumulants = {idx: c for idx, c in zip(indices, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in indices]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4
def test_moment_cumulant_continuous_equivalence_polynomial_formulation():
"""Same as test above, but instead of index tuples, the polynomial formulation is used."""
for stencil in [get_stencil('D2Q9'), get_stencil('D3Q27')]:
dim = len(stencil[0])
u = sp.symbols(f"u_:{dim}")
index_tuples = tuple(moments_up_to_component_order(2, dim=dim))
indices = exponents_to_polynomial_representations(index_tuples)
c_s_sq = sp.Rational(1, 3)
eq_moments1 = get_moments_of_continuous_maxwellian_equilibrium(indices, dim=dim, u=u, c_s_sq=c_s_sq)
eq_cumulants = get_cumulants_of_continuous_maxwellian_equilibrium(indices, dim=dim, u=u, c_s_sq=c_s_sq)
eq_cumulants = {idx: c for idx, c in zip(index_tuples, eq_cumulants)}
eq_moments2 = [raw_moment_as_function_of_cumulants(idx, eq_cumulants) for idx in index_tuples]
pdfs_to_moments = moment_matrix(indices, stencil)
def normalize(expressions):
return [remove_higher_order_terms(e.expand(), symbols=u, order=3) for e in expressions]
eq1 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments1))
eq2 = normalize(pdfs_to_moments.inv() * sp.Matrix(eq_moments2))
eq3 = normalize(discrete_maxwellian_equilibrium(stencil, order=3, c_s_sq=c_s_sq, compressible=True))
eq4 = normalize(generate_equilibrium_by_matching_moments(stencil, indices, c_s_sq=c_s_sq))
assert eq1 == eq2
assert eq2 == eq3
assert eq3 == eq4
import pytest
import sympy as sp
from lbmpy.creationfunctions import create_lb_method
from lbmpy.relaxationrates import get_shear_relaxation_rate
def test_relaxation_rate():
method = create_lb_method(stencil="D3Q19", method='mrt_raw',
relaxation_rates=[1 + i / 10 for i in range(19)])
with pytest.raises(ValueError) as e:
get_shear_relaxation_rate(method)
assert 'Shear moments are relaxed with different relaxation' in str(e.value)
omegas = sp.symbols("omega_:4")
method = create_lb_method(stencil="D2Q9", method='mrt', relaxation_rates=omegas)
assert get_shear_relaxation_rate(method) == omegas[0]
import os
from types import MappingProxyType
import numpy as np
from lbmpy.scenarios import create_lid_driven_cavity as run_ldc_lbmpy
from pystencils import make_slice
def create_force_driven_channel(force=1e-6, domain_size=None, dim=2, radius=None, length=None,
optimization=MappingProxyType({}), lbm_kernel=None, kernel_params=MappingProxyType({}),
**kwargs):
from lbmpy.lbstep import LatticeBoltzmannStep
from lbmpy.boundaries import NoSlip
from pystencils.slicing import slice_from_direction
wall_boundary = NoSlip()
if domain_size is not None:
dim = len(domain_size)
else:
if dim is None or radius is None or length is None:
raise ValueError("Pass either 'domain_size' or 'dim', 'radius' and 'length'")
assert dim in (2, 3)
kwargs['force'] = tuple([force, 0, 0][:dim])
round_channel = False
if radius is not None:
assert length is not None
if dim == 3:
domain_size = (length, 2 * radius + 1, 2 * radius + 1)
round_channel = True
else:
if domain_size is None:
domain_size = (length, 2 * radius)
if 'force_model' not in kwargs:
kwargs['force_model'] = 'guo'
lb_step = LatticeBoltzmannStep(domain_size, optimization=optimization, lbm_kernel=lbm_kernel,
kernel_params=kernel_params, periodicity=(True, False, False), **kwargs)
boundary_handling = lb_step.boundary_handling
if dim == 2:
for direction in ('N', 'S'):
boundary_handling.set_boundary(wall_boundary, slice_from_direction(direction, dim))
elif dim == 3:
if round_channel:
def circle_mask_cb(_, y, z):
y_mid = np.max(y) // 2
z_mid = np.max(z) // 2
return (y - y_mid) ** 2 + (z - z_mid) ** 2 > radius ** 2
boundary_handling.set_boundary(wall_boundary, mask_callback=circle_mask_cb)
else:
for direction in ('N', 'S', 'T', 'B'):
boundary_handling.set_boundary(wall_boundary, slice_from_direction(direction, dim))
assert domain_size is not None
if 'force_model' not in kwargs:
kwargs['force_model'] = 'guo'
return lb_step
def plot_velocity_fields(vel1, vel2):
import lbmpy.plot as plt
diff = np.average(np.abs(vel2 - vel1))
has_diff = diff > 1e-12
num_plots = 3 if has_diff else 2
plt.subplot(1, num_plots, 1)
plt.title("lbmpy")
plt.vector_field(vel1)
plt.subplot(1, num_plots, 2)
plt.title("walberla ref")
plt.vector_field(vel2)
if has_diff:
plt.title("Difference (%f)" % diff)
plt.subplot(1, num_plots, 3)
plt.vector_field(vel2 - vel1)
plt.show()
def compare_scenario(lbmpy_scenario_creator, walberla_scenario_creator, optimization=MappingProxyType({}),
action='Testing', name='ss', plot="off", **kwargs):
if 'time_steps' in kwargs:
time_steps = kwargs['time_steps']
del kwargs['time_steps']
else:
time_steps = 100
ref_file_path = get_directory_reference_files()
if action == 'Testing':
reference = np.load(os.path.join(ref_file_path, name + ".npz"))
lbmpy_version = lbmpy_scenario_creator(optimization=optimization, **kwargs)
lbmpy_version.run(time_steps)
rho_lbmpy = lbmpy_version.density_slice(make_slice[:, :] if lbmpy_version.dim == 2 else make_slice[:, :, :])
vel_lbmpy = lbmpy_version.velocity_slice(make_slice[:, :] if lbmpy_version.dim == 2 else make_slice[:, :, :])
if plot == "on":
plot_velocity_fields(vel_lbmpy, reference['vel'])
np.testing.assert_almost_equal(reference['rho'], rho_lbmpy, err_msg="Density fields are different")
np.testing.assert_almost_equal(reference['vel'], vel_lbmpy, err_msg="Velocity fields are different")
else:
wlb_time_loop = walberla_scenario_creator(**kwargs)
pdfs_wlb, rho_wlb, vel_wlb = wlb_time_loop(time_steps)
if os.path.exists(ref_file_path + name + ".npz"):
os.remove(ref_file_path + name + ".npz")
np.savez_compressed(ref_file_path + name, pdfs=pdfs_wlb, rho=rho_wlb, vel=vel_wlb)
def get_directory_reference_files():
script_file = os.path.realpath(__file__)
script_dir = os.path.dirname(script_file)
return os.path.join(script_dir, "reference_files")
def compare_lid_driven_cavity(optimization=MappingProxyType({}), action='Testing', plot="off", **kwargs):
if kwargs['method'] == 'MRT':
name = "LidDrivenCavity_" + kwargs['method']
else:
name = "LidDrivenCavity_" + kwargs['method'] + "_" + kwargs['force_model']
if kwargs['compressible']:
name = name + "_compressible"
else:
name = name + "_incompressible"
try:
import waLBerla.field
from lbmpy_tests.walberla_scenario_setup import create_lid_driven_cavity as run_lid_driven_cavity_walberla
except ImportError:
run_lid_driven_cavity_walberla = None
return compare_scenario(run_ldc_lbmpy, run_lid_driven_cavity_walberla, optimization, action, name, plot, **kwargs)
def compare_force_driven_channel(optimization=MappingProxyType({}), action='Testing', plot="off", **kwargs):
from functools import partial
lbmpy_func = partial(create_force_driven_channel, dim=2)
name = "ForceDrivenChannel_" + kwargs['force_model']
if kwargs['compressible']:
name = name + "_compressible"
else:
name = name + "_incompressible"
try:
import waLBerla.field
from lbmpy_tests.walberla_scenario_setup import create_lid_driven_cavity as run_force_driven_channel_walberla
except ImportError:
run_force_driven_channel_walberla = None
return compare_scenario(lbmpy_func, run_force_driven_channel_walberla, optimization, action, name, plot, **kwargs)
def test_channel_srt(action='Testing', plot="off"):
params = {'force': 0.0001,
'radius': 40,
'length': 40,
'method': 'SRT',
'stencil': 'D2Q9',
'time_steps': 500,
'maxwellian_moments': False,
'relaxation_rates': [1.8]}
if action == 'Testing' or action == 'Regenerate':
for force_model_name in ('simple', 'luo', 'guo'):
for compressible in (False, True):
print("%s Channel SRT, Force Model %s, compressible %d" % (action, force_model_name, compressible))
compare_force_driven_channel(compressible=compressible, force_model=force_model_name,
action=action, plot=plot, **params)
else:
print("Possible Actions: Regenerate or Testing")
def test_ldc_srt(action='Testing', plot="off"):
force = (0.0001, -0.00002)
if action == 'Testing' or action == 'Regenerate':
for force_model_name in ('simple', 'luo', 'guo'):
for compressible in (False, True):
print("%s LidDrivenCavity SRT, Force Model %s, compressible %d" % (action, force_model_name,
compressible))
compare_lid_driven_cavity(domain_size=(16, 19), lid_velocity=0.005, stencil='D2Q9',
method='SRT', relaxation_rates=[1.8], compressible=compressible,
maxwellian_moments=False,
force=force, force_model=force_model_name, action=action, plot=plot)
else:
print("Possible Actions: Regenerate or Testing")
def test_ldc_trt(action='Testing', plot="off"):
f = (0.0001, -0.00002, 0.0000124)
if action == 'Testing' or action == 'Regenerate':
for force_modelName in ('luo',): # guo for multiple relaxation rates has to be implemented...
for compressible in (True, False):
print("%s LidDrivenCavity TRT, Force Model %s, compressible %d" % (action, force_modelName,
compressible))
# print("testing", force_modelName, compressible)
compare_lid_driven_cavity(domain_size=(16, 17, 18), lid_velocity=0.005, stencil='D3Q19',
method='TRT', relaxation_rates=[1.8, 1.3], compressible=compressible,
maxwellian_moments=False,
force=f, force_model=force_modelName, action=action, plot=plot)
else:
print("Possible Actions: Regenerate or Testing")
def test_ldc_mrt(action='Testing', plot="off"):
from lbmpy.methods import mrt_orthogonal_modes_literature
from lbmpy.stencils import get_stencil
if action == 'Testing' or action == 'Regenerate':
print("%s LidDrivenCavity MRT, compressible 0" % action)
moments = mrt_orthogonal_modes_literature(get_stencil("D3Q19"), True)
compare_lid_driven_cavity(domain_size=(16, 17, 18), lid_velocity=0.005, stencil='D3Q19',
method='MRT', nested_moments=moments, compressible=False, maxwellian_moments=False,
relaxation_rates=[1.3, 1.4, 1.5, 1.25, 1.36, 1.12], action=action, plot=plot)
else:
print("Possible Actions: Regenerate or Testing")
import numpy as np
from lbmpy.creationfunctions import create_lb_function
def test_srt():
src = np.zeros((3, 3, 9))
dst = np.zeros_like(src)
cuda = False
opt_params = {} if not cuda else {'target': 'gpu'}
func = create_lb_function(method='srt', stencil='D2Q9', relaxation_rates=[1.8], compressible=False,
optimization=opt_params)
if cuda:
import pycuda.gpuarray as gpuarray
gpu_src, gpu_dst = gpuarray.to_gpu(src), gpuarray.to_gpu(dst)
func(src=gpu_src, dst=gpu_dst)
gpu_src.get(src)
gpu_dst.get(dst)
else:
func(src=src, dst=dst)
np.testing.assert_allclose(np.sum(np.abs(dst)), 0.0, atol=1e-13)
%% Cell type:code id: tags:
``` python
import pytest
pytest.importorskip('pycuda')
```
%% Output
<module 'pycuda' from '/home/markus/miniconda3/envs/pystencils/lib/python3.8/site-packages/pycuda/__init__.py'>
%% Cell type:code id: tags:
``` python
from lbmpy.session import *
from lbmpy.sparse import *
from pystencils.field import FieldType
import pycuda.gpuarray as gpuarray
import pystencils as ps
```
%% Cell type:markdown id: tags:
# Sparse (list based) LBM
%% Cell type:code id: tags:
``` python
domain_size = (20, 20)
omega = 1.8
target = 'cpu'
ghost_layers = 1
arr_size = tuple(ds + 2 * ghost_layers for ds in domain_size)
lid_velocity = 0.01
force = 1e-6
channel = True
if channel:
kwargs={'force': (force, 0)}
else:
kwargs = {}
method = create_lb_method(stencil='D2Q9', relaxation_rate=omega, compressible=False, **kwargs)
```
%% Cell type:code id: tags:
``` python
ubb = UBB( (lid_velocity, 0) )
noslip = NoSlip()
flags = {
'fluid': 1,
noslip: 2,
ubb: 4,
}
flag_arr = np.zeros(arr_size, dtype=np.uint16)
flag_arr.fill(flags['fluid'])
if channel:
flag_arr[0, :] = 0
flag_arr[-1, :] = 0
flag_arr[:, 0] = flags[noslip]
flag_arr[:, -1] = flags[noslip]
else:
flag_arr[:, -1] = flags[ubb]
flag_arr[:, 0] = flags[noslip]
flag_arr[0, :] = flags[noslip]
flag_arr[-1, :] = flags[noslip]
plt.scalar_field(flag_arr)
plt.colorbar();
```
%% Output
%% Cell type:markdown id: tags:
### Mappings
%% Cell type:code id: tags:
``` python
mapping = SparseLbMapper(method.stencil, flag_arr, flags['fluid'], flags[noslip], flags[ubb])
index_arr = mapping.create_index_array(ghost_layers)
# Arrays
#index_arr = index_arr_linear.reshape([len(method.stencil), mapping.num_fluid_cells])
#index_arr = index_arr.swapaxes(0, 1)
pdf_arr = np.empty((len(mapping), len(method.stencil)), order='f')
pdf_arr_tmp = np.empty_like(pdf_arr)
vel_arr = np.ones([mapping.num_fluid_cells, method.dim], order='f')
```
%% Cell type:code id: tags:
``` python
pdf_field, pdf_field_tmp, vel_field = ps.fields("f(9), d(9), u(2): [1D]",
#f=pdf_arr[:mapping.num_fluid_cells],
#d=pdf_arr_tmp[:mapping.num_fluid_cells],
#u=vel_arr
)
pdf_field.field_type = FieldType.CUSTOM
pdf_field.pdf_field_tmp = FieldType.CUSTOM
```
%% Cell type:markdown id: tags:
### Macroscopic quantities
%% Cell type:code id: tags:
``` python
cqc = method.conserved_quantity_computation
inp_eqs = cqc.equilibrium_input_equations_from_init_values()
setter_eqs = method.get_equilibrium(conserved_quantity_equations=inp_eqs)
setter_eqs = setter_eqs.new_with_substitutions({sym: pdf_field(i)
for i, sym in enumerate(method.post_collision_pdf_symbols)})
kernel_initialize = ps.create_kernel(setter_eqs, ghost_layers=((0, 0),), ).compile()
def init():
kernel_initialize(f=pdf_arr[:mapping.num_fluid_cells])
init()
getter_eqs = cqc.output_equations_from_pdfs(pdf_field.center_vector,
{'velocity': vel_field})
kernel_compute_u = ps.create_kernel(getter_eqs, ghost_layers=((0,0),) ).compile()
def get_velocity(arr=pdf_arr):
fluid_cell_arr = mapping.coordinates
kernel_compute_u(f=pdf_arr[:mapping.num_fluid_cells], u=vel_arr)
full_velocity_arr = np.zeros(flag_arr.shape + (2,), dtype=np.float64)
full_velocity_arr.fill(np.nan)
arr = fluid_cell_arr[:mapping.num_fluid_cells]
full_velocity_arr[arr['x'], arr['y']] = vel_arr
return full_velocity_arr[1:-1, 1:-1]
```
%% Cell type:markdown id: tags:
### Stream Collide Kernel
%% Cell type:code id: tags:
``` python
#index_field = ps.Field.create_from_numpy_array("idx", index_arr, index_dimensions=1)
index_field = ps.Field.create_generic("idx", spatial_dimensions=1, index_dimensions=1, dtype=index_arr.dtype)
collision_rule = method.get_collision_rule()
Q = len(method.stencil)
symbol_subs = {sym: pdf_field.absolute_access((index_field(i-1),),())
for i, sym in enumerate(method.pre_collision_pdf_symbols)}
symbol_subs.update({sym: pdf_field_tmp(i) for i, sym in enumerate(method.post_collision_pdf_symbols)})
symbol_subs[method.pre_collision_pdf_symbols[0]] = pdf_field(0) # special case for center
symbol_subs
```
%% Output
$\displaystyle \left\{ d_{0} : {{d}_{0}^{0}}, \ d_{1} : {{d}_{0}^{1}}, \ d_{2} : {{d}_{0}^{2}}, \ d_{3} : {{d}_{0}^{3}}, \ d_{4} : {{d}_{0}^{4}}, \ d_{5} : {{d}_{0}^{5}}, \ d_{6} : {{d}_{0}^{6}}, \ d_{7} : {{d}_{0}^{7}}, \ d_{8} : {{d}_{0}^{8}}, \ f_{0} : {{f}_{0}^{0}}, \ f_{1} : {{f}_{\mathbf{{idx}_{0}^{0}}}^{0}}, \ f_{2} : {{f}_{\mathbf{{idx}_{0}^{1}}}^{0}}, \ f_{3} : {{f}_{\mathbf{{idx}_{0}^{2}}}^{0}}, \ f_{4} : {{f}_{\mathbf{{idx}_{0}^{3}}}^{0}}, \ f_{5} : {{f}_{\mathbf{{idx}_{0}^{4}}}^{0}}, \ f_{6} : {{f}_{\mathbf{{idx}_{0}^{5}}}^{0}}, \ f_{7} : {{f}_{\mathbf{{idx}_{0}^{6}}}^{0}}, \ f_{8} : {{f}_{\mathbf{{idx}_{0}^{7}}}^{0}}\right\}$
{d₀: d_C__0, d₁: d_C__1, d₂: d_C__2, d₃: d_C__3, d₄: d_C__4, d₅: d_C__5, d₆: d
_C__6, d₇: d_C__7, d₈: d_C__8, f₀: f_C__0, f₁: f_b59661980ed8, f₂: f_c4b733d80
293, f₃: f_46a6e6d3a3b5, f₄: f_31387ae7897f, f₅: f_24d70d6a047a, f₆: f_c9de861
6062f, f₇: f_c16814a33bbc, f₈: f_52ab73425808}
%% Cell type:code id: tags:
``` python
collision_rule = method.get_collision_rule()
update_rule = collision_rule.new_with_substitutions(symbol_subs)
kernel_stream_collide = ps.create_kernel(update_rule, ghost_layers=[(0,0)], target=target).compile()
```
%% Cell type:markdown id: tags:
### Boundary Kernels
%% Cell type:code id: tags:
``` python
if not channel:
if target == 'gpu':
raise NotImplementedError("UBB on GPU not working yet")
ubb_mapper = SparseLbBoundaryMapper(ubb, method, pdf_field)
#TODO the following line is wrong: kernel contains accesses to index_field and pdf_field which have
#different size: a correct kernel comes out when by change the shape is taken from index field,
# when taken from pdf field, a wrong kernel is generated
ubb_kernel = ps.create_kernel(ubb_mapper.assignments(), ghost_layers=0).compile()
ubb_idx_arr = ubb_mapper.create_index_arr(mapping, flags[ubb])
ps.show_code(ubb_kernel.ast)
def handle_ubb():
ubb_kernel(indexField=ubb_idx_arr, f=pdf_arr[:mapping.num_fluid_cells])
else:
def handle_ubb():
pass
```
%% Cell type:markdown id: tags:
### Time Loop
%% Cell type:code id: tags:
``` python
def time_step():
global pdf_arr, pdf_arr_tmp, index_arr
handle_ubb()
if target == 'gpu':
gpu_pdf_arr = gpuarray.to_gpu(pdf_arr)
gpu_pdf_arr_tmp = gpuarray.to_gpu(pdf_arr_tmp)
gpu_index_arr = gpuarray.to_gpu(index_arr)
kernel_stream_collide(f=gpu_pdf_arr[:mapping.num_fluid_cells],
d=gpu_pdf_arr_tmp[:mapping.num_fluid_cells],
idx=gpu_index_arr)
pdf_arr = gpu_pdf_arr.get()
pdf_arr_tmp = gpu_pdf_arr_tmp.get()
index_arr = gpu_index_arr.get()
else:
kernel_stream_collide(f=pdf_arr[:mapping.num_fluid_cells],
d=pdf_arr_tmp[:mapping.num_fluid_cells],
idx=index_arr)
pdf_arr_tmp, pdf_arr = pdf_arr, pdf_arr_tmp
def run(steps=100):
for t in range(steps):
time_step()
return get_velocity()
```
%% Cell type:code id: tags:
``` python
init()
```
%% Cell type:code id: tags:
``` python
init()
result = run(100)
plt.vector_field(result, step=1);
```
%% Output
%% Cell type:markdown id: tags:
### Check against reference
%% Cell type:code id: tags:
``` python
if channel:
reference = create_channel(domain_size, force=force, lb_method=method)
else:
reference = create_lid_driven_cavity(domain_size, relaxation_rate=omega, lid_velocity=lid_velocity,
compressible=False)
reference.run(100)
```
%% Cell type:code id: tags:
``` python
np.testing.assert_almost_equal(reference.velocity[:, :], result)
```
import numpy as np
import pytest
from lbmpy.creationfunctions import create_lb_ast
from lbmpy.scenarios import create_lid_driven_cavity
from pystencils.sympyextensions import count_operations_in_ast
from sympy.core.cache import clear_cache
def test_split_number_of_operations():
# For the following configurations the number of operations for splitted and un-splitted version are
# exactly equal. This is not true for D3Q15 and D3Q27 because some sub-expressions are computed in multiple
# splitted, inner loops.
for stencil in ['D2Q9', 'D3Q19']:
for compressible in (True, False):
for method in ('srt', 'trt'):
common_params = {'stencil': stencil,
'method': method,
'compressible': compressible,
'force_model': 'luo',
'force': (1e-6, 1e-5, 1e-7)
}
ast_with_splitting = create_lb_ast(optimization={'split': True}, **common_params)
ast_without_splitting = create_lb_ast(optimization={'split': False}, **common_params)
op_with_splitting = count_operations_in_ast(ast_with_splitting)
op_without_splitting = count_operations_in_ast(ast_without_splitting)
assert op_without_splitting['muls'] == op_with_splitting['muls']
assert op_without_splitting['adds'] == op_with_splitting['adds']
assert op_without_splitting['divs'] == op_with_splitting['divs']
@pytest.mark.parametrize('stencil', ['D2Q9', 'D3Q15', 'D3Q19', 'D3Q27'])
@pytest.mark.parametrize('compressible', [True, False])
@pytest.mark.parametrize('method', ['srt', 'mrt'])
@pytest.mark.parametrize('force', [(0, 0, 0), (1e-6, 1e-7, 2e-6)])
@pytest.mark.longrun
def test_equivalence(stencil, compressible, method, force):
relaxation_rates = [1.8, 1.7, 1.0, 1.0, 1.0, 1.0]
clear_cache()
common_params = {'domain_size': (10, 20) if stencil.startswith('D2') else (5, 10, 7),
'stencil': stencil,
'method': method,
'weighted': True,
'compressible': compressible,
'force': force,
'force_model': 'schiller',
'relaxation_rates': relaxation_rates}
print("Running Scenario", common_params)
with_split = create_lid_driven_cavity(optimization={'split': True}, **common_params)
without_split = create_lid_driven_cavity(optimization={'split': False}, **common_params)
with_split.run(100)
without_split.run(100)
np.testing.assert_almost_equal(with_split.velocity_slice(), without_split.velocity_slice())
def test_equivalence_short():
relaxation_rates = [1.8, 1.7, 1.0, 1.0, 1.0, 1.0]
for stencil, compressible, method, force in [('D2Q9', True, 'srt', 1e-7), ('D3Q19', False, 'mrt', 0)]:
dim = int(stencil[1])
common_params = {'domain_size': (20, 30) if stencil.startswith('D2') else (10, 13, 7),
'stencil': stencil,
'method': method,
'weighted': True,
'compressible': compressible,
'force': (force, 0, 0)[:dim],
'relaxation_rates': relaxation_rates}
print("Running Scenario", common_params)
with_split = create_lid_driven_cavity(optimization={'split': True}, **common_params)
without_split = create_lid_driven_cavity(optimization={'split': False}, **common_params)
with_split.run(100)
without_split.run(100)
np.testing.assert_almost_equal(with_split.velocity_slice(), without_split.velocity_slice())
import itertools
import warnings
import pytest
import sympy as sp
import lbmpy.stencils as s
import pystencils as ps
from lbmpy.stencils import get_stencil
def get_3d_stencils():
return s.get_stencil('D3Q15'), s.get_stencil('D3Q19'), s.get_stencil('D3Q27')
def get_all_stencils():
return [
s.get_stencil('D2Q9', 'walberla'),
s.get_stencil('D3Q15', 'walberla'),
s.get_stencil('D3Q19', 'walberla'),
s.get_stencil('D3Q27', 'walberla'),
s.get_stencil('D2Q9', 'counterclockwise'),
s.get_stencil('D2Q9', 'braunschweig'),
s.get_stencil('D3Q19', 'braunschweig'),
s.get_stencil('D3Q27', 'premnath'),
s.get_stencil("D3Q27", "fakhari"),
]
def test_sizes():
assert len(s.get_stencil('D2Q9')) == 9
assert len(s.get_stencil('D3Q15')) == 15
assert len(s.get_stencil('D3Q19')) == 19
assert len(s.get_stencil('D3Q27')) == 27
def test_dimensionality():
for d in s.get_stencil('D2Q9'):
assert len(d) == 2
for d in itertools.chain(*get_3d_stencils()):
assert len(d) == 3
def test_uniqueness():
for stencil in get_3d_stencils():
direction_set = set(stencil)
assert len(direction_set) == len(stencil)
def test_run_self_check():
for st in get_all_stencils():
assert ps.stencil.is_valid(st, max_neighborhood=1)
assert ps.stencil.is_symmetric(st)
def test_inverse_direction():
assert ps.stencil.inverse_direction((1, 0, -1)), (-1, 0 == 1)
def test_free_functions():
assert not ps.stencil.is_symmetric([(1, 0), (0, 1)])
assert not ps.stencil.is_valid([(1, 0), (1, 1, 0)])
assert not ps.stencil.is_valid([(2, 0), (0, 1)], max_neighborhood=1)
with pytest.raises(ValueError) as e:
get_stencil("name_that_does_not_exist")
assert "No such stencil" in str(e.value)
def test_visualize():
import matplotlib.pyplot as plt
plt.clf()
plt.cla()
d2q9, d3q19 = get_stencil("D2Q9"), get_stencil("D3Q19")
figure = plt.gcf()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ps.stencil.plot(d2q9, figure=figure, data=[str(i) for i in range(9)])
ps.stencil.plot(d3q19, figure=figure, data=sp.symbols("a_:19"))
Source diff could not be displayed: it is too large. Options to address this: view the blob.