DataAnalysis2021/05-Spectrogram/uncertainty_instantaneous_frequency.ipynb

277 lines
10 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# The Uncertainty Problem\n",
"Using the moving window analysis, you may have noticed that we are not able to get a good resoltion in time and frequency. This is called the uncertainty problem, which you might have heard from in physics.\n",
"\n",
"Here, we a closer look how the uncertainty problem effects our sweep signal, when we apply the moving window average on this signal.\n",
"\n",
"### Tasks\n",
"1. Read the documentation about [scipy's STFT](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.chirp.html), especially the parameters nperseg, noverlap and nfft\n",
"2. Play around with the parameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Preparation: load packages\n",
"import os\n",
"from obspy.core import read\n",
"from obspy.core import UTCDateTime\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from scipy.signal import stft, chirp\n",
"import pywt"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Load Chirp signal\n",
"dt = 0.01\n",
"t = np.arange(0, 5000) * dt\n",
"data = chirp(t=t, f0=0.5, f1=25, t1=t[-1])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Applying Short-Time Fourier Transform (STFT) on data\n",
"freqs, time, S = stft(x=data, fs=1/dt, window='hann', nperseg=256, noverlap=None, nfft=None)\n",
"\n",
"plt.figure(figsize = [15,6])\n",
"plt.pcolormesh(time, freqs, np.abs(S))\n",
"plt.xlabel(\"Time (s)\")\n",
"plt.ylabel(\"Frequency (Hz)\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# The instantaneous Frequency\n",
"The instantaneous frequency is a measure of the dominating frequency at each time instant of a signal and is determined as the time derivative of the instantaneous phase $\\phi (t)$. The phase can be obtained by constructing the analytical signal of an arbitrary time series:\n",
"\n",
"$$ \n",
"\\hat{s}(t) = s(t) + i \\cdot \\mathcal{H} \\left( (s(t) \\right) = |\\hat{s}(t)| \\cdot \\exp (i \\phi (t)),\n",
"$$\n",
"\n",
"where $\\mathcal{H}$ denotes the Hilbert-Transform. Note, the scipy function [hilbert()](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.hilbert.html) computes the analytical signal. Now, we can derive the instantaneous phase as\n",
"\n",
"$$\n",
"\\phi (t) = \\arg \\left( \\frac{Im(\\hat{s}(t))}{Re(\\hat{s}(t))} \\right).\n",
"$$\n",
"\n",
"Note, the instantaneous phase is called wrapped phase if it is in the interval $(\\pi, \\pi]$ or $[0, 2 \\pi)$. To determine the instantaneous frequncy, we need to [unwrap](https://numpy.org/doc/1.18/reference/generated/numpy.unwrap.html) the phase to get a continous function. Finally, the instantaneous frequency can be determined by\n",
"\n",
"$$\n",
"f(t) = \\frac{1}{2 \\pi} \\frac{\\text{d} \\phi (t)}{\\text{d} t}\n",
"$$\n",
"\n",
"1. Create a time series with \n",
" 1. increasing / decreasing frequncy content, e.g. a [Chirp-Signal](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.chirp.html)\n",
" 2. increasing and decreasing frequency content, i.e. overlapping of two chirp signals (how does the spectrogram looks like?)\n",
"3. Write a function that computes the instantaneous frequency (Hint: use np.diff for instantaneous phase derivative, np.unwrap to unwrap the phase.)\n",
"4. What is the meaning of $|\\hat{s}(t)|$? (Hint: plot it!) \n",
"5. Plot the results of the instantaneous frequnecy"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define the signal\n",
"dt = 0.01\n",
"t = np.arange(0, 5000) * dt\n",
"data = chirp(t=t, f0=0.5, f1=25, t1=t[-1])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Function for instantaneous frequency\n",
"from scipy.signal import hilbert\n",
"def istantaneous_frequency(x, dt):\n",
" # To continue"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Plot results"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# The Continuous Wavelet Transform (CWT)\n",
"\n",
"We learned during the STFT about the uncertainty problem. Using the CWT results in a different time-frequency resolution. For high frequency we get poor frequency resolution by good time resoltution, whereas for low frequencies we get poor time resolution bur good frequency resolution. As you can see, there is no recipe when to use STFT, instantaneous frequnecy or CWT, but it is good to know where are the advantages and disadvantages.\n",
"\n",
"The CWT is defined as\n",
"\n",
"$$\n",
"\\text{CWT}_{a, b} \\left(s(t) \\right) = \\frac{1}{\\sqrt{|a|}} ~ \\int_{-\\infty}^{\\infty} s(t) ~ \\Psi^\\ast \\left( \\frac{t-b}{a} \\right) \\text{d} t ~,\n",
"$$\n",
"\n",
"where $a$ denotes the scaling factor of the mother wavelet $\\Psi(t)$ and $b$ denotes the translation. \n",
"In other words, the CWT is a convolution of the signal $s(t)$ with a conjugate complex and scaled wavelet-function ([here](https://en.wikipedia.org/wiki/Continuous_wavelet_transform) you find a good visualisation). Due to the different scaled wavelets, the CWT reacts well on abruptly changes in the signal.\n",
"Usally we get instead of the frequency on the y-axis the scaling factor $a$, but it depends on the center frequency $f_c$ of the scaled wavelet by\n",
"\n",
"$$\n",
"f_a = \\frac{f_c}{a} .\n",
"$$\n",
"\n",
"Luckily it exists a library [pywt](https://pywavelets.readthedocs.io/en/latest/) to compute the CWT with several wavelets.\n",
"For the following example we use a [complex Morlet wavelet](https://en.wikipedia.org/wiki/Morlet_wavelet) per default.\n",
"\n",
"#### Tasks\n",
"\n",
"1. Try to plot a time-frequency representation of different signals and compare with results of STFT\n",
"2. Use different [mother wavelets](https://pywavelets.readthedocs.io/en/latest/ref/wavelets.html)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Display a mother wavelet\n",
"wav, a = wavelet = pywt.ContinuousWavelet(\"cmor2-0.95\").wavefun();\n",
"plt.plot(a, wav.real, label=\"real part\");\n",
"plt.plot(a, wav.imag, label=\"imag part\");\n",
"plt.legend();"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def frequency2scale(fmin, fmax, waveletname, dt):\n",
" \"\"\"\n",
" Computes scales for given frequency values fmin and famx.\n",
" :param fmin: frequency minimum\n",
" :param fmax: frequency maximum\n",
" :param waveletname: name of wavelet\n",
" :param dt: sampling interval (s)\n",
" \"\"\"\n",
"\n",
" # Determine lower bound of scales\n",
" scale_start = 0.1\n",
" scale_min = pywt.scale2frequency(waveletname, scale_start)\n",
" while scale_min/dt > fmax:\n",
" scale_start *= 1.1\n",
" scale_min = pywt.scale2frequency(waveletname, scale_start)\n",
" scale_min = round(scale_start, 2)\n",
"\n",
" # Determine upper bound of scales\n",
" scale_start = 1\n",
" scale_max = pywt.scale2frequency(waveletname, scale_start)\n",
" while scale_max/dt > fmin:\n",
" scale_start *= 1.1\n",
" scale_max = pywt.scale2frequency(waveletname, scale_start)\n",
" scale_max = round(scale_start, 2)\n",
"\n",
" return scale_min, scale_max\n",
"\n",
"def scaleogram(x, fs, waveletname=\"cmor2-0.95\", fmin=1, fmax=25, num=100):\n",
" \"\"\"\n",
" Computes CWT of an signal x with sampling rate fs in Hz \n",
" :param x: time seris\n",
" :param fs: sampling rate\n",
" :param waveletname: name of wavelet\n",
" :param fmin: minimum frequency\n",
" :param fmax: maximum frequency\n",
" :param num: length of scale array\n",
" \"\"\"\n",
"\n",
" # Check all input parameters and change them is they do not fit\n",
" dt = 1 / fs\n",
" f_ny = 1 / (2 * dt) # Nyquist frequency\n",
" if f_ny < fmax:\n",
" fmax = f_ny\n",
"\n",
" if fmin >= fmax:\n",
" msg = \"Min. frequency {} is greater than max. frequency {} to compute scaleograms.\".format(fmin, fmax)\n",
" raise ValueError(msg)\n",
"\n",
" # Determine scales from fmin and fmax\n",
" scale_min, scale_max = frequency2scale(fmin, fmax, waveletname=waveletname, dt=dt)\n",
" scales = np.logspace(np.log10(scale_min), np.log10(scale_max), num=num)\n",
"\n",
" # Generate scaleogram\n",
" coefficients, frequencies = pywt.cwt(x, scales, waveletname, sampling_period=dt)\n",
" coefficients = np.abs(coefficients)**2\n",
" time = np.linspace(0, dt * len(x), num=len(x))\n",
" \n",
" return frequencies, time, coefficients"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"f_cwt, t_cwt, coeff = scaleogram(data, fs=1/(t[1]-t[0]))\n",
"plt.figure(figsize=(16, 8))\n",
"plt.pcolormesh(t_cwt, f_cwt, np.abs(coeff))\n",
"plt.xlabel(\"Time (s)\")\n",
"plt.xlabel(\"Frequency (Hz)\");"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}