Source code for mmfutils.math.integrate

"""Integration Utilities.
"""
import itertools
import logging
import warnings

import numpy as np

sp = None
numba = None
try:
    import scipy.integrate
    sp = scipy
    import numba
except ImportError:         # pragma: no cover
    pass

try:
    from ._ssum import ssum as _ssum_cython
except ImportError:
    _ssum_cython = None

__all__ = ['quad', 'mquad', 'Richardson', 'rsum']

_ABS_TOL = 1e-12
_REL_TOL = 1e-8
_EPS = np.finfo(float).eps


[docs]def quad(f, a, b, epsabs=_ABS_TOL, epsrel=_REL_TOL, limit=1000, points=None, **kwargs): r""" An improved version of integrate.quad that does some argument checking and deals with points properly. Return (ans, err). Examples -------- >>> def f(x): return 1./x**2 >>> (ans, err) = quad(f, 1, np.inf, points=[]) >>> abs(ans - 1.0) < err True >>> (ans, err) = quad(f, 1, np.inf, points=[3.0, 2.0]) >>> abs(ans - 1.0) < err True """ if points is not None: points = [p for p in points if p < b] points = [p for p in points if a < p] if 0 == len(points): points = None if (points is None) or (b < np.inf): (y, err) = sp.integrate.quad(func=f, a=a, b=b, args=(), full_output=0, epsabs=epsabs, epsrel=epsrel, limit=limit, points=points, **kwargs) else: midp = max(points) (y0, err0) = sp.integrate.quad(func=f, a=a, b=midp, args=(), full_output=0, epsabs=epsabs, epsrel=epsrel, limit=limit, points=points, **kwargs) (y1, err1) = sp.integrate.quad(func=f, a=midp, b=b, args=(), full_output=0, epsabs=epsabs, epsrel=epsrel, limit=limit, points=None, **kwargs) y = y0+y1 err = err0+err1 return (y, err)
[docs]def mquad(f, a, b, abs_tol=_ABS_TOL, verbosity=0, fa=None, fb=None, save_fx=False, res_dict=None, max_fcnt=10000, min_step_size=None, norm=lambda x: abs(np.array(x)).max(), points=None): r"""Return (res, err) where res is the numerically evaluated integral using adaptive Simpson quadrature. mquad tries to approximate the integral of function f from a to b to within an error of abs_tol using recursive adaptive Simpson quadrature. mquad allows the function y = f(x) to be array-valued. In the matrix valued case, the infinity norm of the matrix is used as it's "absolute value". Parameters ---------- f : function Possibly array valued function to integrate. If this emits a NaN, then an AssertionError is raised to allow the user to optimize this check away (as it exists in the core of the loops) a, b : float Integration range (a, b) fa, fb : float f(a) and f(b) respectively (if already computed) abs_tol : float Approximate absolute tolerance on integral verbosity : int Display info if greater than zero. Shows the values of [fcnt a b-a Q] during the iteration. save_fx : bool If True, then save the abscissa and function values in res_dict. res_dict : dict Details are stored here. Pass a dictionary to access these. The dictionary will be modified. fcnt : Number of function evaluations. xy : List of pairs (x, f(x)) if save_fx is defined. max_fcnt : int Maximum number of function evaluations. min_step_size : float Minimum step size to limit recursion. norm : function Norm to use to determine convergence. The absolute error is determined as `norm(f(x) - F)`. points : [float] List of special points to be included in abscissa. Notes ----- Based on "adaptsim" by Walter Gander. Ref: W. Gander and W. Gautschi, "Adaptive Quadrature Revisited", 1998. http://www.inf.ethz.ch/personal/gander Examples -------- Orthogonality of planewaves on [0, 2pi] >>> def f(x): ... v = np.exp(1j*np.array([[1.0, 2.0, 3.0]])*x) ... return v.T.conj()*v/2.0/np.pi >>> ans = mquad(f, 0, 2*np.pi) >>> abs(ans - np.eye(ans.shape[0])).max() < _ABS_TOL True >>> res_dict = {} >>> def f(x): return x**2 >>> ans = mquad(f, -2, 1, res_dict=res_dict, save_fx=True) >>> abs(ans - 3.0) < _ABS_TOL True >>> x = np.array([xy[0] for xy in res_dict['xy']]) >>> y = np.array([xy[1] for xy in res_dict['xy']]) >>> abs(y - f(x)).max() 0.0 # This works, but triggers a warning because of the singular # endpoints. >>> logger = logging.getLogger() >>> logger.disabled = True >>> def f(x): return 1.0/np.sqrt(x) + 1.0/np.sqrt(1.0-x) >>> abs(mquad(f, 0, 1, abs_tol=1e-8) - 4.0) < 1e-8 True >>> logger.disabled = False >>> def f(x): ... if x < 0: ... return 0.0 ... else: ... return 1.0 >>> abs(mquad(f, -2.0, 1.0) - 1.0) < 1e-10 True >>> def f(x): return 1./x >>> mquad(f, 1, np.inf) Traceback (most recent call last): ... ValueError: Infinite endpoints not supported. """ points = [] if points is None else points res_dict = {} if res_dict is None else res_dict assert isinstance(res_dict, dict), "res_dict must be a dictionary" if not (np.isfinite(a) and np.isfinite(b)): raise ValueError("Infinite endpoints not supported.") res_dict['fcnt'] = 0 if min_step_size is None: min_step_size = _EPS/1024.0*abs(b-a) # We augment and decorate the function to pass various related # arguments to the helpers. if save_fx: res_dict['xy'] = [] def f(x, f=f): y = f(x) res_dict['xy'].append((x, y)) res_dict['fcnt'] += 1 assert not np.any(np.isnan(y)), ( "Nan encountered: " "f({}) = {}".format(x, y)) return y else: def f(x, f=f): res_dict['fcnt'] += 1 y = f(x) assert not np.any(np.isnan(y)), ( "Nan encountered: " "f({}) = {}".format(x, y)) return y f.__dict__['res_dict'] = res_dict f.__dict__['max_fcnt'] = max_fcnt f.__dict__['norm'] = norm fa = f(a) if fa is None else fa fb = f(b) if fb is None else fb xs = list(set(p for p in points if a < p and p < b)) xs.sort() ys = list(map(f, xs)) xs.insert(0, a) ys.insert(0, fa) xs.append(b) ys.append(fb) xs_ = [] ys_ = [] for i, a_ in enumerate(xs[:-1]): b_ = xs[i+1] fa_ = ys[i] # Subdivide each interval into three unequal segments. h = 0.13579*(b_ - a_) x1 = a_ + 2.0*h x2 = b_ - 2.0*h fx1 = f(x1) fx2 = f(x2) xs_.extend([a_, x1, x2]) ys_.extend([fa_, fx1, fx2]) xs_.append(b) ys_.append(fb) # Increase the tolerance so that roundoff errors from each # interval will not accumulate too much. abs_tol2 = abs_tol**2/float(len(xs_)-1) res = 0.0 err2 = 0.0 for i, a_ in enumerate(xs_[:-1]): # Call the recursive core integrator on each region. b_ = xs_[i+1] fa_ = ys_[i] fb_ = ys_[i+1] # Fudge endpoints to avoid infinities. if f.norm(fa_) == np.inf: fa_ = f(a_ + _EPS*(b_ - a_)) if f.norm(fb_) == np.inf: fb_ = f(b_ - _EPS*(b_ - a_)) r, e2 = _mquadstep(f, a_, b_, fa_, fb_, abs_tol2, min_step_size, verbosity) res += r err2 += e2 res_dict['err'] = np.maximum(np.sqrt(err2), abs(_EPS*res)) return res
def _mquadstep(f, a, b, fa, fb, abs_tol2, min_step_size, verbosity, fmid=None): r"""Recursive core routine for function mquad. Parameters ---------- f : function a, b : float Endpoints and midpoint of interval `a` < `b` fa, fb, fmid: float, complex, or arrays `f(a)`, `f(b)`, `f(mid)` where `mid=(a+b)/2`. Only provide `fmid` if it has been computed. abs_tol2 : float `abs_tol**2` """ # Evaluate integrand twice in interior of subinterval [a, b]. h = b - a c = (a + b)/2.0 if fmid is None: fc = f(c) else: fc = fmid # Three point Simpson's rule. Q0 = (h/6.0)*(fa + 4.0*fc + fb) err = f.norm(Q0 - (fa + fb)*h/2.0) ac = (a + c)/2.0 cb = (c + b)/2.0 if (abs(h) < min_step_size or ac <= a or b <= cb): # Minimum step size reached; singularity possible. logging.warning(" ".join([ 'mquad:MinStepSize:', 'Minimum step size reached.', "({} < {})".format(abs(h), min_step_size), 'Singularity possible (err = {}).'.format(err)])) return Q0, err*err if f.res_dict['fcnt'] > f.max_fcnt: # pragma: no cover logging.warning(" ".join([ 'mquad:MaxFcnCount:', 'Maximum function count {} exceeded.'.format(f.max_fcnt), 'Singularity likely.'])) return Q0, err*err fac = f((a + c)/2.0) fcb = f((c + b)/2.0) # Five point double Simpson's rule. Q1 = (h/12.0)*(fa + 4.0*fac + 2.0*fc + 4.0*fcb + fb) # One step of Romberg extrapolation. Q = Q1 + (Q1 - Q0)/15.0 # Check accuracy of integral over this subinterval. err2 = f.norm(Q1 - Q)**2 if not np.isfinite(f.norm(Q)): # pragma: no cover # Infinite or Not-a-Number function value encountered. logging.warning(" ".join(['mquad:ImproperFcnValue:', 'Inf or NaN function value encountered.'])) return Q, err2 if 1 < verbosity: # pragma: no cover print(a, h, f.norm(Q)) if err2 <= abs_tol2: # We are done. pass else: # Subdivide region. Qac, err_ac2 = _mquadstep(f, a, c, fa, fc, abs_tol2/2.0, min_step_size, verbosity, fmid=fac) Qcb, err_cb2 = _mquadstep(f, c, b, fc, fb, abs_tol2/2.0, min_step_size, verbosity, fmid=fcb) Q = Qac + Qcb err2 = err_ac2 + err_cb2 return Q, np.maximum(err2, (_EPS*Q)**2)
[docs]def Richardson(f, ps=None, l=2, n0=1): r"""Compute the Richardson extrapolation of $f$ given the function .. math:: f(N) = f + \sum_{n=0}^{\infty} \frac{\alpha_n}{N^{p_n}} The extrapolants are stored in the array `S`[n, s] where `S[n, 0] = f(n0*l**n)` and `S[n, s]` is the s'th extrapolant. .. note:: It is crucial for performance that the powers $p_n$ be properly characterized. If you do not know the form of the errors, then consider using a non-linear acceleration technique such as :func:`levin_sum`. Parameters ---------- ps : iterable Iterable returning the powers $p_n$. To generate the sequence $p_0 + m d_p$ for example, use :func:`itertools.count``(p0, dp)`. Examples -------- Here we consider .. math:: f(N) = \sum_{n=1}^{N} \frac{1}{n^2} = \frac{\pi^2}{6} + \order(N^{-1}) >>> def f(N): return sum(np.arange(1, N+1, dtype=float)**(-2)) >>> r = Richardson(f, l=3, n0=2) >>> for n in range(9): ... x = next(r) >>> err = abs(x - np.pi**2/6.0) >>> assert err < 1e-14, 'err' Now some other examples with different `p` values: .. math:: f(N) = \sum_{n=1}^{N} \frac{1}{n^4} = \frac{\pi^4}{90} + \order(N^{-3}) >>> def f(N): return sum(np.arange(1, N+1, dtype=float)**(-4)) >>> r = Richardson(f, ps=itertools.count(3,1)) >>> for n in range(8): ... x = next(r) >>> err = abs(x - np.pi**4/90.0) >>> assert err < 1e-14, 'err' .. math:: f(N) = \sum_{n=1}^{N} \frac{1}{n^6} = \frac{\pi^6}{945} + \order(N^{-5}) >>> def f(N): return sum(np.arange(1, N+1, dtype=float)**(-6)) >>> r = Richardson(f, ps=itertools.count(5)) >>> for n in range(7): ... x = next(r) >>> err = abs(x - np.pi**6/945.0) >>> assert err < 1e-14, 'err' Richardson works with array valued functions: >>> def f(N): return np.array([sum(np.arange(1, N+1, dtype=float)**(-2)), ... sum(np.arange(1, N+1, dtype=float)**(-4))]) >>> r = Richardson(f, l=3, n0=2) >>> for n in range(7): ... x = next(r) >>> err = abs(x - np.array([np.pi**2/6.0, np.pi**4/90.0])).max() >>> assert err < 1e-13, 'err' It also works for complex valued functions: >>> def f(N): return (sum(np.arange(1, N+1, dtype=float)**(-2)) + ... 1j*sum(np.arange(1, N+1, dtype=float)**(-4))) >>> r = Richardson(f, l=3, n0=2) >>> for n in range(7): ... x = next(r) >>> err = abs(x - (np.pi**2/6.0 + 1j*np.pi**4/90.0)) >>> assert err < 1e-13, 'err' """ if ps is None: ps = itertools.count(1, 1) n = 0 f0 = f(n0) if hasattr(f0, 'shape'): # Allows us to deal with array valued functions S = np.zeros((n+1, n+1) + f0.shape, dtype=f0.dtype) else: S = np.zeros((n+1, n+1), dtype=type(f0)) p = [] while True: if S.shape[0] <= n: new_shape = np.array(S.shape) new_shape[:2] *= 2 Snew = np.empty(new_shape, dtype=S.dtype) Snew[0:S.shape[0], 0:S.shape[1]] = S[:, :] S = Snew if 0 == l: # pragma: no cover (What is this?) S[n, 0] = f0 else: S[n, 0] = f(n0*l**n) p.append(next(ps)) for m in range(1, n+1): lpm1 = float(l**p[m-1]) S[n, m] = (lpm1*S[n, m-1] - S[n-1, m-1])/(lpm1 - 1.0) n = n+1 yield S[n-1, n-1]
def exact_add(a, b): """Exact addition. Return (x, err) so that using exact addition x + err == a + b but using floating point x + err == x This is based on the fact that, with IEEE floating point if abs(b) <= abs(a): x = a + b # Floating point sum err = (a - x) + b # Exact error >>> exact_add(1e18, 1) (1e+18, 1.0) """ if abs(a) < abs(b): x = b + a err = (b - x) + a else: x = a + b err = (a - x) + b return (x, err) def exact_sum(xs, maxiter=5): """Compute the sum of all values in xs exactly. Return (ans, err) where err is an array whose sum is the exact err, and ans is the sum. Arguments --------- xs : list Numbers to be summed maxiter : int Maximum number of iterations. The function tries to make return a result such that `ans + sum(err) == ans` but gives up if more than this many iterations are required. Examples -------- >>> (x, err) = exact_sum([2e100, 2e51, 2, 2e-50, -2]) >>> x 2e+100 >>> err [2e+51, 2e-50] >>> exact_sum([1e100]*100 + [2e100, 2e51, 2, 2e-50, -2]) (1.02e+102, [2.7197364491160207e+85, 2e+51, 2e-50]) """ ans = 0.0 err = [] for b in xs: (ans, err0) = exact_add(ans, b) for n in range(len(err)): (err[n], err0) = exact_add(err[n], err0) if err0 != 0: err.append(err0) err = [e for e in err if e != 0] max_iter = 5 while (sum(list(reversed(err))) + ans) != ans: max_iter -= 1 if max_iter < 0: # pragma: no cover (never get here?) # Prevent infinite looping. break ans, err = exact_sum(err + [ans]) return (ans, err) def ssum_python(xs): r"""Return (sum(xs), err) computed stably using Kahan's summation method for floating point numbers. (Python version.) >>> N = 10000 >>> l = [(10.0*n)**3.0 for n in reversed(range(N+1))] >>> ans = 250.0*((N + 1.0)*N)**2 >>> (ssum_python(l)[0] - ans, sum(l) - ans) (0.0, -5632.0) """ sum = 0.0 carry = 0.0 for x in xs: y = x - carry tmp = sum + y carry = (tmp - sum) - y sum = tmp eps = np.finfo(np.double).eps err = max(abs(2.0*sum*eps), len(xs)*eps*eps) return (sum, err) if numba: @numba.jit(nopython=True) def ssum_numba(xs, _eps=_EPS): r"""Return (sum(xs), err) computed stably using Kahan's summation method for floating point numbers. (Numba version.) >>> N = 10000 >>> l = np.array([(10.0*n)**3.0 for n in reversed(range(N+1))]) >>> ans = 250.0*((N + 1.0)*N)**2 >>> (ssum_numba(l)[0] - ans, sum(l) - ans) (0.0, -5632.0) Should run less than 8 times slower than a regular sum. >>> import time >>> n = 1./np.arange(1, 2**10) >>> t = time.time();tmp = n.sum();t0 = time.time() - t; >>> np.allclose(n.sum(), ssum_numba(n)[0]) True >>> t = time.time();tmp = ssum_numba(n);t1 = time.time() - t; >>> t1 < 8.0*t0 True """ sum = 0.0 carry = 0.0 for x in xs: y = x - carry tmp = sum + y carry = (tmp - sum) - y sum = tmp err = max(abs(2.0*sum*_eps), len(xs)*_eps*_eps) return (sum, err) def ssum_cython(xs, _eps=_EPS): xs = np.asarray(xs) if _ssum_cython is None: warnings.warn( "ImportError: Could not _ssum_cython: using slow version") return ssum_python(xs) sum = _ssum_cython(xs) ##if isinstance(xs.dtype, np.inexact): # eps = np.finfo(xs.dtype).eps #else: # eps = 0.0 err = max(abs(2.0*sum*_eps), len(xs)*_eps*_eps) return (sum, err) def ssum(xs): r"""Return (sum(xs), err) computed stably using Kahan's summation method for floating point numbers. (C++ version using weave). >>> N = 10000 >>> l = [(10.0*n)**3.0 for n in reversed(range(N+1))] >>> ans = 250.0*((N + 1.0)*N)**2 >>> (ssum(l)[0] - ans, sum(l) - ans) (0.0, -5632.0) Here is an example of the Harmonic series. Series such as these should be summed in reverse, but ssum should do it well. >>> sn = 1./np.arange(1, 10**4) >>> Hn, Hn_err = exact_sum(sn) >>> ans, err = ssum(sn) >>> abs(ans - Hn) < err True >>> abs(sum(sn) - Hn) < err # Normal sum not good! False >>> abs(sum(list(reversed(sn))) - Hn) < err # Unless elements sorted True Here is an example where the truncation errors are tested. >>> try: long = long ... except: long = int >>> N = 10000 >>> np.random.seed(3) >>> r = np.random.randint(-2**30, 2**30, 4*N) >>> A = np.array([long(a)*2**90 + long(b)*2**60 + long(c)*2**30 + long(d) ... for (a, b, c, d) in zip(r[:N], r[N:2*N], r[2*N:3*N], r[3*N:4*N])]) >>> B = A.astype(float)/3987.0 # Introduce truncation errors >>> exact_ans = A.sum() >>> ans, err = ssum(B) >>> ans *= 3987.0 >>> err *= 3987.0 >>> exact_err = abs(float(long(ans) - exact_ans)) >>> exact_err < err True >>> exact_err < err/1000.0 False """ return ssum_cython(xs)
[docs]def rsum(f, N0=0, ps=None, l=2, abs_tol=_ABS_TOL, rel_tol=_REL_TOL, verbosity=0): """Sum f using Richardson extrapolation. Examples -------- >>> def f(n): ... return 1./(n+1)**2 >>> res, err = rsum(f) >>> res 1.6449340668... >>> abs(res - np.pi**2/6.0) < err True """ def F(N, f=f, fs=[0.0, 0]): r"""Return sum of f(n) up to f(N).""" fs[0] += ssum([f(n+N0) for n in range(fs[1], N+1)])[0] fs[1] = N+1 return fs[0] r = Richardson(F, ps=ps, l=l) r1 = next(r) while True: r0 = r1 r1 = next(r) abs_err = abs(r1 - r0) rel_err = abs_err/(abs(r1)+abs_tol) if (abs_err <= abs_tol or rel_err <= rel_tol): break if verbosity > 0: # pragma: no cover logging.info("{} +- {}".format(r1, abs_err)) return r1, abs_err