Skip to content
Snippets Groups Projects
Commit cc06c99b authored by Martin Bauer's avatar Martin Bauer
Browse files

Moved moment stuff to new lbm module

parent e570688c
Branches
Tags
No related merge requests found
import itertools
import numpy as np
import sympy as sp
from lbmpy.densityVelocityExpressions import getDensityVelocityExpressions
from lbmpy.stencils import getStencil
from pystencils.backends.cbackend import CustomCppCode
from pystencils.types import TypedSymbol
from pystencils.field import Field
from pystencils.ast import Block, SympyAssignment, LoopOverCoordinate, KernelFunction
from pystencils.transformations import moveConstantsBeforeLoop, resolveFieldAccesses, typingFromSympyInspection, \
typeAllEquations
INV_DIR_SYMBOL = TypedSymbol("invDir", "int")
WEIGHTS_SYMBOL = TypedSymbol("weights", "double")
def offsetSymbols(dim):
return [TypedSymbol("c_%d" % (d,), "int") for d in range(dim)]
def offsetFromDir(dirIdx, dim):
return tuple([sp.IndexedBase(symbol, shape=(1,))[dirIdx] for symbol in offsetSymbols(dim)])
def invDir(dirIdx):
return sp.IndexedBase(INV_DIR_SYMBOL, shape=(1,))[dirIdx]
def weightOfDirection(dirIdx):
return sp.IndexedBase(WEIGHTS_SYMBOL, shape=(1,))[dirIdx]
class LatticeModelInfo(CustomCppCode):
def __init__(self, latticeModel):
stencil = latticeModel.stencil
symbolsDefined = set(offsetSymbols(latticeModel.dim) + [INV_DIR_SYMBOL, WEIGHTS_SYMBOL])
offsetSym = offsetSymbols(latticeModel.dim)
code = "\n"
for i in range(latticeModel.dim):
offsetStr = ", ".join([str(d[i]) for d in stencil])
code += "const int %s [] = { %s };\n" % (offsetSym[i].name, offsetStr)
invDirs = []
for direction in stencil:
inverseDir = tuple([-i for i in direction])
invDirs.append(str(stencil.index(inverseDir)))
code += "static const int %s [] = { %s };\n" % (INV_DIR_SYMBOL.name, ", ".join(invDirs))
weights = [str(w.evalf()) for w in latticeModel.weights]
code += "static const double %s [] = { %s };\n" % (WEIGHTS_SYMBOL.name, ",".join(weights))
super(LatticeModelInfo, self).__init__(code, symbolsRead=set(), symbolsDefined=symbolsDefined)
def generateBoundaryHandling(pdfField, indexArr, latticeModel, boundaryFunctor):
dim = latticeModel.dim
cellLoopBody = Block([])
cellLoop = LoopOverCoordinate(cellLoopBody, coordinateToLoopOver=0, start=0, stop=indexArr.shape[0])
indexField = Field.createFromNumpyArray("indexField", indexArr, indexDimensions=1)
coordinateSymbols = [TypedSymbol(name, "int") for name in ['x', 'y', 'z']]
for d in range(dim):
cellLoopBody.append(SympyAssignment(coordinateSymbols[d], indexField[0](d)))
dirSymbol = TypedSymbol("dir", "int")
cellLoopBody.append(SympyAssignment(dirSymbol, indexField[0](dim)))
boundaryEqList = boundaryFunctor(pdfField, dirSymbol, latticeModel)
if type(boundaryEqList) is tuple:
boundaryEqList, additionalNodes = boundaryEqList
else:
additionalNodes = []
typeInfos = typingFromSympyInspection(boundaryEqList, pdfField.dtype)
fieldsRead, fieldsWritten, assignments = typeAllEquations(boundaryEqList, typeInfos)
fieldsAccessed = fieldsRead.union(fieldsWritten) - set([indexField])
for be in assignments:
cellLoopBody.append(be)
functionBody = Block([cellLoop])
ast = KernelFunction(functionBody, [pdfField, indexField])
if len(additionalNodes) > 0:
loops = ast.atoms(LoopOverCoordinate)
assert len(loops) == 1
loop = list(loops)[0]
for node in additionalNodes:
loop.body.append(node)
functionBody.insertFront(LatticeModelInfo(latticeModel))
fixedCoordinateMapping = {f.name: coordinateSymbols[:dim] for f in fieldsAccessed}
resolveFieldAccesses(ast, set(['indexField']), fieldToFixedCoordinates=fixedCoordinateMapping)
moveConstantsBeforeLoop(ast)
return ast
def createBoundaryIndexList(flagFieldArr, nrOfGhostLayers, stencil, boundaryMask, fluidMask):
result = []
gl = nrOfGhostLayers
for cell in itertools.product(*[range(gl, i-gl) for i in flagFieldArr.shape]):
if not flagFieldArr[cell] & fluidMask:
continue
for dirIdx, direction in enumerate(stencil):
neighborCell = tuple([cell_i + dir_i for cell_i, dir_i in zip(cell, direction)])
if flagFieldArr[neighborCell] & boundaryMask:
result.append(list(cell) + [dirIdx])
return np.array(result, dtype=np.int32)
def noSlip(pdfField, direction, latticeModel):
neighbor = offsetFromDir(direction, latticeModel.dim)
inverseDir = invDir(direction)
return [sp.Eq(pdfField[neighbor](inverseDir), pdfField(direction))]
def ubb(pdfField, direction, latticeModel, velocity):
neighbor = offsetFromDir(direction, latticeModel.dim)
inverseDir = invDir(direction)
velTerm = 6 * sum([d_i * v_i for d_i, v_i in zip(neighbor, velocity)]) * weightOfDirection(direction)
return [sp.Eq(pdfField[neighbor](inverseDir),
pdfField(direction) - velTerm)]
def fixedDensity(pdfField, direction, latticeModel, density):
from lbmpy.equilibria import standardDiscreteEquilibrium
neighbor = offsetFromDir(direction, latticeModel.dim)
inverseDir = invDir(direction)
stencil = latticeModel.stencil
if not latticeModel.compressible:
density -= 1
eqParams = {'stencil': stencil,
'order': 2,
'c_s_sq': sp.Rational(1, 3),
'compressible': latticeModel.compressible,
'rho': density}
u = sp.Matrix(latticeModel.symbolicVelocity)
symmetricEq = (standardDiscreteEquilibrium(u=u, **eqParams) + standardDiscreteEquilibrium(u=-u, **eqParams)) / 2
subExpr1, rhoExpr, subExpr2, uExprs = getDensityVelocityExpressions(stencil,
[pdfField(i) for i in range(len(stencil))],
latticeModel.compressible)
subExprs = subExpr1 + [rhoExpr] + subExpr2 + uExprs
conditions = [(eq_i, sp.Equality(direction, i)) for i, eq_i in enumerate(symmetricEq)] + [(0, True)]
eq_component = sp.Piecewise(*conditions)
return subExprs + [sp.Eq(pdfField[neighbor](inverseDir),
2 * eq_component - pdfField(direction))]
if __name__ == "__main__":
from lbmpy.latticemodel import makeSRT
from pystencils.cpu import generateC
import functools
lm = makeSRT(getStencil("D3Q19"))
pdfField = Field.createGeneric("pdfField", lm.dim, indexDimensions=1)
indexArr = np.array([[1, 1, 1, 1], [1, 2, 1, 1], [2, 1, 1, 1]], dtype=np.int32)
pressureBoundary = functools.partial(fixedDensity, density=1.0)
ast = generateBoundaryHandling(pdfField, indexArr, lm, pressureBoundary)
print(generateC(ast))
import numpy as np
from lbmpy.boundaries import createBoundaryIndexList, generateBoundaryHandling
from pystencils.cpu import makePythonFunction
class BoundaryHandling:
def __init__(self, symbolicPdfField, domainShape, latticeModel, ghostLayers=1):
self._symbolicPdfField = symbolicPdfField
self._shapeWithGhostLayers = [d + 2 * ghostLayers for d in domainShape]
self._fluidFlag = 2 ** 31
self.flagField = np.full(self._shapeWithGhostLayers, self._fluidFlag, dtype=np.int32)
self._ghostLayers = ghostLayers
self._latticeModel = latticeModel
self._boundaryFunctions = []
self._nameToIndex = {}
self._boundarySweeps = []
def addBoundary(self, boundaryFunction, name=None):
if name is None:
name = boundaryFunction.__name__
self._nameToIndex[name] = len(self._boundaryFunctions)
self._boundaryFunctions.append(boundaryFunction)
def invalidateIndexCache(self):
self._boundarySweeps = []
def clear(self):
np.fill(self._fluidFlag)
self.invalidateIndexCache()
def getFlag(self, name):
return 2 ** self._nameToIndex[name]
def setBoundary(self, name, indexExpr, clearOtherBoundaries=True):
if not isinstance(name, str):
function = name
if hasattr(function, '__name__'):
name = function.__name__
else:
name = function.name
if function not in self._boundaryFunctions:
self.addBoundary(function, name)
flag = self.getFlag(name)
if clearOtherBoundaries:
self.flagField[indexExpr] = flag
else:
# clear fluid flag
np.bitwise_and(self.flagField[indexExpr], np.invert(self._fluidFlag), self.flagField[indexExpr])
# add new boundary flag
np.bitwise_or(self.flagField[indexExpr], flag, self.flagField[indexExpr])
self.invalidateIndexCache()
def prepare(self):
self.invalidateIndexCache()
for boundaryIdx, boundaryFunc in enumerate(self._boundaryFunctions):
idxField = createBoundaryIndexList(self.flagField, self._ghostLayers, self._latticeModel.stencil,
2 ** boundaryIdx, self._fluidFlag)
ast = generateBoundaryHandling(self._symbolicPdfField, idxField, self._latticeModel, boundaryFunc)
self._boundarySweeps.append(makePythonFunction(ast, {'indexField': idxField}))
def __call__(self, **kwargs):
if len(self._boundarySweeps) == 0:
self.prepare()
for boundarySweep in self._boundarySweeps:
boundarySweep(**kwargs)
import sympy as sp
import functools
from pystencils.sympyextensions import makeExponentialFuncArgumentSquares
@functools.lru_cache()
def momentGeneratingFunction(function, symbols, symbolsInResult):
"""
Computes the moment generating function of a probability distribution. It is defined as:
.. math ::
F[f(\mathbf{x})](\mathbf{t}) = \int e^{<\mathbf{x}, \mathbf{t}>} f(x)\; dx
:param function: sympy expression
:param symbols: a sequence of symbols forming the vector x
:param symbolsInResult: a sequence forming the vector t
:return: transformation result F: an expression that depends now on symbolsInResult
(symbols have been integrated out)
"""
assert len(symbols) == len(symbolsInResult)
for t_i, v_i in zip(symbolsInResult, symbols):
function *= sp.exp(t_i * v_i)
# This is a custom transformation that speeds up the integrating process
# of a MaxwellBoltzmann distribution
# without this transformation the symbolic integration is sometimes not possible (e.g. in 2D without assumptions)
# or is really slow
# other functions should not be affected by this transformation
# Without this transformation the following assumptions are required for the u and v variables of Maxwell Boltzmann
# 2D: real=True ( without assumption it will not work)
# 3D: no assumption ( with assumptions it will not work :P )
function = makeExponentialFuncArgumentSquares(function, symbols)
bounds = [(s_i, -sp.oo, sp.oo) for s_i in symbols]
result = sp.integrate(function, *bounds)
return sp.simplify(result)
def cumulantGeneratingFunction(function, symbols, symbolsInResult):
"""
Computes cumulant generating function, which is the logarithm of the moment generating function.
For parameter description see :func:`momentGeneratingFunction`.
"""
return sp.ln(momentGeneratingFunction(function, symbols, symbolsInResult))
def multiDifferentiation(generatingFunction, index, symbols):
"""
Computes moment from moment-generating function or cumulant from cumulant-generating function,
by differentiating the generating function, as specified by index and evaluating the derivative at symbols=0
:param generatingFunction: function with is differentiated
:param index: the i'th index specifies how often to differentiate w.r.t. to symbols[i]
:param symbols: symbol to differentiate
"""
assert len(index) == len(symbols), "Length of index and length of symbols has to match"
diffArgs = []
for order, t_i in zip(index, symbols):
for i in range(order):
diffArgs.append(t_i)
if len(diffArgs) > 0:
r = sp.diff(generatingFunction, *diffArgs)
else:
r = generatingFunction
for t_i in symbols:
r = r.subs(t_i, 0)
return sp.simplify(r)
@functools.lru_cache(maxsize=512)
def __continuousMomentOrCumulant(function, moment, symbols, generatingFunction):
t = [sp.Dummy() for i in range(3)] # DoFs in generating function
if type(moment) is tuple:
return multiDifferentiation(generatingFunction(function, symbols, t), moment, t)
else:
result = 0
for term, coeff in moment.as_coefficients_dict().items():
exponents = tuple([term.as_coeff_exponent(v_i)[1] for v_i in symbols])
cm = multiDifferentiation(generatingFunction(function, symbols, t), exponents, t)
result += coeff * cm
return result
def continuousMoment(function, moment, symbols):
"""
Computes moment of given function
:param function: function to compute moments of
:param moment: tuple or polynomial describing the moment
:param symbols: degrees of freedom of the function
"""
return __continuousMomentOrCumulant(function, moment, symbols, momentGeneratingFunction)
def continuousCumulant(function, moment, symbols):
"""
Computes cumulant of continuous function
for parameter description see :func:`continuousMoment`
"""
return __continuousMomentOrCumulant(function, moment, symbols, cumulantGeneratingFunction)
import sympy as sp
from collections import Counter
from pystencils.transformations import fastSubs
from lbmpy.equilibria import maxwellBoltzmannEquilibrium
from lbmpy.latticemodel import LatticeModel, LbmCollisionRule
from lbmpy.cumulants import cumulantsFromPdfs, getDefaultIndexedSymbols, rawMomentsFromCumulants, continuousCumulant
from lbmpy.moments import momentsUpToComponentOrder, momentMatrix
from lbmpy.equilibria import getWeights
from lbmpy.densityVelocityExpressions import getDensityVelocityExpressions
def isHydrodynamic(idx):
return sum(idx) == 2
def relaxationRateFromMagicNumber(hydrodynamicOmega, magicNumber):
half = sp.Rational(1, 2)
return 1 / (magicNumber / (1 / hydrodynamicOmega - half) + half)
def relaxationRateFromFactor(hydrodynamicOmega, factor):
half = sp.Rational(1, 2)
return 1 / ((1 / hydrodynamicOmega - half) / factor + half)
class LinearCombinationRelaxation:
def __init__(self, preCollisionSymbols, indices):
self._preCollisionSymbols = preCollisionSymbols
self._indices = indices
def addRelaxationRule(self, linearCombination, relaxationRate):
coeffDict = linearCombination.as_coeff_dict(self._preCollisionSymbols)
class TRTStyle:
@staticmethod
def createFromFactors(omega, factorEven=1.0, factorOdd=sp.Rational(3, 16)):
omegaEven = relaxationRateFromFactor(omega, factorEven)
omegaOdd = relaxationRateFromMagicNumber(omega, factorOdd)
return TRTStyle(omega, omegaEven, omegaOdd)
def __init__(self, omegaHydrodynamic, omegaEven, omegaOdd):
self._omega = omegaHydrodynamic
self._omega_e = omegaEven
self._omega_o = omegaOdd
self.addPostCollisionsAsSubexpressions = False
def __call__(self, preCollisionSymbols, indices):
pre = {a: b for a, b in zip(indices, preCollisionSymbols)}
post = {}
dim = len(indices[0])
maxwellBoltzmann = maxwellBoltzmannEquilibrium(dim, c_s_sq=sp.Rational(1, 3))
for idx, value in pre.items():
isEven = sum(idx) % 2 == 0
if isHydrodynamic(idx):
relaxRate = self._omega
elif isEven:
relaxRate = self._omega_e
else:
relaxRate = self._omega_o
post[idx] = pre[idx] + relaxRate * (continuousCumulant(maxwellBoltzmann, idx) - pre[idx])
return [post[idx] for idx in indices]
class AllSameRelaxation:
def __init__(self, omega):
self._omega = omega
self.addPostCollisionsAsSubexpressions = False
def __call__(self, preCollisionSymbols, indices):
pre = {a: b for a, b in zip(indices, preCollisionSymbols)}
post = {}
dim = len(indices[0])
maxwellBoltzmann = maxwellBoltzmannEquilibrium(dim, c_s_sq=sp.Rational(1, 3))
for idx, value in pre.items():
post[idx] = pre[idx] + self._omega * (continuousCumulant(maxwellBoltzmann, idx) - pre[idx])
return [post[idx] for idx in indices]
class SimpleBoltzmannRelaxation:
def __init__(self, omega):
self._omega = omega
self.addPostCollisionsAsSubexpressions = False
def __call__(self, preCollisionSymbols, indices):
pre = {a: b for a, b in zip(indices, preCollisionSymbols)}
post = {}
dim = len(indices[0])
maxwellBoltzmann = maxwellBoltzmannEquilibrium(dim, c_s_sq=sp.Rational(1, 3))
for idx, value in pre.items():
if isHydrodynamic(idx):
post[idx] = (1 - self._omega) * pre[idx]
else:
post[idx] = continuousCumulant(maxwellBoltzmann, idx)
return [post[idx] for idx in indices]
class CorrectedD3Q27Collision:
def __init__(self, omegaArr):
self.omega = omegaArr
self.addPostCollisionsAsSubexpressions = True
def __call__(self, preCollisionSymbols, indices):
assert len(indices) == 27
pre = {a: b for a, b in zip(indices, preCollisionSymbols)}
post = {}
ux, uy, uz = pre[(1, 0, 0)], pre[(0, 1, 0)], pre[(0, 0, 1)]
rho = sp.exp(pre[(0, 0, 0)])
post[(0, 0, 0)] = pre[(0, 0, 0)]
post[(1, 0, 0)] = pre[(1, 0, 0)]
post[(0, 1, 0)] = pre[(0, 1, 0)]
post[(0, 0, 1)] = pre[(0, 0, 1)]
post[(1, 1, 0)] = 1 - self.omega[0] * pre[(1, 1, 0)]
post[(1, 0, 1)] = 1 - self.omega[0] * pre[(1, 0, 1)]
post[(0, 1, 1)] = 1 - self.omega[0] * pre[(0, 1, 1)]
Dxux = - self.omega[0] / 2 / rho * (2 * pre[(2,0,0)] - pre[(0,2,0)] - pre[(0,0,2)]) - self.omega[1] / 2 / rho * (pre[(2,0,0)] + pre[(0,2,0)] + pre[(0,0,2)] - pre[(0,0,0)])
Dyuy = Dxux + 3 * self.omega[0] / rho / 2 * (pre[(2, 0, 0)] - pre[(0, 2, 0)])
Dzuz = Dxux + 3 * self.omega[0] / rho / 2 * (pre[(2, 0, 0)] - pre[(0, 0, 2)])
CS_200__m__CS020 = (1-self.omega[0]) * (pre[(2,0,0)] - pre[(0,2,0)]) - 3 * rho * (1 - self.omega[0] / 2) * (Dxux * ux**2 - Dyuy * uy**2)
CS_200__m__CS002 = (1-self.omega[0]) * (pre[(2,0,0)] - pre[(0,0,2)]) - 3 * rho * (1 - self.omega[0] / 2) * (Dxux * ux**2 - Dzuz * uz**2)
CS_200__p__CS020__p__CS_002 = self.omega[1] * pre[(0,0,0)] + (1-self.omega[1]) * (pre[(2,0,0)] + pre[(0,2,0)] + pre[(0,0,2)]) - 3 * rho *(1 - self.omega[1]/2) * (Dxux * ux**2 + Dyuy * uy**2 + Dzuz * uz**2)
post[(2, 0, 0)] = (CS_200__m__CS020 + CS_200__m__CS002 + CS_200__p__CS020__p__CS_002) / 3
post[(0, 2, 0)] = post[(2, 0, 0)] - CS_200__m__CS020
post[(0, 0, 2)] = post[(2, 0, 0)] - CS_200__m__CS002
CS_120__p__CS_102 = (1 - self.omega[2]) * (pre[(1, 2, 0)] + pre[(1, 0, 2)])
CS_210__p__CS_012 = (1 - self.omega[2]) * (pre[(2, 1, 0)] + pre[(0, 1, 2)])
CS_201__p__CS_021 = (1 - self.omega[2]) * (pre[(2, 0, 1)] + pre[(0, 2, 1)])
CS_120__m__CS_102 = (1 - self.omega[3]) * (pre[(1, 2, 0)] - pre[(1, 0, 2)])
CS_210__m__CS_012 = (1 - self.omega[3]) * (pre[(2, 1, 0)] - pre[(0, 1, 2)])
CS_201__m__CS_021 = (1 - self.omega[3]) * (pre[(2, 0, 1)] - pre[(0, 2, 1)])
post[(1, 2, 0)] = (CS_120__p__CS_102 + CS_120__m__CS_102) / 2
post[(1, 0, 2)] = (CS_120__p__CS_102 - CS_120__m__CS_102) / 2
post[(0, 1, 2)] = (CS_210__p__CS_012 - CS_210__m__CS_012) / 2
post[(2, 1, 0)] = (CS_210__p__CS_012 + CS_210__m__CS_012) / 2
post[(2, 0, 1)] = (CS_201__p__CS_021 + CS_201__m__CS_021) / 2
post[(0, 2, 1)] = (CS_201__p__CS_021 - CS_201__m__CS_021) / 2
post[(1, 1, 1)] = (1 - self.omega[4]) * pre[(1, 1, 1)]
CS_220__m__2CS_202__p__CS_022 = (1 - self.omega[5]) * (pre[(2, 2, 0)] - 2 * pre[(2, 0, 2)] + pre[(0, 2, 2)])
CS_220__p__CS_202__m__2CS_022 = (1 - self.omega[5]) * (pre[(2, 2, 0)] + pre[(2, 0, 2)] - 2 * pre[(0, 2, 2)])
CS_220__p__CS_202__p__CS_022 = (1 - self.omega[6]) * (pre[(2, 2, 0)] + pre[(2, 0, 2)] + pre[(0, 2, 2)])
post[(2, 2, 0)] = (CS_220__m__2CS_202__p__CS_022 + CS_220__p__CS_202__m__2CS_022 + CS_220__p__CS_202__p__CS_022) / 3
post[(2, 0, 2)] = (CS_220__p__CS_202__p__CS_022 - CS_220__m__2CS_202__p__CS_022) / 3
post[(0, 2, 2)] = (CS_220__p__CS_202__p__CS_022 - CS_220__p__CS_202__m__2CS_022) / 3
post[(2, 1, 1)] = (1 - self.omega[7]) * pre[(2, 1, 1)]
post[(1, 2, 1)] = (1 - self.omega[7]) * pre[(1, 2, 1)]
post[(1, 1, 2)] = (1 - self.omega[7]) * pre[(1, 1, 2)]
post[(2, 2, 1)] = (1 - self.omega[8]) * pre[(2, 2, 1)]
post[(2, 1, 2)] = (1 - self.omega[8]) * pre[(2, 1, 2)]
post[(1, 2, 2)] = (1 - self.omega[8]) * pre[(1, 2, 2)]
post[(2, 2, 2)] = (1 - self.omega[9]) * pre[(2, 2, 2)]
return [post[idx] for idx in indices]
class CumulantRelaxationLatticeModel(LatticeModel):
def __init__(self, stencil, cumulantCollision, forceModel=None):
super(CumulantRelaxationLatticeModel, self).__init__(stencil, True, forceModel)
self.cumulantCollision = cumulantCollision
@property
def weights(self):
return getWeights(self._stencil)
def getCollisionRule(self):
pdfSymbols = tuple(self.pdfSymbols)
moments = tuple(momentsUpToComponentOrder(2, dim=self.dim))
densityIdx = (0, 0) if self.dim == 2 else (0, 0, 0)
velIndices = [(1, 0), (0, 1)] if self.dim == 2 else [(1, 0, 0), (0, 1, 0), (0, 0, 1)]
momentSymbols = tuple(getDefaultIndexedSymbols(None, "m", moments))
cumulantSymbols = getDefaultIndexedSymbols(None, "c", moments)
cumulantSymbols[moments.index(densityIdx)] = sp.ln(self.symbolicDensity)
for u_i, idx in zip(self.symbolicVelocity, velIndices):
cumulantSymbols[moments.index(idx)] = u_i
cumulantSymbols = tuple(cumulantSymbols)
cumFromPdf = cumulantsFromPdfs(tuple(self.stencil), pdfSymbols, cumulantSymbols)
rhoSubexprs, rhoEq, uSubexprs, uEqs = getDensityVelocityExpressions(self.stencil, self.pdfSymbols,
self.compressible)
# for some force models the velocity has to be shifted
if self.forceModel and hasattr(self.forceModel, "equilibriumVelocity"):
uSymbols = self.symbolicVelocity
uRhs = [e.rhs for e in uEqs]
correctedVel = self.forceModel.equilibriumVelocity(self, uRhs, self.symbolicDensity)
uEqs = [sp.Eq(u_i, correctedVel_i) for u_i, correctedVel_i in zip(uSymbols, correctedVel)]
subExpressions = rhoSubexprs + [rhoEq] + uSubexprs + uEqs
subExpressions += [eq for eq, moment in zip(cumFromPdf, moments)
if moment not in velIndices and moment != densityIdx]
collidedValues = self.cumulantCollision(cumulantSymbols, moments)
print(collidedValues)
if self.cumulantCollision.addPostCollisionsAsSubexpressions:
postcollisionSymbols = getDefaultIndexedSymbols(None, "cp", moments)
subExpressions += [sp.Eq(s, cp) for s, cp in zip(postcollisionSymbols, collidedValues)]
collidedValues = postcollisionSymbols
momFromCum = rawMomentsFromCumulants(tuple(self.stencil), cumulantSymbols, momentSymbols)
substitutions = {cumulantSymbol: collidedValue
for cumulantSymbol, collidedValue in zip(cumulantSymbols, collidedValues)}
collidedMoments = [fastSubs(rawMomentEq.rhs, substitutions) for rawMomentEq in momFromCum]
M = momentMatrix(moments, self.stencil)
collisionResult = M.inv() * sp.Matrix(collidedMoments)
if self.forceModel:
collisionResult += sp.Matrix(self.forceModel(latticeModel=self))
collisionEqs = [sp.Eq(dst_i, t) for dst_i, t in zip(self.pdfDestinationSymbols, collisionResult)]
return LbmCollisionRule(collisionEqs, subExpressions, self)
import functools
import sympy as sp
from lbmpy.diskcache import diskcache
from lbmpy.moments import continuousCumulantOrMoment, momentsUpToComponentOrder, discreteMoment
from lbmpy.transformations import replaceAdditive
from lbmpy.util import scalarProduct
def getDefaultIndexedSymbols(passedSymbols, prefix, indices):
try:
dim = len(indices[0])
except TypeError:
dim = 1
if passedSymbols is not None:
return passedSymbols
else:
formatString = "%s_" + "_".join(["%d"]*dim)
return [sp.Symbol(formatString % ((prefix,) + i)) for i in indices]
@functools.lru_cache(maxsize=512)
def continuousCumulant(function, indexTuple, symbols=None):
"""
Computes cumulant of given function ( for parameters see function continuousMoment )
"""
return continuousCumulantOrMoment(function, indexTuple, symbols, True)
@functools.lru_cache(maxsize=16)
def getDiscreteCumulantGeneratingFunction(function, stencil, waveNumbers):
assert len(stencil) == len(function)
laplaceTrafo = sum([factor * sp.exp(scalarProduct(waveNumbers, e)) for factor, e in zip(function, stencil)])
return sp.ln(laplaceTrafo)
@functools.lru_cache(maxsize=64)
def discreteCumulant(function, indexTuple, stencil):
assert len(stencil) == len(function)
dim = len(stencil[0])
assert len(indexTuple) == dim
waveNumbers = tuple([sp.Symbol("Xi_%d" % (i,)) for i in range(dim)])
res = getDiscreteCumulantGeneratingFunction(function, stencil, waveNumbers)
for m, waveNumber in zip(indexTuple, waveNumbers):
for i in range(m):
res = sp.diff(res, waveNumber)
return res.subs({waveNumber: 0 for waveNumber in waveNumbers})
@functools.lru_cache(maxsize=8)
def cumulantsFromPdfs(stencil, pdfSymbols=None, cumulantSymbols=None):
dim = len(stencil[0])
indices = list(momentsUpToComponentOrder(2, dim=dim))
cumulantSymbols = getDefaultIndexedSymbols(cumulantSymbols, "c", indices)
pdfSymbols = getDefaultIndexedSymbols(pdfSymbols, "f", range(len(stencil)))
return [sp.Eq(cumulantSymbol, discreteCumulant(tuple(pdfSymbols), idx, stencil))
for cumulantSymbol, idx in zip(cumulantSymbols, indices)]
@diskcache
def cumulantsFromRawMoments(stencil, momentSymbols=None, cumulantSymbols=None):
dim = len(stencil[0])
indices = list(momentsUpToComponentOrder(2, dim=dim))
momentSymbols = getDefaultIndexedSymbols(momentSymbols, "m", indices)
cumulantSymbols = getDefaultIndexedSymbols(cumulantSymbols, "c", indices)
# here cumulantsFromPdfs is used, then the moments are replaced
pdfSymbols = tuple([sp.Symbol("f_%d" % (i,)) for i in range(len(stencil))])
replacements = {momentSymbol: discreteMoment(pdfSymbols, idx, stencil)
for momentSymbol, idx in zip(momentSymbols, indices)}
cumulantEquations = cumulantsFromPdfs(stencil, tuple(pdfSymbols), tuple(cumulantSymbols))
result = []
for cumulantSymbol, cumulantEq in zip(cumulantSymbols, cumulantEquations):
c = cumulantEq.rhs
for symbol, replacement in replacements.items():
c = replaceAdditive(c, symbol, replacement, requiredMatchOriginal=1.0)
result.append(sp.Eq(cumulantSymbol, c))
return result
@diskcache
def rawMomentsFromCumulants(stencil, cumulantSymbols=None, momentSymbols=None):
dim = len(stencil[0])
indices = list(momentsUpToComponentOrder(2, dim=dim))
momentSymbols = getDefaultIndexedSymbols(momentSymbols, "m", indices)
cumulantSymbols = getDefaultIndexedSymbols(cumulantSymbols, "c", indices)
forwardEqs = cumulantsFromRawMoments(stencil, tuple(momentSymbols), tuple(cumulantSymbols))
solveResult = sp.solve(forwardEqs, momentSymbols)
assert len(solveResult) == 1, "Could not invert the forward equations - manual implementation required?"
return [sp.Eq(momentSymbol, r) for momentSymbol, r in zip(momentSymbols, solveResult[0])]
import sympy as sp
import lbmpy.util as util
def getDensityVelocityExpressions(stencil, symbolicPdfs, compressible):
"""
Returns a list of sympy equations to compute density and velocity for a given stencil
with the minimum amount of operations
"""
def filterOutPlusTerms(expr):
result = 0
for term in expr.args:
if not type(term) is sp.Mul:
result += term
return result
dim = len(stencil[0])
subexpressions = []
rho = sum(symbolicPdfs)
u = [0] * dim
for f, offset in zip(symbolicPdfs, stencil):
for i in range(dim):
u[i] += f * int(offset[i])
plusTerms = [set(filterOutPlusTerms(u_i).args) for u_i in u]
for i in range(dim):
rhs = plusTerms[i]
for j in range(i):
rhs -= plusTerms[j]
eq = sp.Eq(sp.Symbol("vel%dTerm" % (i,)), sum(rhs))
subexpressions.append(eq)
for subexpression in subexpressions:
rho = rho.subs(subexpression.rhs, subexpression.lhs)
rho = sp.Eq(util.getSymbolicDensity(), rho)
symbolicVel = util.getSymbolicVelocityVector(dim)
for i in range(dim):
u[i] = u[i].subs(subexpressions[i].rhs, subexpressions[i].lhs)
u[i] = sp.Eq(symbolicVel[i], u[i])
velSubexpressions = []
if compressible:
rho_inv = sp.Symbol("rho_inv")
rho_inv_eq = sp.Eq(rho_inv, 1 / util.getSymbolicDensity())
u = [sp.Eq(u_i.lhs, u_i.rhs*rho_inv) for u_i in u]
velSubexpressions.append(rho_inv_eq)
return subexpressions, rho, velSubexpressions, u
try:
from joblib import Memory
diskcache = Memory(cachedir="/tmp/pylbm", verbose=False).cache
except ImportError:
# fallback to in-memory caching if joblib is not available
import functools
diskcache = functools.lru_cache(maxsize=64)
import sympy as sp
from lbmpy.equilibria import standardDiscreteEquilibrium, getWeights
from lbmpy.moments import momentsUpToComponentOrder
from pystencils.transformations import fastSubs
def discreteEntropyFromWeights(function, weights):
return -sum([f_i * sp.ln(f_i / w_i) for f_i, w_i in zip(function, weights)])
def discreteApproxEntropyFromWeights(function, weights):
return -sum([f_i * ((f_i / w_i)-1) for f_i, w_i in zip(function, weights)])
def discreteEntropy(function, stencil):
return discreteEntropyFromWeights(function, getWeights(stencil))
def discreteApproxEntropy(function, stencil):
weights = getWeights(stencil)
return discreteApproxEntropyFromWeights(function, weights)
def splitUpdateEquationsInDeltasPlusRest(updateEqsRhs, relaxationRate):
deltas = [ue.expand().collect(relaxationRate).coeff(relaxationRate)
for ue in updateEqsRhs]
rest = [ue.expand().collect(relaxationRate) - relaxationRate * delta for ue, delta in zip(updateEqsRhs, deltas)]
return rest, deltas
def findEntropyMaximizingOmega(omega_s, f_eq, ds, dh):
dsdh = sum([ds_i * dh_i / f_eq_i for ds_i, dh_i, f_eq_i in zip(ds, dh, f_eq)])
dhdh = sum([dh_i * dh_i / f_eq_i for dh_i, f_eq_i in zip(dh, f_eq)])
return 1 - ((omega_s - 1) * dsdh / dhdh)
def determineRelaxationRateByEntropyCondition(updateRule, omega_s, omega_h):
stencil = updateRule.latticeModel.stencil
Q = len(stencil)
fSymbols = updateRule.latticeModel.pdfSymbols
updateEqsRhs = [e.rhs for e in updateRule.updateEquations]
_, ds = splitUpdateEquationsInDeltasPlusRest(updateEqsRhs, omega_s)
_, dh = splitUpdateEquationsInDeltasPlusRest(updateEqsRhs, omega_h)
dsSymbols = [sp.Symbol("entropicDs_%d" % (i,)) for i in range(Q)]
dhSymbols = [sp.Symbol("entropicDh_%d" % (i,)) for i in range(Q)]
feqSymbols = [sp.Symbol("entropicFeq_%d" % (i,)) for i in range(Q)]
subexprs = [sp.Eq(a, b) for a, b in zip(dsSymbols, ds)] + \
[sp.Eq(a, b) for a, b in zip(dhSymbols, dh)] + \
[sp.Eq(a, f_i + ds_i + dh_i) for a, f_i, ds_i, dh_i in zip(feqSymbols, fSymbols, dsSymbols, dhSymbols)]
optimalOmegaH = findEntropyMaximizingOmega(omega_s, feqSymbols, dsSymbols, dhSymbols)
subexprs += [sp.Eq(omega_h, optimalOmegaH)]
newUpdateEquations = []
for updateEq in updateRule.updateEquations:
index = updateRule.latticeModel.pdfDestinationSymbols.index(updateEq.lhs)
newEq = sp.Eq(updateEq.lhs, fSymbols[index] + omega_s * dsSymbols[index] + omega_h * dhSymbols[index])
newUpdateEquations.append(newEq)
return updateRule.newWithSubexpressions(newUpdateEquations, subexprs)
def decompositionByRelaxationRate(updateRule, relaxationRate):
lm = updateRule.latticeModel
stencil = lm.stencil
affineTerms = [0] * len(stencil)
linearTerms = [0] * len(stencil)
quadraticTerms = [0] * len(stencil)
for updateEquation in updateRule.updateEquations:
index = lm.pdfDestinationSymbols.index(updateEquation.lhs)
rhs = updateEquation.rhs
linearTerms[index] = rhs.coeff(relaxationRate)
quadraticTerms[index] = rhs.coeff(relaxationRate**2)
affineTerms[index] = rhs - relaxationRate * linearTerms[index] - relaxationRate**2 * quadraticTerms[index]
if relaxationRate in affineTerms[index].atoms(sp.Symbol):
raise ValueError("Relaxation Rate decomposition failed (affine part) - run simplification first")
if relaxationRate in linearTerms[index].atoms(sp.Symbol):
raise ValueError("Relaxation Rate decomposition failed (linear part) - run simplification first")
if relaxationRate in quadraticTerms[index].atoms(sp.Symbol):
raise ValueError("Relaxation Rate decomposition failed (quadratic part) - run simplification first")
return affineTerms, linearTerms, quadraticTerms
class RelaxationRatePolynomialDecomposition:
def __init__(self, collisionRule, freeRelaxationRates, fixedRelaxationRates):
self._collisionRule = collisionRule
self._freeRelaxationRates = freeRelaxationRates
self._fixedRelaxationRates = fixedRelaxationRates
self._allRelaxationRates = fixedRelaxationRates + freeRelaxationRates
for se in collisionRule.subexpressions:
for rr in freeRelaxationRates:
assert rr not in se.rhs.atoms(sp.Symbol), \
"Decomposition not possible since free relaxation rates are already in subexpressions"
def symbolicRelaxationRateFactors(self, relaxationRate, power):
Q = len(self._collisionRule.latticeModel.stencil)
omegaIdx = self._allRelaxationRates.index(relaxationRate)
return [sp.Symbol("entFacOmega_%d_%d_%d" % (i, omegaIdx, power)) for i in range(Q)]
def relaxationRateFactors(self, relaxationRate):
updateEquations = self._collisionRule.updateEquations
result = []
for updateEquation in updateEquations:
factors = []
rhs = updateEquation.rhs
power = 0
while True:
power += 1
factor = rhs.coeff(relaxationRate ** power)
if factor != 0:
if relaxationRate in factor.atoms(sp.Symbol):
raise ValueError("Relaxation Rate decomposition failed - run simplification first")
factors.append(factor)
else:
break
result.append(factors)
return result
def constantExprs(self):
subsDict = {rr: 0 for rr in self._freeRelaxationRates}
subsDict.update({rr: 0 for rr in self._fixedRelaxationRates})
updateEquations = self._collisionRule.updateEquations
return [fastSubs(eq.rhs, subsDict) for eq in updateEquations]
def equilibriumExprs(self):
subsDict = {rr: 1 for rr in self._freeRelaxationRates}
subsDict.update({rr: 1 for rr in self._fixedRelaxationRates})
updateEquations = self._collisionRule.updateEquations
return [fastSubs(eq.rhs, subsDict) for eq in updateEquations]
def symbolicEquilibrium(self):
Q = len(self._collisionRule.latticeModel.stencil)
return [sp.Symbol("entFeq_%d" % (i,)) for i in range(Q)]
def determineRelaxationRateByEntropyConditionIterative(updateRule, omega_s, omega_h,
newtonIterations=2, initialValue=1):
lm = updateRule.latticeModel
decomp = RelaxationRatePolynomialDecomposition(updateRule, [omega_h], [omega_s])
# compute and assign f_eq
#fEqEqs = [sp.Eq(a, b) for a, b in zip(decomp.symbolicEquilibrium(), decomp.equilibriumExprs())]
# compute and assign relaxation rate factors
newUpdateEquations = []
fEqEqs = []
rrFactorDefinitions = []
relaxationRates = [omega_s, omega_h]
for i, constantExpr in enumerate(decomp.constantExprs()):
updateEqRhs = constantExpr
fEqRhs = constantExpr
for rr in relaxationRates:
factors = decomp.relaxationRateFactors(rr)
for idx, f in enumerate(factors[i]):
power = idx + 1
symbolicFactor = decomp.symbolicRelaxationRateFactors(rr, power)[i]
rrFactorDefinitions.append(sp.Eq(symbolicFactor, f))
updateEqRhs += rr ** power * symbolicFactor
fEqRhs += symbolicFactor
newUpdateEquations.append(sp.Eq(lm.pdfDestinationSymbols[i], updateEqRhs))
fEqEqs.append(sp.Eq(decomp.symbolicEquilibrium()[i], fEqRhs))
# newton iterations to determine free omega
intermediateOmegas = [sp.Symbol("omega_iter_%i" % (i,)) for i in range(newtonIterations+1)]
intermediateOmegas[0] = initialValue
intermediateOmegas[-1] = omega_h
newtonIterationEquations = []
for omega_idx in range(len(intermediateOmegas)-1):
rhsOmega = intermediateOmegas[omega_idx]
lhsOmega = intermediateOmegas[omega_idx+1]
updateEqsRhs = [e.rhs for e in newUpdateEquations]
entropy = discreteApproxEntropyFromWeights(updateEqsRhs, [e.lhs for e in fEqEqs])
entropyDiff = sp.diff(entropy, omega_h)
entropySecondDiff = sp.diff(entropyDiff, omega_h)
entropyDiff = entropyDiff.subs(omega_h, rhsOmega)
entropySecondDiff = entropySecondDiff.subs(omega_h, rhsOmega)
newtonEq = sp.Eq(lhsOmega, rhsOmega - entropyDiff / entropySecondDiff)
newtonIterationEquations.append(newtonEq)
# final update equations
return updateRule.newWithSubexpressions(newUpdateEquations, rrFactorDefinitions + fEqEqs + newtonIterationEquations)
def createEntropicCollisionRule(dim, name='KBC-N4', compressible=False,
useNewtonIterations=False, velocityRelaxation=None, fixedOmega=None):
from functools import reduce
import itertools
import operator
from lbmpy.moments import MOMENT_SYMBOLS, momentsOfOrder, discreteMoment, exponentTuplesToPolynomials
from lbmpy.stencils import getStencil
from lbmpy.latticemodel import MomentRelaxationLatticeModel
from lbmpy.simplifications import createDefaultMomentSpaceSimplificationStrategy
def product(iterable):
return reduce(operator.mul, iterable, 1)
theMoment = MOMENT_SYMBOLS[:dim]
rho = [sp.Rational(1, 1)]
velocity = list(theMoment)
shearTensorOffDiagonal = [product(t) for t in itertools.combinations(theMoment, 2)]
shearTensorDiagonal = [m_i * m_i for m_i in theMoment]
shearTensorTrace = sum(shearTensorDiagonal)
shearTensorTracefreeDiagonal = [d - shearTensorTrace/dim for d in shearTensorDiagonal]
energyTransportTensor = exponentTuplesToPolynomials([a for a in momentsOfOrder(3, dim, True) if 3 not in a])
explicitlyDefined = set(rho + velocity + shearTensorOffDiagonal + shearTensorDiagonal + energyTransportTensor)
rest = list(set(exponentTuplesToPolynomials(momentsUpToComponentOrder(2, dim))) - explicitlyDefined)
assert len(rest) + len(explicitlyDefined) == 3**dim
# naming according to paper Karlin 2015: Entropic multirelaxation lattice Boltzmann models for turbulent flows
D = shearTensorOffDiagonal + shearTensorTracefreeDiagonal[:-1]
T = [shearTensorTrace]
Q = energyTransportTensor
if name == 'KBC-N1':
decomposition = [D, T+Q+rest]
elif name == 'KBC-N2':
decomposition = [D+T, Q+rest]
elif name == 'KBC-N3':
decomposition = [D+Q, T+rest]
elif name == 'KBC-N4':
decomposition = [D+T+Q, rest]
else:
raise ValueError("Unknown model. Supported models KBC-Nx")
omega_s, omega_h = sp.symbols("omega omega_h")
shearPart, restPart = decomposition
velRelaxation = omega_s if velocityRelaxation is None else velocityRelaxation
relaxationRates = [omega_s] + \
[velRelaxation] * len(velocity) + \
[omega_s] * len(shearPart) + \
[omega_h] * len(restPart)
stencil = getStencil("D2Q9") if dim == 2 else getStencil("D3Q27")
allMoments = rho + velocity + shearPart + restPart
discreteEquilibrium = standardDiscreteEquilibrium(stencil, order=2,
compressible=compressible, c_s_sq=sp.Rational(1, 3))
equilibriumMoments = [discreteMoment(tuple(discreteEquilibrium), mom, stencil) for mom in allMoments]
lm = MomentRelaxationLatticeModel(stencil, allMoments, equilibriumMoments, relaxationRates, compressible)
simplify = createDefaultMomentSpaceSimplificationStrategy()
collisionRule = simplify(lm.getCollisionRule())
if useNewtonIterations:
collisionRule = determineRelaxationRateByEntropyConditionIterative(collisionRule, omega_s, omega_h, 4)
else:
collisionRule = determineRelaxationRateByEntropyCondition(collisionRule, omega_s, omega_h)
if fixedOmega:
collisionRule = collisionRule.newWithSubstitutions({omega_s: fixedOmega})
return collisionRule
if __name__ == "__main__":
createEntropicCollisionRule(3).display()
from sympy import Rational as R
from sympy import simplify
import sympy as sp
from lbmpy.util import getSymbolicDensity, getSymbolicVelocityVector, getSymbolicSoundSpeed
from lbmpy.moments import momentMatrix, continuousMoment, MOMENT_SYMBOLS
from lbmpy.transformations import removeHigherOrderTerms
from lbmpy.diskcache import diskcache
def getWeights(stencil):
Q = len(stencil)
def weightForDirection(direction):
absSum = sum([abs(d) for d in direction])
return getWeights.weights[Q][absSum]
return [weightForDirection(d) for d in stencil]
getWeights.weights = {
9: {
0: R(4, 9),
1: R(1, 9),
2: R(1, 36),
},
15: {
0: R(2, 9),
1: R(1, 9),
3: R(1, 72),
},
19: {
0: R(1, 3),
1: R(1, 18),
2: R(1, 36),
},
27: {
0: R(8, 27),
1: R(2, 27),
2: R(1, 54),
3: R(1, 216),
}
}
#@diskcache
def standardDiscreteEquilibrium(stencil, rho=None, u=None, order=2, c_s_sq=None, compressible=True):
"""
Returns the common quadratic LBM equilibrium as a list of sympy expressions
:param stencil: one of the supported stencils ( D2Q9, D3Q19, ... )
:param rho: sympy symbol for the density - defaults to symbols('rho')
:param u: list with the length of stencil dimensionality with symbols for velocity,
:param order: highest order of velocity terms (for hydrodynamics order 2 is sufficient)
:param c_s_sq: square of speed of sound - if not specified the symbolic value "c_s**2" is used
:param compressible: compressibility
"""
e = stencil
w = getWeights(stencil)
assert len(e) == len(w)
D = len(e[0])
if rho is None:
rho = getSymbolicDensity()
if u is None:
u = getSymbolicVelocityVector(D, "u")
if not c_s_sq:
c_s_sq = getSymbolicSoundSpeed()**2
rhoOutside = rho if compressible else R(1, 1)
rhoInside = rho if not compressible else R(1, 1)
res = []
for w_q, e_q in zip(w, e):
eTimesU = 0
for c_q_alpha, u_alpha in zip(e_q, u):
eTimesU += c_q_alpha * u_alpha
fq = rhoInside + eTimesU / c_s_sq
if order <= 1:
res.append(fq * rhoOutside * w_q)
continue
uTimesU = 0
for u_alpha in u:
uTimesU += u_alpha * u_alpha
fq += R(1, 2) / c_s_sq**2 * eTimesU ** 2 - R(1, 2) / c_s_sq * uTimesU
if order <= 2:
res.append(fq * rhoOutside * w_q)
continue
fq += R(1, 6) / c_s_sq**3 * eTimesU**3 - R(1, 2) / c_s_sq**2 * uTimesU * eTimesU
res.append(simplify(fq * rhoOutside * w_q))
return sp.Matrix(len(stencil), 1, res)
@diskcache
def maxwellBoltzmannEquilibrium(dimension=3, rho=None, u=None, v=None, c_s_sq=None):
"""
Returns sympy expression of Maxwell Boltzmann distribution
:param dimension: number of space dimensions
:param rho: sympy symbol for the density - defaults to symbols('rho')
if custom symbol is used make sure to mark is as positive=True
:param u: list with the length of spatial dimensions containing symbols for macroscopic velocity u
:param u: list with the length of spatial dimensions containing symbols for peculiar velocity v
:param c_s_sq: symbol for speed of sound squared, defaults to symbol c_s**2
:return:
"""
D = dimension
if not rho:
rho = getSymbolicDensity()
if not u:
u = getSymbolicVelocityVector(D, "u")
if not v:
v = getSymbolicVelocityVector(D, "v")
if not c_s_sq:
c_s = getSymbolicSoundSpeed()
c_s_sq = c_s**2
velTerm = sum([(v_i - u_i) ** 2 for v_i, u_i in zip(v, u)])
return rho / (2 * sp.pi * c_s_sq) ** (R(D, 2)) * sp.exp(- velTerm / (2 * c_s_sq))
@diskcache
def getEquilibriumMoments(listOfMoments, continuousEquilibrium, u, v, order=None):
result = []
for moment in listOfMoments:
contMom = 0
for term, coeff in moment.as_coefficients_dict().items():
exponents = tuple([term.as_coeff_exponent(v_i)[1] for v_i in v])
contMom += coeff * continuousMoment(continuousEquilibrium, exponents, symbols=v)
if order:
contMom = removeHigherOrderTerms(contMom, order, u)
result.append(contMom)
return result
@diskcache
def getMaxwellBoltzmannEquilibriumMoments(moments, order=None, velSymbols=MOMENT_SYMBOLS):
dim = len(velSymbols)
rho = getSymbolicDensity()
u = getSymbolicVelocityVector(dim, "u")
mb = maxwellBoltzmannEquilibrium(dim, rho, u, velSymbols, c_s_sq=sp.Rational(1, 3))
return getEquilibriumMoments(moments, mb, u, velSymbols, order)
@diskcache
def generateEquilibriumByMatchingMoments(stencil, moments, order=None, velSymbols=MOMENT_SYMBOLS):
Q = len(stencil)
assert len(moments) == Q, "Moment count(%d) does not match stencil size(%d)" % (len(moments), Q)
continuousMomentsVector = sp.Matrix(Q, 1, getMaxwellBoltzmannEquilibriumMoments(moments, order, velSymbols))
M = momentMatrix(moments, stencil)
assert M.rank() == Q, "Rank of moment matrix (%d) does not match stencil size (%d)" % (M.rank(), Q)
return M.inv() * continuousMomentsVector
import sympy as sp
import numpy as np
from lbmpy.latticemodel import LbmCollisionRule
from pystencils import Field
# --------------------------------- Field Access patterns --------------------------------------------------------------
def streamPullSourceDestination(lbmCollisionRule, srcField, dstField):
lm = lbmCollisionRule.latticeModel
pdfSrcSymbols = lm.pdfSymbols
pdfDstSymbols = lm.pdfDestinationSymbols
substitutions = {}
for idx, offset in enumerate(lm.stencil):
inverseIdx = tuple([-d for d in offset])
substitutions[pdfSrcSymbols[idx]] = srcField[inverseIdx](idx)
substitutions[pdfDstSymbols[idx]] = dstField(idx)
return lbmCollisionRule.newWithSubstitutions(substitutions)
def streamPullWithSourceAndDestinationFields(lbmCollisionRule, numpyField=None, srcFieldName="src", dstFieldName="dst",
genericLayout='numpy', genericFieldType=np.float64):
"""
Implements a stream-pull scheme, where values are read from source and written to destination field
:param lbmCollisionRule: a collision rule created by the lattice model
:param numpyField: optional numpy field for PDFs. Used to create a kernel of fixed loop bounds and strides
if None, a generic kernel is created
:param srcFieldName: name of the pdf source field
:param dstFieldName: name of the pdf destination field
:param genericLayout: if no numpyField is given to determine the layout, a variable sized field with the given
genericLayout is used
:param genericFieldType: if no numpyField is given, this data type is used for the fields
:return: new lbm collision rule, where generic pdf references are replaced by field accesses
"""
lm = lbmCollisionRule.latticeModel
if numpyField is not None:
assert len(numpyField.shape) == lm.dim + 1
if numpyField is None:
src = Field.createGeneric(srcFieldName, lm.dim, indexDimensions=1, layout=genericLayout, dtype=genericFieldType)
dst = Field.createGeneric(dstFieldName, lm.dim, indexDimensions=1, layout=genericLayout, dtype=genericFieldType)
else:
src = Field.createFromNumpyArray(srcFieldName, numpyField, indexDimensions=1)
dst = Field.createFromNumpyArray(dstFieldName, numpyField, indexDimensions=1)
return streamPullSourceDestination(lbmCollisionRule, src, dst)
# ---------------------------- Macroscopic value I/O to fields ---------------------------------------------------------
def addVelocityFieldOutput(lbmCollisionRule, velocityField, shiftForceIfNecessary=True):
lm = lbmCollisionRule.latticeModel
rho = lm.symbolicDensity
u = lm.symbolicVelocity
if hasattr(lm.forceModel, "macroscopicVelocity") and shiftForceIfNecessary:
macroscopicVelocity = lm.forceModel.macroscopicVelocity(lm, u, rho)
else:
macroscopicVelocity = u
newEqs = [sp.Eq(velocityField(i), u_i) for i, u_i in enumerate(macroscopicVelocity)]
return LbmCollisionRule(lbmCollisionRule.updateEquations+newEqs, lbmCollisionRule.subexpressions,
lm, lbmCollisionRule.updateEquationDirections)
def addScalarOutput(lbmCollisionRule, symbol, outputField):
eq = sp.Eq(outputField(0), symbol)
return lbmCollisionRule.newWithSubexpressions(lbmCollisionRule.updateEquations, [eq])
def addDensityFieldOutput(lbmCollisionRule, densityField):
lm = lbmCollisionRule.latticeModel
newEqs = [sp.Eq(densityField(0), lm.symbolicDensity)]
return LbmCollisionRule(lbmCollisionRule.updateEquations+newEqs, lbmCollisionRule.subexpressions, lm,
lbmCollisionRule.updateEquationDirections)
"""
.. module:: forcemodels
:synopsis: Collection of forcing terms for hydrodynamic LBM simulations
"""
import sympy as sp
import lbmpy.util as util
from lbmpy.equilibria import getWeights
from lbmpy.util import scalarProduct
class Simple:
"""
A simple force model which introduces the following additional force term in the
collision process: ::math:`3 * w_i * e_i * f_i` (often: force = rho * acceleration)
Should only be used with constant forces!
Shifts the macroscopic velocity by F/2, but does not change the equilibrium velocity.
"""
def __init__(self, force):
self._force = force
def __call__(self, latticeModel):
dim = len(latticeModel.stencil[0])
assert len(self._force) == dim
weights = getWeights(latticeModel.stencil)
return [3 * w_i * scalarProduct(self._force, direction)
for direction, w_i in zip(latticeModel.stencil, weights)]
class Luo:
"""
Force model by Luo with the following forcing term
.. math ::
F_i = w_i * \left( \frac{c_i - u}{c_s^2} + \frac{c_i * (c_i * u)}{c_s^4} \right) * F
Shifts the macroscopic velocity by F/2, but does not change the equilibrium velocity.
"""
def __init__(self, force):
self._force = force
def __call__(self, latticeModel):
u = sp.Matrix(util.getSymbolicVelocityVector(latticeModel.dim))
force = sp.Matrix(self._force)
weights = getWeights(latticeModel.stencil)
result = []
for direction, w_i in zip(latticeModel.stencil, weights):
direction = sp.Matrix(direction)
result.append(3 * w_i * force.dot(direction - u + 3 * direction * direction.dot(u)))
return result
def macroscopicVelocity(self, latticeModel, vel, density):
return defaultVelocityShift(vel, density, self._force, latticeModel.compressible)
class Guo:
"""
Force model by Guo with the following term:
.. math ::
F_i = w_i * ( 1 - \frac{1}{2 * tau} ) * \left( \frac{c_i - u}{c_s^2} + \frac{c_i * (c_i * u)}{c_s^4} \right) * F
Adapts the calculation of the macroscopic velocity as well as the equilibrium velocity (both shifted by F/2)!
"""
def __init__(self, force, viscosityRelaxationRate):
self._force = force
self._viscosityRelaxationRate = viscosityRelaxationRate
def __call__(self, latticeModel):
luo = Luo(self._force)
correctionFactor = (1 - sp.Rational(1, 2) * self._viscosityRelaxationRate)
return [correctionFactor * t for t in luo(latticeModel)]
def macroscopicVelocity(self, latticeModel, vel, density):
return defaultVelocityShift(vel, density, self._force, latticeModel.compressible)
def equilibriumVelocity(self, latticeModel, vel, density):
return defaultVelocityShift(vel, density, self._force, latticeModel.compressible)
# -------------------------------- Helper functions ------------------------------------------------------------------
def defaultVelocityShift(velocity, density, force, compressible):
if compressible:
return [v_i + f_i / (2 * density) for v_i, f_i in zip(velocity, force)]
else:
return [v_i + f_i / 2 for v_i, f_i in zip(velocity, force)]
import numpy as np
def initializePdfField(latticeModel, pdfArray):
if latticeModel.compressible is None:
pdfArray.fill(0.0)
else:
if latticeModel.dim == 2:
for i, weight in enumerate(latticeModel.weights):
pdfArray[:, :, i] = float(weight)
elif latticeModel.dim == 3:
for i, weight in enumerate(latticeModel.weights):
pdfArray[:, :, :, i] = float(weight)
else:
raise NotImplementedError()
def computeVelocity(latticeModel, pdfArray):
vel = np.zeros(pdfArray.shape[:-1] + (2,))
for i, dir in enumerate(latticeModel.stencil):
if dir[0] != 0:
vel[:, :, 0] += dir[0] * pdfArray[:, :, i]
if dir[1] != 0:
vel[:, :, 1] += dir[1] * pdfArray[:, :, i]
return vel
import sympy as sp
from pystencils.transformations import fastSubs
from lbmpy.densityVelocityExpressions import getDensityVelocityExpressions
from lbmpy.equilibria import getMaxwellBoltzmannEquilibriumMoments, standardDiscreteEquilibrium, getWeights
import lbmpy.moments as m
import lbmpy.transformations as trafos
import lbmpy.util as util
# ---------------- From standard discrete equilibrium -------------------------------------------------------------
def makeSRT(stencil, order=2, compressible=False, forceModel=None, velocityRelaxationRate=None):
momentSystem = m.getDefaultOrthogonalMoments(stencil)
discreteEquilibrium = standardDiscreteEquilibrium(stencil, order=order,
compressible=compressible, c_s_sq=sp.Rational(1, 3))
equilibriumMoments = [m.discreteMoment(tuple(discreteEquilibrium), mom, stencil) for mom in momentSystem.allMoments]
relaxationRates = [sp.Symbol('omega')] * len(stencil)
if velocityRelaxationRate is not None:
for id in momentSystem.conservedMomentIds[1:]:
relaxationRates[id] = velocityRelaxationRate
return MomentRelaxationLatticeModel(stencil, momentSystem.allMoments, equilibriumMoments,
relaxationRates, compressible, forceModel)
def makeTRT(stencil, order=2, compressible=False, forceModel=None, velocityRelaxationRate=None):
momentSystem = m.getDefaultOrthogonalMoments(stencil)
discreteEquilibrium = standardDiscreteEquilibrium(stencil, order=order,
compressible=compressible, c_s_sq=sp.Rational(1, 3))
equilibriumMoments = [m.discreteMoment(tuple(discreteEquilibrium), mom, stencil) for mom in momentSystem.allMoments]
lambda_e, lambda_o = sp.symbols("lambda_e lambda_o")
relaxationRates = [lambda_e if m.isEven(moment) else lambda_o for moment in momentSystem.allMoments]
if velocityRelaxationRate is not None:
for id in momentSystem.conservedMomentIds[1:]:
relaxationRates[id] = velocityRelaxationRate
return MomentRelaxationLatticeModel(stencil, momentSystem.allMoments, equilibriumMoments,
relaxationRates, compressible, forceModel)
def makeMRT(stencil, order=2, compressible=False, forceModel=None, velocityRelaxationRate=None):
momentSystem = m.getDefaultOrthogonalMoments(stencil)
discreteEquilibrium = standardDiscreteEquilibrium(stencil, order=order,
compressible=compressible, c_s_sq=sp.Rational(1, 3))
equilibriumMoments = [m.discreteMoment(tuple(discreteEquilibrium), mom, stencil) for mom in momentSystem.allMoments]
if not momentSystem.hasMomentGroups:
raise NotImplementedError("No moment grouping available for this lattice model")
relaxationRates = momentSystem.getSymbolicRelaxationRates()
if velocityRelaxationRate is not None:
for id in momentSystem.conservedMomentIds[1:]:
relaxationRates[id] = velocityRelaxationRate
return MomentRelaxationLatticeModel(stencil, momentSystem.allMoments, equilibriumMoments,
relaxationRates, compressible, forceModel)
# ---------------- From Continuous Maxwell Boltzmann -------------------------------------------------------------
def makeSRTFromMaxwellBoltzmann(stencil, order=2, forceModel=None):
Q = len(stencil)
moments = m.getDefaultOrthogonalMoments(stencil)
return LatticeModel(stencil, moments,
getMaxwellBoltzmannEquilibriumMoments(moments, order=order),
[sp.Symbol('omega')] * Q, True, forceModel)
# ---------------- From Continuous Maxwell Boltzmann -------------------------------------------------------------
class LbmCollisionRule:
def __init__(self, updateEquations, subExpressions, latticeModel, updateEquationDirections=None):
self.subexpressions = subExpressions
self.updateEquations = updateEquations
if updateEquationDirections is None:
self.updateEquationDirections = latticeModel.stencil
else:
self.updateEquationDirections = updateEquationDirections
self.latticeModel = latticeModel
def newWithSubexpressions(self, newUpdateEquations, newSubexpressions, newOrder=None):
assert len(self.updateEquations) == len(newUpdateEquations)
ordering = self.updateEquationDirections if newOrder is None else newOrder
return LbmCollisionRule(newUpdateEquations, self.subexpressions+newSubexpressions, self.latticeModel, ordering)
def newWithSubstitutions(self, substitutionDict, newOrder=None):
newSubexpressions = [fastSubs(e, substitutionDict) for e in self.subexpressions]
newUpdateEquations = [fastSubs(e, substitutionDict) for e in self.updateEquations]
ordering = self.updateEquationDirections if newOrder is not None else newOrder
return LbmCollisionRule(newUpdateEquations, newSubexpressions, self.latticeModel, ordering)
def countNumberOfOperations(self):
return trafos.countNumberOfOperations(self.subexpressions + self.updateEquations)
@property
def equations(self):
return self.subexpressions + self.updateEquations
def display(self, printFunction=print):
"""
Prints subexpressions and update rules of this collision rule
:param printFunction: function that is used for printing, for IPython notebooks IPython.display can be useful
"""
printFunction("Subexpressions:")
for s in self.subexpressions:
printFunction(s)
printFunction("Update Rules")
for s in self.updateEquations: # [-1:]:
printFunction(s)
def displayRepresentative(self, printFunction=print, directions=None):
"""Prints the update rules for C, W, NW and for 3D models TNW
:param printFunction: function that is used for printing, for IPython notebooks IPython.display can be useful
:param directions: can be a list of directions, to print only these directions
if None, the default directions are printed
"""
if directions is None:
if self.latticeModel.dim == 2:
directions = [(0, 0), (1, 0), (1, 1)]
elif self.latticeModel.dim == 3:
directions = [(0, 0, 0), (1, 0, 0), (1, 1, 0), (1, 1, 1)]
else:
raise NotImplementedError("Only 2D and 3D models supported")
indices = [self.updateEquationDirections.index(i) for i in directions]
for i in indices:
printFunction(self.updateEquations[i])
class LatticeModel:
def __init__(self, stencil, compressible, forceModel=None):
self._stencil = stencil
self._compressible = compressible
self._forceModel = forceModel
@property
def stencil(self):
"""Sequence of directions (discretization of velocity space)"""
return self._stencil
@property
def compressible(self):
"""Determines how to calculate density/velocity and how pdfs are stored:
True: pdfs are centered around 1 (normal)
False: pdfs are centered around 0, density is sum(pdfs)+1"""
return self._compressible
@property
def dim(self):
"""Spatial dimension of method"""
return len(self._stencil[0])
@property
def forceModel(self):
"""Force model passed in constructor"""
return self._forceModel
@property
def pdfSymbols(self):
Q = len(self._stencil)
return [sp.Symbol("f_%d" % (i,)) for i in range(Q)]
@property
def pdfDestinationSymbols(self):
Q = len(self._stencil)
return [sp.Symbol("d_%d" % (i,)) for i in range(Q)]
@property
def symbolicDensity(self):
return util.getSymbolicDensity()
@property
def symbolicVelocity(self):
return util.getSymbolicVelocityVector(self.dim)
def getCollisionRule(self):
raise NotImplemented("This method has to be implemented in subclass")
class MomentRelaxationLatticeModel(LatticeModel):
def __init__(self, stencil, moments, equilibriumMoments, relaxationRates, compressible, forceModel=None):
super(MomentRelaxationLatticeModel, self).__init__(stencil, compressible, forceModel)
self._moments = moments
self._equilibriumMoments = sp.Matrix(equilibriumMoments)
self._relaxationRates = relaxationRates
@property
def momentMatrix(self):
return m.momentMatrix(self._moments, self.stencil)
@property
def weights(self):
return getWeights(self._stencil)
@property
def relaxationRates(self):
"""Sequence of len(stencil) relaxation rates (may be symbolic or constant)"""
return self._relaxationRates
@property
def allRelaxationRatesFixed(self):
symbols = [rt for rt in self._relaxationRates if isinstance(rt, sp.Symbol)]
return len(symbols) == 0
def setCollisionDOFs(self, replacementDict):
"""Replace relaxation rate symbols by passing a dictionary from symbol name to new value"""
substitutions = {sp.Symbol(key): value for key, value in replacementDict.items()}
self._relaxationRates = [fastSubs(rr, substitutions) for rr in self._relaxationRates]
def getCollisionRule(self):
relaxationMatrix = sp.diag(*self._relaxationRates)
M = self.momentMatrix # transform from physical to moment space
pdfVector = sp.Matrix(self.pdfSymbols)
collisionResult = M.inv() * relaxationMatrix * (self._equilibriumMoments - M * pdfVector)
#collisionResult = M.inv() * (M * pdfVector + relaxationMatrix * (self._equilibriumMoments - M * pdfVector))
print("more complex collision rule")
if self.forceModel:
collisionResult += sp.Matrix(self.forceModel(latticeModel=self))
collisionEqs = [sp.Eq(dst_i, s+t)
for s, dst_i, t in zip(self.pdfSymbols, self.pdfDestinationSymbols, collisionResult)]
#collisionEqs = [sp.Eq(dst_i, t)
# for dst_i, t in zip(self.pdfDestinationSymbols, collisionResult)]
# get optimized calculation rules for density and velocity
rhoSubexprs, rhoEq, uSubexprs, uEqs = getDensityVelocityExpressions(self.stencil, self.pdfSymbols,
self.compressible)
# for some force models the velocity has to be shifted
if self.forceModel and hasattr(self.forceModel, "equilibriumVelocity"):
uSymbols = [e.lhs for e in uEqs]
uRhs = [e.rhs for e in uEqs]
correctedVel = self.forceModel.equilibriumVelocity(self, uRhs, rhoEq.lhs)
uEqs = [sp.Eq(u_i, correctedVel_i) for u_i, correctedVel_i in zip(uSymbols, correctedVel)]
subExpressions = rhoSubexprs + [rhoEq] + uSubexprs + uEqs
return LbmCollisionRule(collisionEqs, subExpressions, self)
from collections import defaultdict
import sympy as sp
import numpy as np
from pystencils.field import Field
from lbmpy.fieldaccess import streamPullWithSourceAndDestinationFields
from lbmpy.latticemodel import MomentRelaxationLatticeModel
from lbmpy.simplifications import createDefaultMomentSpaceSimplificationStrategy
import lbmpy.simplifications as simplifications
def createStreamCollideUpdateRule(lm, numpyField=None, srcFieldName="src", dstFieldName="dst",
doCSE=False, genericLayout='numpy', genericFieldType=np.float64):
"""
Creates a list of LBM update equations
:param lm: instance of lattice model
:param numpyField: optional numpy field for PDFs. Used to create a kernel of fixed loop bounds and strides
if None, a generic kernel is created
:param srcFieldName: name of the pdf source field
:param dstFieldName: name of the pdf destination field
:param doCSE: if True, common subexpression elimination is done for pdfs in opposing directions
:param genericLayout: if no numpyField is given to determine the layout, a variable sized field with the given
genericLayout is used
:param genericFieldType: if no numpyField is given, this data type is used for the fields
:return: list of sympy equations
"""
if isinstance(lm, MomentRelaxationLatticeModel):
simplificationStrategy = createDefaultMomentSpaceSimplificationStrategy()
if doCSE:
simplificationStrategy.addSimplificationRule(simplifications.cseInOpposingDirections)
else:
simplificationStrategy = simplifications.Strategy()
simplificationStrategy.addSimplificationRule(simplifications.subexpressionSubstitutionInExistingSubexpressions)
simplificationStrategy.addSimplificationRule(simplifications.sympyCSE)
collisionRule = lm.getCollisionRule()
collisionRule = simplificationStrategy(collisionRule)
collisionRule = streamPullWithSourceAndDestinationFields(collisionRule, numpyField, srcFieldName, dstFieldName,
genericLayout, genericFieldType)
return collisionRule
def createLbmSplitGroups(lm, equations):
f_eq_common = sp.Symbol('f_eq_common')
rho = sp.Symbol('rho')
result = [list(lm.symbolicVelocity), ]
if lm.compressible:
result[0].append(rho)
directionGroups = defaultdict(list)
for eq in equations:
if f_eq_common in eq.rhs.atoms(sp.Symbol) and f_eq_common not in result[0]:
result[0].append(f_eq_common)
if not type(eq.lhs) is Field.Access:
continue
idx = eq.lhs.index[0]
if idx == 0:
result[0].append(eq.lhs)
continue
direction = lm.stencil[idx]
inverseDir = tuple([-i for i in direction])
if inverseDir in directionGroups:
directionGroups[inverseDir].append(eq.lhs)
else:
directionGroups[direction].append(eq.lhs)
result += directionGroups.values()
if f_eq_common not in result[0]:
result[0].append(rho)
return result
import sympy as sp
class RelaxationScheme:
def doRelaxation(self, equationCollection):
pass
def equilibrium(self):
pass
def conservedQuantities(self):
"""Returns """
pass
class LbmMethod:
"""
Class that holds all information about a lattice Boltzmann collision scheme:
- discretization of velocity space i.e. stencil
- equations to transform into collision space and back
- relaxation scheme and equilibrium
- conserved quantities and equations to compute them
Interface to force models and boundary conditions:
- getMacroscopicQuantity('velocity')
- getMacroscopicQuantity('density')
- getMacroscopicQuantitySymbol('velocity')
General Idea:
- separate collision space transformation (e.g. raw moments, central moments and cumulant)
from relaxation schemes (e.g. various orthogonalization methods)
- question: who defines macroscopic/conserved values
Open Questions: TODO
- forcemodel as member? or can it be supplied in getCollisionRule()
and getMacroscopicQuantity() -> probably a member is better
ForceModel Interface:
- pass velocity & compressibility instead of complete lbmMethod? not sure...
-
Boundary Interface:
- pass full lbm method to boundary function
"""
def __init__(self, stencil, collisionSpaceTransformation, relaxationScheme,
inverseCollisionSpaceTransformation=None):
"""
Create a LbmMethod
:param stencil:
sequence of directions, each direction is a tuple of integers of length equal to dimension
:param collisionSpaceTransformation:
EquationCollection object, defining a transformation of distribution space (pdfs)
into collision space. Collision space can be for example a moment or cumulant representation.
The equation collection must have as many free symbols as there are stencil entries.
The free symbols are termed collisionSpaceSymbols and are passed to the relaxation scheme.
:param relaxationScheme:
a relaxation scheme object, providing information about the relaxation process and the
equilibrium in collision space
:param inverseCollisionSpaceTransformation:
if passed None, the inverse transformation is determined by
inverting the collisionSpaceTransformation using sympy. There are
cases where sympy is slow or unable to do that, in this case the
inverse can be specified here.
"""
@property
def stencil(self):
pass # TODO
@property
def compressible(self):
pass # TODO
@property
def dim(self):
return len(self.stencil[0])
def preCollisionSymbols(self):
return [sp.Symbol("f_%d" % (i,)) for i in range(len(self.stencil))]
def collisionSpaceSymbols(self):
return [sp.Symbol("c_%d" % (i,)) for i in range(len(self.stencil))]
def postCollisionSymbols(self):
return [sp.Symbol("d_%d" % (i,)) for i in range(len(self.stencil))]
def equilibrium(self):
"""Returns equation collection, defining the equilibrium in collision space"""
def conservedQuantities(self):
"""Returns equation collection defining conserved quantities"""
pass
def transformToCollisionSpace(self, equationCollection):
pass
def transformToPdfSpace(self, equationCollection):
pass
import sympy as sp
from lbmpy.fieldaccess import streamPullWithSourceAndDestinationFields
from lbmpy.latticemodel import LbmCollisionRule
from lbmpy.lbmgenerator import createStreamCollideUpdateRule
from pystencils.field import Field
from pystencils.cpu import createKernel, makePythonFunction
from lbmpy.equilibria import standardDiscreteEquilibrium
from lbmpy.densityVelocityExpressions import getDensityVelocityExpressions
def compileMacroscopicValuesGetter(latticeModel, pdfArr=None, macroscopicFieldLayout='numpy'):
"""
Creates a function that computes density and/or velocity and stores it into given arrays
:param latticeModel: lattice model (to get information about stencil, force velocity shift and compressibility)
:param pdfArr: array to set can already be specified here, or later when the returned function is called
:param macroscopicFieldLayout: layout specification for Field.createGeneric
:return: a function, which has three parameters:
- pdfArray, can be omitted if pdf array was already passed while compiling
- densityOut, if not None, density is written to that array
- velocityOUt, if not None, velocity is written to that array
"""
if pdfArr is None:
pdfField = Field.createGeneric('pdfs', latticeModel.dim, indexDimensions=1)
else:
pdfField = Field.createFromNumpyArray('pdfs', pdfArr, indexDimensions=1)
rhoField = Field.createGeneric('rho', latticeModel.dim, indexDimensions=0, layout=macroscopicFieldLayout)
velField = Field.createGeneric('vel', latticeModel.dim, indexDimensions=1, layout=macroscopicFieldLayout)
lm = latticeModel
Q = len(lm.stencil)
symPdfs = [pdfField(i) for i in range(Q)]
subexpressions, rho, velSubexpressions, u = getDensityVelocityExpressions(lm.stencil, symPdfs, lm.compressible)
uRhs = [u_i.rhs for u_i in u]
uLhs = [u_i.lhs for u_i in u]
if hasattr(lm.forceModel, "macroscopicVelocity"):
correctedVel = lm.forceModel.macroscopicVelocity(lm, uRhs, rho.lhs)
u = [sp.Eq(a, b) for a, b in zip(uLhs, correctedVel)]
eqsRhoKernel = subexpressions + [sp.Eq(rhoField(0), rho.rhs)]
eqsVelKernel = subexpressions + [rho] + velSubexpressions + [sp.Eq(velField(i), u[i].rhs) for i in range(lm.dim)]
eqsRhoAndVelKernel = eqsVelKernel + [sp.Eq(rhoField(0), rho.lhs)]
kernelRho = makePythonFunction(createKernel(eqsRhoKernel))
kernelVel = makePythonFunction(createKernel(eqsVelKernel))
kernelRhoAndVel = makePythonFunction(createKernel(eqsRhoAndVelKernel))
def getter(pdfs=None, densityOut=None, velocityOut=None):
if pdfs is not None and pdfArr is not None:
assert pdfs.shape == pdfArr.shape and pdfs.strides == pdfArr.strides, \
"Pdf array not matching blueprint which was used to compile" + str(pdfs.shape) + str(pdfArr.shape)
if pdfs is None:
assert pdfArr is not None, "Pdf array has to be passed either when compiling or when calling."
pdfs = pdfArr
assert not (densityOut is None and velocityOut is None), \
"Specify either densityOut or velocityOut parameter or both"
if densityOut is not None and velocityOut is None:
kernelRho(pdfs=pdfs, rho=densityOut)
elif densityOut is None and velocityOut is not None:
kernelVel(pdfs=pdfs, vel=velocityOut)
else:
kernelRhoAndVel(pdfs=pdfs, rho=densityOut, vel=velocityOut)
return getter
def compileAdvancedVelocitySetter(collisionRule, velocityArray, pdfArr=None):
"""
Advanced initialization of velocity field through iteration procedure according to
Mei, Luo, Lallemand and Humieres: Consistent initial conditions for LBM simulations, 2005
Important: this procedure only works if a non-zero relaxation rate was used for the velocity moments!
:param collisionRule: unsimplified collision rule
:param velocityArray: array with velocity field
:param pdfArr: optional array, to compile kernel with fixed layout and shape
:return: function, that has to be called multiple times, with a pdf field (src/dst) until convergence
similar to the normal streamCollide step, also with boundary handling
"""
velocityField = Field.createFromNumpyArray('vel', velocityArray, indexDimensions=1)
# create normal LBM kernel and replace velocity by expressions of velocity field
from lbmpy.simplifications import sympyCSE
latticeModel = collisionRule.latticeModel
collisionRule = sympyCSE(collisionRule)
collisionRule = streamPullWithSourceAndDestinationFields(collisionRule, pdfArr)
replacements = {u_i: sp.Eq(u_i, velocityField(i)) for i, u_i in enumerate(latticeModel.symbolicVelocity)}
newSubExpressions = [replacements[eq.lhs] if eq.lhs in replacements else eq for eq in collisionRule.subexpressions]
newCollisionRule = LbmCollisionRule(collisionRule.updateEquations, newSubExpressions,
latticeModel, collisionRule.updateEquationDirections)
kernelAst = createKernel(newCollisionRule.equations)
return makePythonFunction(kernelAst, {'vel': velocityArray})
def compileMacroscopicValuesSetter(latticeModel, density=1, velocity=0, equilibriumOrder=2, pdfArr=None):
"""
Creates a function that sets a pdf field to specified macroscopic quantities
The returned function can be called with the pdf field to set as single argument
:param latticeModel: lattice model (to get information about stencil, force velocity shift and compressibility)
:param density: density used for equilibrium. Can either be scalar (all cells are set to same density) or array
:param velocity: velocity for equilibrium. Can either be scalar (e.g. 0, sets all cells to (0,0,0) velocity)
or a tuple with D (2 or 3) entries to set same velocity in the complete domain, or an array
specifying the velocity for each cell
:param equilibriumOrder: approximation order of equilibrium
:param pdfArr: array to set can already be specified here, or later when the returned function is called
"""
if pdfArr is not None:
pdfField = Field.createFromNumpyArray('pdfs', pdfArr, indexDimensions=1)
else:
pdfField = Field.createGeneric('pdfs', latticeModel.dim, indexDimensions=1)
noOffset = tuple([0] * latticeModel.dim)
kernelArguments = {}
if hasattr(density, 'shape'):
densityValue = Field.createFromNumpyArray('rho', density, indexDimensions=0)[noOffset]
kernelArguments['rho'] = density
else:
densityValue = density
if hasattr(velocity, 'shape'):
assert velocity.shape[-1] == latticeModel.dim, "Wrong shape of velocity array"
velocityValue = [Field.createFromNumpyArray('vel', velocity, indexDimensions=1)[noOffset](i)
for i in range(latticeModel.dim)]
kernelArguments['vel'] = velocity
else:
if not hasattr(velocity, "__len__"):
velocity = [velocity] * latticeModel.dim
velocityValue = tuple(velocity)
# force shift
if latticeModel.forceModel and hasattr(latticeModel.forceModel, "macroscopicVelocity"):
# force model knows only about one direction - use sympy to solve the shift equations to get backward
fm = latticeModel.forceModel
unshiftedVel = [sp.Symbol("v_unshifted_%d" % (i,)) for i in range(latticeModel.dim)]
shiftedVel = fm.macroscopicVelocity(latticeModel, unshiftedVel, densityValue)
velShiftEqs = [sp.Eq(a, b) for a, b in zip(velocityValue, shiftedVel)]
solveRes = sp.solve(velShiftEqs, unshiftedVel)
assert len(solveRes) == latticeModel.dim
velocityValue = [solveRes[unshiftedVel_i] for unshiftedVel_i in unshiftedVel]
eq = standardDiscreteEquilibrium(latticeModel.stencil, densityValue, velocityValue, equilibriumOrder,
c_s_sq=sp.Rational(1, 3), compressible=latticeModel.compressible)
updateEquations = [sp.Eq(pdfField(i), eq[i]) for i in range(len(latticeModel.stencil))]
f = makePythonFunction(createKernel(updateEquations), kernelArguments)
def setter(pdfs=None, **kwargs):
if pdfs is not None and pdfArr is not None:
assert pdfs.shape == pdfArr.shape and pdfs.strides == pdfArr.strides, \
"Pdf array not matching blueprint which was used to compile"
if pdfs is None:
assert pdfArr is not None, "Pdf array has to be passed either when compiling or when calling."
pdfs = pdfArr
assert pdfs.shape[-1] == len(latticeModel.stencil), "Wrong shape of pdf array"
assert len(pdfs.shape) == latticeModel.dim + 1, "Wrong shape of pdf array"
if hasattr(density, 'shape'):
assert pdfs.shape[:-1] == density.shape, "Density array shape does not match pdf array shape"
if hasattr(velocity, 'shape'):
assert pdfs.shape[:-1] == velocity.shape[:-1], "Velocity array shape does not match pdf array shape"
f(pdfs=pdfs, **kwargs) # kwargs passed here, since force model might need additional fields
return setter
This diff is collapsed.
from matplotlib.pyplot import *
import numpy as np
from numpy.linalg import norm
def removeGhostLayers(field):
return field[1:-1, 1:-1]
def vectorField(field, step=2, **kwargs):
field = removeGhostLayers(field)
veln = field.swapaxes(0, 1)
quiver(veln[::step, ::step, 0], veln[::step, ::step, 1], **kwargs)
def vectorFieldMagnitude(field, **kwargs):
field = norm(field, axis=2, ord=2)
return scalarField(field, **kwargs)
def scalarField(field, **kwargs):
field = removeGhostLayers(field)
field = np.swapaxes(field, 0, 1)
return imshow(field, origin='lower', **kwargs)
def vectorFieldMagnitudeAnimation(runFunction, plotSetupFunction=lambda: None,
plotUpdateFunction=lambda: None, interval=30, frames=180, **kwargs):
import matplotlib.animation as animation
fig = gcf()
im = None
field = runFunction()
im = vectorFieldMagnitude(field, **kwargs)
plotSetupFunction()
def updatefig(*args):
field = runFunction()
field = norm(field, axis=2, ord=2)
field = np.swapaxes(field, 0, 1)
im.set_array(field)
plotUpdateFunction()
return im,
return animation.FuncAnimation(fig, updatefig, interval=interval, frames=frames)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment