Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 416 additions and 73 deletions
import numpy as np
import sympy as sp
from pystencils.typing import TypedSymbol, create_type
from pystencils.backends.cbackend import CustomCodeNode
class NeighbourOffsetArrays(CustomCodeNode):
@staticmethod
def neighbour_offset(dir_idx, stencil):
if isinstance(sp.sympify(dir_idx), sp.Integer):
return stencil[dir_idx]
else:
return tuple([sp.IndexedBase(symbol, shape=(1,))[dir_idx]
for symbol in NeighbourOffsetArrays._offset_symbols(len(stencil[0]))])
@staticmethod
def _offset_symbols(dim):
return [TypedSymbol(f"neighbour_offset_{d}", create_type('int32')) for d in ['x', 'y', 'z'][:dim]]
def __init__(self, stencil, offsets_dtype=np.int32):
offsets_dtype = create_type(offsets_dtype)
dim = len(stencil[0])
array_symbols = NeighbourOffsetArrays._offset_symbols(dim)
code = "\n"
for i, arrsymb in enumerate(array_symbols):
code += _array_pattern(offsets_dtype, arrsymb.name, (d[i] for d in stencil))
offset_symbols = NeighbourOffsetArrays._offset_symbols(dim)
super(NeighbourOffsetArrays, self).__init__(code, symbols_read=set(),
symbols_defined=set(offset_symbols))
class MirroredStencilDirections(CustomCodeNode):
@staticmethod
def mirror_stencil(direction, mirror_axis):
assert mirror_axis <= len(direction), f"only {len(direction)} axis available for mirage"
direction = list(direction)
direction[mirror_axis] = -direction[mirror_axis]
return tuple(direction)
@staticmethod
def _mirrored_symbol(mirror_axis):
axis = ['x', 'y', 'z']
return TypedSymbol(f"{axis[mirror_axis]}_axis_mirrored_stencil_dir", create_type('int32'))
def __init__(self, stencil, mirror_axis, dtype=np.int32):
offsets_dtype = create_type(dtype)
mirrored_stencil_symbol = MirroredStencilDirections._mirrored_symbol(mirror_axis)
mirrored_directions = [stencil.index(MirroredStencilDirections.mirror_stencil(direction, mirror_axis))
for direction in stencil]
code = "\n"
code += _array_pattern(offsets_dtype, mirrored_stencil_symbol.name, mirrored_directions)
super(MirroredStencilDirections, self).__init__(code, symbols_read=set(),
symbols_defined={mirrored_stencil_symbol})
class LbmWeightInfo(CustomCodeNode):
def __init__(self, lb_method, data_type='double'):
self.weights_symbol = TypedSymbol("weights", data_type)
weights = [f"(({self.weights_symbol.dtype.c_name})({str(w.evalf(17))}))" for w in lb_method.weights]
weights = ", ".join(weights)
w_sym = self.weights_symbol
code = f"const {self.weights_symbol.dtype.c_name} {w_sym.name} [] = {{{ weights }}};\n"
super(LbmWeightInfo, self).__init__(code, symbols_read=set(), symbols_defined={w_sym})
def weight_of_direction(self, dir_idx, lb_method=None):
if isinstance(sp.sympify(dir_idx), sp.Integer):
return lb_method.weights[dir_idx].evalf(17)
else:
return sp.IndexedBase(self.weights_symbol, shape=(1,))[dir_idx]
class TranslationArraysNode(CustomCodeNode):
def __init__(self, array_content, symbols_defined):
code = ''
for content in array_content:
code += _array_pattern(*content)
super(TranslationArraysNode, self).__init__(code, symbols_read=set(), symbols_defined=symbols_defined)
def __str__(self):
return "Variable PDF Access Translation Arrays"
def __repr__(self):
return "Variable PDF Access Translation Arrays"
def _array_pattern(dtype, name, content):
return f"const {str(dtype)} {name} [] = {{ {','.join(str(c) for c in content)} }}; \n"
import json
import six
import inspect
from pystencils.runhelper.db import PystencilsJsonEncoder
from pystencils.simp import SimplificationStrategy
from lbmpy import LBStencil, Method, CollisionSpace, SubgridScaleModel
from lbmpy.creationfunctions import LBMConfig, LBMOptimisation
from lbmpy.methods import CollisionSpaceInfo
from lbmpy.forcemodels import AbstractForceModel
from lbmpy.non_newtonian_models import CassonsParameters
class LbmpyJsonEncoder(PystencilsJsonEncoder):
def default(self, obj):
if isinstance(obj, (LBMConfig, LBMOptimisation, CollisionSpaceInfo, CassonsParameters)):
return obj.__dict__
if isinstance(obj, (LBStencil, Method, CollisionSpace, SubgridScaleModel)):
return obj.name
if isinstance(obj, AbstractForceModel):
return obj.__class__.__name__
if isinstance(obj, SimplificationStrategy):
return obj.__str__()
if inspect.isclass(obj):
return obj.__name__
return PystencilsJsonEncoder.default(self, obj)
class LbmpyJsonSerializer(object):
@classmethod
def serialize(cls, data):
if six.PY3:
if isinstance(data, bytes):
return json.dumps(data.decode('utf-8'), cls=LbmpyJsonEncoder, ensure_ascii=False).encode('utf-8')
else:
return json.dumps(data, cls=LbmpyJsonEncoder, ensure_ascii=False).encode('utf-8')
else:
return json.dumps(data, cls=LbmpyJsonEncoder, ensure_ascii=False).encode('utf-8')
@classmethod
def deserialize(cls, data):
if six.PY3:
return json.loads(data.decode('utf-8'))
else:
return json.loads(data.decode('utf-8'))
......@@ -217,3 +217,22 @@ class ForceModel(Enum):
"""
See :class:`lbmpy.forcemodels.ShanChen`
"""
CENTRALMOMENT = auto()
"""
See :class:`lbmpy.forcemodels.CentralMoment`
"""
class SubgridScaleModel(Enum):
"""
The SubgridScaleModel enumeration defines which subgrid-scale model (SGS) is used to perform
Large-Eddy-Simulations (LES).
"""
SMAGORINSKY = auto()
"""
See :func:`lbmpy.turbulence_models.add_smagorinsky_model`
"""
QR = auto()
"""
See :func:`lbmpy.turbulence_models.add_qr_model`
"""
File moved
import sympy as sp
import pystencils as ps
from pystencils.field import Field
def welford_assignments(field, mean_field, sum_of_squares_field=None, sum_of_cubes_field=None):
r"""
Creates the assignments for the kernel creation of Welford's algorithm
(https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm).
This algorithm is an online algorithm for the mean and variance calculation of sample data, here implemented for
the execution on scalar or vector fields, e.g., velocity.
The calculation of the variance / the third-order central moments is optional and only performed if a field for
the sum of squares / sum of cubes is given.
The mean value is directly updated in the mean vector field.
The variance / covariance must be retrieved in a post-processing step. Let :math `M_{2,n}` denote the value of the
sum of squares after the first :math `n` samples. According to Welford the biased sample variance
:math `\sigma_n^2` and the unbiased sample variance :math `s_n^2` are given by
.. math ::
\sigma_n^2 = \frac{M_{2,n}}{n}
s_n^2 = \frac{M_{2,n}}{n-1},
respectively.
Liekwise, to the 3rd-order central moment(s) of the (vector) field, the sum of cubes field must be divided
by :math `n` in a post-processing step.
"""
if isinstance(field, Field):
dim = field.values_per_cell()
welford_field = field.center
elif isinstance(field, Field.Access):
dim = field.field.values_per_cell()
welford_field = field
else:
raise ValueError("Vector field has to be a pystencils Field or Field.Access")
if isinstance(mean_field, Field):
welford_mean_field = mean_field.center
elif isinstance(mean_field, Field.Access):
welford_mean_field = mean_field
else:
raise ValueError("Mean vector field has to be a pystencils Field or Field.Access")
if sum_of_squares_field is None:
# sum of products will not be calculated, i.e., the covariance matrix is not retrievable
welford_sum_of_squares_field = None
else:
if isinstance(sum_of_squares_field, Field):
welford_sum_of_squares_field = sum_of_squares_field.center
elif isinstance(sum_of_squares_field, Field.Access):
welford_sum_of_squares_field = sum_of_squares_field
else:
raise ValueError("Sum of squares field has to be a pystencils Field or Field.Access")
assert welford_sum_of_squares_field.field.values_per_cell() == dim ** 2, \
(f"Sum of squares field does not have the right layout. "
f"Index dimension: {welford_sum_of_squares_field.field.values_per_cell()}, expected: {dim ** 2}")
if sum_of_cubes_field is None:
# sum of cubes will not be calculated, i.e., the 3rd-order central moments are not retrievable
welford_sum_of_cubes_field = None
else:
if isinstance(sum_of_cubes_field, Field):
welford_sum_of_cubes_field = sum_of_cubes_field.center
elif isinstance(sum_of_cubes_field, Field.Access):
welford_sum_of_cubes_field = sum_of_cubes_field
else:
raise ValueError("Sum of cubes field has to be a pystencils Field or Field.Access")
assert welford_sum_of_cubes_field.field.values_per_cell() == dim ** 3, \
(f"Sum of cubes field does not have the right layout. "
f"Index dimension: {welford_sum_of_cubes_field.field.values_per_cell()}, expected: {dim ** 3}")
# for the calculation of the thrid-order moments, the variance must also be calculated
if welford_sum_of_cubes_field is not None:
assert welford_sum_of_squares_field is not None
# actual assignments
counter = sp.Symbol('counter')
delta = sp.symbols(f"delta_:{dim}")
main_assignments = list()
for i in range(dim):
main_assignments.append(ps.Assignment(delta[i], welford_field.at_index(i) - welford_mean_field.at_index(i)))
main_assignments.append(ps.Assignment(welford_mean_field.at_index(i),
welford_mean_field.at_index(i) + delta[i] / counter))
if sum_of_squares_field is not None:
delta2 = sp.symbols(f"delta2_:{dim}")
for i in range(dim):
main_assignments.append(
ps.Assignment(delta2[i], welford_field.at_index(i) - welford_mean_field.at_index(i)))
for i in range(dim):
for j in range(dim):
idx = i * dim + j
main_assignments.append(ps.Assignment(
welford_sum_of_squares_field.at_index(idx),
welford_sum_of_squares_field.at_index(idx) + delta[i] * delta2[j]))
if sum_of_cubes_field is not None:
for i in range(dim):
for j in range(dim):
for k in range(dim):
idx = (i * dim + j) * dim + k
main_assignments.append(ps.Assignment(
welford_sum_of_cubes_field.at_index(idx),
welford_sum_of_cubes_field.at_index(idx)
- delta[k] / counter * welford_sum_of_squares_field(i * dim + j)
- delta[j] / counter * welford_sum_of_squares_field(k * dim + i)
- delta[i] / counter * welford_sum_of_squares_field(j * dim + k)
+ delta2[i] * (2 * delta[j] - delta2[j]) * delta[k]
))
return main_assignments
......@@ -19,9 +19,7 @@ def add_fluctuations_to_collision_rule(collision_rule, temperature=None, amplitu
""""""
if not (temperature and not amplitudes) or (temperature and amplitudes):
raise ValueError("Fluctuating LBM: Pass either 'temperature' or 'amplitudes'.")
if collision_rule.method.conserved_quantity_computation.zero_centered_pdfs:
raise ValueError("The fluctuating LBM is not implemented for zero-centered PDF storage.")
method = collision_rule.method
if not amplitudes:
amplitudes = fluctuation_amplitude_from_temperature(method, temperature, c_s_sq)
......@@ -44,9 +42,7 @@ def fluctuation_amplitude_from_temperature(method, temperature, c_s_sq=sp.Symbol
"""Produces amplitude equations according to (2.60) and (3.54) in Schiller08"""
normalization_factors = sp.matrix_multiply_elementwise(method.moment_matrix, method.moment_matrix) * \
sp.Matrix(method.weights)
density = method.zeroth_order_equilibrium_moment_symbol
if method.conserved_quantity_computation.zero_centered_pdfs:
density += 1
density = method._cqc.density_symbol
mu = temperature * density / c_s_sq
return [sp.sqrt(mu * norm * (1 - (1 - rr) ** 2))
for norm, rr in zip(normalization_factors, method.relaxation_rates)]
......
......@@ -210,6 +210,28 @@ class Simple(AbstractForceModel):
return before, after
class CentralMoment(AbstractForceModel):
r"""
A force model that only shifts the macroscopic and equilibrium velocities and does not introduce a forcing term to
the collision process. Forcing is then applied through relaxation of the first central moments in the shifted
frame of reference (cf. https://doi.org/10.1016/j.camwa.2015.05.001).
"""
def __call__(self, lb_method):
raise ValueError("This force model can only be combined with the Cumulant collision model")
def symmetric_central_moment_forcing(self, lb_method, *_):
before = sp.Matrix([0, ] * lb_method.stencil.Q)
after = sp.Matrix([0, ] * lb_method.stencil.Q)
for i, sf in enumerate(self.symbolic_force_vector):
before[i + 1] = sp.Rational(1, 2) * sf
after[i + 1] = sp.Rational(1, 2) * sf
return before, after
def equilibrium_velocity_shift(self, density):
return default_velocity_shift(density, self.symbolic_force_vector)
class Luo(AbstractForceModel):
r"""Force model by Luo :cite:`luo1993lattice`.
......@@ -305,20 +327,20 @@ class He(AbstractForceModel):
.. math::
F (\mathbf{c})
= \frac{1}{\rho c_s^2}
\mathbf{F} \cdot ( \mathbf{c} - \mathbf{u} )
F (\mathbf{c})
= \frac{1}{\rho c_s^2}
\mathbf{F} \cdot ( \mathbf{c} - \mathbf{u} )
f^{\mathrm{eq}} (\mathbf{c})
the following analytical expresson for the monomial raw moments of the force is found:
.. math::
m_{\alpha\beta\gamma}^{F, \mathrm{He}}
= \frac{1}{\rho c_s^2} \left(
F_x m^{\mathrm{eq}}_{\alpha+1,\beta,\gamma}
+ F_y m^{\mathrm{eq}}_{\alpha,\beta+1,\gamma}
+ F_z m^{\mathrm{eq}}_{\alpha,\beta,\gamma+1}
m_{\alpha\beta\gamma}^{F, \mathrm{He}}
= \frac{1}{\rho c_s^2} \left(
F_x m^{\mathrm{eq}}_{\alpha+1,\beta,\gamma}
+ F_y m^{\mathrm{eq}}_{\alpha,\beta+1,\gamma}
+ F_z m^{\mathrm{eq}}_{\alpha,\beta,\gamma+1}
- m^{eq}_{\alpha\beta\gamma} ( \mathbf{F} \cdot \mathbf{u} )
\right)
"""
......
File moved
File moved
......@@ -49,6 +49,9 @@ class LatticeBoltzmannStep:
default_target=target,
parallel=False)
if lbm_config:
method_parameters['stencil'] = lbm_config.stencil
if 'stencil' not in method_parameters:
method_parameters['stencil'] = LBStencil(Stencil.D2Q9) \
if data_handling.dim == 2 else LBStencil(Stencil.D3Q27)
......
import functools
import sympy as sp
from copy import deepcopy
from lbmpy.simplificationfactory import create_simplification_strategy
from pystencils import create_kernel, CreateKernelConfig
from pystencils import create_kernel, CreateKernelConfig, Assignment
from pystencils.field import Field, get_layout_of_array
from pystencils.enums import Target
from lbmpy.advanced_streaming.utility import get_accessor, Timestep
from lbmpy.relaxationrates import get_shear_relaxation_rate
from lbmpy.utils import second_order_moment_tensor
def pdf_initialization_assignments(lb_method, density, velocity, pdfs,
streaming_pattern='pull', previous_timestep=Timestep.BOTH,
set_pre_collision_pdfs=False):
"""Assignments to initialize the pdf field with equilibrium"""
def get_field_accesses(lb_method, pdfs, streaming_pattern, previous_timestep, pre_collision_pdfs):
if isinstance(pdfs, Field):
accessor = get_accessor(streaming_pattern, previous_timestep)
if set_pre_collision_pdfs:
if pre_collision_pdfs:
field_accesses = accessor.read(pdfs, lb_method.stencil)
else:
field_accesses = accessor.write(pdfs, lb_method.stencil)
elif streaming_pattern == 'pull' and not set_pre_collision_pdfs:
elif streaming_pattern == 'pull' and not pre_collision_pdfs:
field_accesses = pdfs
else:
raise ValueError("Invalid value of pdfs: A PDF field reference is required to derive "
+ f"initialization assignments for streaming pattern {streaming_pattern}.")
return field_accesses
def pdf_initialization_assignments(lb_method, density, velocity, pdfs,
streaming_pattern='pull', previous_timestep=Timestep.BOTH,
set_pre_collision_pdfs=False):
"""Assignments to initialize the pdf field with equilibrium"""
field_accesses = get_field_accesses(lb_method, pdfs, streaming_pattern, previous_timestep, set_pre_collision_pdfs)
if isinstance(density, Field):
density = density.center
if isinstance(velocity, Field):
velocity = velocity.center_vector
......@@ -41,17 +48,9 @@ def pdf_initialization_assignments(lb_method, density, velocity, pdfs,
def macroscopic_values_getter(lb_method, density, velocity, pdfs,
streaming_pattern='pull', previous_timestep=Timestep.BOTH,
use_pre_collision_pdfs=False):
if isinstance(pdfs, Field):
accessor = get_accessor(streaming_pattern, previous_timestep)
if use_pre_collision_pdfs:
field_accesses = accessor.read(pdfs, lb_method.stencil)
else:
field_accesses = accessor.write(pdfs, lb_method.stencil)
elif streaming_pattern == 'pull' and not use_pre_collision_pdfs:
field_accesses = pdfs
else:
raise ValueError("Invalid value of pdfs: A PDF field reference is required to derive "
+ f"getter assignments for streaming pattern {streaming_pattern}.")
field_accesses = get_field_accesses(lb_method, pdfs, streaming_pattern, previous_timestep, use_pre_collision_pdfs)
cqc = lb_method.conserved_quantity_computation
assert not (velocity is None and density is None)
output_spec = {}
......@@ -65,6 +64,27 @@ def macroscopic_values_getter(lb_method, density, velocity, pdfs,
macroscopic_values_setter = pdf_initialization_assignments
def strain_rate_tensor_getter(lb_method, strain_rate_tensor, pdfs, streaming_pattern='pull',
previous_timestep=Timestep.BOTH, use_pre_collision_pdfs=False):
field_accesses = get_field_accesses(lb_method, pdfs, streaming_pattern, previous_timestep, use_pre_collision_pdfs)
if isinstance(strain_rate_tensor, Field):
strain_rate_tensor = strain_rate_tensor.center_vector
omega_s = get_shear_relaxation_rate(lb_method)
equilibrium = lb_method.equilibrium_distribution
rho = equilibrium.density if equilibrium.compressible else equilibrium.background_density
f_neq = sp.Matrix([field_accesses[i] for i in range(lb_method.stencil.Q)]) - lb_method.get_equilibrium_terms()
pi = second_order_moment_tensor(f_neq, lb_method.stencil)
strain_rate_tensor_equ = - 1.5 * (omega_s / rho) * pi
assignments = [Assignment(strain_rate_tensor[i * lb_method.stencil.D + j], strain_rate_tensor_equ[i, j])
for i in range(lb_method.stencil.D) for j in range(lb_method.stencil.D)]
return assignments
def compile_macroscopic_values_getter(lb_method, output_quantities, pdf_arr=None,
ghost_layers=1, iteration_slice=None,
field_layout='numpy', target=Target.CPU,
......
......@@ -27,16 +27,16 @@ import warnings
import numpy as np
# Optional packages cpuinfo, pycuda and psutil for hardware queries
# Optional packages cpuinfo, cupy and psutil for hardware queries
try:
from cpuinfo import get_cpu_info
except ImportError:
get_cpu_info = None
try:
from pycuda.autoinit import device
import cupy
except ImportError:
device = None
cupy = None
try:
from psutil import virtual_memory
......@@ -113,9 +113,11 @@ def memory_sizes_of_current_machine():
if 'l3_cache_size' in cpu_info:
result['L3'] = cpu_info['l3_cache_size']
if device:
size = device.total_memory() / (1024 * 1024)
result['GPU'] = "{0:.0f} MB".format(size)
if cupy:
for i in range(cupy.cuda.runtime.getDeviceCount()):
device = cupy.cuda.Device(i)
size = device.mem_info[1] / (1024 * 1024)
result[f'GPU{i}'] = "{0:.0f} MB".format(size)
if virtual_memory:
mem = virtual_memory()
......@@ -124,7 +126,7 @@ def memory_sizes_of_current_machine():
if not result:
warnings.warn("Couldn't query for any local memory size."
"Install py-cpuinfo to get cache sizes, psutil for RAM size and pycuda for GPU memory size.")
"Install py-cpuinfo to get cache sizes, psutil for RAM size and cupy for GPU memory size.")
return result
......
......@@ -76,38 +76,49 @@ def discrete_maxwellian_equilibrium(stencil, rho=sp.Symbol("rho"), u=sp.symbols(
"""
weights = get_weights(stencil, c_s_sq)
assert stencil.Q == len(weights)
u = u[:stencil.D]
res = [discrete_equilibrium(e_q, u, rho, w_q, order, c_s_sq, compressible) for w_q, e_q in zip(weights, stencil)]
return tuple(res)
def discrete_equilibrium(v=sp.symbols("v_:3"), u=sp.symbols("u_:3"), rho=sp.Symbol("rho"), weight=sp.Symbol("w"),
order=2, c_s_sq=sp.Symbol("c_s") ** 2, compressible=True):
"""
Returns the common discrete LBM equilibrium depending on the mesoscopic velocity and the directional lattice weight
Args:
v: symbols for mesoscopic velocity
u: symbols for macroscopic velocity
rho: sympy symbol for the density
weight: symbol for stencil weights
order: highest order of velocity terms (for hydrodynamics order 2 is sufficient)
c_s_sq: square of speed of sound
compressible: compressibility
"""
rho_outside = rho if compressible else sp.Rational(1, 1)
rho_inside = rho if not compressible else sp.Rational(1, 1)
res = []
for w_q, e_q in zip(weights, stencil):
e_times_u = 0
for c_q_alpha, u_alpha in zip(e_q, u):
e_times_u += c_q_alpha * u_alpha
fq = rho_inside + e_times_u / c_s_sq
e_times_u = 0
for c_q_alpha, u_alpha in zip(v, u):
e_times_u += c_q_alpha * u_alpha
if order <= 1:
res.append(fq * rho_outside * w_q)
continue
fq = rho_inside + e_times_u / c_s_sq
u_times_u = 0
for u_alpha in u:
u_times_u += u_alpha * u_alpha
fq += sp.Rational(1, 2) / c_s_sq**2 * e_times_u ** 2 - sp.Rational(1, 2) / c_s_sq * u_times_u
if order <= 1:
return fq * rho_outside * weight
if order <= 2:
res.append(fq * rho_outside * w_q)
continue
u_times_u = 0
for u_alpha in u:
u_times_u += u_alpha * u_alpha
fq += sp.Rational(1, 2) / c_s_sq**2 * e_times_u ** 2 - sp.Rational(1, 2) / c_s_sq * u_times_u
fq += sp.Rational(1, 6) / c_s_sq**3 * e_times_u**3 - sp.Rational(1, 2) / c_s_sq**2 * u_times_u * e_times_u
if order <= 2:
return fq * rho_outside * weight
res.append(sp.expand(fq * rho_outside * w_q))
fq += sp.Rational(1, 6) / c_s_sq**3 * e_times_u**3 - sp.Rational(1, 2) / c_s_sq**2 * u_times_u * e_times_u
return tuple(res)
return sp.expand(fq * rho_outside * weight)
@disk_cache
......
......@@ -67,8 +67,8 @@ class AbstractLbMethod(abc.ABC):
return d
@property
def subs_dict_relxation_rate(self):
"""returns a dictonary which maps the replaced numerical relaxation rates to its original numerical value"""
def subs_dict_relaxation_rate(self):
"""returns a dictionary which maps the replaced numerical relaxation rates to its original numerical value"""
result = dict()
for i in range(self._stencil.Q):
result[self.symbolic_relaxation_matrix[i, i]] = self.relaxation_matrix[i, i]
......@@ -99,26 +99,33 @@ class AbstractLbMethod(abc.ABC):
# -------------------------------- Helper Functions ----------------------------------------------------------------
def _generate_symbolic_relaxation_matrix(self, relaxation_rates=None):
def _generate_symbolic_relaxation_matrix(self, relaxation_rates=None, relaxation_rates_modifier=None):
"""
This function replaces the numbers in the relaxation matrix with symbols in this case, and returns also
the subexpressions, that assign the number to the newly introduced symbol
"""
rr = relaxation_rates if relaxation_rates is not None else self.relaxation_rates
unique_relaxation_rates = set()
subexpressions = {}
symbolic_relaxation_rates = list()
for relaxation_rate in rr:
relaxation_rate = sp.sympify(relaxation_rate)
if relaxation_rate not in unique_relaxation_rates:
# special treatment for zero, sp.Zero would be an integer ..
if isinstance(relaxation_rate, sp.Symbol):
symbolic_relaxation_rate = relaxation_rate
else:
if isinstance(relaxation_rate, Zero):
relaxation_rate = 0.0
if not isinstance(relaxation_rate, sp.Symbol):
rt_symbol = sp.Symbol(f"rr_{len(subexpressions)}")
subexpressions[relaxation_rate] = rt_symbol
unique_relaxation_rates.add(relaxation_rate)
if relaxation_rate in subexpressions:
symbolic_relaxation_rate = subexpressions[relaxation_rate]
else:
symbolic_relaxation_rate = sp.Symbol(f"rr_{len(subexpressions)}")
subexpressions[relaxation_rate] = symbolic_relaxation_rate
symbolic_relaxation_rates.append(symbolic_relaxation_rate)
new_rr = [subexpressions[sp.sympify(e)] if sp.sympify(e) in subexpressions else e for e in rr]
substitutions = [Assignment(e[1], e[0]) for e in subexpressions.items()]
if relaxation_rates_modifier is not None:
symbolic_relaxation_rates = [r * relaxation_rates_modifier for r in symbolic_relaxation_rates]
else:
for srr in symbolic_relaxation_rates:
assert isinstance(srr, sp.Symbol)
return substitutions, sp.diag(*new_rr)
return substitutions, sp.diag(*symbolic_relaxation_rates)