Skip to content
Snippets Groups Projects
Commit 7a48b9d6 authored by Martin Bauer's avatar Martin Bauer
Browse files

Documentation & boundary helpers for lbmpy

- documentation for lbmpy creationfunctions
- new tutorial
- boundary handling: various helpers to setup geometry
parent 1e3db506
No related branches found
No related tags found
No related merge requests found
......@@ -5,13 +5,20 @@ from lbmpy.stencils import getStencil
from pystencils import TypedSymbol, Field
from pystencils.backends.cbackend import CustomCppCode
from lbmpy.boundaries.createindexlist import createBoundaryIndexList
from pystencils.slicing import normalizeSlice
from pystencils.slicing import normalizeSlice, makeSlice
INV_DIR_SYMBOL = TypedSymbol("invDir", "int")
WEIGHTS_SYMBOL = TypedSymbol("weights", "double")
class BoundaryHandling(object):
class BoundaryInfo(object):
def __init__(self, name, flag, function, kernel):
self.name = name
self.flag = flag
self.function = function
self.kernel = kernel
def __init__(self, pdfField, domainShape, lbMethod, ghostLayers=1, target='cpu'):
"""
Class for managing boundary kernels
......@@ -32,13 +39,16 @@ class BoundaryHandling(object):
self._symbolicPdfField = symbolicPdfField
self._shapeWithGhostLayers = [d + 2 * ghostLayers for d in domainShape]
self._fluidFlag = 2 ** 30
self._fluidFlag = 2 ** 0
self.flagField = np.full(self._shapeWithGhostLayers, self._fluidFlag, dtype=np.int32)
self._ghostLayers = ghostLayers
self._lbMethod = lbMethod
self._boundaryFunctions = []
self._nameToIndex = {}
self._boundarySweeps = []
self._boundaryInfos = []
self._nameToBoundary = {}
self._periodicityKernels = []
self._dirty = False
self._periodicity = [False, False, False]
self._target = target
if target not in ('cpu', 'gpu'):
......@@ -46,21 +56,45 @@ class BoundaryHandling(object):
@property
def periodicity(self):
"""List that indicates for x,y (z) coordinate if domain is periodic in that direction"""
return self._periodicity
@property
def fluidFlag(self):
"""Flag that is set where the lattice Boltzmann update should happen"""
return self._fluidFlag
def getFlag(self, name):
"""Flag that represents the boundary with given name. Raises KeyError if no such boundary exists."""
return self._nameToBoundary[name].flag
def getBoundaryNames(self):
"""List of names of all registered boundary conditions"""
return [b.name for b in self._boundaryInfos]
def setPeriodicity(self, x=False, y=False, z=False):
"""Enable periodic boundary conditions at the border of the domain"""
self._periodicity = [x, y, z]
self.invalidateIndexCache()
self._compilePeriodicityKernels()
def hasBoundary(self, name):
"""Returns boolean indicating if a boundary with that name exists"""
return name in self._nameToBoundary
def setBoundary(self, function, indexExpr, maskArr=None, name=None):
def setBoundary(self, function, indexExpr=None, maskArr=None, name=None):
"""
Sets boundary in a rectangular region (slice)
:param function: boundary
:param function: boundary function or the string 'fluid' to remove boundary conditions
:param indexExpr: slice expression, where boundary should be set, see :mod:`pystencils.slicing`
:param maskArr: optional boolean (masked) array specifying where the boundary should be set
:param name: name of the boundary
"""
if indexExpr is None:
indexExpr = [slice(None, None, None)] * len(self.flagField.shape)
if function == 'fluid':
flag = self._fluidFlag
else:
if name is None:
if hasattr(function, '__name__'):
name = function.__name__
......@@ -70,10 +104,11 @@ class BoundaryHandling(object):
raise ValueError("Boundary function has to have a '__name__' or 'name' attribute "
"if name is not specified")
if function not in self._boundaryFunctions:
if not self.hasBoundary(name):
self.addBoundary(function, name)
flag = self.getFlag(name)
assert flag != self._fluidFlag
indexExpr = normalizeSlice(indexExpr, self._shapeWithGhostLayers)
if maskArr is None:
......@@ -81,8 +116,7 @@ class BoundaryHandling(object):
else:
flagFieldView = self.flagField[indexExpr]
flagFieldView[maskArr] = flag
self.invalidateIndexCache()
self._dirty = True
def addBoundary(self, boundaryFunction, name=None):
"""
......@@ -97,45 +131,69 @@ class BoundaryHandling(object):
if name is None:
name = boundaryFunction.__name__
if name in self._nameToIndex:
return 2 ** self._nameToIndex[name]
if self.hasBoundary(name):
return self._boundaryInfos[name].flag
newIdx = len(self._boundaryFunctions)
self._nameToIndex[name] = newIdx
self._boundaryFunctions.append(boundaryFunction)
return 2 ** newIdx
newIdx = len(self._boundaryInfos) + 1 # +1 because 2**0 is reserved for fluid flag
boundaryInfo = self.BoundaryInfo(name, 2 ** newIdx, boundaryFunction, None)
self._boundaryInfos.append(boundaryInfo)
self._nameToBoundary[name] = boundaryInfo
self._dirty = True
return boundaryInfo.flag
def invalidateIndexCache(self):
self._boundarySweeps = []
def indices(self, dx=1.0, includeGhostLayers=False):
if not includeGhostLayers:
params = [np.arange(start=-1, stop=s-1) * dx for s in self.flagField.shape]
else:
params = [np.arange(s) * dx for s in self.flagField.shape]
return np.meshgrid(*params, indexing='ij')
def __call__(self, **kwargs):
"""Run the boundary handling, all keyword args are passed through to the boundary sweeps"""
if self._dirty:
self.prepare()
for boundary in self._boundaryInfos:
boundary.kernel(**kwargs)
for k in self._periodicityKernels:
k(**kwargs)
def clear(self):
"""Removes all boundaries and fills the domain with fluid"""
np.fill(self._fluidFlag)
self.invalidateIndexCache()
def getFlag(self, name):
return 2 ** self._nameToIndex[name]
self._dirty = False
self._boundaryInfos = []
self._nameToBoundary = {}
def prepare(self):
self.invalidateIndexCache()
for boundaryIdx, boundaryFunc in enumerate(self._boundaryFunctions):
"""Fills data structures to speed up the boundary handling and compiles all boundary kernels.
This is automatically done when first called. With this function this can be triggered before running."""
for boundary in self._boundaryInfos:
assert boundary.flag != self._fluidFlag
idxField = createBoundaryIndexList(self.flagField, self._lbMethod.stencil,
2 ** boundaryIdx, self._fluidFlag, self._ghostLayers)
ast = generateBoundaryHandling(self._symbolicPdfField, idxField, self._lbMethod, boundaryFunc,
boundary.flag, self._fluidFlag, self._ghostLayers)
ast = generateBoundaryHandling(self._symbolicPdfField, idxField, self._lbMethod, boundary.function,
target=self._target)
if self._target == 'cpu':
from pystencils.cpu import makePythonFunction as makePythonCpuFunction
self._boundarySweeps.append(makePythonCpuFunction(ast, {'indexField': idxField}))
boundary.kernel = makePythonCpuFunction(ast, {'indexField': idxField})
elif self._target == 'gpu':
from pystencils.gpucuda import makePythonFunction as makePythonGpuFunction
import pycuda.gpuarray as gpuarray
idxGpuField = gpuarray.to_gpu(idxField)
self._boundarySweeps.append(makePythonGpuFunction(ast, {'indexField': idxGpuField}))
boundary.kernel = makePythonGpuFunction(ast, {'indexField': idxGpuField})
else:
assert False
self._addPeriodicityHandlers()
self._dirty = False
def _addPeriodicityHandlers(self):
def invalidateIndexCache(self):
"""Invalidates the cache for optimization data structures. When setting boundaries the cache is automatically
invalidated, so there is no need to call this function manually, as long as the flag field is not manually
modified."""
self._dirty = True
def _compilePeriodicityKernels(self):
self._periodicityKernels = []
dim = len(self.flagField.shape)
if dim == 2:
stencil = getStencil("D2Q9")
......@@ -158,22 +216,16 @@ class BoundaryHandling(object):
if len(filteredStencil) > 0:
if self._target == 'cpu':
from pystencils.slicing import getPeriodicBoundaryFunctor
self._boundarySweeps.append(getPeriodicBoundaryFunctor(filteredStencil, ghostLayers=1))
self._periodicityKernels.append(getPeriodicBoundaryFunctor(filteredStencil, ghostLayers=1))
elif self._target == 'gpu':
from pystencils.gpucuda.periodicity import getPeriodicBoundaryFunctor
self._boundarySweeps.append(getPeriodicBoundaryFunctor(filteredStencil, self.flagField.shape,
self._periodicityKernels.append(getPeriodicBoundaryFunctor(filteredStencil, self.flagField.shape,
indexDimensions=1,
indexDimShape=len(self._lbMethod.stencil),
ghostLayers=1))
else:
assert False
def __call__(self, **kwargs):
if len(self._boundarySweeps) == 0:
self.prepare()
for boundarySweep in self._boundarySweeps:
boundarySweep(**kwargs)
# -------------------------------------- Helper Functions --------------------------------------------------------------
......
......@@ -2,42 +2,6 @@ r"""
Creating LBM kernels
====================
Terminology and creation pipeline
---------------------------------
Kernel functions are created in three steps:
1. *Method*:
the method defines the collision process. Currently there are two big categories:
moment and cumulant based methods. A method defines how each moment or cumulant is relaxed by
storing the equilibrium value and the relaxation rate for each moment/cumulant.
2. *Collision/Update Rule*:
Methods can generate a "collision rule" which is an equation collection that define the
post collision values as a function of the pre-collision values. On these equation collection
simplifications are applied to reduce the number of floating point operations.
At this stage an entropic optimization step can also be added to determine one relaxation rate by an
entropy condition.
Then a streaming rule is added which transforms the collision rule into an update rule.
The streaming step depends on the pdf storage (source/destination, AABB pattern, EsoTwist).
Currently only the simple source/destination pattern is supported.
3. *AST*:
The abstract syntax tree describes the structure of the kernel, including loops and conditionals.
The ast can be modified e.g. to add OpenMP pragmas, reorder loops or apply other optimizations.
4. *Function*:
This step compiles the AST into an executable function, either for CPU or GPUs. This function
behaves like a normal Python function and runs one LBM time step.
The function :func:`createLatticeBoltzmannFunction` runs the whole pipeline, the other functions in this module
execute this pipeline only up to a certain step. Each function optionally also takes the result of the previous step.
For example, to modify the AST one can run::
ast = createLatticeBoltzmannAst(...)
# modify ast here
func = createLatticeBoltzmannFunction(ast=ast, ...)
Parameters
----------
......@@ -69,7 +33,8 @@ General:
yet, only the relaxation pattern. To get the entropic method, see parameters below!
(:func:`lbmpy.methods.createKBCTypeTRT`)
- ``relaxationRates``: sequence of relaxation rates, number depends on selected method. If you specify more rates than
method needs, the additional rates are ignored.
method needs, the additional rates are ignored. For SRT and TRT models it is possible ot define a single
``relaxationRate`` instead of a list, the second rate for TRT is then determined via magic number.
- ``compressible=False``: affects the selection of equilibrium moments. Both options approximate the *incompressible*
Navier Stokes Equations. However when chosen as False, the approximation is better, the standard LBM derivation is
compressible.
......@@ -130,6 +95,44 @@ Other:
specifying the number of threads. If True is specified OpenMP chooses the number of threads
- ``doublePrecision=True``: by default simulations run with double precision floating point numbers, by setting this
parameter to False, single precision is used, which is much faster, especially on GPUs
Terminology and creation pipeline
---------------------------------
Kernel functions are created in three steps:
1. *Method*:
the method defines the collision process. Currently there are two big categories:
moment and cumulant based methods. A method defines how each moment or cumulant is relaxed by
storing the equilibrium value and the relaxation rate for each moment/cumulant.
2. *Collision/Update Rule*:
Methods can generate a "collision rule" which is an equation collection that define the
post collision values as a function of the pre-collision values. On these equation collection
simplifications are applied to reduce the number of floating point operations.
At this stage an entropic optimization step can also be added to determine one relaxation rate by an
entropy condition.
Then a streaming rule is added which transforms the collision rule into an update rule.
The streaming step depends on the pdf storage (source/destination, AABB pattern, EsoTwist).
Currently only the simple source/destination pattern is supported.
3. *AST*:
The abstract syntax tree describes the structure of the kernel, including loops and conditionals.
The ast can be modified e.g. to add OpenMP pragmas, reorder loops or apply other optimizations.
4. *Function*:
This step compiles the AST into an executable function, either for CPU or GPUs. This function
behaves like a normal Python function and runs one LBM time step.
The function :func:`createLatticeBoltzmannFunction` runs the whole pipeline, the other functions in this module
execute this pipeline only up to a certain step. Each function optionally also takes the result of the previous step.
For example, to modify the AST one can run::
ast = createLatticeBoltzmannAst(...)
# modify ast here
func = createLatticeBoltzmannFunction(ast=ast, ...)
"""
import sympy as sp
from copy import copy
......@@ -138,6 +141,7 @@ from functools import partial
from lbmpy.methods import createSRT, createTRT, createOrthogonalMRT, createKBCTypeTRT, \
createRawMRT, createThreeRelaxationRateMRT
from lbmpy.methods.entropic import addIterativeEntropyCondition, addEntropyCondition
from lbmpy.methods.relaxationrates import relaxationRateFromMagicNumber
from lbmpy.stencils import getStencil
import lbmpy.forcemodels as forceModels
from lbmpy.simplificationfactory import createSimplificationStrategy
......@@ -179,6 +183,12 @@ def updateWithDefaultParameters(params, optParams):
'gpuIndexing': 'block',
'gpuIndexingParams': {},
}
if 'relaxationRate' in params:
if 'relaxationRates' not in params:
params['relaxationRates'] = [params['relaxationRate'],
relaxationRateFromMagicNumber(params['relaxationRate'])]
del params['relaxationRate']
unknownParams = [k for k in params.keys() if k not in defaultMethodDescription]
unknownOptParams = [k for k in optParams.keys() if k not in defaultOptimizationDescription]
if unknownParams:
......
......@@ -2,11 +2,27 @@ from matplotlib.pyplot import *
def vectorField(field, step=2, **kwargs):
"""
Plot given vector field as quiver (arrow) plot.
:param field: numpy array with 3 dimensions, first two are spatial x,y coordinate, the last
coordinate should have shape 2 and stores the 2 velocity components
:param step: plots only every steps's cell
:param kwargs: keyword arguments passed to :func:`matplotlib.pyplot.quiver`
"""
veln = field.swapaxes(0, 1)
quiver(veln[::step, ::step, 0], veln[::step, ::step, 1], **kwargs)
res = quiver(veln[::step, ::step, 0], veln[::step, ::step, 1], **kwargs)
axis('equal')
return res
def vectorFieldMagnitude(field, **kwargs):
"""
Plots the magnitude of a vector field as colormap
:param field: numpy array with 3 dimensions, first two are spatial x,y coordinate, the last
coordinate should have shape 2 and stores the 2 velocity components
:param kwargs: keyword arguments passed to :func:`matplotlib.pyplot.imshow`
"""
from numpy.linalg import norm
norm = norm(field, axis=2, ord=2)
if hasattr(field, 'mask'):
......@@ -15,15 +31,98 @@ def vectorFieldMagnitude(field, **kwargs):
def scalarField(field, **kwargs):
"""
Plots field values as colormap
:param field: two dimensional numpy array
:param kwargs: keyword arguments passed to :func:`matplotlib.pyplot.imshow`
"""
import numpy as np
field = np.swapaxes(field, 0, 1)
return imshow(field, origin='lower', **kwargs)
res = imshow(field, origin='lower', **kwargs)
axis('equal')
return res
def plotBoundaryHandling(boundaryHandling, boundaryNameToColor=None):
"""
Shows boundary cells
:param boundaryHandling: instance of :class:`lbmpy.boundaries.BoundaryHandling`
:param boundaryNameToColor: optional dictionary mapping boundary names to colors
"""
import matplotlib
import matplotlib.pyplot as plt
if len(boundaryHandling.flagField.shape) != 2:
raise NotImplementedError("Only implemented for 2D boundary handlings")
if boundaryNameToColor:
fixedColors = boundaryNameToColor
else:
fixedColors = {
'fluid': '#1f77ff11',
'noSlip': '#000000'
}
boundaryNames = ['fluid'] + boundaryHandling.getBoundaryNames()
flagValues = [boundaryHandling.fluidFlag] + [boundaryHandling.getFlag(n)
for n in boundaryHandling.getBoundaryNames()]
defaultCycler = matplotlib.rcParams['axes.prop_cycle']
colorValues = [fixedColors[name] if name in fixedColors else cycle['color']
for cycle, name in zip(defaultCycler, boundaryNames)]
cmap = matplotlib.colors.ListedColormap(colorValues)
bounds = np.array(flagValues, dtype=float) - 0.5
bounds = list(bounds) + [bounds[-1] + 1]
norm = matplotlib.colors.BoundaryNorm(bounds, cmap.N)
flagField = boundaryHandling.flagField.swapaxes(0, 1)
plt.imshow(flagField, interpolation='none', origin='lower',
cmap=cmap, norm=norm)
patches = [matplotlib.patches.Patch(color=color, label=name) for color, name in zip(colorValues, boundaryNames)]
plt.axis('equal')
plt.legend(handles=patches, bbox_to_anchor=(1.02, 0.5), loc=2, borderaxespad=0.)
# ------------------------------------------- Animations ---------------------------------------------------------------
def vectorFieldAnimation(runFunction, step=2, rescale=True, plotSetupFunction=lambda: None,
plotUpdateFunction=lambda: None, interval=30, frames=180, **kwargs):
import matplotlib.animation as animation
from numpy.linalg import norm
fig = gcf()
im = None
field = runFunction()
if rescale:
maxNorm = np.max(norm(field, axis=2, ord=2))
field /= maxNorm
if 'scale' not in kwargs:
kwargs['scale'] = 1.0
quiverPlot = vectorField(field, step=step, **kwargs)
plotSetupFunction()
def updatefig(*args):
f = runFunction()
f = np.swapaxes(f, 0, 1)
if rescale:
maxNorm = np.max(norm(f, axis=2, ord=2))
f /= maxNorm
u, v = f[::step, ::step, 0], f[::step, ::step, 1]
quiverPlot.set_UVC(u, v)
plotUpdateFunction()
return im,
return animation.FuncAnimation(fig, updatefig, interval=interval, frames=frames)
def vectorFieldMagnitudeAnimation(runFunction, plotSetupFunction=lambda: None,
plotUpdateFunction=lambda: None, interval=30, frames=180, **kwargs):
import matplotlib.animation as animation
import numpy as np
from numpy.linalg import norm
fig = gcf()
......
"""
Scenario setup
==============
This module contains functions to set up pre-defined scenarios like a lid-driven cavity or channel flows.
It is a good starting point if you are new to lbmpy.
>>> scenario = createForceDrivenChannel(dim=2, radius=10, length=20, force=1e-5,
... method='srt', relaxationRate=1.9)
>>> scenario.run(100)
>>> scenario.plotVelocity()
All scenarios can be modified, for example you can create a simple channel first, then place an object in it:
>>> from lbmpy.boundaries import noSlip
>>> from pystencils.slicing import makeSlice
>>> scenario.boundaryHandling.setBoundary(noSlip, makeSlice[0.3:0.4, 0.0:0.3])
Functions for scenario setup:
-----------------------------
All of the following scenario creation functions take keyword arguments specifying which LBM method should be used
and a ``optimizationParams`` dictionary, defining performance related options. These parameters are documented
at :mod:`lbmpy.creationfunctions`. The only mandatory keyword parameter is ``relaxationRate``,
that defines the viscosity of the fluid (valid values being between 0 and 2).
"""
import numpy as np
from functools import partial
from pystencils.field import getLayoutOfArray, createNumpyArrayWithLayout
from pystencils.slicing import sliceFromDirection, addGhostLayers, getPeriodicBoundaryFunctor, removeGhostLayers, \
normalizeSlice, makeSlice
from pystencils.slicing import sliceFromDirection, addGhostLayers, removeGhostLayers, normalizeSlice, makeSlice
from lbmpy.creationfunctions import createLatticeBoltzmannFunction, updateWithDefaultParameters
from lbmpy.macroscopic_value_kernels import compileMacroscopicValuesGetter, compileMacroscopicValuesSetter
from lbmpy.boundaries import BoundaryHandling, noSlip, ubb, fixedDensity
......@@ -10,27 +35,210 @@ from lbmpy.stencils import getStencil
from lbmpy.updatekernels import createPdfArray
class Scenario(object):
def __init__(self, domainSize, methodParameters, optimizationParams, lbmKernel=None,
initialVelocity=None, preUpdateFunctions=[], kernelParams={}):
# ---------------------------------------- Example Scenarios -----------------------------------------------------------
def createFullyPeriodicFlow(initialVelocity, optimizationParams={}, lbmKernel=None, **kwargs):
"""
Constructor for generic scenarios. You probably want to use one of the simpler scenario factory functions
of this file.
Creates a fully periodic setup with prescribed velocity field
:param initialVelocity: numpy array that defines an initial velocity for each cell. The shape of this
array determines the domain size.
:param optimizationParams: see :mod:`lbmpy.creationfunctions`
:param lbmKernel: a LBM function, which would otherwise automatically created
:param kwargs: other parameters are passed on to the method, see :mod:`lbmpy.creationfunctions`
:return: instance of :class:`Scenario`
"""
domainSize = initialVelocity.shape[:-1]
scenario = Scenario(domainSize, kwargs, optimizationParams, lbmKernel, initialVelocity)
scenario.boundaryHandling.setPeriodicity(True, True, True)
return scenario
def createLidDrivenCavity(domainSize, lidVelocity=0.005, optimizationParams={}, lbmKernel=None, **kwargs):
"""
Creates a lid driven cavity scenario
:param domainSize: tuple specifying the number of cells in each dimension
:param lidVelocity: x velocity of lid in lattice coordinates.
:param optimizationParams: see :mod:`lbmpy.creationfunctions`
:param lbmKernel: a LBM function, which would otherwise automatically created
:param kwargs: other parameters are passed on to the method, see :mod:`lbmpy.creationfunctions`
:return: instance of :class:`Scenario`
"""
scenario = Scenario(domainSize, kwargs, optimizationParams, lbmKernel=lbmKernel)
myUbb = partial(ubb, velocity=[lidVelocity, 0, 0][:scenario.method.dim])
myUbb.name = 'ubb'
dim = scenario.method.dim
scenario.boundaryHandling.setBoundary(myUbb, sliceFromDirection('N', dim))
for direction in ('W', 'E', 'S') if scenario.method.dim == 2 else ('W', 'E', 'S', 'T', 'B'):
scenario.boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
return scenario
def createForceDrivenChannel(force=1e-6, domainSize=None, dim=2, radius=None, length=None, initialVelocity=None,
optimizationParams={}, lbmKernel=None, **kwargs):
"""
Creates a channel flow in x direction, which is driven by a constant force along the x axis
:param force: force in x direction (lattice units) that drives the channel
:param domainSize: tuple with size of channel in x, y, (z) direction. If not specified, pass dim, radius and length.
In 3D, this creates a channel with rectangular cross section
:param dim: dimension of the channel (only required if domainSize is not passed)
:param radius: radius in 3D, or half of the height in 2D (only required if domainSize is not passed).
In 3D, this creates a channel with circular cross section
:param length: extend in x direction (only required if domainSize is not passed)
:param initialVelocity: initial velocity, either array to specify velocity for each cell or tuple for constant
:param optimizationParams: see :mod:`lbmpy.creationfunctions`
:param lbmKernel: a LBM function, which would otherwise automatically created
:param kwargs: other parameters are passed on to the method, see :mod:`lbmpy.creationfunctions`
:return: instance of :class:`Scenario`
"""
if domainSize is not None:
dim = len(domainSize)
else:
if dim is None or radius is None or length is None:
raise ValueError("Pass either 'domainSize' or 'dim', 'radius' and 'length'")
assert dim in (2, 3)
kwargs['force'] = tuple([force, 0, 0][:dim])
if radius is not None:
assert length is not None
if dim == 3:
domainSize = (length, 2 * radius + 1, 2 * radius + 1)
roundChannel = True
else:
if domainSize is None:
domainSize = (length, 2 * radius)
else:
roundChannel = False
if 'forceModel' not in kwargs:
kwargs['forceModel'] = 'guo'
scenario = Scenario(domainSize, kwargs, optimizationParams, lbmKernel=lbmKernel,
initialVelocity=initialVelocity)
boundaryHandling = scenario.boundaryHandling
boundaryHandling.setPeriodicity(True, False, False)
if dim == 2:
for direction in ('N', 'S'):
boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
elif dim == 3:
if roundChannel:
noSlipIdx = boundaryHandling.addBoundary(noSlip)
ff = boundaryHandling.flagField
yMid = ff.shape[1] // 2
zMid = ff.shape[2] // 2
y, z = np.meshgrid(range(ff.shape[1]), range(ff.shape[2]))
ff[(y - yMid) ** 2 + (z - zMid) ** 2 > radius ** 2] = noSlipIdx
else:
for direction in ('N', 'S', 'T', 'B'):
boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
assert domainSize is not None
if 'forceModel' not in kwargs:
kwargs['forceModel'] = 'guo'
return scenario
def createPressureGradientDrivenChannel(pressureDifference, domainSize=None, dim=2, radius=None, length=None,
initialVelocity=None, optimizationParams={}, lbmKernel=None, **kwargs):
"""
Creates a channel flow in x direction, which is driven by two pressure boundaries.
Consider using :func:`createForceDrivenChannel` which does not have artifacts an inflow and outflow.
:param pressureDifference: pressure drop in channel in lattice units
:param domainSize: tuple with size of channel in x, y, (z) direction. If not specified, pass dim, radius and length.
In 3D, this creates a channel with rectangular cross section
:param dim: dimension of the channel (only required if domainSize is not passed)
:param radius: radius in 3D, or half of the height in 2D (only required if domainSize is not passed).
In 3D, this creates a channel with circular cross section
:param length: extend in x direction (only required if domainSize is not passed)
:param initialVelocity: initial velocity, either array to specify velocity for each cell or tuple for constant
:param optimizationParams: see :mod:`lbmpy.creationfunctions`
:param lbmKernel: a LBM function, which would otherwise automatically created
:param kwargs: other parameters are passed on to the method, see :mod:`lbmpy.creationfunctions`
:return: instance of :class:`Scenario`
"""
if domainSize is not None:
dim = len(domainSize)
else:
if dim is None or radius is None or length is None:
raise ValueError("Pass either 'domainSize' or 'dim', 'radius' and 'length'")
assert dim in (2, 3)
if radius is not None:
assert length is not None
if dim == 3:
domainSize = (length, 2 * radius + 1, 2 * radius + 1)
roundChannel = True
else:
if domainSize is None:
domainSize = (length, 2 * radius)
else:
roundChannel = False
assert dim in (2, 3)
scenario = Scenario(domainSize, kwargs, optimizationParams, lbmKernel=lbmKernel, initialVelocity=initialVelocity)
boundaryHandling = scenario.boundaryHandling
pressureBoundaryInflow = partial(fixedDensity, density=1.0 + pressureDifference)
pressureBoundaryInflow.__name__ = "Inflow"
pressureBoundaryOutflow = partial(fixedDensity, density=1.0)
pressureBoundaryOutflow.__name__ = "Outflow"
boundaryHandling.setBoundary(pressureBoundaryInflow, sliceFromDirection('W', dim))
boundaryHandling.setBoundary(pressureBoundaryOutflow, sliceFromDirection('E', dim))
if dim == 2:
for direction in ('N', 'S'):
boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
elif dim == 3:
if roundChannel:
noSlipIdx = boundaryHandling.addBoundary(noSlip)
ff = boundaryHandling.flagField
yMid = ff.shape[1] // 2
zMid = ff.shape[2] // 2
y, z = np.meshgrid(range(ff.shape[1]), range(ff.shape[2]))
ff[(y - yMid) ** 2 + (z - zMid) ** 2 > radius ** 2] = noSlipIdx
else:
for direction in ('N', 'S', 'T', 'B'):
boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
return scenario
# ------------------------------------------ Scenario Class ------------------------------------------------------------
class Scenario(object):
"""Scenario containing boundary handling and LBM update function
You probably want to use one of the simpler scenario factory functions of this module instead of using
this constructor.
:param domainSize: tuple, defining the domain size without ghost layers
:param methodParameters: dict with method parameters, as documented in :mod:`lbmpy.creationfunctions`,
passed to :func:`lbmpy.creationfunction.createLatticeBoltzmannFunction`
passed to :func:`lbmpy.creationfunctions.createLatticeBoltzmannFunction`
:param optimizationParams: dict with optimization parameters, as documented in :mod:`lbmpy.creationfunctions`,
passed to :func:`lbmpy.creationfunction.createLatticeBoltzmannFunction`
passed to :func:`lbmpy.creationfunctions.createLatticeBoltzmannFunction`
:param lbmKernel: a lattice boltzmann function can be passed here, if None it is created with the parameters
specified above
:param initialVelocity: tuple with initial velocity of the domain, can either be a constant or a numpy array
with first axes shaped like the domain, and the last dimension of size #dimensions
:param preUpdateFunctions: list of functions that are called before the LBM kernel. They get the pdf array as
only argument. Can be used for custom boundary conditions, periodicity, etc.
:param kernelParams: dict which is passed to the kernel as additional parameters
"""
def __init__(self, domainSize, methodParameters, optimizationParams, lbmKernel=None,
initialVelocity=None, preUpdateFunctions=[]):
ghostLayers = 1
domainSizeWithGhostLayer = tuple([s + 2 * ghostLayers for s in domainSize])
D = len(domainSize)
......@@ -67,8 +275,10 @@ class Scenario(object):
target=optimizationParams['target'])
self._preUpdateFunctions = preUpdateFunctions
self.kernelParams = kernelParams
self.kernelParams = {}
self._pdfGpuArrays = []
self.timeStepsRun = 0
self.domainSize = domainSize
if initialVelocity is None:
initialVelocity = [0] * D
......@@ -89,6 +299,7 @@ class Scenario(object):
self._gpuTimeloop(timeSteps)
else:
self._cpuTimeloop(timeSteps)
self.timeStepsRun += timeSteps
@property
def velocity(self):
......@@ -97,6 +308,15 @@ class Scenario(object):
mask = np.repeat(mask[..., np.newaxis], self.dim, axis=2)
return removeGhostLayers(np.ma.masked_array(self._velocity, mask), indexDimensions=1)
@property
def vorticity(self):
if self.dim != 2:
raise NotImplementedError("Vorticity only implemented for 2D scenarios")
vel = self.velocity
grad_y_of_x = np.gradient(vel[:, :, 0], axis=1)
grad_x_of_y = np.gradient(vel[:, :, 1], axis=0)
return grad_x_of_y - grad_y_of_x
@property
def density(self):
"""Density as numpy array"""
......@@ -199,157 +419,3 @@ class Scenario(object):
gpuArr.get(cpuArr)
self._getMacroscopic(pdfs=self._pdfArrays[0], density=self._density, velocity=self._velocity)
# ---------------------------------------- Example Scenarios -----------------------------------------------------------
def createFullyPeriodicFlow(initialVelocity, optimizationParams={}, lbmKernel=None, kernelParams={}, **kwargs):
"""
Creates a fully periodic setup with prescribed velocity field
"""
domainSize = initialVelocity.shape[:-1]
scenario = Scenario(domainSize, kwargs, optimizationParams, lbmKernel, initialVelocity, kernelParams)
scenario.boundaryHandling.setPeriodicity(True, True, True)
return scenario
def createLidDrivenCavity(domainSize, lidVelocity=0.005, optimizationParams={}, lbmKernel=None,
kernelParams={}, **kwargs):
"""
Creates a lid driven cavity scenario with prescribed lid velocity in lattice coordinates
"""
scenario = Scenario(domainSize, kwargs, optimizationParams, lbmKernel=lbmKernel, kernelParams=kernelParams)
myUbb = partial(ubb, velocity=[lidVelocity, 0, 0][:scenario.method.dim])
myUbb.name = 'ubb'
dim = scenario.method.dim
scenario.boundaryHandling.setBoundary(myUbb, sliceFromDirection('N', dim))
for direction in ('W', 'E', 'S') if scenario.method.dim == 2 else ('W', 'E', 'S', 'T', 'B'):
scenario.boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
return scenario
def createPressureGradientDrivenChannel(dim, pressureDifference, domainSize=None, radius=None, length=None,
lbmKernel=None, optimizationParams={}, initialVelocity=None,
boundarySetupFunctions=[], kernelParams={}, **kwargs):
"""
Creates a channel flow in x direction, which is driven by two pressure boundaries.
"""
assert dim in (2, 3)
if radius is not None:
assert length is not None
if dim == 3:
domainSize = (length, 2 * radius + 1, 2 * radius + 1)
roundChannel = True
else:
if domainSize is None:
domainSize = (length, 2 * radius)
else:
roundChannel = False
if domainSize is None:
raise ValueError("Missing domainSize or radius and length parameters!")
scenario = Scenario(domainSize, kwargs, optimizationParams, lbmKernel=lbmKernel, kernelParams=kernelParams,
initialVelocity=initialVelocity)
boundaryHandling = scenario.boundaryHandling
pressureBoundaryInflow = partial(fixedDensity, density=1.0 + pressureDifference)
pressureBoundaryInflow.__name__ = "Inflow"
pressureBoundaryOutflow = partial(fixedDensity, density=1.0)
pressureBoundaryOutflow.__name__ = "Outflow"
boundaryHandling.setBoundary(pressureBoundaryInflow, sliceFromDirection('W', dim))
boundaryHandling.setBoundary(pressureBoundaryOutflow, sliceFromDirection('E', dim))
if dim == 2:
for direction in ('N', 'S'):
boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
elif dim == 3:
if roundChannel:
noSlipIdx = boundaryHandling.addBoundary(noSlip)
ff = boundaryHandling.flagField
yMid = ff.shape[1] // 2
zMid = ff.shape[2] // 2
y, z = np.meshgrid(range(ff.shape[1]), range(ff.shape[2]))
ff[(y - yMid) ** 2 + (z - zMid) ** 2 > radius ** 2] = noSlipIdx
else:
for direction in ('N', 'S', 'T', 'B'):
boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
for userFunction in boundarySetupFunctions:
userFunction(boundaryHandling, scenario.method, domainSize)
return scenario
def createForceDrivenChannel(dim, force, domainSize=None, radius=None, length=None, lbmKernel=None,
optimizationParams={}, initialVelocity=None, boundarySetupFunctions=[],
kernelParams={}, **kwargs):
"""
Creates a channel flow in x direction, which is driven by a constant force along the x axis
"""
assert dim in (2, 3)
kwargs['force'] = tuple([force, 0, 0][:dim])
if radius is not None:
assert length is not None
if dim == 3:
domainSize = (length, 2 * radius + 1, 2 * radius + 1)
roundChannel = True
else:
if domainSize is None:
domainSize = (length, 2 * radius)
else:
roundChannel = False
if 'forceModel' not in kwargs:
kwargs['forceModel'] = 'guo'
scenario = Scenario(domainSize, kwargs, optimizationParams, lbmKernel=lbmKernel,
initialVelocity=initialVelocity, kernelParams=kernelParams)
boundaryHandling = scenario.boundaryHandling
boundaryHandling.setPeriodicity(True, False, False)
if dim == 2:
for direction in ('N', 'S'):
boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
elif dim == 3:
if roundChannel:
noSlipIdx = boundaryHandling.addBoundary(noSlip)
ff = boundaryHandling.flagField
yMid = ff.shape[1] // 2
zMid = ff.shape[2] // 2
y, z = np.meshgrid(range(ff.shape[1]), range(ff.shape[2]))
ff[(y - yMid) ** 2 + (z - zMid) ** 2 > radius ** 2] = noSlipIdx
else:
for direction in ('N', 'S', 'T', 'B'):
boundaryHandling.setBoundary(noSlip, sliceFromDirection(direction, dim))
for userFunction in boundarySetupFunctions:
userFunction(boundaryHandling, scenario.method, domainSize)
assert domainSize is not None
if 'forceModel' not in kwargs:
kwargs['forceModel'] = 'guo'
return scenario
if __name__ == '__main__':
from lbmpy.scenarios import createForceDrivenChannel
from lbmpy.boundaries.geometry import BlackAndWhiteImageBoundary
from pystencils.slicing import makeSlice
from lbmpy.boundaries import noSlip
import numpy as np
domainSize = (10, 10)
scenario = createForceDrivenChannel(dim=2, method='srt', force=0.000002,
domainSize=domainSize,
relaxationRates=[1.92], forceModel='guo',
compressible=True,
optimizationParams={'target': 'gpu'})
imageSetup = BlackAndWhiteImageBoundary("/home/staff/bauer/opensource.png",
noSlip, targetSlice=makeSlice[2:-2, 1:-2])
imageSetup(scenario.boundaryHandling, scenario.method, domainSize)
scenario.boundaryHandling.prepare()
scenario.run(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment