Skip to content
Snippets Groups Projects

Oldroyd-B

Merged Michael Kuron requested to merge oldroydb into master
Files
2
lbmpy/oldroydb.py 0 → 100644
+ 209
0
 
import pystencils as ps
 
import sympy as sp
 
import numpy as np
 
 
from pystencils.boundaries.boundaryconditions import Boundary
 
from pystencils.stencil import inverse_direction_string, direction_string_to_offset
 
 
 
class OldroydB:
 
def __init__(self, dim, u, tau, F, tauflux, tauface, lambda_p, eta_p, vof=True):
 
assert not ps.FieldType.is_staggered(u)
 
assert not ps.FieldType.is_staggered(tau)
 
assert not ps.FieldType.is_staggered(F)
 
assert ps.FieldType.is_staggered(tauflux)
 
assert ps.FieldType.is_staggered(tauface)
 
 
self.dim = dim
 
self.u = u
 
self.tau = tau
 
self.F = F
 
self.tauflux = tauflux
 
self.tauface_field = tauface
 
self.lambda_p = lambda_p
 
self.eta_p = eta_p
 
 
full_stencil = ["C"] + self.tauflux.staggered_stencil + \
 
list(map(inverse_direction_string, self.tauflux.staggered_stencil))
 
self.stencil = tuple(map(lambda d: tuple(ps.stencil.direction_string_to_offset(d, self.dim)), full_stencil))
 
full_stencil = ["C"] + self.tauface_field.staggered_stencil + \
 
list(map(inverse_direction_string, self.tauface_field.staggered_stencil))
 
self.force_stencil = tuple(map(lambda d: tuple(ps.stencil.direction_string_to_offset(d, self.dim)),
 
full_stencil))
 
 
self.disc = ps.fd.FVM1stOrder(self.tau, self._flux(), self._source())
 
if vof:
 
self.vof = ps.fd.VOF(self.tauflux, self.u, self.tau)
 
else:
 
self.vof = None
 
 
def _flux(self):
 
return [self.tau.center_vector.applyfunc(lambda t: t * self.u.center_vector[i]) for i in range(self.dim)]
 
 
def _source(self):
 
gradu = sp.Matrix([[ps.fd.diff(self.u.center_vector[j], i) for j in range(self.dim)] for i in range(self.dim)])
 
gamma = gradu + gradu.transpose()
 
return self.tau.center_vector * gradu + gradu.transpose() * self.tau.center_vector + \
 
(self.eta_p * gamma - self.tau.center_vector) / self.lambda_p
 
 
def tauface(self):
 
return ps.AssignmentCollection([ps.Assignment(self.tauface_field.staggered_vector_access(d),
 
(self.tau.center_vector + self.tau.neighbor_vector(d)) / 2)
 
for d in self.tauface_field.staggered_stencil])
 
 
def force(self):
 
full_stencil = self.tauface_field.staggered_stencil + \
 
list(map(inverse_direction_string, self.tauface_field.staggered_stencil))
 
dtau = sp.Matrix([sum([sum([
 
self.tauface_field.staggered_access(d, (i, j)) * direction_string_to_offset(d)[i]
 
for i in range(self.dim)]) / sp.Matrix(direction_string_to_offset(d)).norm()
 
for d in full_stencil]) for j in range(self.dim)])
 
A0 = sum([sp.Matrix(direction_string_to_offset(d)).norm() for d in full_stencil])
 
return ps.AssignmentCollection(ps.Assignment(self.F.center_vector, dtau / A0 * 2 * self.dim))
 
 
def flux(self):
 
if self.vof is not None:
 
return self.vof
 
else:
 
return self.disc.discrete_flux(self.tauflux)
 
 
def continuity(self):
 
cont = self.disc.discrete_continuity(self.tauflux)
 
tau_copy = sp.Matrix(self.dim, self.dim, lambda i, j: sp.Symbol("tau_old_%d_%d" % (i, j)))
 
tau_subs = {self.tau.center_vector[i, j]: tau_copy[i, j] for i in range(self.dim) for j in range(self.dim)}
 
return [ps.Assignment(tau_copy[i, j], self.tau.center_vector[i, j])
 
for i in range(self.dim) for j in range(self.dim)] + \
 
[ps.Assignment(a.lhs, a.rhs.subs(tau_subs)) for a in cont]
 
 
 
class Flux(Boundary):
 
inner_or_boundary = True # call the boundary condition with the fluid cell
 
single_link = False # needs to be called for all directional fluxes
 
 
def __init__(self, stencil, value=None):
 
self.stencil = stencil
 
self.value = value
 
 
def __call__(self, field, direction_symbol, **kwargs):
 
assert ps.FieldType.is_staggered(field)
 
 
assert all([s == 0 for s in self.stencil[0]])
 
accesses = [field.staggered_vector_access(ps.stencil.offset_to_direction_string(d))
 
for d in self.stencil[1:]]
 
conds = [sp.Equality(direction_symbol, d + 1) for d in range(len(accesses))]
 
 
if self.value is None:
 
val = sp.Matrix(np.zeros(accesses[0].shape, dtype=int))
 
else:
 
val = self.value
 
 
# use conditional
 
conditional = None
 
for a, c, d in zip(accesses, conds, self.stencil[1:]):
 
d = ps.stencil.offset_to_direction_string(d)
 
assignments = []
 
for i in range(len(a)):
 
fac = 1
 
if ps.FieldType.is_staggered_flux(field) and type(a[i]) is sp.Mul and a[i].args[0] == -1:
 
fac = a[i].args[0]
 
a[i] *= a[i].args[0]
 
assignments.append(ps.Assignment(a[i], fac * val[i]))
 
if len(assignments) > 0:
 
conditional = ps.astnodes.Conditional(ps.data_types.type_all_numbers(c, "int"),
 
ps.astnodes.Block(assignments),
 
conditional)
 
return [conditional]
 
 
def __hash__(self):
 
return hash((Flux, self.stencil, self.value))
 
 
def __eq__(self, other):
 
return type(other) == Flux and other.stencil == self.stencil and self.value == other.value
 
 
 
class Extrapolation(Boundary):
 
inner_or_boundary = True # call the boundary condition with the fluid cell
 
single_link = False # needs to be called for all directional fluxes
 
 
def __init__(self, stencil, src_field, order):
 
self.stencil = stencil
 
self.src = src_field
 
if order == 0:
 
self.weights = (1,)
 
elif order == 1:
 
self.weights = (sp.Rational(3, 2), - sp.Rational(1, 2))
 
elif order == 2:
 
self.weights = (sp.Rational(15, 8), - sp.Rational(10, 8), sp.Rational(3, 8))
 
else:
 
raise NotImplementedError("weights are not known for extrapolation orders > 2")
 
 
def __call__(self, field, direction_symbol, **kwargs):
 
assert ps.FieldType.is_staggered(field)
 
 
assert all([s == 0 for s in self.stencil[0]])
 
accesses = [field.staggered_vector_access(ps.stencil.offset_to_direction_string(d))
 
for d in self.stencil[1:]]
 
conds = [sp.Equality(direction_symbol, d + 1) for d in range(len(accesses))]
 
 
# use conditional
 
conditional = None
 
for a, c, o in zip(accesses, conds, self.stencil[1:]):
 
assignments = []
 
src = [self.src.neighbor_vector(tuple([-1 * n * i for i in o])) for n in range(len(self.weights))]
 
interp = self.weights[0] * src[0]
 
for i in range(1, len(self.weights)):
 
interp += self.weights[i] * src[i]
 
for i in range(len(a)):
 
fac = 1
 
if ps.FieldType.is_staggered_flux(field) and type(a[i]) is sp.Mul and a[i].args[0] == -1:
 
fac = a[i].args[0]
 
a[i] *= a[i].args[0]
 
assignments.append(ps.Assignment(a[i], fac * interp[i]))
 
if len(assignments) > 0:
 
conditional = ps.astnodes.Conditional(ps.data_types.type_all_numbers(c, "int"),
 
ps.astnodes.Block(assignments),
 
conditional)
 
return [conditional]
 
 
def __hash__(self):
 
return hash((Extrapolation, self.stencil, self.src, self.weights))
 
 
def __eq__(self, other):
 
return type(other) == Extrapolation and other.stencil == self.stencil and \
 
other.src == self.src and other.weights == self.weights
 
 
 
class ForceOnBoundary(Boundary):
 
inner_or_boundary = False # call the boundary condition with the boundary cell
 
single_link = False # needs to be called for all directional fluxes
 
 
def __init__(self, stencil, force_field):
 
self.stencil = stencil
 
self.force_field = force_field
 
 
assert not ps.FieldType.is_staggered(force_field)
 
 
def __call__(self, face_stress_field, direction_symbol, **kwargs):
 
assert ps.FieldType.is_staggered(face_stress_field)
 
 
assert all([s == 0 for s in self.stencil[0]])
 
accesses = [face_stress_field.staggered_vector_access(ps.stencil.offset_to_direction_string(d))
 
for d in self.stencil[1:]]
 
conds = [sp.Equality(direction_symbol, d + 1) for d in range(len(accesses))]
 
 
# use conditional
 
conditional = None
 
for a, c, o in zip(accesses, conds, self.stencil[1:]):
 
assignments = ps.Assignment(self.force_field.center_vector,
 
self.force_field.center_vector + 1 * a.transpose() * sp.Matrix(o))
 
conditional = ps.astnodes.Conditional(ps.data_types.type_all_numbers(c, "int"),
 
ps.astnodes.Block(assignments),
 
conditional)
 
return [conditional]
 
 
def __hash__(self):
 
return hash((ForceOnBoundary, self.stencil, self.force_field))
 
 
def __eq__(self, other):
 
return type(other) == ForceOnBoundary and other.stencil == self.stencil and \
 
other.force_field == self.force_field