Skip to content

EMD / HHT

Empirical Mode Decomposition and the Hilbert-Huang Transform.

EMD adaptively decomposes a signal into Intrinsic Mode Functions (IMFs) without any a-priori basis. Each IMF satisfies:

  1. The number of extrema and zero-crossings differ by at most one.
  2. The mean of the upper and lower envelopes is everywhere near zero.

The HHT applies the Hilbert transform to each IMF to produce a fully adaptive time-frequency representation free of cross-term artifacts.

Reference: Huang et al. (1998), Proc. R. Soc. London A, 454, 903–995.


Workflow

import dspkit as dsp

# 1. Decompose
imfs, residue = dsp.emd(x, max_imfs=8)

# 2. Hilbert-Huang Transform
envs, inst_freqs = dsp.hht(imfs, fs)

# 3. Marginal spectrum
freq_bins, marginal = dsp.hht_marginal_spectrum(envs, inst_freqs, fs, n_bins=512)

Reconstruction is exact: imfs.sum(axis=0) + residue == x (within floating-point tolerance).

EMD intrinsic mode functions

HHT time-frequency analysis


dspkit.emd.emd(x, max_imfs=None, max_sifting=10, sd_threshold=0.2)

Empirical Mode Decomposition.

Iteratively extracts Intrinsic Mode Functions (IMFs) from x using the sifting process. Each IMF satisfies:

  1. The number of extrema and zero-crossings differ by at most one.
  2. The mean of the upper and lower envelopes is everywhere near zero.

IMFs are ordered from highest to lowest instantaneous frequency. Their sum plus the residue exactly reconstructs the original signal.

Parameters:

Name Type Description Default
x (array_like, shape(N))

Input signal. Should be detrended beforehand if it has a strong polynomial trend (EMD handles mild trends via the residue).

required
max_imfs int or None

Maximum number of IMFs to extract. None extracts all possible.

None
max_sifting int

Maximum sifting iterations per IMF (default 10). Higher values enforce stricter IMF symmetry at the cost of computation.

10
sd_threshold float

Sifting stops when the normalised squared difference between successive iterates falls below this threshold (default 0.2).

0.2

Returns:

Name Type Description
imfs (ndarray, shape(n_imfs, N))

Extracted IMFs, ordered highest to lowest frequency.

residue (ndarray, shape(N))

Monotone (or near-monotone) residue. Represents the overall trend.

Notes

Reconstruction is exact: imfs.sum(axis=0) + residue ≈ x.

Source code in dspkit/emd.py
def emd(
    x: np.ndarray,
    max_imfs: int | None = None,
    max_sifting: int = 10,
    sd_threshold: float = 0.2,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Empirical Mode Decomposition.

    Iteratively extracts Intrinsic Mode Functions (IMFs) from x using the
    sifting process. Each IMF satisfies:

    1. The number of extrema and zero-crossings differ by at most one.
    2. The mean of the upper and lower envelopes is everywhere near zero.

    IMFs are ordered from highest to lowest instantaneous frequency.
    Their sum plus the residue exactly reconstructs the original signal.

    Parameters
    ----------
    x : array_like, shape (N,)
        Input signal. Should be detrended beforehand if it has a strong
        polynomial trend (EMD handles mild trends via the residue).
    max_imfs : int or None
        Maximum number of IMFs to extract. ``None`` extracts all possible.
    max_sifting : int
        Maximum sifting iterations per IMF (default 10). Higher values
        enforce stricter IMF symmetry at the cost of computation.
    sd_threshold : float
        Sifting stops when the normalised squared difference between
        successive iterates falls below this threshold (default 0.2).

    Returns
    -------
    imfs : ndarray, shape (n_imfs, N)
        Extracted IMFs, ordered highest to lowest frequency.
    residue : ndarray, shape (N,)
        Monotone (or near-monotone) residue. Represents the overall trend.

    Notes
    -----
    Reconstruction is exact: ``imfs.sum(axis=0) + residue ≈ x``.
    """
    x = np.asarray(x, dtype=float)
    N = len(x)
    t = np.arange(N, dtype=float)

    imfs = []
    r = x.copy()

    while _has_enough_extrema(r):
        if max_imfs is not None and len(imfs) >= max_imfs:
            break
        imf = _sift(t, r, max_sifting=max_sifting, sd_threshold=sd_threshold)
        imfs.append(imf)
        r = r - imf

    if not imfs:
        # Signal is already monotone — return empty IMF array
        return np.empty((0, N)), r

    return np.array(imfs), r

dspkit.emd.hht(imfs, fs)

Hilbert-Huang Transform: apply the Hilbert transform to each IMF.

Each IMF is a narrow-band AM-FM signal, so the Hilbert transform gives physically meaningful instantaneous amplitude and frequency.

Parameters:

Name Type Description Default
imfs (array_like, shape(n_imfs, N))

IMFs returned by emd.

required
fs float

Sampling frequency [Hz].

required

Returns:

Name Type Description
envelopes (ndarray, shape(n_imfs, N))

Instantaneous amplitude of each IMF.

inst_freqs (ndarray, shape(n_imfs, N))

Instantaneous frequency of each IMF [Hz].

Source code in dspkit/emd.py
def hht(
    imfs: np.ndarray,
    fs: float,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Hilbert-Huang Transform: apply the Hilbert transform to each IMF.

    Each IMF is a narrow-band AM-FM signal, so the Hilbert transform gives
    physically meaningful instantaneous amplitude and frequency.

    Parameters
    ----------
    imfs : array_like, shape (n_imfs, N)
        IMFs returned by ``emd``.
    fs : float
        Sampling frequency [Hz].

    Returns
    -------
    envelopes : ndarray, shape (n_imfs, N)
        Instantaneous amplitude of each IMF.
    inst_freqs : ndarray, shape (n_imfs, N)
        Instantaneous frequency of each IMF [Hz].
    """
    imfs = np.atleast_2d(np.asarray(imfs, dtype=float))
    envelopes  = np.zeros_like(imfs)
    inst_freqs = np.zeros_like(imfs)

    for i, imf in enumerate(imfs):
        envelopes[i], _, inst_freqs[i] = hilbert_attributes(imf, fs)

    return envelopes, inst_freqs

dspkit.emd.hht_marginal_spectrum(envelopes, inst_freqs, fs, n_bins=512)

Marginal Hilbert spectrum.

Accumulates instantaneous energy (amplitude²) at each instantaneous frequency across all IMFs and all time samples:

H(f) = (1/N) Σ_{imfs} Σ_{t: f_i(t)≈f} A²(t)

This is an adaptive analogue of the PSD — it does not assume stationarity and its frequency axis is not fixed by a window length.

Parameters:

Name Type Description Default
envelopes (ndarray, shape(n_imfs, N))

Instantaneous amplitudes from hht.

required
inst_freqs (ndarray, shape(n_imfs, N))

Instantaneous frequencies from hht [Hz].

required
fs float

Sampling frequency [Hz]. Used to set the Nyquist limit.

required
n_bins int

Number of frequency bins (default 512).

512

Returns:

Name Type Description
freq_bins (ndarray, shape(n_bins))

Frequency axis [Hz], from 0 to fs / 2.

spectrum (ndarray, shape(n_bins))

Marginal Hilbert spectrum [amplitude² per Hz equivalent].

Source code in dspkit/emd.py
def hht_marginal_spectrum(
    envelopes: np.ndarray,
    inst_freqs: np.ndarray,
    fs: float,
    n_bins: int = 512,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Marginal Hilbert spectrum.

    Accumulates instantaneous energy (amplitude²) at each instantaneous
    frequency across all IMFs and all time samples:

        H(f) = (1/N) Σ_{imfs} Σ_{t: f_i(t)≈f} A²(t)

    This is an adaptive analogue of the PSD — it does not assume stationarity
    and its frequency axis is not fixed by a window length.

    Parameters
    ----------
    envelopes : ndarray, shape (n_imfs, N)
        Instantaneous amplitudes from ``hht``.
    inst_freqs : ndarray, shape (n_imfs, N)
        Instantaneous frequencies from ``hht`` [Hz].
    fs : float
        Sampling frequency [Hz]. Used to set the Nyquist limit.
    n_bins : int
        Number of frequency bins (default 512).

    Returns
    -------
    freq_bins : ndarray, shape (n_bins,)
        Frequency axis [Hz], from 0 to fs / 2.
    spectrum : ndarray, shape (n_bins,)
        Marginal Hilbert spectrum [amplitude² per Hz equivalent].
    """
    envelopes  = np.atleast_2d(np.asarray(envelopes,  dtype=float))
    inst_freqs = np.atleast_2d(np.asarray(inst_freqs, dtype=float))

    nyq = fs / 2.0
    freq_bins = np.linspace(0.0, nyq, n_bins)
    spectrum  = np.zeros(n_bins)
    N = envelopes.shape[1]

    for env, fi in zip(envelopes, inst_freqs):
        # Clip to valid range, convert to bin index
        fi_clipped = np.clip(fi, 0.0, nyq * (1.0 - 1e-9))
        idx = (fi_clipped / nyq * (n_bins - 1)).astype(int)
        np.add.at(spectrum, idx, env ** 2)

    spectrum /= N  # time-average
    return freq_bins, spectrum