DataAnalysis2021/04-FFT_DFT_and_Applications/resample.ipynb

231 lines
7.5 KiB
Plaintext
Raw Permalink Normal View History

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Resampling of Time Series\n",
"## Decimation\n",
"The Discrete Fourier Transform (DFT) can be used for some very basic signal processing operations. We can either use is for decimation, which results in redcution of the number of samples of a given time series or we can use it for interpolation, which is the opposite of decimation.\n",
"Decimation implies a reduction of time resolution and thus a decrease in the Nyquist frequency.\n",
"If the original signal contains higher frequency than the new Nyquist frequency\n",
"(after decimation) we get aliasing. Thus, a low pass filtering to the new Nyquist\n",
"frequency is required before decimation. Decimation and low-pass filtering can be\n",
"achieved in one step by using the DFT:\n",
"1. Transform the time series to the frequency domain using the DFT.\n",
"2. Reduce the Nyquist frequency by the desired factor (when using the Fast Fourier Transform, this should be a power of 2), and apply an appropriate taper. Thats the low-pass filter!\n",
"3. Do an inverse DFT of the modified spectrum with reduced Nyquist frequency and obtain a time series with greater sampling interval (because lower Nyquist frequency implies lower time resolution)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Import all required packages\n",
"%matplotlib inline\n",
"import os\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from obspy import read\n",
"from scipy import signal"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Read the data\n",
"datapath = os.path.join(os.path.expanduser('~'),'work', 'data', 'DOR50', '*063')\n",
"st = read(datapath)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def decimate(x, dt, factor):\n",
" \"\"\"\n",
" Easy decimation for real input data x. Note, since new_dt might not be old_dt * 2**n,\n",
" the decimation is not correct.\n",
" x: Data array\n",
" dt: sampling rate of x\n",
" factor: factor for decimation, thus new sampling rate is dt/factor\n",
" \"\"\"\n",
" # 1. Transform x to frequnecy domain\n",
" ft = np.fft.rfft(x)\n",
" freqs = np.fft.rfftfreq(n=len(x), d=dt)\n",
" \n",
" # 2. Get Nyquist Frequencies for both old dt an new dt\n",
" \n",
" \n",
" # Find index of new Nyquist Frequnency in freqs\n",
" index = np.where(freqs > new_f_ny)[0][0]\n",
" \n",
" # 3. Apply window function as lowpass filter and do inverse DFT \n",
" new_ft = ft[:index]\n",
" new_ft = new_ft * signal.tukey(len(new_ft), alpha=0.25)\n",
" x_new = np.fft.irfft(ft[:index])\n",
" \n",
" return x_new, dt*factor"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Plot results for original and decimated data\n",
"# Read data from st\n",
"data = st[0].data[:100000]\n",
"\n",
"# Apply Decimation\n",
"dec, dt_dec = decimate(data, dt=st[0].stats.delta, factor=2)\n",
"\n",
"# Create arrays for time to plot\n",
"t_data = np.arange(0, len(data)) * st[0].stats.delta\n",
"t_dec = np.arange(0, len(dec)) * dt_dec\n",
"\n",
"# Create plot widget\n",
"plt.plot(t_data, data - np.mean(data), alpha=0.5)\n",
"plt.plot(t_dec, dec - np.mean(dec), alpha=0.5)\n",
"plt.xlabel(\"Time (s)\")\n",
"plt.ylabel(\"Amplitude (a.u.)\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Interpolation\n",
"The opposite of decimation is interpolation. Here, we want a finer sampling of the\n",
"time series without changing the frequency content. We could do this in the time\n",
"domain by some interpolation rule using neighbouring samples. We can also do it\n",
"by using the DFT:\n",
"1. Transform the time series to the frequency domain using the DFT.\n",
"2. Append zeros to the spectrum thus increasing the Nyquist frequency.\n",
"3. Do an inverse DFT of the extended spectrum and obtain a time series with smaller sampling interval (because higher Nyquist frequency implies higher time resolution). The frequency content is unchanged!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def interpolation(x, dt, new_dt):\n",
" \"\"\"\n",
" Easy function for interpolation of a given data array to increase the the sampling rate.\n",
" \n",
" x: data array\n",
" dt: sampling rate\n",
" new_dt: new sampling rae\n",
" \"\"\"\n",
" # 1. Transform x to frequnecy domain\n",
" ft = np.fft.rfft(x)\n",
" \n",
" # Determine the number of zeros to add\n",
" \n",
" \n",
" # 2. Append zeros to spectrum\n",
" ft = np.concatenate((ft, np.zeros(append_zeros)))\n",
" \n",
" # 3. Do inverse DFT and determine new dt\n",
" x_new = np.fft.irfft(ft)\n",
" new_dt = t_max / len(x_new)\n",
" \n",
" return x_new, new_dt"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create plot\n",
"# Read data from st\n",
"data = st[0].data[:100000]\n",
"\n",
"# Apply interpolation\n",
"interp, dt_interp = interpolation(data, dt=st[0].stats.delta, new_dt=1/150)\n",
"\n",
"# Create time arrays\n",
"t_data = np.arange(0, len(data)) * st[0].stats.delta\n",
"t_interp = np.arange(0, len(interp)) * dt_interp\n",
"\n",
"# Create plot widget\n",
"plt.plot(t_data, data - np.mean(data), alpha=0.5)\n",
"plt.plot(t_interp, interp - np.mean(interp), alpha=0.5)\n",
"plt.xlabel(\"Time (s)\")\n",
"plt.ylabel(\"Amplitude (a.u.)\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Comparison with function scipy.signal.resample\n",
"For documentation see https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.resample.html"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Import funtion\n",
"from scipy.signal import resample\n",
"\n",
"# Read data from st and create time array\n",
"data = st[0].data[:100000]\n",
"time = np.arange(0, len(data)) * st[0].stats.delta\n",
"\n",
"# Apply resample funtion\n",
"resampled_x, resampled_t = resample(data, num=int(len(data)*2), t=time)\n",
"\n",
"# Create plot\n",
"plt.plot(time, data - np.mean(data), alpha=0.5)\n",
"plt.plot(resampled_t, resampled_x - np.mean(resampled_x), alpha=0.5)\n",
"plt.xlabel(\"Time (s)\")\n",
"plt.ylabel(\"Amplitude (a.u.)\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}