Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 2181 additions and 406 deletions
from .creationfunctions import (
create_lb_ast,
create_lb_collision_rule,
create_lb_function,
create_lb_method,
create_lb_update_rule,
LBMConfig,
LBMOptimisation,
)
from .enums import Stencil, Method, ForceModel, CollisionSpace, SubgridScaleModel
from .lbstep import LatticeBoltzmannStep
from .macroscopic_value_kernels import (
pdf_initialization_assignments,
macroscopic_values_getter,
strain_rate_tensor_getter,
compile_macroscopic_values_getter,
compile_macroscopic_values_setter,
create_advanced_velocity_setter_collision_rule,
)
from .maxwellian_equilibrium import get_weights
from .relaxationrates import (
relaxation_rate_from_lattice_viscosity,
lattice_viscosity_from_relaxation_rate,
relaxation_rate_from_magic_number,
)
from .scenarios import create_lid_driven_cavity, create_fully_periodic_flow
from .stencils import LBStencil
__all__ = [
"create_lb_ast",
"create_lb_collision_rule",
"create_lb_function",
"create_lb_method",
"create_lb_update_rule",
"LBMConfig",
"LBMOptimisation",
"Stencil",
"Method",
"ForceModel",
"CollisionSpace",
"SubgridScaleModel",
"LatticeBoltzmannStep",
"pdf_initialization_assignments",
"macroscopic_values_getter",
"strain_rate_tensor_getter",
"compile_macroscopic_values_getter",
"compile_macroscopic_values_setter",
"create_advanced_velocity_setter_collision_rule",
"get_weights",
"relaxation_rate_from_lattice_viscosity",
"lattice_viscosity_from_relaxation_rate",
"relaxation_rate_from_magic_number",
"create_lid_driven_cavity",
"create_fully_periodic_flow",
"LBStencil",
]
from . import _version
__version__ = _version.get_versions()['version']
......@@ -5,8 +5,9 @@
# directories (produced by setup.py build) will contain a much shorter file
# that just contains the computed version number.
# This file is released into the public domain. Generated by
# versioneer-0.19 (https://github.com/python-versioneer/python-versioneer)
# This file is released into the public domain.
# Generated by versioneer-0.29
# https://github.com/python-versioneer/python-versioneer
"""Git implementation of _version.py."""
......@@ -15,9 +16,11 @@ import os
import re
import subprocess
import sys
from typing import Any, Callable, Dict, List, Optional, Tuple
import functools
def get_keywords():
def get_keywords() -> Dict[str, str]:
"""Get the keywords needed to look up the version information."""
# these strings will be replaced by git during git-archive.
# setup.py/versioneer.py will grep for the variable names, so they must
......@@ -33,8 +36,15 @@ def get_keywords():
class VersioneerConfig:
"""Container for Versioneer configuration parameters."""
VCS: str
style: str
tag_prefix: str
parentdir_prefix: str
versionfile_source: str
verbose: bool
def get_config():
def get_config() -> VersioneerConfig:
"""Create, populate and return the VersioneerConfig() object."""
# these strings are filled in when 'setup.py versioneer' creates
# _version.py
......@@ -43,7 +53,7 @@ def get_config():
cfg.style = "pep440"
cfg.tag_prefix = "release/"
cfg.parentdir_prefix = "lbmpy-"
cfg.versionfile_source = "lbmpy/_version.py"
cfg.versionfile_source = "src/lbmpy/_version.py"
cfg.verbose = False
return cfg
......@@ -52,13 +62,13 @@ class NotThisMethod(Exception):
"""Exception raised if a method is not valid for the current scenario."""
LONG_VERSION_PY = {}
HANDLERS = {}
LONG_VERSION_PY: Dict[str, str] = {}
HANDLERS: Dict[str, Dict[str, Callable]] = {}
def register_vcs_handler(vcs, method): # decorator
def register_vcs_handler(vcs: str, method: str) -> Callable: # decorator
"""Create decorator to mark a method as the handler of a VCS."""
def decorate(f):
def decorate(f: Callable) -> Callable:
"""Store f in HANDLERS[vcs][method]."""
if vcs not in HANDLERS:
HANDLERS[vcs] = {}
......@@ -67,22 +77,35 @@ def register_vcs_handler(vcs, method): # decorator
return decorate
def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False,
env=None):
def run_command(
commands: List[str],
args: List[str],
cwd: Optional[str] = None,
verbose: bool = False,
hide_stderr: bool = False,
env: Optional[Dict[str, str]] = None,
) -> Tuple[Optional[str], Optional[int]]:
"""Call the given command(s)."""
assert isinstance(commands, list)
p = None
for c in commands:
process = None
popen_kwargs: Dict[str, Any] = {}
if sys.platform == "win32":
# This hides the console window if pythonw.exe is used
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
popen_kwargs["startupinfo"] = startupinfo
for command in commands:
try:
dispcmd = str([c] + args)
dispcmd = str([command] + args)
# remember shell=False, so use git.cmd on windows, not just git
p = subprocess.Popen([c] + args, cwd=cwd, env=env,
stdout=subprocess.PIPE,
stderr=(subprocess.PIPE if hide_stderr
else None))
process = subprocess.Popen([command] + args, cwd=cwd, env=env,
stdout=subprocess.PIPE,
stderr=(subprocess.PIPE if hide_stderr
else None), **popen_kwargs)
break
except EnvironmentError:
e = sys.exc_info()[1]
except OSError as e:
if e.errno == errno.ENOENT:
continue
if verbose:
......@@ -93,16 +116,20 @@ def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False,
if verbose:
print("unable to find command, tried %s" % (commands,))
return None, None
stdout = p.communicate()[0].strip().decode()
if p.returncode != 0:
stdout = process.communicate()[0].strip().decode()
if process.returncode != 0:
if verbose:
print("unable to run %s (error)" % dispcmd)
print("stdout was %s" % stdout)
return None, p.returncode
return stdout, p.returncode
return None, process.returncode
return stdout, process.returncode
def versions_from_parentdir(parentdir_prefix, root, verbose):
def versions_from_parentdir(
parentdir_prefix: str,
root: str,
verbose: bool,
) -> Dict[str, Any]:
"""Try to determine the version from the parent directory name.
Source tarballs conventionally unpack into a directory that includes both
......@@ -111,15 +138,14 @@ def versions_from_parentdir(parentdir_prefix, root, verbose):
"""
rootdirs = []
for i in range(3):
for _ in range(3):
dirname = os.path.basename(root)
if dirname.startswith(parentdir_prefix):
return {"version": dirname[len(parentdir_prefix):],
"full-revisionid": None,
"dirty": False, "error": None, "date": None}
else:
rootdirs.append(root)
root = os.path.dirname(root) # up a level
rootdirs.append(root)
root = os.path.dirname(root) # up a level
if verbose:
print("Tried directories %s but none started with prefix %s" %
......@@ -128,39 +154,42 @@ def versions_from_parentdir(parentdir_prefix, root, verbose):
@register_vcs_handler("git", "get_keywords")
def git_get_keywords(versionfile_abs):
def git_get_keywords(versionfile_abs: str) -> Dict[str, str]:
"""Extract version information from the given file."""
# the code embedded in _version.py can just fetch the value of these
# keywords. When used from setup.py, we don't want to import _version.py,
# so we do it with a regexp instead. This function is not used from
# _version.py.
keywords = {}
keywords: Dict[str, str] = {}
try:
f = open(versionfile_abs, "r")
for line in f.readlines():
if line.strip().startswith("git_refnames ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["refnames"] = mo.group(1)
if line.strip().startswith("git_full ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["full"] = mo.group(1)
if line.strip().startswith("git_date ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["date"] = mo.group(1)
f.close()
except EnvironmentError:
with open(versionfile_abs, "r") as fobj:
for line in fobj:
if line.strip().startswith("git_refnames ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["refnames"] = mo.group(1)
if line.strip().startswith("git_full ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["full"] = mo.group(1)
if line.strip().startswith("git_date ="):
mo = re.search(r'=\s*"(.*)"', line)
if mo:
keywords["date"] = mo.group(1)
except OSError:
pass
return keywords
@register_vcs_handler("git", "keywords")
def git_versions_from_keywords(keywords, tag_prefix, verbose):
def git_versions_from_keywords(
keywords: Dict[str, str],
tag_prefix: str,
verbose: bool,
) -> Dict[str, Any]:
"""Get version information from git keywords."""
if not keywords:
raise NotThisMethod("no keywords at all, weird")
if "refnames" not in keywords:
raise NotThisMethod("Short version file found")
date = keywords.get("date")
if date is not None:
# Use only the last line. Previous lines may contain GPG signature
......@@ -179,11 +208,11 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose):
if verbose:
print("keywords are unexpanded, not using")
raise NotThisMethod("unexpanded keywords, not a git-archive tarball")
refs = set([r.strip() for r in refnames.strip("()").split(",")])
refs = {r.strip() for r in refnames.strip("()").split(",")}
# starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of
# just "foo-1.0". If we see a "tag: " prefix, prefer those.
TAG = "tag: "
tags = set([r[len(TAG):] for r in refs if r.startswith(TAG)])
tags = {r[len(TAG):] for r in refs if r.startswith(TAG)}
if not tags:
# Either we're using git < 1.8.3, or there really are no tags. We use
# a heuristic: assume all version tags have a digit. The old git %d
......@@ -192,7 +221,7 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose):
# between branches and tags. By ignoring refnames without digits, we
# filter out many common branch names like "release" and
# "stabilization", as well as "HEAD" and "master".
tags = set([r for r in refs if re.search(r'\d', r)])
tags = {r for r in refs if re.search(r'\d', r)}
if verbose:
print("discarding '%s', no digits" % ",".join(refs - tags))
if verbose:
......@@ -201,6 +230,11 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose):
# sorting will prefer e.g. "2.0" over "2.0rc1"
if ref.startswith(tag_prefix):
r = ref[len(tag_prefix):]
# Filter out refs that exactly match prefix or that don't start
# with a number once the prefix is stripped (mostly a concern
# when prefix is '')
if not re.match(r'\d', r):
continue
if verbose:
print("picking %s" % r)
return {"version": r,
......@@ -216,7 +250,12 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose):
@register_vcs_handler("git", "pieces_from_vcs")
def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
def git_pieces_from_vcs(
tag_prefix: str,
root: str,
verbose: bool,
runner: Callable = run_command
) -> Dict[str, Any]:
"""Get version from 'git describe' in the root of the source tree.
This only gets called if the git-archive 'subst' keywords were *not*
......@@ -227,8 +266,15 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
if sys.platform == "win32":
GITS = ["git.cmd", "git.exe"]
out, rc = run_command(GITS, ["rev-parse", "--git-dir"], cwd=root,
hide_stderr=True)
# GIT_DIR can interfere with correct operation of Versioneer.
# It may be intended to be passed to the Versioneer-versioned project,
# but that should not change where we get our version from.
env = os.environ.copy()
env.pop("GIT_DIR", None)
runner = functools.partial(runner, env=env)
_, rc = runner(GITS, ["rev-parse", "--git-dir"], cwd=root,
hide_stderr=not verbose)
if rc != 0:
if verbose:
print("Directory %s not under git control" % root)
......@@ -236,24 +282,57 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
# if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty]
# if there isn't one, this yields HEX[-dirty] (no NUM)
describe_out, rc = run_command(GITS, ["describe", "--tags", "--dirty",
"--always", "--long",
"--match", "%s*" % tag_prefix],
cwd=root)
describe_out, rc = runner(GITS, [
"describe", "--tags", "--dirty", "--always", "--long",
"--match", f"{tag_prefix}[[:digit:]]*"
], cwd=root)
# --long was added in git-1.5.5
if describe_out is None:
raise NotThisMethod("'git describe' failed")
describe_out = describe_out.strip()
full_out, rc = run_command(GITS, ["rev-parse", "HEAD"], cwd=root)
full_out, rc = runner(GITS, ["rev-parse", "HEAD"], cwd=root)
if full_out is None:
raise NotThisMethod("'git rev-parse' failed")
full_out = full_out.strip()
pieces = {}
pieces: Dict[str, Any] = {}
pieces["long"] = full_out
pieces["short"] = full_out[:7] # maybe improved later
pieces["error"] = None
branch_name, rc = runner(GITS, ["rev-parse", "--abbrev-ref", "HEAD"],
cwd=root)
# --abbrev-ref was added in git-1.6.3
if rc != 0 or branch_name is None:
raise NotThisMethod("'git rev-parse --abbrev-ref' returned error")
branch_name = branch_name.strip()
if branch_name == "HEAD":
# If we aren't exactly on a branch, pick a branch which represents
# the current commit. If all else fails, we are on a branchless
# commit.
branches, rc = runner(GITS, ["branch", "--contains"], cwd=root)
# --contains was added in git-1.5.4
if rc != 0 or branches is None:
raise NotThisMethod("'git branch --contains' returned error")
branches = branches.split("\n")
# Remove the first line if we're running detached
if "(" in branches[0]:
branches.pop(0)
# Strip off the leading "* " from the list of branches.
branches = [branch[2:] for branch in branches]
if "master" in branches:
branch_name = "master"
elif not branches:
branch_name = None
else:
# Pick the first branch that is returned. Good or bad.
branch_name = branches[0]
pieces["branch"] = branch_name
# parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty]
# TAG might have hyphens.
git_describe = describe_out
......@@ -270,7 +349,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
# TAG-NUM-gHEX
mo = re.search(r'^(.+)-(\d+)-g([0-9a-f]+)$', git_describe)
if not mo:
# unparseable. Maybe git-describe is misbehaving?
# unparsable. Maybe git-describe is misbehaving?
pieces["error"] = ("unable to parse git-describe output: '%s'"
% describe_out)
return pieces
......@@ -295,13 +374,11 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
else:
# HEX: no tags
pieces["closest-tag"] = None
count_out, rc = run_command(GITS, ["rev-list", "HEAD", "--count"],
cwd=root)
pieces["distance"] = int(count_out) # total number of commits
out, rc = runner(GITS, ["rev-list", "HEAD", "--left-right"], cwd=root)
pieces["distance"] = len(out.split()) # total number of commits
# commit date: see ISO-8601 comment in git_versions_from_keywords()
date = run_command(GITS, ["show", "-s", "--format=%ci", "HEAD"],
cwd=root)[0].strip()
date = runner(GITS, ["show", "-s", "--format=%ci", "HEAD"], cwd=root)[0].strip()
# Use only the last line. Previous lines may contain GPG signature
# information.
date = date.splitlines()[-1]
......@@ -310,14 +387,14 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
return pieces
def plus_or_dot(pieces):
def plus_or_dot(pieces: Dict[str, Any]) -> str:
"""Return a + if we don't already have one, else return a ."""
if "+" in pieces.get("closest-tag", ""):
return "."
return "+"
def render_pep440(pieces):
def render_pep440(pieces: Dict[str, Any]) -> str:
"""Build up version string, with post-release "local version identifier".
Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you
......@@ -342,23 +419,71 @@ def render_pep440(pieces):
return rendered
def render_pep440_pre(pieces):
"""TAG[.post0.devDISTANCE] -- No -dirty.
def render_pep440_branch(pieces: Dict[str, Any]) -> str:
"""TAG[[.dev0]+DISTANCE.gHEX[.dirty]] .
The ".dev0" means not master branch. Note that .dev0 sorts backwards
(a feature branch will appear "older" than the master branch).
Exceptions:
1: no tags. 0.post0.devDISTANCE
1: no tags. 0[.dev0]+untagged.DISTANCE.gHEX[.dirty]
"""
if pieces["closest-tag"]:
rendered = pieces["closest-tag"]
if pieces["distance"] or pieces["dirty"]:
if pieces["branch"] != "master":
rendered += ".dev0"
rendered += plus_or_dot(pieces)
rendered += "%d.g%s" % (pieces["distance"], pieces["short"])
if pieces["dirty"]:
rendered += ".dirty"
else:
# exception #1
rendered = "0"
if pieces["branch"] != "master":
rendered += ".dev0"
rendered += "+untagged.%d.g%s" % (pieces["distance"],
pieces["short"])
if pieces["dirty"]:
rendered += ".dirty"
return rendered
def pep440_split_post(ver: str) -> Tuple[str, Optional[int]]:
"""Split pep440 version string at the post-release segment.
Returns the release segments before the post-release and the
post-release version number (or -1 if no post-release segment is present).
"""
vc = str.split(ver, ".post")
return vc[0], int(vc[1] or 0) if len(vc) == 2 else None
def render_pep440_pre(pieces: Dict[str, Any]) -> str:
"""TAG[.postN.devDISTANCE] -- No -dirty.
Exceptions:
1: no tags. 0.post0.devDISTANCE
"""
if pieces["closest-tag"]:
if pieces["distance"]:
rendered += ".post0.dev%d" % pieces["distance"]
# update the post release segment
tag_version, post_version = pep440_split_post(pieces["closest-tag"])
rendered = tag_version
if post_version is not None:
rendered += ".post%d.dev%d" % (post_version + 1, pieces["distance"])
else:
rendered += ".post0.dev%d" % (pieces["distance"])
else:
# no commits, use the tag as the version
rendered = pieces["closest-tag"]
else:
# exception #1
rendered = "0.post0.dev%d" % pieces["distance"]
return rendered
def render_pep440_post(pieces):
def render_pep440_post(pieces: Dict[str, Any]) -> str:
"""TAG[.postDISTANCE[.dev0]+gHEX] .
The ".dev0" means dirty. Note that .dev0 sorts backwards
......@@ -385,7 +510,36 @@ def render_pep440_post(pieces):
return rendered
def render_pep440_old(pieces):
def render_pep440_post_branch(pieces: Dict[str, Any]) -> str:
"""TAG[.postDISTANCE[.dev0]+gHEX[.dirty]] .
The ".dev0" means not master branch.
Exceptions:
1: no tags. 0.postDISTANCE[.dev0]+gHEX[.dirty]
"""
if pieces["closest-tag"]:
rendered = pieces["closest-tag"]
if pieces["distance"] or pieces["dirty"]:
rendered += ".post%d" % pieces["distance"]
if pieces["branch"] != "master":
rendered += ".dev0"
rendered += plus_or_dot(pieces)
rendered += "g%s" % pieces["short"]
if pieces["dirty"]:
rendered += ".dirty"
else:
# exception #1
rendered = "0.post%d" % pieces["distance"]
if pieces["branch"] != "master":
rendered += ".dev0"
rendered += "+g%s" % pieces["short"]
if pieces["dirty"]:
rendered += ".dirty"
return rendered
def render_pep440_old(pieces: Dict[str, Any]) -> str:
"""TAG[.postDISTANCE[.dev0]] .
The ".dev0" means dirty.
......@@ -407,7 +561,7 @@ def render_pep440_old(pieces):
return rendered
def render_git_describe(pieces):
def render_git_describe(pieces: Dict[str, Any]) -> str:
"""TAG[-DISTANCE-gHEX][-dirty].
Like 'git describe --tags --dirty --always'.
......@@ -427,7 +581,7 @@ def render_git_describe(pieces):
return rendered
def render_git_describe_long(pieces):
def render_git_describe_long(pieces: Dict[str, Any]) -> str:
"""TAG-DISTANCE-gHEX[-dirty].
Like 'git describe --tags --dirty --always -long'.
......@@ -447,7 +601,7 @@ def render_git_describe_long(pieces):
return rendered
def render(pieces, style):
def render(pieces: Dict[str, Any], style: str) -> Dict[str, Any]:
"""Render the given version pieces into the requested style."""
if pieces["error"]:
return {"version": "unknown",
......@@ -461,10 +615,14 @@ def render(pieces, style):
if style == "pep440":
rendered = render_pep440(pieces)
elif style == "pep440-branch":
rendered = render_pep440_branch(pieces)
elif style == "pep440-pre":
rendered = render_pep440_pre(pieces)
elif style == "pep440-post":
rendered = render_pep440_post(pieces)
elif style == "pep440-post-branch":
rendered = render_pep440_post_branch(pieces)
elif style == "pep440-old":
rendered = render_pep440_old(pieces)
elif style == "git-describe":
......@@ -479,7 +637,7 @@ def render(pieces, style):
"date": pieces.get("date")}
def get_versions():
def get_versions() -> Dict[str, Any]:
"""Get version information or return default if unable to do so."""
# I am in _version.py, which lives at ROOT/VERSIONFILE_SOURCE. If we have
# __file__, we can work backwards from there to the root. Some
......@@ -500,7 +658,7 @@ def get_versions():
# versionfile_source is the relative path from the top of the source
# tree (where the .git directory might live) to this file. Invert
# this to find the root from __file__.
for i in cfg.versionfile_source.split('/'):
for _ in cfg.versionfile_source.split('/'):
root = os.path.dirname(root)
except NameError:
return {"version": "0+unknown", "full-revisionid": None,
......
from .indexing import BetweenTimestepsIndexing, NeighbourOffsetArrays
from .indexing import BetweenTimestepsIndexing
from .communication import get_communication_slices, LBMPeriodicityHandling
from .utility import Timestep, get_accessor, is_inplace, get_timesteps, \
numeric_index, numeric_offsets, inverse_dir_index, AccessPdfValues
__all__ = ['BetweenTimestepsIndexing', 'NeighbourOffsetArrays',
__all__ = ['BetweenTimestepsIndexing',
'get_communication_slices', 'LBMPeriodicityHandling',
'Timestep', 'get_accessor', 'is_inplace', 'get_timesteps',
'numeric_index', 'numeric_offsets', 'inverse_dir_index', 'AccessPdfValues']
import itertools
from pystencils import Field, Assignment
from pystencils.slicing import shift_slice, get_slice_before_ghost_layer, normalize_slice
from lbmpy.advanced_streaming.utility import is_inplace, get_accessor, numeric_index, \
numeric_offsets, Timestep, get_timesteps
from pystencils import CreateKernelConfig, Field, Assignment, AssignmentCollection
from pystencils.slicing import (
shift_slice,
get_slice_before_ghost_layer,
normalize_slice,
)
from lbmpy.advanced_streaming.utility import (
is_inplace,
get_accessor,
numeric_index,
Timestep,
get_timesteps,
numeric_offsets,
)
from pystencils.datahandling import SerialDataHandling
from pystencils.enums import Target
from itertools import chain
def _trim_slice_in_direction(slices, direction):
assert len(slices) == len(direction)
class LBMPeriodicityHandling:
result = []
for s, d in zip(slices, direction):
if isinstance(s, int):
result.append(s)
continue
start = s.start + 1 if d == -1 else s.start
stop = s.stop - 1 if d == 1 else s.stop
result.append(slice(start, stop, s.step))
def __init__(
self,
stencil,
data_handling,
pdf_field_name,
streaming_pattern="pull",
ghost_layers=1,
cupy_direct_copy=True,
):
"""
Periodicity Handling for Lattice Boltzmann Streaming.
**On the usage with cuda:**
- cupy allows the copying of sliced arrays within device memory using the numpy syntax,
e.g. `dst[:,0] = src[:,-1]`. In this implementation, this is the default for periodicity
handling. Alternatively, if you set `cupy_direct_copy=False`, GPU kernels are generated and
compiled. The compiled kernels are almost twice as fast in execution as cupy array copying,
but especially for large stencils like D3Q27, their compilation can take up to 20 seconds.
Choose your weapon depending on your use case.
"""
if not isinstance(data_handling, SerialDataHandling):
raise ValueError("Only serial data handling is supported!")
return tuple(result)
self.stencil = stencil
self.dim = stencil.D
self.dh = data_handling
assert data_handling.default_target in [Target.CPU, Target.GPU]
self.target = data_handling.default_target
def _extend_dir(direction):
if len(direction) == 0:
yield tuple()
elif direction[0] == 0:
for d in [-1, 0, 1]:
for rest in _extend_dir(direction[1:]):
yield (d, ) + rest
else:
for rest in _extend_dir(direction[1:]):
yield (direction[0], ) + rest
self.pdf_field_name = pdf_field_name
self.ghost_layers = ghost_layers
self.periodicity = data_handling.periodicity
self.inplace_pattern = is_inplace(streaming_pattern)
self.cpu = self.target == Target.CPU
self.cupy_direct_copy = self.target == Target.GPU and cupy_direct_copy
def _get_neighbour_transform(direction, ghost_layers):
return tuple(d * (ghost_layers + 1) for d in direction)
def is_copy_direction(direction):
s = 0
for d, p in zip(direction, self.periodicity):
s += abs(d)
if d != 0 and not p:
return False
return s != 0
def _fix_length_one_slices(slices):
"""Slices of length one are replaced by their start value for correct periodic shifting"""
if isinstance(slices, int):
return slices
elif isinstance(slices, slice):
if slices.stop is not None and abs(slices.start - slices.stop) == 1:
return slices.start
elif slices.stop is None and slices.start == -1:
return -1 # [-1:] also has length one
full_stencil = itertools.product(*([-1, 0, 1] for _ in range(self.dim)))
copy_directions = tuple(filter(is_copy_direction, full_stencil))
self.comm_slices = []
timesteps = get_timesteps(streaming_pattern)
for timestep in timesteps:
slices_per_comm_dir = get_communication_slices(
stencil=stencil,
comm_stencil=copy_directions,
streaming_pattern=streaming_pattern,
prev_timestep=timestep,
ghost_layers=ghost_layers,
)
self.comm_slices.append(
list(chain.from_iterable(v for k, v in slices_per_comm_dir.items()))
)
if self.target == Target.GPU and not cupy_direct_copy:
self.device_copy_kernels = list()
for timestep in timesteps:
self.device_copy_kernels.append(self._compile_copy_kernels(timestep))
def __call__(self, prev_timestep=Timestep.BOTH):
if self.cpu:
self._periodicity_handling_cpu(prev_timestep)
else:
return slices
else:
return tuple(_fix_length_one_slices(s) for s in slices)
self._periodicity_handling_gpu(prev_timestep)
def _periodicity_handling_cpu(self, prev_timestep):
arr = self.dh.cpu_arrays[self.pdf_field_name]
comm_slices = self.comm_slices[prev_timestep.idx]
for src, dst in comm_slices:
arr[dst] = arr[src]
def _compile_copy_kernels(self, timestep):
assert self.target == Target.GPU
pdf_field = self.dh.fields[self.pdf_field_name]
kernels = []
for src, dst in self.comm_slices[timestep.idx]:
kernels.append(periodic_pdf_gpu_copy_kernel(pdf_field, src, dst))
return kernels
def _periodicity_handling_gpu(self, prev_timestep):
arr = self.dh.gpu_arrays[self.pdf_field_name]
if self.cupy_direct_copy:
for src, dst in self.comm_slices[prev_timestep.idx]:
arr[dst] = arr[src]
else:
kernel_args = {self.pdf_field_name: arr}
for kernel in self.device_copy_kernels[prev_timestep.idx]:
kernel(**kernel_args)
def get_communication_slices(
stencil, comm_stencil=None, streaming_pattern='pull', prev_timestep=Timestep.BOTH, ghost_layers=1):
stencil,
comm_stencil=None,
streaming_pattern="pull",
prev_timestep=Timestep.BOTH,
ghost_layers=1,
):
"""
Return the source and destination slices for periodicity handling or communication between blocks.
:param stencil: The stencil used by the LB method.
:param comm_stencil: The stencil defining the communication directions. If None, it will be set to the
:param comm_stencil: The stencil defining the communication directions. If None, it will be set to the
full stencil (D2Q9 in 2D, D3Q27 in 3D, etc.).
:param streaming_pattern: The streaming pattern.
:param prev_timestep: Timestep after which communication is run.
......@@ -71,7 +141,9 @@ def get_communication_slices(
if comm_stencil is None:
comm_stencil = itertools.product(*([-1, 0, 1] for _ in range(stencil.D)))
pdfs = Field.create_generic('pdfs', spatial_dimensions=len(stencil[0]), index_shape=(stencil.Q,))
pdfs = Field.create_generic(
"pdfs", spatial_dimensions=len(stencil[0]), index_shape=(stencil.Q,)
)
write_accesses = get_accessor(streaming_pattern, prev_timestep).write(pdfs, stencil)
slices_per_comm_direction = dict()
......@@ -83,19 +155,27 @@ def get_communication_slices(
for streaming_dir in set(_extend_dir(comm_dir)) & set(stencil):
d = stencil.index(streaming_dir)
write_offsets = numeric_offsets(write_accesses[d])
write_index = numeric_index(write_accesses[d])[0]
origin_slice = get_slice_before_ghost_layer(
comm_dir, ghost_layers=ghost_layers, thickness=1
)
src_slice = _fix_length_one_slices(origin_slice)
write_offsets = numeric_offsets(write_accesses[d])
tangential_dir = tuple(s - c for s, c in zip(streaming_dir, comm_dir))
origin_slice = get_slice_before_ghost_layer(comm_dir, ghost_layers=ghost_layers, thickness=1)
origin_slice = _fix_length_one_slices(origin_slice)
src_slice = shift_slice(_trim_slice_in_direction(origin_slice, tangential_dir), write_offsets)
# TODO: this is just a hotfix. _trim_slice_in_direction breaks FreeSlip BC with adjacent periodic side
if streaming_pattern != "pull":
src_slice = shift_slice(
_trim_slice_in_direction(src_slice, tangential_dir), write_offsets
)
neighbour_transform = _get_neighbour_transform(comm_dir, ghost_layers)
dst_slice = shift_slice(src_slice, neighbour_transform)
src_slice = src_slice + (write_index, )
dst_slice = dst_slice + (write_index, )
src_slice = src_slice + (write_index,)
dst_slice = dst_slice + (write_index,)
slices_for_dir.append((src_slice, dst_slice))
......@@ -103,10 +183,10 @@ def get_communication_slices(
return slices_per_comm_direction
def periodic_pdf_copy_kernel(pdf_field, src_slice, dst_slice,
domain_size=None, target=Target.GPU):
"""Copies a rectangular array slice onto another non-overlapping array slice"""
from pystencils.gpucuda.kernelcreation import create_cuda_kernel
def periodic_pdf_gpu_copy_kernel(pdf_field, src_slice, dst_slice, domain_size=None):
"""Generate a GPU kernel which copies all values from one slice of a field
to another non-overlapping slice."""
from pystencils import create_kernel
pdf_idx = src_slice[-1]
assert isinstance(pdf_idx, int), "PDF index needs to be an integer constant"
......@@ -114,6 +194,7 @@ def periodic_pdf_copy_kernel(pdf_field, src_slice, dst_slice,
src_slice = src_slice[:-1]
dst_slice = dst_slice[:-1]
# TODO this is the domain_size with GL
if domain_size is None:
domain_size = pdf_field.spatial_shape
......@@ -126,105 +207,71 @@ def periodic_pdf_copy_kernel(pdf_field, src_slice, dst_slice,
def _stop(s):
return s.stop if isinstance(s, slice) else s
offset = [_start(s1) - _start(s2) for s1, s2 in zip(normalized_from_slice, normalized_to_slice)]
assert offset == [_stop(s1) - _stop(s2) for s1, s2 in zip(normalized_from_slice, normalized_to_slice)], \
"Slices have to have same size"
offset = [
_start(s1) - _start(s2)
for s1, s2 in zip(normalized_from_slice, normalized_to_slice)
]
assert offset == [
_stop(s1) - _stop(s2)
for s1, s2 in zip(normalized_from_slice, normalized_to_slice)
], "Slices have to have same size"
copy_eq = AssignmentCollection(
main_assignments=[
Assignment(pdf_field(pdf_idx), pdf_field[tuple(offset)](pdf_idx))
]
)
config = CreateKernelConfig(
iteration_slice=dst_slice,
skip_independence_check=True,
target=Target.GPU,
)
ast = create_kernel(copy_eq, config=config)
return ast.compile()
copy_eq = Assignment(pdf_field(pdf_idx), pdf_field[tuple(offset)](pdf_idx))
ast = create_cuda_kernel([copy_eq], iteration_slice=dst_slice, skip_independence_check=True)
if target == Target.GPU:
from pystencils.gpucuda import make_python_function
return make_python_function(ast)
else:
raise ValueError('Invalid target:', target)
class LBMPeriodicityHandling:
def __init__(self, stencil, data_handling, pdf_field_name,
streaming_pattern='pull', ghost_layers=1,
pycuda_direct_copy=True):
"""
Periodicity Handling for Lattice Boltzmann Streaming.
**On the usage with cuda:**
- pycuda allows the copying of sliced arrays within device memory using the numpy syntax,
e.g. `dst[:,0] = src[:,-1]`. In this implementation, this is the default for periodicity
handling. Alternatively, if you set `pycuda_direct_copy=False`, GPU kernels are generated and
compiled. The compiled kernels are almost twice as fast in execution as pycuda array copying,
but especially for large stencils like D3Q27, their compilation can take up to 20 seconds.
Choose your weapon depending on your use case.
"""
if not isinstance(data_handling, SerialDataHandling):
raise ValueError('Only serial data handling is supported!')
self.stencil = stencil
self.dim = stencil.D
self.dh = data_handling
target = data_handling.default_target
assert target in [Target.CPU, Target.GPU]
self.pdf_field_name = pdf_field_name
self.ghost_layers = ghost_layers
periodicity = data_handling.periodicity
self.inplace_pattern = is_inplace(streaming_pattern)
self.target = target
self.cpu = target == Target.CPU
self.pycuda_direct_copy = target == Target.GPU and pycuda_direct_copy
def _extend_dir(direction):
if len(direction) == 0:
yield tuple()
elif direction[0] == 0:
for d in [-1, 0, 1]:
for rest in _extend_dir(direction[1:]):
yield (d,) + rest
else:
for rest in _extend_dir(direction[1:]):
yield (direction[0],) + rest
def is_copy_direction(direction):
s = 0
for d, p in zip(direction, periodicity):
s += abs(d)
if d != 0 and not p:
return False
return s != 0
def _get_neighbour_transform(direction, ghost_layers):
return tuple(d * (ghost_layers + 1) for d in direction)
full_stencil = itertools.product(*([-1, 0, 1] for _ in range(self.dim)))
copy_directions = tuple(filter(is_copy_direction, full_stencil))
self.comm_slices = []
timesteps = get_timesteps(streaming_pattern)
for timestep in timesteps:
slices_per_comm_dir = get_communication_slices(stencil=stencil,
comm_stencil=copy_directions,
streaming_pattern=streaming_pattern,
prev_timestep=timestep,
ghost_layers=ghost_layers)
self.comm_slices.append(list(chain.from_iterable(v for k, v in slices_per_comm_dir.items())))
if target == Target.GPU and not pycuda_direct_copy:
self.device_copy_kernels = []
for timestep in timesteps:
self.device_copy_kernels.append(self._compile_copy_kernels(timestep))
def __call__(self, prev_timestep=Timestep.BOTH):
if self.cpu:
self._periodicity_handling_cpu(prev_timestep)
def _fix_length_one_slices(slices):
"""Slices of length one are replaced by their start value for correct periodic shifting"""
if isinstance(slices, int):
return slices
elif isinstance(slices, slice):
if slices.stop is not None and abs(slices.start - slices.stop) == 1:
return slices.start
elif slices.stop is None and slices.start == -1:
return -1 # [-1:] also has length one
else:
self._periodicity_handling_gpu(prev_timestep)
return slices
else:
return tuple(_fix_length_one_slices(s) for s in slices)
def _periodicity_handling_cpu(self, prev_timestep):
arr = self.dh.cpu_arrays[self.pdf_field_name]
comm_slices = self.comm_slices[prev_timestep.idx]
for src, dst in comm_slices:
arr[dst] = arr[src]
def _compile_copy_kernels(self, timestep):
pdf_field = self.dh.fields[self.pdf_field_name]
kernels = []
for src, dst in self.comm_slices[timestep.idx]:
kernels.append(
periodic_pdf_copy_kernel(pdf_field, src, dst, target=self.target))
return kernels
def _trim_slice_in_direction(slices, direction):
assert len(slices) == len(direction)
def _periodicity_handling_gpu(self, prev_timestep):
arr = self.dh.gpu_arrays[self.pdf_field_name]
if self.pycuda_direct_copy:
for src, dst in self.comm_slices[prev_timestep.idx]:
arr[dst] = arr[src]
else:
kernel_args = {self.pdf_field_name: arr}
for kernel in self.device_copy_kernels[prev_timestep.idx]:
kernel(**kernel_args)
result = []
for s, d in zip(slices, direction):
if isinstance(s, int):
result.append(s)
continue
start = s.start + 1 if d == -1 else s.start
stop = s.stop - 1 if d == 1 else s.stop
result.append(slice(start, stop, s.step))
return tuple(result)
......@@ -3,17 +3,12 @@ import sympy as sp
import pystencils as ps
from pystencils.typing import TypedSymbol, create_type
from pystencils.backends.cbackend import CustomCodeNode
from lbmpy.advanced_streaming.utility import get_accessor, inverse_dir_index, is_inplace, Timestep
from lbmpy.custom_code_nodes import TranslationArraysNode
from itertools import product
def _array_pattern(dtype, name, content):
return f"const {str(dtype)} {name} [] = {{ {','.join(str(c) for c in content)} }}; \n"
class BetweenTimestepsIndexing:
# ==============================================
......@@ -30,7 +25,7 @@ class BetweenTimestepsIndexing:
@property
def inverse_dir_symbol(self):
"""Symbol denoting the inversion of a PDF field index.
"""Symbol denoting the inversion of a PDF field index.
Use only at top-level of index to f_out or f_in, otherwise it can't be correctly replaced."""
return sp.IndexedBase('invdir')
......@@ -168,90 +163,21 @@ class BetweenTimestepsIndexing:
return trivial_index_translations, trivial_offset_translations
def create_code_node(self):
return BetweenTimestepsIndexing.TranslationArraysNode(self)
class TranslationArraysNode(CustomCodeNode):
def __init__(self, indexing):
code = ''
symbols_defined = set()
for f_dir, inv in indexing._required_index_arrays:
indices, offsets = indexing._get_translated_indices_and_offsets(f_dir, inv)
index_array_symbol = indexing._index_array_symbol(f_dir, inv)
symbols_defined.add(index_array_symbol)
code += _array_pattern(indexing._index_dtype, index_array_symbol.name, indices)
for f_dir, inv in indexing._required_offset_arrays:
indices, offsets = indexing._get_translated_indices_and_offsets(f_dir, inv)
offset_array_symbols = indexing._offset_array_symbols(f_dir, inv)
symbols_defined |= set(offset_array_symbols)
for d, arrsymb in enumerate(offset_array_symbols):
code += _array_pattern(indexing._offsets_dtype, arrsymb.name, offsets[d])
super(BetweenTimestepsIndexing.TranslationArraysNode, self).__init__(
code, symbols_read=set(), symbols_defined=symbols_defined)
array_content = list()
symbols_defined = set()
for f_dir, inv in self._required_index_arrays:
indices, offsets = self._get_translated_indices_and_offsets(f_dir, inv)
index_array_symbol = self._index_array_symbol(f_dir, inv)
symbols_defined.add(index_array_symbol)
array_content.append((self._index_dtype, index_array_symbol.name, indices))
def __str__(self):
return "Variable PDF Access Translation Arrays"
for f_dir, inv in self._required_offset_arrays:
indices, offsets = self._get_translated_indices_and_offsets(f_dir, inv)
offset_array_symbols = self._offset_array_symbols(f_dir, inv)
symbols_defined |= set(offset_array_symbols)
for d, arrsymb in enumerate(offset_array_symbols):
array_content.append((self._offsets_dtype, arrsymb.name, offsets[d]))
def __repr__(self):
return "Variable PDF Access Translation Arrays"
return TranslationArraysNode(array_content, symbols_defined)
# end class AdvancedStreamingIndexing
class NeighbourOffsetArrays(CustomCodeNode):
@staticmethod
def neighbour_offset(dir_idx, stencil):
if isinstance(sp.sympify(dir_idx), sp.Integer):
return stencil[dir_idx]
else:
return tuple([sp.IndexedBase(symbol, shape=(1,))[dir_idx]
for symbol in NeighbourOffsetArrays._offset_symbols(len(stencil[0]))])
@staticmethod
def _offset_symbols(dim):
return [TypedSymbol(f"neighbour_offset_{d}", create_type(np.int32)) for d in ['x', 'y', 'z'][:dim]]
def __init__(self, stencil, offsets_dtype=np.int32):
offsets_dtype = create_type(offsets_dtype)
dim = len(stencil[0])
array_symbols = NeighbourOffsetArrays._offset_symbols(dim)
code = "\n"
for i, arrsymb in enumerate(array_symbols):
code += _array_pattern(offsets_dtype, arrsymb.name, (d[i] for d in stencil))
offset_symbols = NeighbourOffsetArrays._offset_symbols(dim)
super(NeighbourOffsetArrays, self).__init__(code, symbols_read=set(),
symbols_defined=set(offset_symbols))
class MirroredStencilDirections(CustomCodeNode):
@staticmethod
def mirror_stencil(direction, mirror_axis):
assert mirror_axis <= len(direction), f"only {len(direction)} axis available for mirage"
direction = list(direction)
direction[mirror_axis] = -direction[mirror_axis]
return tuple(direction)
@staticmethod
def _mirrored_symbol(mirror_axis):
axis = ['x', 'y', 'z']
return TypedSymbol(f"{axis[mirror_axis]}_axis_mirrored_stencil_dir", create_type(np.int32))
def __init__(self, stencil, mirror_axis, dtype=np.int32):
offsets_dtype = create_type(dtype)
mirrored_stencil_symbol = MirroredStencilDirections._mirrored_symbol(mirror_axis)
mirrored_directions = [stencil.index(MirroredStencilDirections.mirror_stencil(direction, mirror_axis))
for direction in stencil]
code = "\n"
code += _array_pattern(offsets_dtype, mirrored_stencil_symbol.name, mirrored_directions)
super(MirroredStencilDirections, self).__init__(code, symbols_read=set(),
symbols_defined={mirrored_stencil_symbol})
......@@ -58,24 +58,27 @@ odd_accessors = {
}
def is_inplace(streaming_pattern):
if streaming_pattern not in streaming_patterns:
raise ValueError('Invalid streaming pattern', streaming_pattern)
return streaming_pattern in ['aa', 'esotwist', 'esopull', 'esopush']
def get_accessor(streaming_pattern: str, timestep: Timestep) -> PdfFieldAccessor:
if streaming_pattern not in streaming_patterns:
raise ValueError(
"Invalid value of parameter 'streaming_pattern'.", streaming_pattern)
if is_inplace(streaming_pattern) and (timestep == Timestep.BOTH):
raise ValueError(f"Invalid timestep for streaming pattern {streaming_pattern}: {str(timestep)}")
if timestep == Timestep.EVEN:
return even_accessors[streaming_pattern]
else:
return odd_accessors[streaming_pattern]
def is_inplace(streaming_pattern):
if streaming_pattern not in streaming_patterns:
raise ValueError('Invalid streaming pattern', streaming_pattern)
return streaming_pattern in ['aa', 'esotwist', 'esopull', 'esopush']
def get_timesteps(streaming_pattern):
return (Timestep.EVEN, Timestep.ODD) if is_inplace(streaming_pattern) else (Timestep.BOTH, )
......
from typing import Union
from numpy.typing import NDArray
def poiseuille_flow(middle_distance: Union[float, NDArray], height,
ext_force_density: float, dyn_visc: float) -> Union[float, NDArray]:
"""
Analytical solution for plane Poiseuille flow.
Args:
middle_distance: Distance to the middle plane of the channel.
height: Distance between the boundaries.
ext_force_density: Force density on the fluid normal to the boundaries.
dyn_visc: dyn_visc
Returns:
A numpy array of the poiseuille profile if middle_distance is given as array otherwise of velocity of
the position given with middle_distance
"""
return ext_force_density * 1. / (2 * dyn_visc) * (height**2.0 / 4.0 - middle_distance**2.0)
from lbmpy.boundaries.boundaryconditions import (
UBB, FixedDensity, DiffusionDirichlet, SimpleExtrapolationOutflow, WallFunctionBounce,
ExtrapolationOutflow, NeumannByCopy, NoSlip, NoSlipLinearBouzidi, QuadraticBounceBack, StreamInConstant, FreeSlip)
from lbmpy.boundaries.boundaryhandling import LatticeBoltzmannBoundaryHandling
from lbmpy.boundaries.wall_function_models import MoninObukhovSimilarityTheory, LogLaw, MuskerLaw, SpaldingsLaw
__all__ = ['NoSlip', 'NoSlipLinearBouzidi', 'QuadraticBounceBack', 'FreeSlip', 'WallFunctionBounce',
'UBB', 'FixedDensity',
'SimpleExtrapolationOutflow', 'ExtrapolationOutflow',
'DiffusionDirichlet', 'NeumannByCopy', 'StreamInConstant',
'LatticeBoltzmannBoundaryHandling',
'MoninObukhovSimilarityTheory', 'LogLaw', 'MuskerLaw', 'SpaldingsLaw']
import sympy as sp
from lbmpy.boundaries.boundaryhandling import LbmWeightInfo
from lbmpy.advanced_streaming.indexing import BetweenTimestepsIndexing
from lbmpy.advanced_streaming.utility import Timestep, get_accessor
from lbmpy.custom_code_nodes import LbmWeightInfo
from pystencils.boundaries.boundaryhandling import BoundaryOffsetInfo
from pystencils.assignment import Assignment
from pystencils.astnodes import Block, Conditional, LoopOverCoordinate, SympyAssignment
......@@ -67,7 +67,7 @@ def boundary_conditional(boundary, direction, streaming_pattern, prev_timestep,
assignments = []
for direction_idx in dir_indices:
rule = boundary(f_out, f_in, direction_idx, inv_dir, lb_method, index_field=None)
rule = boundary(f_out, f_in, direction_idx, inv_dir, lb_method, index_field=None, force_vector=None)
# rhs: replace f_out by post collision symbols.
rhs_substitutions = {f_out(i): sym for i, sym in enumerate(lb_method.post_collision_pdf_symbols)}
......
import abc
from enum import Enum, auto
from warnings import warn
from lbmpy.advanced_streaming.utility import AccessPdfValues, Timestep
from pystencils.simp.assignment_collection import AssignmentCollection
from pystencils import Assignment, Field
from lbmpy.boundaries.boundaryhandling import LbmWeightInfo
from pystencils.typing import create_type
from pystencils.sympyextensions import get_symmetric_part
from lbmpy.simplificationfactory import create_simplification_strategy
from lbmpy.advanced_streaming.indexing import NeighbourOffsetArrays, MirroredStencilDirections
from pystencils.simp.assignment_collection import AssignmentCollection
from pystencils.stencil import offset_to_direction_string, direction_string_to_offset, inverse_direction
from pystencils.sympyextensions import get_symmetric_part, simplify_by_equality, scalar_product
from pystencils.typing import create_type, TypedSymbol
from lbmpy.advanced_streaming.utility import AccessPdfValues, Timestep
from lbmpy.custom_code_nodes import (NeighbourOffsetArrays, MirroredStencilDirections, LbmWeightInfo,
TranslationArraysNode)
from lbmpy.maxwellian_equilibrium import discrete_equilibrium
from lbmpy.simplificationfactory import create_simplification_strategy
import sympy as sp
import numpy as np
class LbBoundary(abc.ABC):
......@@ -24,10 +28,11 @@ class LbBoundary(abc.ABC):
inner_or_boundary = True
single_link = False
def __init__(self, name=None):
def __init__(self, name=None, calculate_force_on_boundary=False):
self._name = name
self.calculate_force_on_boundary = calculate_force_on_boundary
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
"""
This function defines the boundary behavior and must therefore be implemented by all boundaries.
The boundary is defined through a list of sympy equations from which a boundary kernel is generated.
......@@ -44,6 +49,8 @@ class LbBoundary(abc.ABC):
lb_method: an instance of the LB method used. Use this to adapt the boundary to the method
(e.g. compressibility)
index_field: the boundary index field that can be used to retrieve and update boundary data
force_vector: vector to store the force on the boundary. Has the same size as the index field and
D-entries per cell
Returns:
list of pystencils assignments, or pystencils.AssignmentCollection
......@@ -95,23 +102,260 @@ class LbBoundary(abc.ABC):
class NoSlip(LbBoundary):
"""
r"""
No-Slip, (half-way) simple bounce back boundary condition, enforcing zero velocity at obstacle.
Extended for use with any streaming pattern.
Populations leaving the boundary node :math:`\mathbf{x}_b` at time :math:`t` are reflected
back with :math:`\mathbf{c}_{\overline{i}} = -\mathbf{c}_{i}`
.. math ::
f_{\overline{i}}(\mathbf{x}_b, t + \Delta t) = f^{\star}_{i}(\mathbf{x}_b, t)
Args:
name: optional name of the boundary.
calculate_force_on_boundary: stores the force for each PDF at the boundary in a force vector
"""
def __init__(self, name=None):
def __init__(self, name=None, calculate_force_on_boundary=False):
"""Set an optional name here, to mark boundaries, for example for force evaluations"""
super(NoSlip, self).__init__(name)
super(NoSlip, self).__init__(name, calculate_force_on_boundary)
def get_additional_code_nodes(self, lb_method):
if self.calculate_force_on_boundary:
return [NeighbourOffsetArrays(lb_method.stencil)]
else:
return []
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
if self.calculate_force_on_boundary:
force = sp.Symbol("f")
subexpressions = [Assignment(force, sp.Float(2.0) * f_out(dir_symbol))]
offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
for i in range(lb_method.stencil.D):
subexpressions.append(Assignment(force_vector[0](f'F_{i}'), force * offset[i]))
else:
subexpressions = []
boundary_assignments = [Assignment(f_in(inv_dir[dir_symbol]), f_out(dir_symbol))]
return AssignmentCollection(boundary_assignments, subexpressions=subexpressions)
class NoSlipLinearBouzidi(LbBoundary):
"""
No-Slip, (half-way) simple bounce back boundary condition with interpolation
to increase accuracy: :cite:`BouzidiBC`. In order to make the boundary condition work properly a
Python callback function needs to be provided to calculate the distance from the wall for each cell near to the
boundary. If this is not done the boundary condition will fall back to a simple NoSlip boundary.
Furthermore, for this boundary condition a second fluid cell away from the wall is needed. If the second fluid
cell is not available (e.g. because it is marked as boundary as well), the boundary condition should fall back to
a NoSlip boundary as well.
Args:
name: optional name of the boundary.
init_wall_distance: Python callback function to calculate the wall distance for each cell near to the boundary
data_type: data type of the wall distance q
"""
def __init__(self, name=None, init_wall_distance=None, data_type='double', calculate_force_on_boundary=False):
self.data_type = data_type
self.init_wall_distance = init_wall_distance
super(NoSlipLinearBouzidi, self).__init__(name, calculate_force_on_boundary)
@property
def additional_data(self):
"""Used internally only. For the NoSlipLinearBouzidi boundary the distance to the obstacle of every
direction is needed. This information is stored in the index vector."""
return [('q', create_type(self.data_type))]
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
return Assignment(f_in(inv_dir[dir_symbol]), f_out(dir_symbol))
def get_additional_code_nodes(self, lb_method):
if self.calculate_force_on_boundary:
return [NeighbourOffsetArrays(lb_method.stencil)]
else:
return []
@property
def additional_data_init_callback(self):
def default_callback(boundary_data, **_):
for cell in boundary_data.index_array:
cell['q'] = -1
if self.init_wall_distance:
return self.init_wall_distance
else:
warn("No callback function provided to initialise the wall distance for each cell "
"(init_wall_distance=None). The boundary condition will fall back to a simple NoSlip BC")
return default_callback
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
f_xf = sp.Symbol("f_xf")
f_xf_inv = sp.Symbol("f_xf_inv")
d_x2f = sp.Symbol("d_x2f")
q = sp.Symbol("q")
one = sp.Float(1.0)
two = sp.Float(2.0)
half = sp.Rational(1, 2)
subexpressions = [Assignment(f_xf, f_out(dir_symbol)),
Assignment(f_xf_inv, f_out(inv_dir[dir_symbol])),
Assignment(d_x2f, f_in(dir_symbol)),
Assignment(q, index_field[0]('q'))]
case_one = (half * (f_xf + f_xf_inv * (two * q - one))) / q
case_two = two * q * f_xf + (one - two * q) * d_x2f
case_three = f_xf
rhs = sp.Piecewise((case_one, sp.Ge(q, 0.5)),
(case_two, sp.And(sp.Gt(q, 0), sp.Lt(q, 0.5))),
(case_three, True))
if self.calculate_force_on_boundary:
force = sp.Symbol("f")
subexpressions.append(Assignment(force, f_xf + rhs))
offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
for i in range(lb_method.stencil.D):
subexpressions.append(Assignment(force_vector[0](f'F_{i}'), force * offset[i]))
boundary_assignments = [Assignment(f_in(inv_dir[dir_symbol]), rhs)]
return AssignmentCollection(boundary_assignments, subexpressions=subexpressions)
# end class NoSlip
# end class NoSlipLinearBouzidi
class QuadraticBounceBack(LbBoundary):
"""
Second order accurate bounce back boundary condition. Implementation details are provided in a demo notebook here:
https://pycodegen.pages.i10git.cs.fau.de/lbmpy/notebooks/demo_interpolation_boundary_conditions.html
Args:
relaxation_rate: relaxation rate to realise a BGK scheme for recovering the pre collision PDF value.
name: optional name of the boundary.
init_wall_distance: Python callback function to calculate the wall distance for each cell near to the boundary
data_type: data type of the wall distance q
"""
def __init__(self, relaxation_rate, name=None, init_wall_distance=None, data_type='double',
calculate_force_on_boundary=False):
self.relaxation_rate = relaxation_rate
self.data_type = data_type
self.init_wall_distance = init_wall_distance
self.equilibrium_values_name = "f_eq"
self.inv_dir_symbol = TypedSymbol("inv_dir", create_type("int32"))
super(QuadraticBounceBack, self).__init__(name, calculate_force_on_boundary)
@property
def additional_data(self):
"""Used internally only. For the NoSlipLinearBouzidi boundary the distance to the obstacle of every
direction is needed. This information is stored in the index vector."""
return [('q', create_type(self.data_type))]
@property
def additional_data_init_callback(self):
def default_callback(boundary_data, **_):
for cell in boundary_data.index_array:
cell['q'] = 0.5
if self.init_wall_distance:
return self.init_wall_distance
else:
warn("No callback function provided to initialise the wall distance for each cell "
"(init_wall_distance=None). The boundary condition will fall back to a simple NoSlip BC")
return default_callback
def get_additional_code_nodes(self, lb_method):
"""Return a list of code nodes that will be added in the generated code before the index field loop.
Args:
lb_method: Lattice Boltzmann method. See :func:`lbmpy.creationfunctions.create_lb_method`
Returns:
list containing LbmWeightInfo
"""
stencil = lb_method.stencil
inv_directions = [str(stencil.index(inverse_direction(direction))) for direction in stencil]
dtype = self.inv_dir_symbol.dtype
name = self.inv_dir_symbol.name
inverse_dir_node = TranslationArraysNode([(dtype, name, inv_directions), ], {self.inv_dir_symbol})
return [LbmWeightInfo(lb_method, self.data_type), inverse_dir_node, NeighbourOffsetArrays(lb_method.stencil)]
@staticmethod
def get_equilibrium(v, u, rho, drho, weight, compressible, zero_centered):
rho_background = sp.Integer(1)
result = discrete_equilibrium(v, u, rho, weight,
order=2, c_s_sq=sp.Rational(1, 3), compressible=compressible)
if zero_centered:
shift = discrete_equilibrium(v, [0] * len(u), rho_background, weight,
order=0, c_s_sq=sp.Rational(1, 3), compressible=False)
result = simplify_by_equality(result - shift, rho, drho, rho_background)
return result
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
omega = self.relaxation_rate
inv = sp.IndexedBase(self.inv_dir_symbol, shape=(1,))[dir_symbol]
weight_info = LbmWeightInfo(lb_method, data_type=self.data_type)
weight_of_direction = weight_info.weight_of_direction
pdf_field_accesses = [f_out(i) for i in range(len(lb_method.stencil))]
pdf_symbols = [sp.Symbol(f"pdf_{i}") for i in range(len(lb_method.stencil))]
f_xf = sp.Symbol("f_xf")
f_xf_inv = sp.Symbol("f_xf_inv")
q = sp.Symbol("q")
feq = sp.Symbol("f_eq")
weight = sp.Symbol("w")
weight_inv = sp.Symbol("w_inv")
v = [TypedSymbol(f"c_{i}", self.data_type) for i in range(lb_method.stencil.D)]
v_inv = [TypedSymbol(f"c_inv_{i}", self.data_type) for i in range(lb_method.stencil.D)]
one = sp.Float(1.0)
half = sp.Rational(1, 2)
subexpressions = [Assignment(pdf_symbols[i], pdf) for i, pdf in enumerate(pdf_field_accesses)]
subexpressions.append(Assignment(f_xf, f_out(dir_symbol)))
subexpressions.append(Assignment(f_xf_inv, f_out(inv_dir[dir_symbol])))
subexpressions.append(Assignment(q, index_field[0]('q')))
subexpressions.append(Assignment(weight, weight_of_direction(dir_symbol, lb_method)))
subexpressions.append(Assignment(weight_inv, weight_of_direction(inv, lb_method)))
for i in range(lb_method.stencil.D):
offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
subexpressions.append(Assignment(v[i], offset[i]))
for i in range(lb_method.stencil.D):
offset = NeighbourOffsetArrays.neighbour_offset(inv, lb_method.stencil)
subexpressions.append(Assignment(v_inv[i], offset[i]))
cqc = lb_method.conserved_quantity_computation
rho = cqc.density_symbol
drho = cqc.density_deviation_symbol
u = sp.Matrix(cqc.velocity_symbols)
compressible = cqc.compressible
zero_centered = cqc.zero_centered_pdfs
cqe = cqc.equilibrium_input_equations_from_pdfs(pdf_symbols, False)
subexpressions.append(cqe.all_assignments)
eq_dir = self.get_equilibrium(v, u, rho, drho, weight, compressible, zero_centered)
eq_inv = self.get_equilibrium(v_inv, u, rho, drho, weight_inv, compressible, zero_centered)
subexpressions.append(Assignment(feq, eq_dir + eq_inv))
t1 = (f_xf - f_xf_inv + (f_xf + f_xf_inv - feq * omega) / (one - omega))
t2 = (q * (f_xf + f_xf_inv)) / (one + q)
result = (one - q) / (one + q) * t1 * half + t2
if self.calculate_force_on_boundary:
force = sp.Symbol("f")
subexpressions.append(Assignment(force, f_xf + result))
offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
for i in range(lb_method.stencil.D):
subexpressions.append(Assignment(force_vector[0](f'F_{i}'), force * offset[i]))
boundary_assignments = [Assignment(f_in(inv_dir[dir_symbol]), result)]
return AssignmentCollection(boundary_assignments, subexpressions=subexpressions)
# end class QuadraticBounceBack
class FreeSlip(LbBoundary):
"""
......@@ -151,7 +395,7 @@ class FreeSlip(LbBoundary):
if name is None and normal_direction:
name = f"Free Slip : {offset_to_direction_string([-x for x in normal_direction])}"
super(FreeSlip, self).__init__(name)
super(FreeSlip, self).__init__(name, calculate_force_on_boundary=False)
def init_callback(self, boundary_data, **_):
if len(boundary_data.index_array) > 1e6:
......@@ -221,7 +465,7 @@ class FreeSlip(LbBoundary):
else:
return [NeighbourOffsetArrays(lb_method.stencil)]
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
neighbor_offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
if self.normal_direction:
tangential_offset = tuple(offset + normal for offset, normal in zip(neighbor_offset, self.normal_direction))
......@@ -242,13 +486,257 @@ class FreeSlip(LbBoundary):
# end class FreeSlip
class WallFunctionBounce(LbBoundary):
"""
Wall function based on the bounce back idea, cf. :cite:`Han2021`. Its implementation is extended to the D3Q27
stencil, whereas different weights of the drag distribution are proposed.
Args:
lb_method: LB method which is used for the simulation
pdfs: Symbolic representation of the particle distribution functions.
normal_direction: Normal direction of the wall. Currently, only straight and axis-aligned walls are supported.
wall_function_model: Wall function that is used to retrieve the wall stress :math:`tau_w` during the simulation.
See :class:`lbmpy.boundaries.wall_treatment.WallFunctionModel` for more details
mean_velocity: Optional field or field access for the mean velocity. As wall functions are typically defined
in terms of the mean velocity, it is recommended to provide this variable. Per default, the
instantaneous velocity obtained from pdfs is used for the wall function.
sampling_shift: Optional sampling shift for the velocity sampling. Can be provided as symbolic variable or
integer. In both cases, the user must assure that the sampling shift is at least 1, as sampling
in boundary cells is not physical. Per default, a sampling shift of 1 is employed which
corresponds to a sampling in the first fluid cell normal to the wall. For lower friction
Reynolds numbers, choosing a sampling shift >1 has shown to improve the results for higher
resolutions.
Mutually exclusive with the Maronga sampling shift.
maronga_sampling_shift: Optionally, apply a correction factor to the wall shear stress proposed by Maronga et
al. :cite:`Maronga2020`. Has only been tested and validated for the MOST wall function.
No guarantee is given that it also works with other wall functions.
Mutually exclusive with the standard sampling shift.
dt: time discretisation. Usually one in LB units
dy: space discretisation. Usually one in LB units
y: distance from the wall
target_friction_velocity: A target friction velocity can be given if an estimate is known a priori. This target
friction velocity will be used as initial guess for implicit wall functions to ensure
convergence of the Newton algorithm.
weight_method: The extension of the WFB to a D3Q27 stencil is non-unique. Different weights can be chosen to
define the drag distribution onto the pdfs. Per default, weights corresponding to the weights
in the D3Q27 stencil are chosen.
name: Optional name of the boundary.
data_type: Floating-point precision. Per default, double.
"""
class WeightMethod(Enum):
LATTICE_WEIGHT = auto(),
GEOMETRIC_WEIGHT = auto()
def __init__(self, lb_method, pdfs, normal_direction, wall_function_model,
mean_velocity=None, sampling_shift=1, maronga_sampling_shift=None,
dt=1, dy=1, y=0.5,
target_friction_velocity=None,
weight_method=WeightMethod.LATTICE_WEIGHT,
name=None, data_type='double'):
"""Set an optional name here, to mark boundaries, for example for force evaluations"""
self.stencil = lb_method.stencil
if not (self.stencil.Q == 19 or self.stencil.Q == 27):
raise ValueError("WFB boundary is currently only defined for D3Q19 and D3Q27 stencils.")
self.pdfs = pdfs
self.wall_function_model = wall_function_model
if mean_velocity:
if isinstance(mean_velocity, Field):
self.mean_velocity = mean_velocity.center_vector
elif isinstance(mean_velocity, Field.Access):
self.mean_velocity = mean_velocity.field.neighbor_vector(mean_velocity.offsets)
else:
raise ValueError("Mean velocity field has to be a pystencils Field or Field.Access")
else:
self.mean_velocity = None
if not isinstance(sampling_shift, int):
self.sampling_shift = TypedSymbol(sampling_shift.name, np.uint32)
else:
assert sampling_shift >= 1, "The sampling shift must be greater than 1."
self.sampling_shift = sampling_shift
if maronga_sampling_shift:
assert self.mean_velocity, "Mean velocity field must be provided when using the Maronga correction"
if not isinstance(maronga_sampling_shift, int):
self.maronga_sampling_shift = TypedSymbol(maronga_sampling_shift.name, np.uint32)
else:
assert maronga_sampling_shift >= 1, "The Maronga sampling shift must be greater than 1."
self.maronga_sampling_shift = maronga_sampling_shift
else:
self.maronga_sampling_shift = None
if (self.sampling_shift != 1) and self.maronga_sampling_shift:
raise ValueError("Both sampling shift and Maronga offset are set. This is currently not supported.")
self.dt = dt
self.dy = dy
self.y = y
self.data_type = data_type
self.target_friction_velocity = target_friction_velocity
self.weight_method = weight_method
if len(normal_direction) - normal_direction.count(0) != 1:
raise ValueError("Only normal directions for straight walls are supported for example (0, 1, 0) for "
"a WallFunctionBounce applied to the southern boundary of the domain")
self.mirror_axis = normal_direction.index(*[direction for direction in normal_direction if direction != 0])
self.normal_direction = normal_direction
assert all([n in [-1, 0, 1] for n in self.normal_direction]), \
"Only -1, 0 and 1 allowed for defining the normal direction"
tangential_component = [int(not n) for n in self.normal_direction]
self.normal_axis = tangential_component.index(0)
self.tangential_axis = [0, 1, 2]
self.tangential_axis.remove(self.normal_axis)
self.dim = self.stencil.D
if name is None:
name = f"WFB : {offset_to_direction_string([-x for x in normal_direction])}"
super(WallFunctionBounce, self).__init__(name, calculate_force_on_boundary=False)
def get_additional_code_nodes(self, lb_method):
return [MirroredStencilDirections(self.stencil, self.mirror_axis),
NeighbourOffsetArrays(lb_method.stencil)]
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
# needed symbols for offsets and indices
# neighbour offset symbols are basically the stencil directions defined in stencils.py:L130ff.
neighbor_offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
tangential_offset = tuple(offset + normal for offset, normal in zip(neighbor_offset, self.normal_direction))
mirrored_stencil_symbol = MirroredStencilDirections._mirrored_symbol(self.mirror_axis)
mirrored_direction = inv_dir[sp.IndexedBase(mirrored_stencil_symbol, shape=(1,))[dir_symbol]]
name_base = "f_in_inv_offsets_"
offset_array_symbols = [TypedSymbol(name_base + d, mirrored_stencil_symbol.dtype) for d in ['x', 'y', 'z']]
mirrored_offset = sp.IndexedBase(mirrored_stencil_symbol, shape=(1,))[dir_symbol]
offsets = tuple(sp.IndexedBase(s, shape=(1,))[mirrored_offset] for s in offset_array_symbols)
# needed symbols in the Assignments
u_m = sp.Symbol("u_m")
tau_w = sp.Symbol("tau_w")
wall_stress = sp.symbols("tau_w_x tau_w_y tau_w_z")
# if the mean velocity field is not given, or the Maronga correction is applied, density and velocity values
# will be calculated from pdfs
cqc = lb_method.conserved_quantity_computation
result = []
if (not self.mean_velocity) or self.maronga_sampling_shift:
pdf_center_vector = sp.Matrix([0] * self.stencil.Q)
for i in range(self.stencil.Q):
pdf_center_vector[i] = self.pdfs[offsets[0] + self.normal_direction[0],
offsets[1] + self.normal_direction[1],
offsets[2] + self.normal_direction[2]](i)
eq_equations = cqc.equilibrium_input_equations_from_pdfs(pdf_center_vector)
result.append(eq_equations.all_assignments)
# sample velocity which will be used in the wall stress calculation
if self.mean_velocity:
if self.maronga_sampling_shift:
u_for_tau_wall = tuple(u_mean_i.get_shifted(
self.maronga_sampling_shift * self.normal_direction[0],
self.maronga_sampling_shift * self.normal_direction[1],
self.maronga_sampling_shift * self.normal_direction[2]
) for u_mean_i in self.mean_velocity)
else:
u_for_tau_wall = tuple(u_mean_i.get_shifted(
self.sampling_shift * self.normal_direction[0],
self.sampling_shift * self.normal_direction[1],
self.sampling_shift * self.normal_direction[2]
) for u_mean_i in self.mean_velocity)
rho_for_tau_wall = sp.Float(1)
else:
rho_for_tau_wall = cqc.density_symbol
u_for_tau_wall = cqc.velocity_symbols
# calculate Maronga factor in case of correction
maronga_fix = sp.Symbol("maronga_fix")
if self.maronga_sampling_shift:
inst_first_cell_vel = cqc.velocity_symbols
mean_first_cell_vel = tuple(u_mean_i.get_shifted(*self.normal_direction) for u_mean_i in self.mean_velocity)
mag_inst_vel_first_cell = sp.sqrt(sum([inst_first_cell_vel[i] ** 2 for i in self.tangential_axis]))
mag_mean_vel_first_cell = sp.sqrt(sum([mean_first_cell_vel[i] ** 2 for i in self.tangential_axis]))
result.append(Assignment(maronga_fix, mag_inst_vel_first_cell / mag_mean_vel_first_cell))
else:
maronga_fix = 1
# store which direction is tangential component (only those are used for the wall shear stress)
red_u_mag = sp.sqrt(sum([u_for_tau_wall[i]**2 for i in self.tangential_axis]))
u_mag = Assignment(u_m, red_u_mag)
result.append(u_mag)
wall_distance = self.maronga_sampling_shift if self.maronga_sampling_shift else self.sampling_shift
# using wall function model
wall_law_assignments = self.wall_function_model.shear_stress_assignments(
density_symbol=rho_for_tau_wall, velocity_symbol=u_m, shear_stress_symbol=tau_w,
wall_distance=(wall_distance - sp.Rational(1, 2) * self.dy),
u_tau_target=self.target_friction_velocity)
result.append(wall_law_assignments)
# calculate wall stress components and use them to calculate the drag
for i in self.tangential_axis:
result.append(Assignment(wall_stress[i], - u_for_tau_wall[i] / u_m * tau_w * maronga_fix))
weight, inv_weight_sq = sp.symbols("wfb_weight inverse_weight_squared")
if self.stencil.Q == 19:
result.append(Assignment(weight, sp.Rational(1, 2)))
elif self.stencil.Q == 27:
result.append(Assignment(inv_weight_sq, sum([neighbor_offset[i]**2 for i in self.tangential_axis])))
a, b = sp.symbols("wfb_a wfb_b")
if self.weight_method == self.WeightMethod.LATTICE_WEIGHT:
res_ab = sp.solve([2 * a + 4 * b - 1, a - 4 * b], [a, b]) # lattice weight scaling
elif self.weight_method == self.WeightMethod.GEOMETRIC_WEIGHT:
res_ab = sp.solve([2 * a + 4 * b - 1, a - sp.sqrt(2) * b], [a, b]) # geometric scaling
else:
raise ValueError("Unknown weighting method for the WFB D3Q27 extension. Currently, only lattice "
"weights and geometric weights are supported.")
result.append(Assignment(weight, sp.Piecewise((sp.Float(0), sp.Equality(inv_weight_sq, 0)),
(res_ab[a], sp.Equality(inv_weight_sq, 1)),
(res_ab[b], True))))
factor = self.dt / self.dy * weight
drag = sum([neighbor_offset[i] * factor * wall_stress[i] for i in self.tangential_axis])
result.append(Assignment(f_in.center(inv_dir[dir_symbol]), f_out[tangential_offset](mirrored_direction) - drag))
return result
# end class WallFunctionBounce
class UBB(LbBoundary):
"""Velocity bounce back boundary condition, enforcing specified velocity at obstacle
r"""Velocity bounce back boundary condition, enforcing specified velocity at obstacle. Furthermore, a density
at the wall can be implied. The boundary condition is implemented with the following formula:
.. math ::
f_{\overline{i}}(\mathbf{x}_b, t + \Delta t) = f^{\star}_{i}(\mathbf{x}_b, t) -
2 w_{i} \rho_{w} \frac{\mathbf{c}_i \cdot \mathbf{u}_w}{c_s^2}
Args:
velocity: can either be a constant, an access into a field, or a callback function.
The callback functions gets a numpy record array with members, 'x','y','z', 'dir' (direction)
and 'velocity' which has to be set to the desired velocity of the corresponding link
velocity: Prescribe the fluid velocity :math:`\mathbf{u}_w` at the wall.
Can either be a constant, an access into a field, or a callback function.
The callback functions gets a numpy record array with members, ``x``, ``y``, ``z``, ``dir``
(direction) and ``velocity`` which has to be set to the desired velocity of the corresponding link
density: Prescribe the fluid density :math:`\rho_{w}` at the wall. If not prescribed the density is
calculated from the PDFs at the wall. The density can only be set constant.
adapt_velocity_to_force: adapts the velocity to the correct equilibrium when the lattice Boltzmann method holds
a forcing term. If no forcing term is set and adapt_velocity_to_force is set to True
it has no effect.
......@@ -256,8 +744,9 @@ class UBB(LbBoundary):
name: optional name of the boundary.
"""
def __init__(self, velocity, adapt_velocity_to_force=False, dim=None, name=None, data_type='double'):
def __init__(self, velocity, density=None, adapt_velocity_to_force=False, dim=None, name=None, data_type='double'):
self._velocity = velocity
self._density = density
self._adaptVelocityToForce = adapt_velocity_to_force
if callable(self._velocity) and not dim:
raise ValueError("When using a velocity callback the dimension has to be specified with the dim parameter")
......@@ -266,7 +755,7 @@ class UBB(LbBoundary):
self.dim = dim
self.data_type = data_type
super(UBB, self).__init__(name)
super(UBB, self).__init__(name, calculate_force_on_boundary=False)
@property
def additional_data(self):
......@@ -302,7 +791,7 @@ class UBB(LbBoundary):
This is useful if the inflow velocity should have a certain profile for instance"""
return callable(self._velocity)
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
vel_from_idx_field = callable(self._velocity)
vel = [index_field(f'vel_{i}') for i in range(self.dim)] if vel_from_idx_field else self._velocity
......@@ -337,7 +826,10 @@ class UBB(LbBoundary):
pdf_field_accesses = [f_out(i) for i in range(len(lb_method.stencil))]
density_equations = cqc.output_equations_from_pdfs(pdf_field_accesses, {'density': density_symbol})
density_symbol = lb_method.conserved_quantity_computation.density_symbol
result = density_equations.all_assignments
if self._density:
result = [Assignment(density_symbol, self._density)]
else:
result = density_equations.all_assignments
result += [Assignment(f_in(inv_dir[dir_symbol]),
f_out(dir_symbol) - vel_term * density_symbol)]
return result
......@@ -375,7 +867,7 @@ class SimpleExtrapolationOutflow(LbBoundary):
self.normal_direction = tuple([int(n) for n in normal_direction])
assert all([n in [-1, 0, 1] for n in self.normal_direction]), \
"Only -1, 0 and 1 allowed for defining the normal direction"
super(SimpleExtrapolationOutflow, self).__init__(name)
super(SimpleExtrapolationOutflow, self).__init__(name, calculate_force_on_boundary=False)
def get_additional_code_nodes(self, lb_method):
"""Return a list of code nodes that will be added in the generated code before the index field loop.
......@@ -389,7 +881,7 @@ class SimpleExtrapolationOutflow(LbBoundary):
"""
return [NeighbourOffsetArrays(lb_method.stencil)]
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
neighbor_offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
tangential_offset = tuple(offset - normal for offset, normal in zip(neighbor_offset, self.normal_direction))
......@@ -415,7 +907,7 @@ class ExtrapolationOutflow(LbBoundary):
Args:
normal_direction: direction vector normal to the outflow
lb_method: the lattice boltzman method to be used in the simulation
lb_method: the lattice Boltzmann method to be used in the simulation
dt: lattice time step size
dx: lattice spacing distance
name: optional name of the boundary.
......@@ -468,7 +960,7 @@ class ExtrapolationOutflow(LbBoundary):
self.equilibrium_calculation = calc_eq_pdfs
super(ExtrapolationOutflow, self).__init__(name)
super(ExtrapolationOutflow, self).__init__(name, calculate_force_on_boundary=False)
def init_callback(self, boundary_data, **_):
dim = boundary_data.dim
......@@ -521,7 +1013,7 @@ class ExtrapolationOutflow(LbBoundary):
"""
return [NeighbourOffsetArrays(lb_method.stencil)]
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
subexpressions = []
boundary_assignments = []
dtdx = sp.Rational(self.dt, self.dx)
......@@ -550,7 +1042,12 @@ class ExtrapolationOutflow(LbBoundary):
class FixedDensity(LbBoundary):
"""Boundary condition that fixes the density/pressure at the obstacle.
r"""Boundary condition for prescribing a density at the wall. Through :math:`p = c_s^2 \rho` this boundary condition
can also function as a pressure boundary condition.
.. math ::
f_{\overline{i}}(\mathbf{x}_b, t + \Delta t) = - f^{\star}_{i}(\mathbf{x}_b, t) +
2 w_{i} \rho_{w} (1 + \frac{(\mathbf{c}_i \cdot \mathbf{u}_w)^2}{2c_s^4} + \frac{\mathbf{u}_w^2}{2c_s^2})
Args:
density: value of the density which should be set.
......@@ -562,9 +1059,9 @@ class FixedDensity(LbBoundary):
name = "Fixed Density " + str(density)
self.density = density
super(FixedDensity, self).__init__(name)
super(FixedDensity, self).__init__(name, calculate_force_on_boundary=False)
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
def remove_asymmetric_part_of_main_assignments(assignment_collection, degrees_of_freedom):
new_main_assignments = [Assignment(a.lhs, get_symmetric_part(a.rhs, degrees_of_freedom))
for a in assignment_collection.main_assignments]
......@@ -604,22 +1101,46 @@ class FixedDensity(LbBoundary):
# end class FixedDensity
class DiffusionDirichlet(LbBoundary):
"""Boundary condition for advection-diffusion problems that fixes the concentration at the obstacle.
"""Concentration boundary which is used for concentration or thermal boundary conditions of convection-diffusion
equation Base on https://doi.org/10.1103/PhysRevE.85.016701.
Args:
concentration: value of the concentration which should be set.
concentration: can either be a constant, an access into a field, or a callback function.
The callback functions gets a numpy record array with members, ``x``, ``y``, ``z``, ``dir``
(direction) and ``concentration`` which has to be set to the desired
velocity of the corresponding link
velocity_field: if velocity field is given the boundary value is approximated by using the discrete equilibrium.
name: optional name of the boundary.
data_type: data type of the concentration value. default is double
"""
def __init__(self, concentration, name=None, data_type='double'):
def __init__(self, concentration, velocity_field=None, name=None, data_type='double'):
if name is None:
name = "Diffusion Dirichlet " + str(concentration)
name = "DiffusionDirichlet"
self.concentration = concentration
self._data_type = data_type
self.concentration_is_callable = callable(self.concentration)
self.velocity_field = velocity_field
super(DiffusionDirichlet, self).__init__(name)
super(DiffusionDirichlet, self).__init__(name, calculate_force_on_boundary=False)
@property
def additional_data(self):
""" In case of the UBB boundary additional data is a velocity vector. This vector is added to each cell to
realize velocity profiles for the inlet."""
if self.concentration_is_callable:
return [('concentration', create_type(self._data_type))]
else:
return []
@property
def additional_data_init_callback(self):
"""Initialise additional data of the boundary. For an example see
`tutorial 02 <https://pycodegen.pages.i10git.cs.fau.de/lbmpy/notebooks/02_tutorial_boundary_setup.html>`_
or lbmpy.geometry.add_pipe_inflow_boundary"""
if self.concentration_is_callable:
return self.concentration
def get_additional_code_nodes(self, lb_method):
"""Return a list of code nodes that will be added in the generated code before the index field loop.
......@@ -630,15 +1151,34 @@ class DiffusionDirichlet(LbBoundary):
Returns:
list containing LbmWeightInfo
"""
return [LbmWeightInfo(lb_method, self._data_type)]
if self.velocity_field:
return [LbmWeightInfo(lb_method, self._data_type), NeighbourOffsetArrays(lb_method.stencil)]
else:
return [LbmWeightInfo(lb_method, self._data_type)]
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
assert lb_method.conserved_quantity_computation.zero_centered_pdfs is False, \
"DiffusionDirichlet only works for methods with normal pdfs storage -> set zero_centered=False"
weight_info = LbmWeightInfo(lb_method, self._data_type)
w_dir = weight_info.weight_of_direction(dir_symbol, lb_method)
return [Assignment(f_in(inv_dir[dir_symbol]),
2 * w_dir * self.concentration - f_out(dir_symbol))]
if self.concentration_is_callable:
concentration = index_field[0]('concentration')
else:
concentration = self.concentration
if self.velocity_field:
neighbour_offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
u = self.velocity_field
cs = sp.Rational(1, 3)
equilibrium = (1 + scalar_product(neighbour_offset, u.center_vector)**2 / (2 * cs**4)
- scalar_product(u.center_vector, u.center_vector) / (2 * cs**2))
else:
equilibrium = sp.Rational(1, 1)
result = [Assignment(f_in(inv_dir[dir_symbol]), 2.0 * w_dir * concentration * equilibrium - f_out(dir_symbol))]
return result
# end class DiffusionDirichlet
......@@ -659,7 +1199,7 @@ class NeumannByCopy(LbBoundary):
"""
return [NeighbourOffsetArrays(lb_method.stencil)]
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
neighbour_offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
return [Assignment(f_in(inv_dir[dir_symbol]), f_out(inv_dir[dir_symbol])),
Assignment(f_out[neighbour_offset](dir_symbol), f_out(dir_symbol))]
......@@ -678,7 +1218,7 @@ class StreamInConstant(LbBoundary):
"""
def __init__(self, constant, name=None):
super(StreamInConstant, self).__init__(name)
super(StreamInConstant, self).__init__(name, calculate_force_on_boundary=False)
self.constant = constant
def get_additional_code_nodes(self, lb_method):
......@@ -692,7 +1232,7 @@ class StreamInConstant(LbBoundary):
"""
return [NeighbourOffsetArrays(lb_method.stencil)]
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field):
def __call__(self, f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector):
neighbour_offset = NeighbourOffsetArrays.neighbour_offset(dir_symbol, lb_method.stencil)
return [Assignment(f_in(inv_dir[dir_symbol]), self.constant),
Assignment(f_out[neighbour_offset](dir_symbol), self.constant)]
......
from dataclasses import replace
import numpy as np
import sympy as sp
from lbmpy.advanced_streaming.indexing import BetweenTimestepsIndexing
from lbmpy.advanced_streaming.utility import is_inplace, Timestep, AccessPdfValues
from pystencils import Field, Assignment, TypedSymbol, create_kernel
from pystencils.stencil import inverse_direction
from pystencils import CreateKernelConfig, Target
from pystencils import Assignment, CreateKernelConfig, create_kernel, Field, Target
from pystencils.boundaries import BoundaryHandling
from pystencils.boundaries.createindexlist import numpy_data_type_for_boundary_object
from pystencils.backends.cbackend import CustomCodeNode
from pystencils.field import FieldType
from pystencils.simp import add_subexpressions_for_field_reads
from pystencils.stencil import inverse_direction
from lbmpy.advanced_streaming.indexing import BetweenTimestepsIndexing
from lbmpy.advanced_streaming.utility import is_inplace, Timestep, AccessPdfValues
class LatticeBoltzmannBoundaryHandling(BoundaryHandling):
"""
Enables boundary handling for LBM simulations with advanced streaming patterns.
For the in-place patterns AA and EsoTwist, two kernels are generated for a boundary
Enables boundary handling for LBM simulations with advanced streaming patterns.
For the in-place patterns AA and EsoTwist, two kernels are generated for a boundary
object and the right one selected depending on the time step.
"""
......@@ -154,53 +156,39 @@ class LatticeBoltzmannBoundaryHandling(BoundaryHandling):
# end class LatticeBoltzmannBoundaryHandling
class LbmWeightInfo(CustomCodeNode):
def __init__(self, lb_method, data_type='double'):
self.weights_symbol = TypedSymbol("weights", data_type)
data_type_string = "double" if self.weights_symbol.dtype.numpy_dtype == np.float64 else "float"
weights = [str(w.evalf(17)) for w in lb_method.weights]
if data_type_string == "float":
weights = "f, ".join(weights)
weights += "f" # suffix for the last element
else:
weights = ", ".join(weights)
w_sym = self.weights_symbol
code = f"const {data_type_string} {w_sym.name} [] = {{{ weights }}};\n"
super(LbmWeightInfo, self).__init__(code, symbols_read=set(), symbols_defined={w_sym})
def weight_of_direction(self, dir_idx, lb_method=None):
if isinstance(sp.sympify(dir_idx), sp.Integer):
return lb_method.weights[dir_idx].evalf(17)
else:
return sp.IndexedBase(self.weights_symbol, shape=(1,))[dir_idx]
# end class LbmWeightInfo
def create_lattice_boltzmann_boundary_kernel(pdf_field, index_field, lb_method, boundary_functor,
prev_timestep=Timestep.BOTH, streaming_pattern='pull',
target=Target.CPU, **kernel_creation_args):
target=Target.CPU, force_vector=None, **kernel_creation_args):
indexing = BetweenTimestepsIndexing(
pdf_field, lb_method.stencil, prev_timestep, streaming_pattern, np.int32, np.int32)
dim = lb_method.stencil.D
f_out, f_in = indexing.proxy_fields
dir_symbol = indexing.dir_symbol
inv_dir = indexing.inverse_dir_symbol
boundary_assignments = boundary_functor(f_out, f_in, dir_symbol, inv_dir, lb_method, index_field)
config = CreateKernelConfig(target=target, default_number_int="int32",
skip_independence_check=True, **kernel_creation_args)
default_data_type = config.data_type.default_factory()
if force_vector is None:
force_vector_type = np.dtype([(f"F_{i}", default_data_type.c_name) for i in range(dim)], align=True)
force_vector = Field.create_generic('force_vector', spatial_dimensions=1,
dtype=force_vector_type, field_type=FieldType.INDEXED)
config = replace(config, index_fields=[index_field, force_vector])
boundary_assignments = boundary_functor(f_out, f_in, dir_symbol, inv_dir, lb_method, index_field, force_vector)
boundary_assignments = indexing.substitute_proxies(boundary_assignments)
# Code Elements inside the loop
if pdf_field.dtype != default_data_type:
boundary_assignments = add_subexpressions_for_field_reads(boundary_assignments, data_type=default_data_type)
elements = [Assignment(dir_symbol, index_field[0]('dir'))]
elements += boundary_assignments.all_assignments
config = CreateKernelConfig(index_fields=[index_field], target=target, default_number_int="int32",
skip_independence_check=True, **kernel_creation_args)
kernel = create_kernel(elements, config=config)
# Code Elements ahead of the loop
......
import sympy as sp
from abc import ABC, abstractmethod
from pystencils import Assignment
class WallFunctionModel(ABC):
def __init__(self, name):
self._name = name
@abstractmethod
def shear_stress_assignments(self, density_symbol: sp.Symbol, shear_stress_symbol: sp.Symbol,
velocity_symbol: sp.Symbol, wall_distance, u_tau_target):
"""
Computes a symbolic representation for the log law.
Args:
density_symbol: symbol density, should be provided by the LB method's conserved quantity computation
shear_stress_symbol: symbolic wall shear stress to which the calculated shear stress will be assigned
velocity_symbol: symbolic velocity that is taken as a reference in the wall functions
wall_distance: distance to the wall, equals to 0.5 in standard cell-centered LBM
u_tau_target: in implicit wall functions, a target friction velocity can be provided which will be used as
initial guess in the Newton iteration. This target friction velocity can be obtained, e.g.,
from the target friction Reynolds number
"""
pass
# end class WallFunctionModel
class ExplicitWallFunctionModel(WallFunctionModel, ABC):
"""
Abstract base class for explicit wall functions that can be solved directly for the wall shear stress.
"""
def __init__(self, name):
super(ExplicitWallFunctionModel, self).__init__(name=name)
class MoninObukhovSimilarityTheory(ExplicitWallFunctionModel):
def __init__(self, z0, kappa=0.41, phi=0, name="MOST"):
self.z0 = z0
self.kappa = kappa
self.phi = phi
super(MoninObukhovSimilarityTheory, self).__init__(name=name)
def shear_stress_assignments(self, density_symbol: sp.Symbol, shear_stress_symbol: sp.Symbol,
velocity_symbol: sp.Symbol, wall_distance, u_tau_target=None):
u_tau = velocity_symbol * self.kappa / sp.ln(wall_distance / self.z0 + self.phi)
return [Assignment(shear_stress_symbol, u_tau ** 2 * density_symbol)]
class ImplicitWallFunctionModel(WallFunctionModel, ABC):
"""
Abstract base class for implicit wall functions that require a Newton procedure to solve for the wall shear stress.
"""
def __init__(self, name, newton_steps, viscosity):
self.newton_steps = newton_steps
self.u_tau = sp.symbols(f"wall_function_u_tau_:{self.newton_steps + 1}")
self.delta = sp.symbols(f"wall_function_delta_:{self.newton_steps}")
self.viscosity = viscosity
super(ImplicitWallFunctionModel, self).__init__(name=name)
def newton_iteration(self, wall_law):
m = -wall_law / wall_law.diff(self.u_tau[0])
assignments = []
for i in range(self.newton_steps):
assignments.append(Assignment(self.delta[i], m.subs({self.u_tau[0]: self.u_tau[i]})))
assignments.append(Assignment(self.u_tau[i + 1], self.u_tau[i] + self.delta[i]))
return assignments
class LogLaw(ImplicitWallFunctionModel):
"""
Analytical model for the velocity profile inside the boundary layer, obtained from the mean velocity gradient.
Only valid in the log-law region.
"""
def __init__(self, viscosity, newton_steps=5, kappa=0.41, b=5.2, name="LogLaw"):
self.kappa = kappa
self.b = b
super(LogLaw, self).__init__(name=name, newton_steps=newton_steps, viscosity=viscosity)
def shear_stress_assignments(self, density_symbol: sp.Symbol, shear_stress_symbol: sp.Symbol,
velocity_symbol: sp.Symbol, wall_distance, u_tau_target=None):
def law(u_p, y_p):
return 1 / self.kappa * sp.ln(y_p) + self.b - u_p
u_plus = velocity_symbol / self.u_tau[0]
y_plus = (wall_distance * self.u_tau[0]) / self.viscosity
u_tau_init = u_tau_target if u_tau_target else velocity_symbol / sp.Float(100)
wall_law = law(u_plus, y_plus)
assignments = [Assignment(self.u_tau[0], u_tau_init), # initial guess
*self.newton_iteration(wall_law), # newton iterations
Assignment(shear_stress_symbol, self.u_tau[-1] ** 2 * density_symbol)] # final result
return assignments
class SpaldingsLaw(ImplicitWallFunctionModel):
"""
Single formula for the velocity profile inside the boundary layer, proposed by Spalding :cite:`spalding1961`.
Valid in the inner and the outer layer.
"""
def __init__(self, viscosity, newton_steps=5, kappa=0.41, b=5.5, name="Spalding"):
self.kappa = kappa
self.b = b
super(SpaldingsLaw, self).__init__(name=name, newton_steps=newton_steps, viscosity=viscosity)
def shear_stress_assignments(self, density_symbol: sp.Symbol, shear_stress_symbol: sp.Symbol,
velocity_symbol: sp.Symbol, wall_distance, u_tau_target=None):
def law(u_p, y_p):
k_times_u = self.kappa * u_p
frac_1 = (k_times_u ** 2) / sp.Float(2)
frac_2 = (k_times_u ** 3) / sp.Float(6)
return (u_p + sp.exp(-self.kappa * self.b) * (sp.exp(k_times_u) - sp.Float(1) - k_times_u - frac_1 - frac_2)
- y_p)
u_plus = velocity_symbol / self.u_tau[0]
y_plus = (wall_distance * self.u_tau[0]) / self.viscosity
u_tau_init = u_tau_target if u_tau_target else velocity_symbol / sp.Float(100)
wall_law = law(u_plus, y_plus)
assignments = [Assignment(self.u_tau[0], u_tau_init), # initial guess
*self.newton_iteration(wall_law), # newton iterations
Assignment(shear_stress_symbol, self.u_tau[-1] ** 2 * density_symbol)] # final result
return assignments
class MuskerLaw(ImplicitWallFunctionModel):
"""
Quasi-analytical model for the velocity profile inside the boundary layer, proposed by Musker. Valid in the inner
and the outer layer.
Formulation taken from :cite:`malaspinas2015`, Equation (59).
"""
def __init__(self, viscosity, newton_steps=5, name="Musker"):
super(MuskerLaw, self).__init__(name=name, newton_steps=newton_steps, viscosity=viscosity)
def shear_stress_assignments(self, density_symbol: sp.Symbol, shear_stress_symbol: sp.Symbol,
velocity_symbol: sp.Symbol, wall_distance, u_tau_target=None):
def law(u_p, y_p):
arctan = sp.Float(5.424) * sp.atan(sp.Float(0.119760479041916168) * y_p - sp.Float(0.488023952095808383))
logarithm = (sp.Float(0.434) * sp.log((y_p + sp.Float(10.6)) ** sp.Float(9.6)
/ (y_p ** 2 - sp.Float(8.15) * y_p + sp.Float(86)) ** 2))
return (arctan + logarithm - sp.Float(3.50727901936264842)) - u_p
u_plus = velocity_symbol / self.u_tau[0]
y_plus = (wall_distance * self.u_tau[0]) / self.viscosity
u_tau_init = u_tau_target if u_tau_target else velocity_symbol / sp.Float(100)
wall_law = law(u_plus, y_plus)
assignments = [Assignment(self.u_tau[0], u_tau_init), # initial guess
*self.newton_iteration(wall_law), # newton iterations
Assignment(shear_stress_symbol, self.u_tau[-1] ** 2 * density_symbol)] # final result
return assignments
......@@ -370,7 +370,7 @@ def take_moments(eqn, pdf_to_moment_name=(('f', '\\Pi'), ('\\Omega f', '\\Upsilo
if new_f_index is None:
rest *= factor
else:
assert not(new_f_index and f_index)
assert not (new_f_index and f_index)
f_index = new_f_index
moment_tuple = [0] * len(velocity_terms)
......
......@@ -20,21 +20,21 @@ and belongs to pystencils, not lbmpy. This can be found in the pystencils module
of the generated code is specified.
1. *Method*:
the method defines the collision process. Currently there are two big categories:
the method defines the collision process. Currently, there are two big categories:
moment and cumulant based methods. A method defines how each moment or cumulant is relaxed by
storing the equilibrium value and the relaxation rate for each moment/cumulant.
2. *Collision/Update Rule*:
Methods can generate a "collision rule" which is an equation collection that define the
post collision values as a function of the pre-collision values. On these equation collection
simplifications are applied to reduce the number of floating point operations.
At this stage an entropic optimization step can also be added to determine one relaxation rate by an
At this stage an entropic optimisation step can also be added to determine one relaxation rate by an
entropy condition.
Then a streaming rule is added which transforms the collision rule into an update rule.
The streaming step depends on the pdf storage (source/destination, AABB pattern, EsoTwist).
Currently only the simple source/destination pattern is supported.
3. *AST*:
The abstract syntax tree describes the structure of the kernel, including loops and conditionals.
The ast can be modified e.g. to add OpenMP pragmas, reorder loops or apply other optimizations.
The ast can be modified, e.g., to add OpenMP pragmas, reorder loops or apply other optimisations.
4. *Function*:
This step compiles the AST into an executable function, either for CPU or GPUs. This function
behaves like a normal Python function and runs one LBM time step.
......@@ -52,6 +52,8 @@ For example, to modify the AST one can run::
func = create_lb_function(ast=ast, ...)
"""
import copy
from dataclasses import dataclass, field, replace
from typing import Union, List, Tuple, Any, Type, Iterable
from warnings import warn, filterwarnings
......@@ -61,10 +63,11 @@ import pystencils.astnodes
import sympy as sp
import sympy.core.numbers
from lbmpy.enums import Stencil, Method, ForceModel, CollisionSpace
from lbmpy.enums import Stencil, Method, ForceModel, CollisionSpace, SubgridScaleModel
import lbmpy.forcemodels as forcemodels
from lbmpy.fieldaccess import CollideOnlyInplaceAccessor, PdfFieldAccessor, PeriodicTwoFieldsAccessor
from lbmpy.fluctuatinglb import add_fluctuations_to_collision_rule
from lbmpy.partially_saturated_cells import add_psm_to_collision_rule, PSMConfig
from lbmpy.non_newtonian_models import add_cassons_model, CassonsParameters
from lbmpy.methods import (create_mrt_orthogonal, create_mrt_raw, create_central_moment,
create_srt, create_trt, create_trt_kbc)
......@@ -75,11 +78,13 @@ from lbmpy.methods.momentbased.entropic import add_entropy_condition, add_iterat
from lbmpy.relaxationrates import relaxation_rate_from_magic_number
from lbmpy.simplificationfactory import create_simplification_strategy
from lbmpy.stencils import LBStencil
from lbmpy.turbulence_models import add_smagorinsky_model
from lbmpy.turbulence_models import add_sgs_model
from lbmpy.updatekernels import create_lbm_kernel, create_stream_pull_with_output_kernel
from lbmpy.advanced_streaming.utility import Timestep, get_accessor
from pystencils import CreateKernelConfig, create_kernel
from pystencils.astnodes import Conditional, Block
from pystencils.cache import disk_cache_no_fallback
from pystencils.node_collection import NodeCollection
from pystencils.typing import collate_types
from pystencils.field import Field
from pystencils.simp import sympy_cse, SimplificationStrategy
......@@ -87,7 +92,7 @@ from pystencils.simp import sympy_cse, SimplificationStrategy
from lbmpy.methods.abstractlbmethod import LbmCollisionRule, AbstractLbMethod
from lbmpy.methods.cumulantbased import CumulantBasedLbMethod
# Filter out JobLib warnings. They are not usefull for use:
# Filter out JobLib warnings. They are not useful for use:
# https://github.com/joblib/joblib/issues/683
filterwarnings("ignore", message="Persisting input arguments took")
......@@ -99,7 +104,7 @@ class LBMConfig:
"""
stencil: lbmpy.stencils.LBStencil = LBStencil(Stencil.D2Q9)
"""
All stencils are defined in :class:`lbmpy.enums.Stencil`. From that :class:`lbmpy.stencils.LBStenil`
All stencils are defined in :class:`lbmpy.enums.Stencil`. From that :class:`lbmpy.stencils.LBStencil`
class will be created
"""
method: Method = Method.SRT
......@@ -112,13 +117,19 @@ class LBMConfig:
"""
Sequence of relaxation rates, number depends on selected method. If you specify more rates than
method needs, the additional rates are ignored.
If no relaxation rates are specified, the parameter `relaxation_rate` will be consulted.
"""
relaxation_rate: Union[int, float, Type[sp.Symbol]] = None
"""
For SRT, TRT and polynomial cumulant models it is possible to define
a single ``relaxation_rate`` instead of a list (Internally this is converted to a list with a single entry).
The second rate for TRT is then determined via magic number. For the moment, central moment based and the
cumulant model, it sets only the relaxation rate corresponding to shear viscosity, setting all others to unity.
The method's primary relaxation rate. In most cases, this is the relaxation rate governing shear viscosity.
For SRT, this is the only relaxation rate.
For TRT, the second relaxation rate is then determined via magic number.
In the case of raw moment, central moment, and cumulant-based MRT methods, all other relaxation rates will be
set to unity.
If neither `relaxation_rate` nor `relaxation_rates` is specified, the behaviour is as if
`relaxation_rate=sp.Symbol('omega')` was set.
"""
compressible: bool = False
"""
......@@ -134,7 +145,7 @@ class LBMConfig:
"""
delta_equilibrium: bool = None
"""
Determines whether or not the (continuous or discrete, see `continuous_equilibrium`) maxwellian equilibrium is
Determines whether or not the (continuous or discrete, see `continuous_equilibrium`) Maxwellian equilibrium is
expressed in its absolute form, or only by its deviation from the rest state (typically given by the reference
density and zero velocity). This parameter is only effective if `zero_centered` is set to `True`. Then, if
`delta_equilibrium` is `False`, the rest state must be reintroduced to the populations during collision. Otherwise,
......@@ -164,7 +175,7 @@ class LBMConfig:
"""
A list of lists of modes, grouped by common relaxation times. This is usually used in
conjunction with `lbmpy.methods.default_moment_sets.mrt_orthogonal_modes_literature`.
If this argument is not provided, Gram-Schmidt orthogonalization of the default modes is performed.
If this argument is not provided, Gram-Schmidt orthogonalisation of the default modes is performed.
"""
force_model: Union[lbmpy.forcemodels.AbstractForceModel, ForceModel] = None
......@@ -197,6 +208,13 @@ class LBMConfig:
Special correction for D3Q27 cumulant LBMs. For Details see
:mod:`lbmpy.methods.cumulantbased.galilean_correction`
"""
fourth_order_correction: Union[float, bool] = False
"""
Special correction for rendering D3Q27 cumulant LBMs fourth-order accurate in diffusion. For Details see
:mod:`lbmpy.methods.cumulantbased.fourth_order_correction`. If set to `True`, the fourth-order correction is
employed without limiters (or more precisely with a very high limiter, practically disabling the limiters). If this
variable is set to a number, the latter is used for the limiters (uniformly for omega_3, omega_4 and omega_5).
"""
collision_space_info: CollisionSpaceInfo = None
"""
Information about the LB method's collision space (see :class:`lbmpy.methods.creationfunctions.CollisionSpaceInfo`)
......@@ -221,12 +239,17 @@ class LBMConfig:
omega_output_field: Field = None
"""
A pystencils Field can be passed here, where the calculated free relaxation rate of
an entropic or Smagorinsky method is written to
an entropic or subgrid-scale method is written to
"""
smagorinsky: Union[float, bool] = False
eddy_viscosity_field: Field = None
"""
set to Smagorinsky constant to activate turbulence model, ``omega_output_field`` can be set to
write out adapted relaxation rates. If set to `True`, 0.12 is used as default smagorinsky constant.
A pystencils Field can be passed here, where the eddy-viscosity of a subgrid-scale model is written.
"""
subgrid_scale_model: Union[SubgridScaleModel, tuple[SubgridScaleModel, float], tuple[SubgridScaleModel, int]] = None
"""
Choose a subgrid-scale model (SGS) for large-eddy simulations. ``omega_output_field`` can be set to
write out adapted relaxation rates. Either provide just the SGS and use the default model constants or provide a
tuple of the SGS and its corresponding model constant.
"""
cassons: CassonsParameters = False
"""
......@@ -244,6 +267,12 @@ class LBMConfig:
Temperature for fluctuating lattice Boltzmann methods.
"""
psm_config: PSMConfig = None
"""
If a PSM config is specified, (1 - fractionField) is added to the relaxation rates of the collision
and to the potential force term, and a solid collision is build and added to the main assignments.
"""
output: dict = field(default_factory=dict)
"""
A dictionary mapping macroscopic quantites e.g. the strings 'density' and 'velocity' to pystencils
......@@ -260,6 +289,11 @@ class LBMConfig:
Symbolic field where the density is read from. If `None` is given the density is calculated inplace from
with zeroth order moment.
"""
conserved_moments: bool = True
"""
If lower order moments are conserved or not. If velocity or density input is set the lower order moments are not
conserved anymore.
"""
kernel_type: Union[str, Type[PdfFieldAccessor]] = 'default_stream_collide'
"""
......@@ -325,9 +359,11 @@ class LBMConfig:
self.stencil = LBStencil(self.stencil)
if self.relaxation_rates is None:
self.relaxation_rates = [sp.Symbol("omega")] * self.stencil.Q
# Fall back to regularized method
if self.relaxation_rate is None:
self.relaxation_rate = sp.Symbol("omega")
# if only a single relaxation rate is defined (which makes sense for SRT or TRT methods)
# if only a single relaxation rate is defined,
# it is internally treated as a list with one element and just sets the relaxation_rates parameter
if self.relaxation_rate is not None:
if self.method in [Method.TRT, Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4]:
......@@ -340,8 +376,8 @@ class LBMConfig:
if not self.compressible and self.method in (Method.MONOMIAL_CUMULANT, Method.CUMULANT):
raise ValueError("Incompressible cumulant-based methods are not supported (yet).")
if self.zero_centered and (self.entropic or self.fluctuating):
raise ValueError("Entropic and fluctuating methods can only be created with `zero_centered=False`.")
if self.zero_centered and self.entropic:
raise ValueError("Entropic methods can only be created with `zero_centered=False`.")
# Check or infer delta-equilibrium
if self.delta_equilibrium is not None:
......@@ -412,7 +448,10 @@ class LBMConfig:
force_not_zero = True
if self.force_model is None and force_not_zero:
self.force_model = forcemodels.Guo(self.force[:self.stencil.D])
if self.method == Method.CUMULANT:
self.force_model = forcemodels.CentralMoment(self.force[:self.stencil.D])
else:
self.force_model = forcemodels.Guo(self.force[:self.stencil.D])
force_model_dict = {
'simple': forcemodels.Simple,
......@@ -424,9 +463,13 @@ class LBMConfig:
'edm': forcemodels.EDM,
'kupershtokh': forcemodels.EDM,
'he': forcemodels.He,
'shanchen': forcemodels.ShanChen
'shanchen': forcemodels.ShanChen,
'centralmoment': forcemodels.CentralMoment
}
if self.psm_config is not None and self.psm_config.fraction_field is not None:
self.force = [(1.0 - self.psm_config.fraction_field.center) * f for f in self.force]
if isinstance(self.force_model, str):
new_force_model = ForceModel[self.force_model.upper()]
warn(f'ForceModel "{self.force_model}" as str is deprecated. Use {new_force_model} instead or '
......@@ -437,6 +480,9 @@ class LBMConfig:
force_model_class = force_model_dict[self.force_model.name.lower()]
self.force_model = force_model_class(force=self.force[:self.stencil.D])
if self.density_input or self.velocity_input:
self.conserved_moments = False
@dataclass
class LBMOptimisation:
......@@ -638,13 +684,31 @@ def create_lb_collision_rule(lb_method=None, lbm_config=None, lbm_optimisation=N
else:
collision_rule = lb_method.get_collision_rule(pre_simplification=pre_simplification)
if lbm_config.psm_config is not None:
if lbm_config.psm_config.fraction_field is None or lbm_config.psm_config.object_velocity_field is None:
raise ValueError("Specify a fraction and object velocity field in the PSM Config")
collision_rule = add_psm_to_collision_rule(collision_rule, lbm_config.psm_config)
if lbm_config.galilean_correction:
from lbmpy.methods.cumulantbased import add_galilean_correction
collision_rule = add_galilean_correction(collision_rule)
if lbm_config.fourth_order_correction:
from lbmpy.methods.cumulantbased import add_fourth_order_correction
# must provide a second relaxation rate in implementation; defaults to 1
if len(lbm_config.relaxation_rates) == 1:
lbm_config.relaxation_rates.append(1)
cumulant_limiter = 1e6 if lbm_config.fourth_order_correction is True else lbm_config.fourth_order_correction
collision_rule = add_fourth_order_correction(collision_rule=collision_rule,
shear_relaxation_rate=lbm_config.relaxation_rates[0],
bulk_relaxation_rate=lbm_config.relaxation_rates[1],
limiter=cumulant_limiter)
if lbm_config.entropic:
if lbm_config.smagorinsky or lbm_config.cassons:
raise ValueError("Choose either entropic, smagorinsky or cassons")
if lbm_config.subgrid_scale_model or lbm_config.cassons:
raise ValueError("Choose either entropic, subgrid-scale or cassons")
if lbm_config.entropic_newton_iterations:
if isinstance(lbm_config.entropic_newton_iterations, bool):
iterations = 3
......@@ -654,14 +718,24 @@ def create_lb_collision_rule(lb_method=None, lbm_config=None, lbm_optimisation=N
omega_output_field=lbm_config.omega_output_field)
else:
collision_rule = add_entropy_condition(collision_rule, omega_output_field=lbm_config.omega_output_field)
elif lbm_config.smagorinsky:
elif lbm_config.subgrid_scale_model:
if lbm_config.cassons:
raise ValueError("Cassons model can not be combined with Smagorinsky model")
smagorinsky_constant = 0.12 if lbm_config.smagorinsky is True else lbm_config.smagorinsky
collision_rule = add_smagorinsky_model(collision_rule, smagorinsky_constant,
omega_output_field=lbm_config.omega_output_field)
if 'split_groups' in collision_rule.simplification_hints:
collision_rule.simplification_hints['split_groups'][0].append(sp.Symbol("smagorinsky_omega"))
raise ValueError("Cassons model can not be combined with a subgrid-scale model")
model_constant = None
sgs_model = lbm_config.subgrid_scale_model
if isinstance(lbm_config.subgrid_scale_model, tuple):
sgs_model = lbm_config.subgrid_scale_model[0]
model_constant = lbm_config.subgrid_scale_model[1]
collision_rule = add_sgs_model(collision_rule=collision_rule, subgrid_scale_model=sgs_model,
model_constant=model_constant, omega_output_field=lbm_config.omega_output_field,
eddy_viscosity_field=lbm_config.eddy_viscosity_field)
if 'split_groups' in collision_rule.simplification_hints:
collision_rule.simplification_hints['split_groups'][0].append(sp.Symbol("sgs_omega"))
elif lbm_config.cassons:
collision_rule = add_cassons_model(collision_rule, parameter=lbm_config.cassons,
......@@ -706,6 +780,11 @@ def create_lb_method(lbm_config=None, **params):
if isinstance(lbm_config.force, Field):
lbm_config.force = tuple(lbm_config.force(i) for i in range(dim))
if lbm_config.psm_config is None:
fraction_field = None
else:
fraction_field = lbm_config.psm_config.fraction_field
common_params = {
'compressible': lbm_config.compressible,
'zero_centered': lbm_config.zero_centered,
......@@ -715,6 +794,7 @@ def create_lb_method(lbm_config=None, **params):
'continuous_equilibrium': lbm_config.continuous_equilibrium,
'c_s_sq': lbm_config.c_s_sq,
'collision_space_info': lbm_config.collision_space_info,
'fraction_field': fraction_field,
}
cumulant_params = {
......@@ -722,6 +802,7 @@ def create_lb_method(lbm_config=None, **params):
'force_model': lbm_config.force_model,
'c_s_sq': lbm_config.c_s_sq,
'collision_space_info': lbm_config.collision_space_info,
'fraction_field': fraction_field,
}
if lbm_config.method == Method.SRT:
......@@ -732,12 +813,15 @@ def create_lb_method(lbm_config=None, **params):
method = create_trt(lbm_config.stencil, relaxation_rates[0], relaxation_rates[1], **common_params)
elif lbm_config.method == Method.MRT:
method = create_mrt_orthogonal(lbm_config.stencil, relaxation_rates, weighted=lbm_config.weighted,
nested_moments=lbm_config.nested_moments, **common_params)
nested_moments=lbm_config.nested_moments,
conserved_moments=lbm_config.conserved_moments, **common_params)
elif lbm_config.method == Method.CENTRAL_MOMENT:
method = create_central_moment(lbm_config.stencil, relaxation_rates,
nested_moments=lbm_config.nested_moments, **common_params)
nested_moments=lbm_config.nested_moments,
conserved_moments=lbm_config.conserved_moments, **common_params)
elif lbm_config.method == Method.MRT_RAW:
method = create_mrt_raw(lbm_config.stencil, relaxation_rates, **common_params)
method = create_mrt_raw(lbm_config.stencil, relaxation_rates,
conserved_moments=lbm_config.conserved_moments, **common_params)
elif lbm_config.method in (Method.TRT_KBC_N1, Method.TRT_KBC_N2, Method.TRT_KBC_N3, Method.TRT_KBC_N4):
if lbm_config.stencil.D == 2 and lbm_config.stencil.Q == 9:
dim = 2
......@@ -748,13 +832,28 @@ def create_lb_method(lbm_config=None, **params):
method_nr = lbm_config.method.name[-1]
method = create_trt_kbc(dim, relaxation_rates[0], relaxation_rates[1], 'KBC-N' + method_nr, **common_params)
elif lbm_config.method == Method.CUMULANT:
if lbm_config.fourth_order_correction:
if lbm_config.stencil.D != 3 and lbm_config.stencil.Q != 27:
raise ValueError("Fourth-order correction can only be applied to D3Q27 cumulant methods.")
assert len(relaxation_rates) <= 2, "Optimal parametrisation for fourth-order cumulants needs either one " \
"or two relaxation rates, associated with the shear (and bulk) " \
"viscosity. All other relaxation rates are automatically chosen " \
"optimally"
# define method in terms of symbolic relaxation rates and assign optimal values later
from lbmpy.methods.cumulantbased.fourth_order_correction import FOURTH_ORDER_RELAXATION_RATE_SYMBOLS
relaxation_rates = FOURTH_ORDER_RELAXATION_RATE_SYMBOLS
if lbm_config.nested_moments is not None:
method = create_cumulant(
lbm_config.stencil, relaxation_rates, lbm_config.nested_moments, **cumulant_params)
method = create_cumulant(lbm_config.stencil, relaxation_rates, lbm_config.nested_moments,
conserved_moments=lbm_config.conserved_moments, **cumulant_params)
else:
method = create_with_default_polynomial_cumulants(lbm_config.stencil, relaxation_rates, **cumulant_params)
elif lbm_config.method == Method.MONOMIAL_CUMULANT:
method = create_with_monomial_cumulants(lbm_config.stencil, relaxation_rates, **cumulant_params)
method = create_with_monomial_cumulants(lbm_config.stencil, relaxation_rates,
conserved_moments=lbm_config.conserved_moments, **cumulant_params)
else:
raise ValueError("Failed to create LB method. Please use lbmpy.enums.Method for the creation")
......@@ -769,6 +868,54 @@ def create_lb_method(lbm_config=None, **params):
return method
def create_psm_update_rule(lbm_config, lbm_optimisation):
node_collection = []
# Use regular lb update rule for no overlapping particles
config_without_psm = copy.deepcopy(lbm_config)
config_without_psm.psm_config = None
# TODO: the force is still multiplied by (1.0 - self.psm_config.fraction_field.center)
# (should not harm if memory bound since self.psm_config.fraction_field.center should always be 0.0)
lb_update_rule = create_lb_update_rule(
lbm_config=config_without_psm, lbm_optimisation=lbm_optimisation
)
node_collection.append(
Conditional(
lbm_config.psm_config.fraction_field.center(0) <= 0.0,
Block(lb_update_rule.all_assignments),
)
)
# Only one particle, i.e., no individual_fraction_field is provided
if lbm_config.psm_config.individual_fraction_field is None:
assert lbm_config.psm_config.MaxParticlesPerCell == 1
psm_update_rule = create_lb_update_rule(
lbm_config=lbm_config, lbm_optimisation=lbm_optimisation
)
node_collection.append(
Conditional(
lbm_config.psm_config.fraction_field.center(0) > 0.0,
Block(psm_update_rule.all_assignments),
)
)
else:
for p in range(lbm_config.psm_config.MaxParticlesPerCell):
# Add psm update rule for p overlapping particles
config_with_p_particles = copy.deepcopy(lbm_config)
config_with_p_particles.psm_config.MaxParticlesPerCell = p + 1
psm_update_rule = create_lb_update_rule(
lbm_config=config_with_p_particles, lbm_optimisation=lbm_optimisation
)
node_collection.append(
Conditional(
lbm_config.psm_config.individual_fraction_field.center(p) > 0.0,
Block(psm_update_rule.all_assignments),
)
)
return NodeCollection(node_collection)
# ----------------------------------------------------------------------------------------------------------------------
def update_with_default_parameters(params, opt_params=None, lbm_config=None, lbm_optimisation=None, config=None):
# Fix CreateKernelConfig params
......
File moved