Skip to content
Snippets Groups Projects
Commit 942b191b authored by Martin Bauer's avatar Martin Bauer
Browse files

started to clean up cumulant transformations

parent e0987a3e
Branches
Tags
No related merge requests found
......@@ -93,14 +93,15 @@ def __continuousMomentOrCumulant(function, moment, symbols, generatingFunction):
t = tuple([sp.Symbol("tmpvar_%d" % i,) for i in range(dim)]) # not using sp.Dummy here - since it prohibits caching
symbols = symbols[:dim]
genFunc = generatingFunction(function, symbols, t)
if type(moment) is tuple:
return multiDifferentiation(generatingFunction(function, symbols, t), moment, t)
return multiDifferentiation(genFunc, moment, t)
else:
result = 0
for term, coeff in moment.as_coefficients_dict().items():
for term, coefficient in moment.as_coefficients_dict().items():
exponents = tuple([term.as_coeff_exponent(v_i)[1] for v_i in symbols])
cm = multiDifferentiation(generatingFunction(function, symbols, t), exponents, t)
result += coeff * cm
cm = multiDifferentiation(genFunc, exponents, t)
result += coefficient * cm
return result
......
"""
This module provides functions to compute cumulants of discrete functions.
Additionally functions are provided to compute cumulants from moments and vice versa
"""
import functools
import sympy as sp
from lbmpy.diskcache import diskcache
from lbmpy.moments import momentsUpToComponentOrder
from lbmpy.continuous_distribution_measures import multiDifferentiation
# ------------------------------------------- Internal Functions -------------------------------------------------------
def __getIndexedSymbols(passedSymbols, prefix, indices):
"""If passed symbols is not None, they are returned, if they are None
indexed symbols of the form {prefix}_{index} are returned"""
try:
dim = len(indices[0])
except TypeError:
dim = 1
if passedSymbols is not None:
return passedSymbols
else:
formatString = "%s_" + "_".join(["%d"]*dim)
tupleIndices = []
for i in indices:
tupleIndices.append(i if type(i) is tuple else (i,))
return [sp.Symbol(formatString % ((prefix,) + i)) for i in tupleIndices]
def __partition(collection):
"""Yields all set partitions of a given sequence"""
if len(collection) == 1:
yield [collection]
return
first = collection[0]
for smaller in __partition(collection[1:]):
for n, subset in enumerate(smaller):
yield smaller[:n] + [[first] + subset] + smaller[n + 1:]
yield [[first]] + smaller
def __cumulantRawMomentTransform(index, dependentVarDict, outerFunction, defaultPrefix, centralized):
"""
Function to express cumulants as function of moments as vice versa.
Uses multivariate version of Faa di Bruno's formula.
:param index: tuple describing the index of the cumulant/moment to express as function of moments/cumulants
:param dependentVarDict: a dictionary from index tuple to moments/cumulants symbols, or None to use default symbols
:param outerFunction: logarithm to transform from moments->cumulants, exp for inverse direction
:param defaultPrefix: if dependentVarDict is None, this is used to construct symbols of the form prefix_i_j_k
:param centralized: if True the first order moments/cumulants are set to zero
"""
dim = len(index)
def createMomentSymbol(idx):
idx = tuple(idx)
if dependentVarDict is None:
return sp.Symbol(defaultPrefix + "_" + "_".join(["%d"] * len(idx)) % idx)
else:
return dependentVarDict[idx]
zerothMoment = createMomentSymbol((0,)*dim)
def outerFunctionDerivative(n):
x = zerothMoment
return sp.diff(outerFunction(x), *tuple([x]*n))
# index (2,1,0) means differentiate twice w.r.t to first variable, and once w.r.t to second variable
# this is transformed here into representation [0,0,1] such that each entry is one diff operation
partitionList = []
for i, indexComponent in enumerate(index):
for j in range(indexComponent):
partitionList.append(i)
if len(partitionList) == 0: # special case for zero index
return outerFunction(zerothMoment)
# implementation of Faa di Bruno's formula:
result = 0
for partition in __partition(partitionList):
factor = outerFunctionDerivative(len(partition))
for elements in partition:
momentIndex = [0, ] * dim
for i in elements:
momentIndex[i] += 1
factor *= createMomentSymbol(momentIndex)
result += factor
if centralized:
for i in dim:
index = [0] * dim
index[i] = 1
result = result.subs(createMomentSymbol(index), 0)
return result
@functools.lru_cache(maxsize=16)
def __getDiscreteCumulantGeneratingFunction(function, stencil, waveNumbers):
assert len(stencil) == len(function)
def scalarProduct(a, b):
return sum(a_i * b_i for a_i, b_i in zip(a, b))
laplaceTrafo = sum([factor * sp.exp(scalarProduct(waveNumbers, e)) for factor, e in zip(function, stencil)])
return sp.ln(laplaceTrafo)
# ------------------------------------------- Public Functions ---------------------------------------------------------
def cumulantAsFunctionOfRawMoments(index, momentsDict=None):
return __cumulantRawMomentTransform(index, momentsDict, sp.log, 'm', False)
def rawMomentAsFunctionOfCumulants(index, cumulantsDict=None):
return __cumulantRawMomentTransform(index, cumulantsDict, sp.exp, 'c', False)
def cumulantAsFunctionOfCentralMoments(index, momentsDict=None):
return __cumulantRawMomentTransform(index, momentsDict, sp.log, 'm', True)
def centralMomentAsFunctionOfCumulants(index, cumulantsDict=None):
return __cumulantRawMomentTransform(index, cumulantsDict, sp.exp, 'c', True)
@functools.lru_cache(maxsize=64)
def discreteCumulant(function, cumulant, stencil):
"""
Computes cumulant of discrete function
:param function: sequence of function components, has to have the same length as stencil
:param cumulant: definition of cumulant, either as an index tuple, or as a polynomial
(similar to moment description)
:param stencil: sequence of directions
"""
assert len(stencil) == len(function)
dim = len(stencil[0])
waveNumbers = tuple([sp.Symbol("Xi_%d" % (i,)) for i in range(dim)])
generatingFunction = __getDiscreteCumulantGeneratingFunction(function, stencil, waveNumbers)
if type(cumulant) is tuple:
return multiDifferentiation(generatingFunction, cumulant, waveNumbers)
else:
from lbmpy.moments import MOMENT_SYMBOLS
result = 0
for term, coefficient in cumulant.as_coefficients_dict().items():
exponents = tuple([term.as_coeff_exponent(v_i)[1] for v_i in MOMENT_SYMBOLS[:dim]])
generatingFunction = __getDiscreteCumulantGeneratingFunction(function, stencil, waveNumbers)
cm = multiDifferentiation(generatingFunction, exponents, waveNumbers)
result += coefficient * cm
return result
@functools.lru_cache(maxsize=8)
def cumulantsFromPdfs(stencil, cumulantIndices=None, pdfSymbols=None, cumulantSymbols=None):
"""
Creates equations to transform pdfs to cumulant space
:param stencil:
:param cumulantIndices: sequence of cumulant indices, could be tuples or polynomial representation
if left to default and a full stencil was passed,
the full set i.e. `momentsUpToComponentOrder(2)` is used
:param pdfSymbols: symbolic values for pdf values, if not passed they default to :math:`f_0, f_1, ...`
:param cumulantSymbols: symbolic values for cumulants (left hand sides of returned equations)
by default they are labeled :math:`c_{00}, c_{01}, c_{10}, ...`
:return: sequence of equations for each cumulant one, on the right hand sides only pdfSymbols are used
"""
dim = len(stencil[0])
if cumulantIndices is None:
cumulantIndices = list(momentsUpToComponentOrder(2, dim=dim))
assert len(stencil) == len(cumulantIndices), "Stencil has to have same length as cumulantIndices sequence"
cumulantSymbols = __getIndexedSymbols(cumulantSymbols, "c", cumulantIndices)
pdfSymbols = __getIndexedSymbols(pdfSymbols, "f", range(len(stencil)))
return [sp.Eq(cumulantSymbol, discreteCumulant(tuple(pdfSymbols), idx, stencil))
for cumulantSymbol, idx in zip(cumulantSymbols, cumulantIndices)]
@diskcache
def cumulantsFromRawMoments(stencil, indices=None, momentSymbols=None, cumulantSymbols=None):
"""
Creates equations to transform from raw moment representation to cumulants
:param stencil:
:param indices: indices of raw moments/ cumulant symbols, by default the full set is used
:param momentSymbols: symbolic values for moments (symbols used for moments in equations)
:param cumulantSymbols: symbolic values for cumulants (left hand sides of the equations)
:return: equations to compute cumulants from raw moments
"""
#TODO
@diskcache
def rawMomentsFromCumulants(stencil, cumulantSymbols=None, momentSymbols=None):
#TODO
pass
......@@ -5,64 +5,40 @@ Additionally functions are provided to compute moments and cumulants of these di
"""
import sympy as sp
import functools
from sympy import Rational as R
from lbmpy.diskcache import diskcache
@functools.lru_cache()
def computeWeights1D(stencilWidth, c_s_sq):
"""
Computes lattice weights for a symmetric 1D stencil with given stencil width.
:param stencilWidth: stencil width, i.e. the maximum absolute value of direction component
e.g. stencilWidth=2 means direction -2, -1, 0, 1, 2
:param c_s_sq: speed of sound squared
:returns: dict mapping from integer offset to weight
>>> computeWeights1D(1, sp.Rational(1,3))
{-1: 1/6, 0: 2/3, 1: 1/6}
"""
from lbmpy.moments import momentMatrix
# Create a 1D stencil with the given width
stencil = tuple([(i,) for i in range(-stencilWidth, stencilWidth+1)])
moments = tuple([(i,) for i in range(2*stencilWidth+1)])
contMoments = getMomentsOfContinuousMaxwellianEquilibrium(moments, rho=1, u=(0,), c_s_sq=c_s_sq, dim=1)
M = momentMatrix(moments, stencil)
weights = M.inv() * sp.Matrix(contMoments)
return {i: w_i for i, w_i in zip(range(-stencilWidth, stencilWidth + 1), weights)}
@functools.lru_cache()
def getWeights(stencil, c_s_sq):
"""
Computes the weights for a stencil
The weight of a direction is determined by multiplying 1D weights i.e. there is a 1D weight associated
with entries -1, 0 and 1 (for one neighborhood stencils). These 1D weights are determined by the
continuous Maxwellian distribution.
The weight of the higher dimensional direction is
the product of the 1D weights, dependent on its entries. For example (1,-1,0) would be
:math:`w_{1} w_{-1} w_{0}`.
This product approach is described in detail in :cite:`karlin2015entropic`.
:param stencil: tuple of directions
:param c_s_sq: speed of sound squared
:returns: list of weights, one for each direction
"""
maxNeighborhood = 0
for d in stencil:
for e in d:
if abs(e) > maxNeighborhood:
maxNeighborhood = abs(e)
elementaryWeights = computeWeights1D(maxNeighborhood, c_s_sq)
result = []
for directionTuple in stencil:
weight = 1
for e in directionTuple:
weight *= elementaryWeights[e]
result.append(weight)
return result
def getWeights(stencil):
Q = len(stencil)
def weightForDirection(direction):
absSum = sum([abs(d) for d in direction])
return getWeights.weights[Q][absSum]
return [weightForDirection(d) for d in stencil]
getWeights.weights = {
9: {
0: R(4, 9),
1: R(1, 9),
2: R(1, 36),
},
15: {
0: R(2, 9),
1: R(1, 9),
3: R(1, 72),
},
19: {
0: R(1, 3),
1: R(1, 18),
2: R(1, 36),
},
27: {
0: R(8, 27),
1: R(2, 27),
2: R(1, 54),
3: R(1, 216),
}
}
@diskcache
......@@ -77,7 +53,7 @@ def discreteMaxwellianEquilibrium(stencil, rho=sp.Symbol("rho"), u=tuple(sp.symb
:param c_s_sq: square of speed of sound
:param compressible: compressibility
"""
weights = getWeights(stencil, c_s_sq)
weights = getWeights(stencil)
assert len(stencil) == len(weights)
dim = len(stencil[0])
......@@ -114,6 +90,25 @@ def discreteMaxwellianEquilibrium(stencil, rho=sp.Symbol("rho"), u=tuple(sp.symb
return tuple(res)
@diskcache
def generateEquilibriumByMatchingMoments(stencil, moments, rho=sp.Symbol("rho"),
u=tuple(sp.symbols("u_0 u_1 u_2")), c_s_sq=sp.Symbol("c_s") ** 2, order=None):
"""
Computes discrete equilibrium, by setting the discrete moments to values taken from the continuous Maxwellian.
The number of moments has to match the number of directions in the stencil. For documentation of other parameters
see :func:`getMomentsOfContinuousMaxwellianEquilibrium`
"""
from lbmpy.moments import momentMatrix
dim = len(stencil[0])
Q = len(stencil)
assert len(moments) == Q, "Moment count(%d) does not match stencil size(%d)" % (len(moments), Q)
continuousMomentsVector = getMomentsOfContinuousMaxwellianEquilibrium(moments, rho, u, c_s_sq, dim, order)
continuousMomentsVector = sp.Matrix(continuousMomentsVector)
M = momentMatrix(moments, stencil)
assert M.rank() == Q, "Rank of moment matrix (%d) does not match stencil size (%d)" % (M.rank(), Q)
return M.inv() * continuousMomentsVector
@diskcache
def continuousMaxwellianEquilibrium(dim=3, rho=sp.Symbol("rho"),
u=tuple(sp.symbols("u_0 u_1 u_2")),
......@@ -184,6 +179,8 @@ def getMomentsOfDiscreteMaxwellianEquilibrium(stencil,
:param compressible: compressible or incompressible form
"""
from lbmpy.moments import discreteMoment
if order is None:
order = 4
mb = discreteMaxwellianEquilibrium(stencil, rho, u, order, c_s_sq, compressible)
return tuple([discreteMoment(mb, moment, stencil).expand() for moment in moments])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment