Skip to content
Snippets Groups Projects
Forked from pycodegen / lbmpy
474 commits behind the upstream repository.
conservedquantitycomputation.py 14.45 KiB
import abc
import sympy as sp
from collections import OrderedDict
from pystencils import Field, Assignment, AssignmentCollection


class AbstractConservedQuantityComputation(abc.ABC):
    r"""

    This class defines how conserved quantities are computed as functions of the pdfs.
    Conserved quantities are used for output and as input to the equilibrium in the collision step

    Depending on the method they might also be computed slightly different, e.g. due to a force model.

    An additional method describes how to get the conserved quantities for the equilibrium for initialization.
    In most cases the inputs can be used directly, but for some methods they have to be altered slightly.
    For example in zero centered hydrodynamic schemes with force model, the density has
    to be decreased by one, and the given velocity has to be shifted dependent on the force.

    .. image:: /img/moment_shift.svg

    """

    @property
    @abc.abstractmethod
    def conserved_quantities(self):
        """
        Dict, mapping names (symbol) to dimensionality (int)
        For example: {'density' : 1, 'velocity' : 3}
        The naming strings can be used in :func:`output_equations_from_pdfs`
        and :func:`equilibrium_input_equations_from_init_values`
        """

    def defined_symbols(self, order='all'):
        """
        Returns a dict, mapping names of conserved quantities to their symbols
        """

    @property
    @abc.abstractmethod
    def default_values(self):
        """
        Returns a dict of symbol to default value, where "default" means that
        the equilibrium simplifies to the weights if these values are inserted.
        Hydrodynamic example: rho=1, u_i = 0
        """

    @abc.abstractmethod
    def equilibrium_input_equations_from_pdfs(self, pdfs):
        """
        Returns an equation collection that defines all necessary quantities to compute the equilibrium as functions
        of the pdfs.
        For hydrodynamic LBM schemes this is usually the density and velocity.

        :param pdfs: values or symbols for the pdf values
        """

    @abc.abstractmethod
    def output_equations_from_pdfs(self, pdfs, output_quantity_names_to_symbols):
        """
        Returns an equation collection that defines conserved quantities for output. These conserved quantities might
        be slightly different that the ones used as input for the equilibrium e.g. due to a force model.

        :param pdfs: values for the pdf entries
        :param output_quantity_names_to_symbols: dict mapping of conserved quantity names
                (See :func:`conserved_quantities`) to symbols or field accesses where they should be written to
        """

    @abc.abstractmethod
    def equilibrium_input_equations_from_init_values(self, **kwargs):
        """
        Returns an equation collection that defines all necessary quantities to compute the equilibrium as function of
        given conserved quantities. Parameters can be names that are given by
        symbol names of :func:`conserved_quantities`.
        For all parameters not specified each implementation should use sensible defaults. For example hydrodynamic
        schemes use density=1 and velocity=0.
        """


class DensityVelocityComputation(AbstractConservedQuantityComputation):
    def __init__(self, stencil, compressible, force_model=None,
                 zeroth_order_moment_symbol=sp.Symbol("rho"),
                 first_order_moment_symbols=sp.symbols("u_:3")):
        dim = len(stencil[0])
        self._stencil = stencil
        self._compressible = compressible
        self._forceModel = force_model
        self._symbolOrder0 = zeroth_order_moment_symbol
        self._symbolsOrder1 = first_order_moment_symbols[:dim]

    @property
    def conserved_quantities(self):
        return {'density': 1,
                'velocity': len(self._stencil[0])}

    @property
    def compressible(self):
        return self._compressible

    def defined_symbols(self, order='all'):
        if order == 'all':
            return {'density': self._symbolOrder0,
                    'velocity': self._symbolsOrder1}
        elif order == 0:
            return 'density', self._symbolOrder0
        elif order == 1:
            return 'velocity', self._symbolsOrder1
        else:
            return None

    @property
    def zero_centered_pdfs(self):
        return not self._compressible

    @property
    def zeroth_order_moment_symbol(self):
        return self._symbolOrder0

    @property
    def first_order_moment_symbols(self):
        return self._symbolsOrder1

    @property
    def default_values(self):
        result = {self._symbolOrder0: 1}
        for s in self._symbolsOrder1:
            result[s] = 0
        return result

    def equilibrium_input_equations_from_pdfs(self, pdfs):
        dim = len(self._stencil[0])
        eq_coll = get_equations_for_zeroth_and_first_order_moment(self._stencil, pdfs, self._symbolOrder0,
                                                                  self._symbolsOrder1[:dim])
        if self._compressible:
            eq_coll = divide_first_order_moments_by_rho(eq_coll, dim)

        eq_coll = apply_force_model_shift('equilibrium_velocity_shift', dim, eq_coll,
                                          self._forceModel, self._compressible)
        return eq_coll

    def equilibrium_input_equations_from_init_values(self, density=1, velocity=(0, 0, 0)):
        dim = len(self._stencil[0])
        zeroth_order_moment = density
        first_order_moments = velocity[:dim]
        vel_offset = [0] * dim

        if self._compressible:
            if self._forceModel and hasattr(self._forceModel, 'macroscopic_velocity_shift'):
                vel_offset = self._forceModel.macroscopic_velocity_shift(zeroth_order_moment)
        else:
            if self._forceModel and hasattr(self._forceModel, 'macroscopic_velocity_shift'):
                vel_offset = self._forceModel.macroscopic_velocity_shift(sp.Rational(1, 1))
            zeroth_order_moment -= sp.Rational(1, 1)
        eqs = [Assignment(self._symbolOrder0, zeroth_order_moment)]

        first_order_moments = [a - b for a, b in zip(first_order_moments, vel_offset)]
        eqs += [Assignment(l, r) for l, r in zip(self._symbolsOrder1, first_order_moments)]

        return AssignmentCollection(eqs, [])

    def output_equations_from_pdfs(self, pdfs, output_quantity_names_to_symbols):
        dim = len(self._stencil[0])

        ac = get_equations_for_zeroth_and_first_order_moment(self._stencil, pdfs,
                                                             self._symbolOrder0, self._symbolsOrder1)

        if self._compressible:
            ac = divide_first_order_moments_by_rho(ac, dim)
        else:
            ac = add_density_offset(ac)

        ac = apply_force_model_shift('macroscopic_velocity_shift', dim, ac, self._forceModel, self._compressible)

        main_assignments = []
        eqs = OrderedDict([(eq.lhs, eq.rhs) for eq in ac.all_assignments])

        if 'density' in output_quantity_names_to_symbols:
            density_output_symbol = output_quantity_names_to_symbols['density']
            if isinstance(density_output_symbol, Field):
                density_output_symbol = density_output_symbol.center
            if density_output_symbol != self._symbolOrder0:
                main_assignments.append(Assignment(density_output_symbol, self._symbolOrder0))
            else:
                main_assignments.append(Assignment(self._symbolOrder0, eqs[self._symbolOrder0]))
                del eqs[self._symbolOrder0]
        if 'velocity' in output_quantity_names_to_symbols:
            vel_output_symbols = output_quantity_names_to_symbols['velocity']
            if isinstance(vel_output_symbols, Field):
                vel_output_symbols = vel_output_symbols.center_vector
            if tuple(vel_output_symbols) != tuple(self._symbolsOrder1):
                main_assignments += [Assignment(a, b) for a, b in zip(vel_output_symbols, self._symbolsOrder1)]
            else:
                for u_i in self._symbolsOrder1:
                    main_assignments.append(Assignment(u_i, eqs[u_i]))
                    del eqs[u_i]
        if 'momentum_density' in output_quantity_names_to_symbols:
            # get zeroth and first moments again - force-shift them if necessary
            # and add their values directly to the main equations assuming that subexpressions are already in
            # main equation collection
            # Is not optimal when velocity and momentum_density are calculated together,
            # but this is usually not the case
            momentum_density_output_symbols = output_quantity_names_to_symbols['momentum_density']
            mom_density_eq_coll = get_equations_for_zeroth_and_first_order_moment(self._stencil, pdfs,
                                                                                  self._symbolOrder0,
                                                                                  self._symbolsOrder1)
            mom_density_eq_coll = apply_force_model_shift('macroscopic_velocity_shift', dim, mom_density_eq_coll,
                                                          self._forceModel, self._compressible)
            for sym, val in zip(momentum_density_output_symbols, mom_density_eq_coll.main_assignments[1:]):
                main_assignments.append(Assignment(sym, val.rhs))
        if 'moment0' in output_quantity_names_to_symbols:
            moment0_output_symbol = output_quantity_names_to_symbols['moment0']
            if isinstance(moment0_output_symbol, Field):
                moment0_output_symbol = moment0_output_symbol.center
            main_assignments.append(Assignment(moment0_output_symbol, sum(pdfs)))
        if 'moment1' in output_quantity_names_to_symbols:
            moment1_output_symbol = output_quantity_names_to_symbols['moment1']
            if isinstance(moment1_output_symbol, Field):
                moment1_output_symbol = moment1_output_symbol.center_vector
            main_assignments.extend([Assignment(lhs, sum(d[i] * pdf for d, pdf in zip(self._stencil, pdfs)))
                                     for i, lhs in enumerate(moment1_output_symbol)])

        ac = ac.copy(main_assignments, [Assignment(a, b) for a, b in eqs.items()])
        return ac.new_without_unused_subexpressions()

    def __repr__(self):
        return "ConservedValueComputation for %s" % (", " .join(self.conserved_quantities.keys()),)


# -----------------------------------------  Helper functions ----------------------------------------------------------


def get_equations_for_zeroth_and_first_order_moment(stencil, symbolic_pdfs,
                                                    symbolic_zeroth_moment, symbolic_first_moments):
    r"""
    Returns an equation system that computes the zeroth and first order moments with the least amount of operations

    The first equation of the system is equivalent to

    .. math :

        \rho = \sum_{d \in S} f_d
        u_j = \sum_{d \in S} f_d u_jd

    :param stencil: called :math:`S` above
    :param symbolic_pdfs: called :math:`f` above
    :param symbolic_zeroth_moment:  called :math:`\rho` above
    :param symbolic_first_moments: called :math:`u` above
    """
    def filter_out_plus_terms(expr):
        result = 0
        for term in expr.args:
            if not type(term) is sp.Mul:
                result += term
        return result

    dim = len(stencil[0])

    subexpressions = []
    pdf_sum = sum(symbolic_pdfs)
    u = [0] * dim
    for f, offset in zip(symbolic_pdfs, stencil):
        for i in range(dim):
            u[i] += f * int(offset[i])

    plus_terms = [set(filter_out_plus_terms(u_i).args) for u_i in u]
    for i in range(dim):
        rhs = plus_terms[i]
        for j in range(i):
            rhs -= plus_terms[j]
        eq = Assignment(sp.Symbol("vel%dTerm" % (i,)), sum(rhs))
        subexpressions.append(eq)

    for subexpression in subexpressions:
        pdf_sum = pdf_sum.subs(subexpression.rhs, subexpression.lhs)

    for i in range(dim):
        u[i] = u[i].subs(subexpressions[i].rhs, subexpressions[i].lhs)

    equations = []
    equations += [Assignment(symbolic_zeroth_moment, pdf_sum)]
    equations += [Assignment(u_i_sym, u_i) for u_i_sym, u_i in zip(symbolic_first_moments, u)]

    return AssignmentCollection(equations, subexpressions)


def divide_first_order_moments_by_rho(assignment_collection, dim):
    r"""
    Assumes that the equations of the passed equation collection are the following
        - rho = f_0  + f_1 + ...
        - u_0 = ...
        - u_1 = ...
    Returns a new equation collection where the u terms (first order moments) are divided by rho.
    The dim parameter specifies the number of first order moments. All subsequent equations are just copied over.
    """
    old_eqs = assignment_collection.main_assignments
    rho = old_eqs[0].lhs
    new_first_order_moment_eq = [Assignment(eq.lhs, eq.rhs / rho) for eq in old_eqs[1:dim + 1]]
    new_eqs = [old_eqs[0]] + new_first_order_moment_eq + old_eqs[dim + 1:]
    return assignment_collection.copy(new_eqs)


def add_density_offset(assignment_collection, offset=sp.Rational(1, 1)):
    r"""
    Assumes that first equation is the density (zeroth moment). Changes the density equations by adding offset to it.
    """
    old_eqs = assignment_collection.main_assignments
    new_density = Assignment(old_eqs[0].lhs, old_eqs[0].rhs + offset)
    return assignment_collection.copy([new_density] + old_eqs[1:])


def apply_force_model_shift(shift_member_name, dim, assignment_collection, force_model, compressible, reverse=False):
    """
    Modifies the first order moment equations in assignment collection according to the force model shift.
    It is applied if force model has a method named shift_member_name. The equations 1: dim+1 of the passed
    equation collection are assumed to be the velocity equations.
    """
    if force_model is not None and hasattr(force_model, shift_member_name):
        old_eqs = assignment_collection.main_assignments
        density = old_eqs[0].lhs if compressible else sp.Rational(1, 1)
        old_vel_eqs = old_eqs[1:dim + 1]
        shift_func = getattr(force_model, shift_member_name)
        vel_offsets = shift_func(density)
        if reverse:
            vel_offsets = [-v for v in vel_offsets]
        shifted_velocity_eqs = [Assignment(old_eq.lhs, old_eq.rhs + offset)
                                for old_eq, offset in zip(old_vel_eqs, vel_offsets)]
        new_eqs = [old_eqs[0]] + shifted_velocity_eqs + old_eqs[dim + 1:]
        return assignment_collection.copy(new_eqs)
    else:
        return assignment_collection