Pair Correlation Function Analysis of Fluorescence Fluctuations in Big Image Time Series using Python¶
A tutorial using Python and scientific libraries to implement pair correlation function (pCF) analysis of a big time series of images from fluorescence microscopy on a personal computer.
Updated on December 8, 2023
Presented at the Big Data Image Processing & Analysis BigDIPA workshops 2016, 2017, and 2018
Supported by the National Institute of Health grant numbers 1R25EB022366-01 and 2P41GM103540-31
Released under the creative commons Attribution 4.0 International license
Abstract¶
Spatiotemporal analysis of fluorescence fluctuations of optical microscopy measurements on living samples is a powerful tool that can provide insight into dynamic molecular processes of high biological relevance (Di Rienzo et al., 2016).
Pair correlation function (pCF) analysis of fluorescence microscopy image time series acquired using very fast cameras is one of the emerging and promising spatiotemporal techniques. However, it is computationally very expensive, and the analysis of big image data sets used to take hours or be impractical on personal computers.
In this tutorial, we use the open-source Python programming language and scientific libraries to compute the pCF analysis of a large time series of fluorescence images acquired with Selective Plane Illumination Microscopy (SPIM).
First, we implement a function to calculate the cross-correlation of two time series. We demonstrate the limitations of Python for efficient numerical computations and several ways to overcome them.
Next, we implement the pCF analysis of a small simulated image time series and optimize its speed by almost two orders of magnitude.
Finally, we use this pCF analysis function to analyze a multi-gigabyte image time series in small, overlapping chunks.
This is a tutorial on computing pair correlations in big images. Refer to the references for an introduction to the pair correlation method and how the computed pair correlations can be analyzed and visualized to study molecular flow in cells.
Requirements¶
To follow this tutorial and run its code, the following prerequisites are needed:
Familiarity with¶
- pair correlation function analysis of fluorescence fluctuations (for example Gratton and Digman lectures)
- programming and nD-array computing (for example Matlab, numpy)
- signal processing, time and frequency domain
Minimum computer specifications¶
- 64-bit Windows 10, macOS, or Linux based operating system
- Core i5 CPU with 4 cores
- 8 GB RAM
- SSD drive with 50 GB free space
- NVIDIA GPU with CUDA drivers
- Web browser supporting WebSockets
- Disabled on-access antivirus scanning for the working and scratch directories
Python development environment¶
- CPython 3.11 64-bit with development header files and libraries
- Python packages: Jupyter, IPython, numpy, scipy, matplotlib, scikit-image, h5py, Cython, dask, numba, and CuPy (optional)
- CUDA Toolkit (optional, used for CuPy)
- A Python distutils compatible C compiler with OpenMP support: Visual Studio 2022 or gcc
Tutorial source code and data files¶
Clone the source code from the ipcf.ipynb repository to a working directory:
git clone https://github.com/cgohlke/ipcf.ipynb
Download and extract the following files from the Zenodo record to the ipcf.ipynb directory:
Simulation_Channel.bin Simulation_Channel.ipcf.bin nih3t3-egfp_2.zip
Open the
ipcf.ipynb
notebook from within the ipcf.ipynb directory, for example, using locally installed jupyter or a docker image:jupyter-lab ipcf.ipynb docker run --rm -p 8888:8888 -v ${PWD}/ipcf.ipynb:/home/jovyan/work/ipcf.ipynb jupyter/scipy-notebook:python-3.11
Configure the runtime environment¶
The notebook depends on a few platform specific variables. Adjust the path to the example data and the path to a scratch directory where large intermediate and output files will be saved:
# import common modules
import datetime
import math
import multiprocessing
import os
import random
import sys
import threading
import warnings
# limit the number CPUs to use
MAXCPUS = min(8, multiprocessing.cpu_count() // 2)
# read-only directory where the example data were extracted
DATA_PATH = './'
# writable directory where large intermediate and output files will be saved
# must not be a network drive
SCRATCH_PATH = './'
# for the BigDIPA workshop cluster
if os.path.exists('../../data/02_fcs_computation/'):
DATA_PATH = '../../data/02_fcs_computation/'
SCRATCH_PATH = '../../scratch/'
# use sequential MKL and OpenBLAS to prevent thread oversubscription
os.environ['MKL_NUM_THREADS'] = '1'
os.environ['OPENBLAS_NUM_THREADS'] = '1'
# os.environ['OMP_NUM_THREADS'] = '1'
import numpy
# set compiler and linker arguments for OpenMP
if sys.platform == 'win32':
OPENMP_ARGS = '--compile-args=/openmp'
else:
OPENMP_ARGS = '--compile-args=-fopenmp --link-args=-fopenmp'
# tell numba where to find CUDA NVVM on Windows
cuda_path = os.environ.get('CUDA_PATH', 'CUDA not found')
if os.path.exists(cuda_path):
os.environ['PATH'] += r';%s\bin;%s\nvvm\bin' % (cuda_path, cuda_path)
os.environ['CUDA_HOME'] = cuda_path
# ignore warnings
warnings.simplefilter('ignore')
# acquire a lock object to force single threaded execution
THREADLOCK = threading.RLock()
# initialize random number generators
random.seed(42)
numpy.random.seed(42)
# display plots within Jupyter notebook
%matplotlib inline
# detect if CUDA is available
try:
import cupy
SKIP_CUDA = False
except ImportError:
SKIP_CUDA = True
# record the current time
START_TIME = datetime.datetime.now()
The Challenge¶
The challenge is to compute the pair correlation function analysis (pCF) of a large time series of images using Python on a personal computer in reasonable time.
Our dataset is a 34.5 GB time series of SPIM images of a biological cell as 35,000 TIFF files of 1024x512 16-bit greyscale samples each:
As part of molecular flow analysis, we need to cross-correlate the time series at each pixel of the image with the time series of all its neighbor pixels at a specific radius after correcting the time series for photobleaching:
For a radius of 6 there are 32 neighbor pixels. To analyze an image of 1024x512 pixels, the number of cross-correlation calculations needed is 16,777,216 (including padding of the border by 6 pixels).
We would like to perform the cross-correlation calculations on the dataset in about 15 minutes.
We have a modern notebook or desktop computer with a minimum of 4 CPU cores, 8 GB RAM, and SSD, running a 64-bit OS with a scientific Python 3.6 distribution and a C compiler installed.
To compute 16,777,216 cross-correlations on 4 CPU cores in 15 minutes, a single cross-correlation computation must finish in about 215 µs (4*15*60*1000*1000/16777216
).
Here is a pseudo code for the image pair correlation function (ipCF) analysis that will later be completed to be the reference implementation:
def ipcf_reference(image_timeseries, circle_coordinates, bins):
"""Return pair correlation function analysis of image time series.
Cross-correlate the time series of each pixel in the image
with all its neighbors at a certain radius and return all
the log-binned and smoothed correlation curves.
"""
ntimes, height, width = image_timeseries.shape
npoints = len(circle_coordinates)
radius = circle_coordinates[0, 0]
result = zeros(
(height - 2 * radius, width - 2 * radius, npoints, len(bins)),
numpy.float32,
)
for y in range(radius, height - radius):
for x in range(radius, width - radius):
a = image_timeseries[:, y, x]
for i in range(npoints):
u, v = circle_coordinates[i]
b = image_timeseries[:, y + v, x + u]
c = correlate(b, a)
result[y - radius, x - radius, i] = smooth(average(c, bins))
return result
# functions that need to be implemented
def correlate(a, b):
"""Return cross-correlation of two arrays."""
...
def average(c, bins):
"""Return averaged chunks of array."""
...
def smooth(c):
"""Return smoothed array."""
...
def circle(npoints, radius):
"""Return circle coordinates."""
...
def logbins(size, nbins):
"""Return up to nbins exponentially increasing integers from 1 to size."""
...
Outline¶
1. Implement a fast cross-correlation function¶
In this section, we
- review the mathematical definition and some properties of cross-correlation
- implement an unnormalized cross-correlation function in pure Python
- compare its speed with an implementation in C
- try several Python libraries to speed up the cross-correlation calculation: threading, numpy, scipy, numba, numba.cuda, CuPy, and Cython
- use the cross-correlation theorem, Cython, and the fft2d C library to implement a very fast circular correlation function
Along the way we learn
- about CPython and its limitations for numerical computations
- to write Python C extensions and interface with C libraries using Cython
- to use the Jupyter notebook to interactively manage data, code, visualizations, and explanatory text
2. Implement pair correlation function analysis of small image time series¶
The techniques learned in the first section are applied to implement the pCF analysis of a small simulated image time series.
In this section, we
- load and explore a time series of images from a simulation of fluorescence fluctuations
- implement functions to normalize, average, and smooth correlation functions for image fluctuation analysis
- run the reference implementation of image pCF analysis
- visualize the result of the image pCF analysis
- optimize the algorithm and implementation of the image pCF analysis for speed
3. Implement out-of-core pair correlation function analysis of big image time series¶
In this section, we demonstrate some methods to process data that fit on disk but are larger than RAM, aka out-of-core processing. We
- interactively browse the 34.5 GB image time series consisting of 35,000 TIFF files
- semi-automatically select a subset of the data for further analysis using the scikit-image library
- save the selection as a blocked dataset in a HDF5 file
- implement a function to correct time series for photobleaching
- use the dask library to chop big data in smaller, overlapping blocks/chunks and schedule the analysis of individual blocks
- run the photobleaching correction and the image pair correlation function implemented in the second part on overlapping blocks of the big SPIM dataset
1. Implement a fast cross-correlation function¶
In this section, we implement a fast 1D cross-correlation function. The goal is to perform a single cross-correlation of two long (214) 1D sequences of real numbers in about 200 µs.
Definition of cross-correlation¶
The unnormalized discrete correlation of two sampled functions $a$ and $b$ as generally defined in signal processing texts (for example, numpy.correlate and MATLAB xcorr) is:
$$c_{ab}[delay] = \sum_n a[n+delay] \cdot conj(b[n])$$
This is known as the sliding dot product.
The above definition is not unique and sometimes correlation may be defined differently (for example Wikipedia):
$$c'_{ab}[delay] = \sum_n conj(a[n]) \cdot b[n+delay],$$
which is related to the first definition:
$$c'_{ab}[delay] = c_{ab}[-delay] = c_{ba}[delay]$$
There is also an ambiguity for which range of $delay$s to compute the correlations and how to deal with out of bounds $delay$ values.
In this tutorial, $a$, $b$ and $c$ will have the same length. $delay$ will range from negative half length to positive half length.
Linear and circular cross-correlation¶
There are two ways of dealing with out of bounds $delay$ values:
- for linear correlation, the out of bounds indices are set to zero (zero padding):
a = [1, 2]
b = [3, 4]
c = [
0 * b[0] + a[1 - 1] * b[1], # delay -1 a[-1]=0
a[0 + 0] * b[0] + a[1 + 0] * b[1],
] # delay 0
print(c)
[4, 11]
- for circular correlation, the out of bounds indices are wrapped around:
a = [1, 2]
b = [3, 4]
c = [
a[0 - 1] * b[0] + a[1 - 1] * b[1], # delay -1 a[-1]=a[1]
a[0 + 0] * b[0] + a[1 + 0] * b[1],
] # delay 0
print(c)
[10, 11]
Linear correlation can be calculated using the circular algorithm by zero padding the input arrays on both sides to twice their lengths:
a = [0, 1, 2, 0]
b = [0, 3, 4, 0]
c = [
a[0 - 1] * b[0] + a[1 - 1] * b[1] + a[2 - 1] * b[2] + a[3 - 1] * b[3],
a[0 + 0] * b[0] + a[1 + 0] * b[1] + a[2 + 0] * b[2] + a[3 + 0] * b[3],
]
print(c)
[4, 11]
Properties of cross-correlation¶
Some properties of the cross-correlation function are relevant for the pair correlation calculations:
According to the cross correlation theorem, the cross-correlation ($\star$) of functions $a(t)$ and $b(t)$ can be calculated using the Fourier transform $\mathcal{F}\{\}$:
$$ a\star b = \mathcal{F}^{-1}(\mathcal{F}\{a\} \cdot (\mathcal{F}\{b\})^*)$$
where $()^*$ denotes the complex conjugate. For long vectors this method is faster to calculate than the sliding dot product.
The cross-correlation of functions $a(t)$ and $b(t)$ is equivalent to the convolution ($*$) of $a(t)$ and $b^*(-t)$:
$$ a\star b = a*b^*(-t),$$
where $b^*$ denotes the conjugate of $b$.
For real valued input, the following symmetry applies:
$$c_{ab}[delay] = c_{ba}[-delay]$$
- The discrete autocorrelation of a sampled function is the discrete correlation of the function with itself. It is always symmetric with respect to positive and negative delays:
$$c_{aa}[delay] = c_{aa}[-delay]$$
- The circular correlation can calculate linear correlation by zero-padding the vectors.
Cross-correlation using pure Python¶
Let's implement the discrete linear correlation function for real input sequences in pure Python using the sliding dot product as defined previously:
$$c_{ab}[delay] = \sum_n a[n+delay] * b[n]$$
def dot_python(a, b, start, stop, delay):
"""Return dot product of two sequences in range."""
sum = 0
for n in range(start, stop):
sum += a[n + delay] * b[n]
return sum
def correlate_python(a, b):
"""Return linear correlation of two sequences."""
size = len(a)
c = [0] * size # allocate output array/list
for index in range(size):
delay = index - size // 2
if delay < 0:
c[index] = dot_python(a, b, -delay, size, delay)
else:
c[index] = dot_python(a, b, 0, size - delay, delay)
return c
It is good practice to write tests to verify code:
def test_correlate(correlate_function):
"""Test linear correlate function using known result."""
# even lengths
c = correlate_function([1, 2], [3, 4])
assert list(c) == [4, 11], c
# uneven lengths
c = correlate_function([1, 2, 3], [4, 8, 16])
assert list(c) == [40, 68, 32], c
test_correlate(correlate_python)
The tests passed, no exception is raised.
Let's time the cross-correlation of two random sequences of length 8192, which are created using list comprehension:
import random
A = [random.random() - 0.5 for _ in range(2**13)]
B = [random.random() - 0.5 for _ in range(2**13)]
%time c = correlate_python(A, B)
CPU times: total: 1.09 s Wall time: 1.57 s
This implementation is about 25,000 times slower than desired (~200 µs) for our pair correlation function image analysis task.
Plot auto-correlation¶
Using the matplotlib 2D plotting library, we plot the auto-correlation of a short random sequence and embed it into the Juyter notebook:
from matplotlib import pyplot
def plot_autocorrelation(size=200):
"""Plot autocorrelation of a random sequence."""
a = [random.random() - 0.5 for _ in range(size)]
c = correlate_python(a, a)
delays = list(range(-len(a) // 2, len(a) // 2))
pyplot.figure(figsize=(6, 6))
pyplot.subplot(2, 1, 1)
pyplot.title('random sequence')
pyplot.ylabel('intensity')
pyplot.plot(a, 'g')
pyplot.subplot(2, 1, 2)
pyplot.title('auto-correlation')
pyplot.xlabel('delay')
pyplot.ylabel('correlation')
pyplot.plot(delays, c, 'r')
pyplot.tight_layout()
pyplot.show()
plot_autocorrelation()
The autocorrelation is always symmetric with respect to positive and negative delays.
For long random sequences, the autocorrelation approaches an impulse function.
Interactively plot cross-correlation¶
Using IPython widgets, we plot the cross-correlation of two short random sequences with peak, where the peak in sequence $b$ is delayed with respect to sequence $a$:
from ipywidgets import Dropdown, IntSlider, interact
from matplotlib import pyplot
def plot_crosscorrelation(size=100):
"""Interactively plot cross-correlation of signals with delayed peak."""
delays = list(range(-size // 2, size // 2))
a = [random.random() - 0.5 for _ in range(size)]
b = [random.random() - 0.5 for _ in range(size)]
a[size // 2] = 10 # add peak in middle of sequence
def _plot(option, delay):
b_ = b.copy()
b_[size // 2 + delay] = 10 # add peak at shifted position
if option.endswith('b'):
c = correlate_python(a, b_)
else:
c = correlate_python(b_, a)
pyplot.figure(figsize=(6, 6))
pyplot.subplot(2, 1, 1)
pyplot.title('random sequences with peak')
pyplot.ylabel('intensity')
pyplot.plot(a, 'g', label='a')
pyplot.plot(b_, 'b', label='b')
pyplot.ylim([-2, 12])
pyplot.yticks([0, 5, 10])
pyplot.legend(fancybox=True, framealpha=0.5)
pyplot.subplot(2, 1, 2)
pyplot.title('cross-correlation')
pyplot.xlabel('delay')
pyplot.ylabel('correlation')
pyplot.xlim([-size // 2, size // 2])
pyplot.ylim([-20, 120])
pyplot.yticks([0, 50, 100])
pyplot.plot(delays, c, 'r', label=option)
pyplot.legend(fancybox=True, framealpha=0.5)
pyplot.tight_layout()
pyplot.show()
interact(
_plot,
option=Dropdown(options=['a\u2605b', 'b\u2605a']),
delay=IntSlider(
value=size // 5,
min=2 - size // 2,
max=size // 2 - 1,
continuous_update=False,
),
)
plot_crosscorrelation()
A positive delay of the peak in sequence $b$ with respect to the peak in sequence $a$ shows as a peak at a negative delay in the cross-correlation of a and b ($a \star b$).
Multi-threading¶
Let's try to use Python's concurrent.futures module to run several correlation functions in parallel on multiple CPU cores within the same process using threads (the smallest sequences of programmed instructions that can be managed independently by the operating system):
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def map_threaded(function, *iterables, max_workers=MAXCPUS, **kwargs):
"""Apply function to every item of iterable and return list of results.
Use a pool of threads to execute calls asynchronously.
"""
if kwargs:
function = partial(function, **kwargs)
with ThreadPoolExecutor(max_workers) as executor:
return list(executor.map(function, *iterables))
%time c = map_threaded(correlate_python, [A, A], [B, B])
assert c[0] == c[1]
CPU times: total: 859 ms Wall time: 3.13 s
There is no improvement over executing the functions sequentially.
In CPython, the reference Python implementation we are using, only one thread can execute Python code at once due to the Global Interpreter Lock (GIL).
Threading is still an appropriate model in Python to run multiple I/O-bound tasks simultaneously.
We demonstrate later that Python functions implemented in C can release the GIL and be executed on multiple CPU cores using threads.
Cross-correlation using C¶
Let's compare the performance of the pure Python function to an implementation in C.
The speed of compiled C code is often used as a reference when comparing single-threaded performance.
%%writefile correlate_c.c
/* A linear correlate function implemented in C. */
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
typedef ptrdiff_t ssize_t;
/* Compute dot product of two sequences in range. */
double dot_c(double *a, double *b, ssize_t start, ssize_t end, ssize_t delay)
{
ssize_t n;
double sum = 0.0;
for (n = start; n < end; n++)
sum += a[n + delay] * b[n];
return sum;
}
/* Compute linear correlation of two one-dimensional sequences. */
void correlate_c(double *a, double *b, double *c, ssize_t size)
{
ssize_t index, delay;
for(index = 0; index < size; index++) {
delay = index - size / 2;
if (delay < 0) {
c[index] = dot_c(a, b, -delay, size, delay);
}
else {
c[index] = dot_c(a, b, 0, size-delay, delay);
}
}
}
/* Time the correlate_c function. */
int main()
{
ssize_t i;
ssize_t size = 8192;
ssize_t loops = 25;
double *a = (double*)malloc(size * sizeof(double));
double *b = (double*)malloc(size * sizeof(double));
for (i = 0; i < size; i++) {
a[i] = (double)rand()/(double)(RAND_MAX) - 0.5;
b[i] = (double)rand()/(double)(RAND_MAX) - 0.5;
}
for (i = 0; i < loops; i++) {
double *c = (double*)malloc(size * sizeof(double));
correlate_c(a, b, c, size);
free(c);
}
free(a);
free(b);
return 0;
}
Writing correlate_c.c
Python's distutils.ccompiler module can be used to compile and link the C code:
from distutils import ccompiler
compiler = ccompiler.new_compiler()
objects = compiler.compile(['correlate_c.c'], extra_postargs=['-O2'])
compiler.link_executable(objects, 'correlate_c')
The generated executable can be timed using Jupyter's magick:
correlate_executable = './correlate_c'
t = %timeit -r 1 -q -o ! $correlate_executable
print('{:.2f} ms per loop'.format(t.best * 1000 / 25))
18.74 ms per loop
The C program calculates the correlation about two orders of magnitudes faster than Python.
Python lists¶
So far, we have used Python's built-in list type to store sequences of floating-point numbers.
Depending on the Python implementation and platform:
every item of a list is an 8-byte pointer to an object storing the value.
every floating-point number is stored as a 24-byte object.
Hence, Python lists are very inefficient for storing large number of homogeneous numerical data:
the numbers are not stored contiguously in a Python list.
a Python list of floating-point numbers is about 4x larger than a C array:
import random
import sys
size = 8192
alist = [random.random() for _ in range(size)]
print(
'Storage size of Python list: {:>6} bytes'.format(
sys.getsizeof(alist) + sys.getsizeof(alist[0]) * size
)
)
print('Storage size of C array: {:>6} bytes'.format(8 + size * 8))
Storage size of Python list: 263832 bytes Storage size of C array: 65544 bytes
Why Python?¶
So far, we have shown that:
- Python built-in lists cannot efficiently store homogeneous numerical data.
- Python runs numerical code orders of magnitudes slower than compiled C.
- Python code cannot be run in parallel on multiple CPU cores in the same process.
Note that this applies to CPython, the Python reference implementation, only. Other Python implementations (pypy, Jython, IronPython) might not have these limitations.
Why do we consider Python for big data image processing and analysis?¶
There are technical solutions to overcome those limitations:
The numpy library provides a standardized, efficient N-dimensional array object to store homogeneous numerical data.
Many third-party libraries (numpy, scipy, scikit-image, etc.) provide fast implementations of numerical functions operating on numpy arrays.
Python can be extended using modules written in C, which can release the GIL.
Python code can be type annotated and compiled to C code using Cython.
Python code using a subset of the language synthax can be just in time compiled to LLVM, CUDA, or OpenCL and executed on CPU or GPU, for example, via numba.
Putting the limitations into perspective: besides CPU bound numerical calculations, there are many other tasks that are part of an efficient image processing pipeline:
Many tasks are I/O bound (load or save data from/to the Internet, hard drive, or databases) and can be efficiently multi-threaded in Python.
Besides threading, there are other methods to analyze data in parallel: SIMD, multiprocessing, distributed.
Python can be used to drive/control/schedule compile and compute tasks, for example, generate, compile, and execute C/OpenCL/CUDA code at runtime.
As an image analyst or end user of imaging software, Python can be used
as a glue language for external libraries or executables written in C, Fortran, R, Java, .NET, Matlab, etc.
for data munging, such as mapping image and meta-data from a diversity of formats (raw binary, html, CSV, TIFF, HDF5, etc.) and sources (file system, databases, http, ftp, cloud storage) into more convenient formats.
as a scripting language for imaging software such as ZEN, μManager, CellProfiler, MeVisLab, ArcGIS, Amira, ParaView, VisIt, GIMP, Blender, OMERO, BisQue, etc.
import numpy
def correlate_numpy(a, b):
"""Return linear correlation of two one-dimensional arrays."""
return numpy.correlate(a, b, mode='same')
def test_correlate(correlate_function, **kwargs):
"""Test correlate function using known results."""
c = correlate_function(
numpy.array([1.0, 2.0, 3.0]), numpy.array([4.0, 8.0, 16.0]), **kwargs
)
assert numpy.allclose(c, [40.0, 68.0, 32.0]), c
c = correlate_function(
numpy.array([1.0, 2.0, 3.0, 4.0]),
numpy.array([5.0, 6.0, 7.0, 8.0]),
**kwargs,
)
assert numpy.allclose(c, [23.0, 44.0, 70.0, 56.0]), c
test_correlate(correlate_numpy)
A = numpy.random.random(2**13) - 0.5
B = numpy.random.random(2**13) - 0.5
%timeit correlate_numpy(A, B)
5.18 ms ± 116 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Depending on how the numpy library was compiled, the numpy.correlate function is several times faster than our implementation in C.
Numpy uses an optimized version of the dot product (from the BLAS library) for calculating the sliding dot product.
The storage size of the numpy array is close to a C array. The overhead of less than 100 bytes matters only for scalar values and small arrays:
print('Storage size of numpy array: {} bytes'.format(sys.getsizeof(A)))
Storage size of numpy array: 65648 bytes
Numpy multi-threaded¶
Many numpy functions release the GIL and can be run in parallel on multiple CPU cores:
%timeit map_threaded(correlate_numpy, [A, A], [B, B])
5.3 ms ± 57.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
However, depending on the numpy build options, some functions might use threads internally, which can cause oversubscription and parallel execution to be slower than sequential. In such cases, when using numpy built with Intel's MKL library, set the MKL_NUM_THREADS
environment variable to 1 to disable the libraries internal use of threads.
Cross-correlation using Cython¶
Cython is an optimizing static compiler for both the Python programming language and the extended Cython programming language.
Cython makes writing C extensions for Python and interfacing with numpy arrays and C libraries easy. Unlike numba, Cython requires an external, Python compatible C compiler.
Cython integrates well with the Jupyter Notebook:
%reload_ext Cython
Here we use Cython to implement the sliding dot product cross-correlation by
- type annotating the Python code
- releasing the GIL
- using typed memoryviews to access data in numpy arrays
- compiling the code to machine code via Python C extension
%%cython -f --compile-args=-O2
#
#cython: boundscheck=False
#cython: wraparound=False
import numpy
cdef double dot_cython(
double[::1] a,
double[::1] b,
ssize_t start,
ssize_t end,
ssize_t delay
) noexcept nogil:
"""Return dot product of two sequences in range."""
cdef:
ssize_t n
double sum
sum = 0.0
for n in range(start, end):
sum += a[n + delay] * b[n]
return sum
def correlate_cython(double[::1] a not None, double[::1] b not None):
"""Return linear correlation of two one-dimensional arrays."""
cdef ssize_t size, delay, index
size = len(a)
result = numpy.empty(size, dtype=numpy.float64)
# numpy array objects cannot be accessed in a nogil section
# use a Cython typed memoryview instead
cdef double[::1] c = result
with nogil:
for index in range(size):
delay = index - size // 2
if delay < 0:
c[index] = dot_cython(a, b, -delay, size, delay)
else:
c[index] = dot_cython(a, b, 0, size-delay, delay)
return result
test_correlate(correlate_cython)
%timeit correlate_cython(A, B)
%timeit map_threaded(correlate_cython, [A, A], [B, B])
18.1 ms ± 29.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 18.5 ms ± 18.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
The Cython implementation is about as fast as the C implementation. Since the function releases the GIL, it can efficiently run in parallel on multiple CPU cores using multi-threading.
Cython's code analysis can be helpful in type annotating and optimizing code. Try it using %%cython --annotate
at the top of the previous cell.
Using Cython with OpenMP¶
OpenMP (Open Multi-Processing) is an application programming interface (API) that supports multi-platform shared memory multiprocessing programming in C, C++, and Fortran.
Cython's prange
function is implemented using OpenMP's "#pragma omp parallel for
" directive.
We instruct the C compiler to use OpenMP by specifying extra platform specific compiler and linker arguments, which were defined at the beginning of the notebook in the OPENMP_ARGS
variable:
%%cython -f --compile-args=-O2 $OPENMP_ARGS
#
#cython: boundscheck=False
#cython: wraparound=False
import numpy
from cython.parallel import prange, parallel
cdef double dot_cython(
double[::1] a,
double[::1] b,
ssize_t start,
ssize_t end,
ssize_t delay
) noexcept nogil:
"""Return dot product of two sequences in range."""
cdef:
ssize_t n
double sum
sum = 0.0
for n in range(start, end):
sum += a[n + delay] * b[n]
return sum
def correlate_cython_omp(
double[::1] a not None,
double[::1] b not None,
int num_threads=0
):
"""Return linear correlation of two one-dimensional arrays."""
cdef:
ssize_t size, delay, index
double[::1] c
size = a.size
result = numpy.empty(size, dtype=numpy.float64)
c = result
with nogil, parallel(num_threads=num_threads):
for index in prange(size):
delay = index - size // 2
if delay < 0:
c[index] = dot_cython(a, b, -delay, size, delay)
else:
c[index] = dot_cython(a, b, 0, size-delay, delay)
return result
test_correlate(correlate_cython_omp)
%timeit correlate_cython_omp(A, B)
%timeit map_threaded(correlate_cython_omp, [A, A], [B, B])
1.65 ms ± 7.93 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each) 5.74 ms ± 46.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Depending on the number of CPU cores, this function is several times faster than the previous implementation, but it can no longer be efficiently multi-threaded because the function already uses all CPU cores via OpenMP.
This implementation is slightly faster than using the vsldCorrExec1D
function in direct mode from Intel's Math Kernel Library (MKL) (not shown).
import numba
import numpy
@numba.jit(nogil=True)
def dot_numba(a, b, start, stop, delay):
"""Return dot product of two sequences in range."""
sum = 0.0
for n in range(start, stop):
sum += a[n + delay] * b[n]
return sum
@numba.jit
def correlate_numba(a, b):
"""Return linear correlation of two one-dimensional arrays."""
size = len(a)
c = numpy.empty(size, numpy.float64) # allocate output numpy array
for index in range(size):
delay = index - size // 2
if delay < 0:
c[index] = dot_numba(a, b, -delay, size, delay)
else:
c[index] = dot_numba(a, b, 0, size - delay, delay)
return c
test_correlate(correlate_numba)
%timeit correlate_numba(A, B)
%timeit map_threaded(correlate_numba, [A, A], [B, B])
25.9 ms ± 359 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) 53.6 ms ± 288 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Compiling Python code with numba achieves speed comparable to C.
The function cannot be run in parallel on CPU cores even though the inner dot
function releases the GIL. The loops need to be factored out to a function that does not hold the GIL.
Parallelized numba code¶
To further improve the Numba implementation of the correlation function, we
move the code of the outer loop
for index in range(size):
into its own function namedcorrelate_numba_jit
and release the GILuse the
numba.prange
function to parallelize the outer loop
import numba
import numpy
@numba.jit(nogil=True)
def dot_numba(a, b, start, stop, delay):
"""Return dot product of two sequences in range."""
sum = 0.0
for n in range(start, stop):
sum += a[n + delay] * b[n]
return sum
@numba.jit(nogil=True, parallel=True)
def correlate_numba_jit(c, a, b, size):
"""Compute linear correlation of two arrays using sliding-dot product."""
for index in numba.prange(size):
delay = index - size // 2
if delay < 0:
c[index] = dot_numba(a, b, -delay, size, delay)
else:
c[index] = dot_numba(a, b, 0, size - delay, delay)
def correlate_numba_parallel(a, b):
"""Return linear correlation of two one-dimensional arrays."""
size = len(a)
c = numpy.empty(size, numpy.float64)
with THREADLOCK:
correlate_numba_jit(c, a, b, size)
return c
test_correlate(correlate_numba_parallel)
%timeit correlate_numba_parallel(A, B)
%timeit map_threaded(correlate_numba_parallel, [A, A], [B, B])
2.44 ms ± 64.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 8.15 ms ± 37.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Just-in-time compile Python code to CUDA using Numba¶
The numba.cuda decorator can translate Python functions into PTX code, which execute on the CUDA hardware, for example, a NVidia graphics card with thousands of cores.
In the CUDA execution model, a kernel function is executed once by each thread on a grid of blocks of threads. The grid can be 1, 2, or 3 dimensional. The threads within each block can synchronize and share memory. Thread blocks can execute independently in any order. The kernel function can determine the absolute position of the current thread in the entire grid of blocks.
import numpy
import numba.cuda
@numba.cuda.jit(device=True, inline=True)
def dot_cuda(a, b, start, stop, delay):
"""Return dot product of two sequences in range."""
sum = 0.0
for i in range(start, stop):
sum += a[i + delay] * b[i]
return sum
@numba.cuda.jit()
def correlate_cuda_kernel(c, a, b, size):
"""CUDA kernel to compute linear correlation of two arrays."""
# global position of the thread in the 1D grid
index = numba.cuda.grid(1)
if index < size:
delay = index - size // 2
if delay < 0:
c[index] = dot_cuda(a, b, -delay, size, delay)
else:
c[index] = dot_cuda(a, b, 0, size - delay, delay)
def correlate_numba_cuda(a, b):
"""Return linear correlation of two one-dimensional arrays."""
size = a.size
c = numpy.zeros(size, numpy.float64)
# launch the CUDA kernel
threadsperblock = 32
blockspergrid = (size + (threadsperblock - 1)) // threadsperblock
correlate_cuda_kernel[blockspergrid, threadsperblock](c, a, b, size)
return c
if not SKIP_CUDA:
test_correlate(correlate_numba_cuda)
%timeit correlate_numba_cuda(A, B)
# crashes: %timeit map_threaded(correlate_numba_cuda, [A, A], [B, B])
1.35 ms ± 25.4 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Running this on a 1920 core GPU is about 20 times faster than on a single core CPU device. For longer input arrays, the GPU will be significant faster (see below).
Switching to frequency domain¶
So far, we have calculated the cross-correlation in the time domain using the sliding dot product.
For longer sequences, it is more efficient to use the cross-correlation theorem to calculate the cross-correlation in the frequency domain using Fourier transforms $\mathcal{F}\{\}$:
$$ a\star b = \mathcal{F}^{-1}(\mathcal{F}\{a\} \cdot (\mathcal{F}\{b\})^*)$$
Cross-correlation using Scipy's convolution function¶
The scipy library provides many efficient numerical routines such as numerical integration and optimization.
The scipy.signal.fftconvolve
function uses zero-padding and the Fast Fourier Transform (FFT) according to the convolution theorem to calculate the convolution of two arrays.
Recall that the cross-correlation of functions $a(t)$ and $b(t)$ is equivalent to the convolution ($*$) of $a(t)$ and $b^*(-t)$:
$$ a\star b = a*b^*(-t),$$
It means that correlation can be calculated using convolution by reversing the second input array:
import scipy.signal
def correlate_scipy(a, b):
"""Return circular correlation of two one-dimensional arrays."""
return scipy.signal.fftconvolve(a, b[::-1], 'same')
test_correlate(correlate_scipy)
%timeit correlate_scipy(A, B)
%timeit map_threaded(correlate_scipy, [A, A], [B, B])
253 µs ± 10.4 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each) 655 µs ± 7.9 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
This is about an order of magnitude faster and scales much better with larger array sizes (not shown) than the multi-threaded Cython implementation of the sliding dot product.
Circular cross-correlation using FFT¶
Let's implement a circular correlation function using numpy's FFT functions according to the cross-correlation theorem:
$$ a\star b = \mathcal{F}^{-1}(\mathcal{F}\{a\} \cdot (\mathcal{F}\{b\})^*)$$
import numpy.fft
def correlate_fft(a, b):
"""Return circular correlation of two one-dimensional arrays."""
# forward DFT
a = numpy.fft.rfft(a)
b = numpy.fft.rfft(b)
# multiply by complex conjugate
a *= b.conj()
# reverse DFT
c = numpy.fft.irfft(a)
# shift
c = numpy.fft.fftshift(c)
return c
%timeit correlate_fft(A, B)
%timeit map_threaded(correlate_fft, [A, A], [B, B])
120 µs ± 2.76 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each) 459 µs ± 3.72 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Recall that the circular correlation function can calculate the linear correlation by zero-padding the arrays (to twice the size for even length arrays):
import numpy
c = correlate_fft(
numpy.pad(A, A.size // 2, mode='constant'),
numpy.pad(B, B.size // 2, mode='constant'),
)
# remove zero-padding from result
c = c[A.size // 2 : -A.size // 2]
# compare to linear correlation
assert numpy.allclose(c, correlate_numpy(A, B))
In general, circular correlation should only be used to analyze cyclic functions. However, later we demonstrate that for our specific application the results obtained from circular correlation do not significantly differ from linear correlation.
Circular cross-correlation using CUDA FFT¶
CuPy is an implementation of a NumPy-compatible multi-dimensional array on CUDA.
To run the FFT based circular correlation function on a GPU, we
- move the input numpy arrays to the current GPU device using
cupy.asarray()
- use FFT functions from
cupy.fft
instead ofnumpy.fft
- move the result array from the GPU device to the host using
cupy.asnumpy()
if not SKIP_CUDA:
import cupy.fft
def correlate_cufft(a, b):
"""Return circular correlation of two one-dimensional arrays."""
# move arrays to the current GPU device
a = cupy.asarray(a)
b = cupy.asarray(b)
a = cupy.fft.rfft(a)
b = cupy.fft.rfft(b)
a *= b.conj()
c = cupy.fft.irfft(a)
c = cupy.fft.fftshift(c)
# move array from GPU device to the host
return cupy.asnumpy(c)
if not SKIP_CUDA:
%timeit correlate_cufft(A, B)
# very slow: %timeit map_threaded(correlate_cufft, [A, A], [B, B])
326 µs ± 2.48 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
This GPU version runs slower than the CPU version due to the overhead of moving small arrays from/to the device and executing several small kernel functions.
Use Cython with a C FFT library¶
The fft2d C library by Takuya Ooura provides efficient functions to compute Fast Fourier Transforms (FFT).
Cython makes it relatively easy to use C libraries from Python.
The file fftsg.c
defines a function rdft
, which computes forward and inverse DFT of real input arrays:
Fast Fourier/Cosine/Sine Transform
dimension :one
data length :power of 2
decimation :frequency
radix :split-radix
data :inplace
table :use
functions
rdft: Real Discrete Fourier Transform
function prototypes
void rdft(int, int, double *, int *, double *);
-------- Real DFT / Inverse of Real DFT --------
[definition]
<case1> RDFT
R[k] = sum_j=0^n-1 a[j]*cos(2*pi*j*k/n), 0<=k<=n/2
I[k] = sum_j=0^n-1 a[j]*sin(2*pi*j*k/n), 0<k<n/2
<case2> IRDFT (excluding scale)
a[k] = (R[0] + R[n/2]*cos(pi*k))/2 +
sum_j=1^n/2-1 R[j]*cos(2*pi*j*k/n) +
sum_j=1^n/2-1 I[j]*sin(2*pi*j*k/n), 0<=k<n
[usage]
<case1>
ip[0] = 0; // first time only
rdft(n, 1, a, ip, w);
<case2>
ip[0] = 0; // first time only
rdft(n, -1, a, ip, w);
[parameters]
n :data length (int)
n >= 2, n = power of 2
a[0...n-1] :input/output data (double *)
<case1>
output data
a[2*k] = R[k], 0<=k<n/2
a[2*k+1] = I[k], 0<k<n/2
a[1] = R[n/2]
<case2>
input data
a[2*j] = R[j], 0<=j<n/2
a[2*j+1] = I[j], 0<j<n/2
a[1] = R[n/2]
ip[0...*] :work area for bit reversal (int *)
length of ip >= 2+sqrt(n/2)
strictly,
length of ip >=
2+(1<<(int)(log(n/2+0.5)/log(2))/2).
ip[0],ip[1] are pointers of the cos/sin table.
w[0...n/2-1] :cos/sin table (double *)
w[],ip[] are initialized if ip[0] == 0.
[remark]
Inverse of
rdft(n, 1, a, ip, w);
is
rdft(n, -1, a, ip, w);
for (j = 0; j <= n - 1; j++) {
a[j] *= 2.0 / n;
}
We use Python's distutils module to compile the fftsg.c
C code into a static link library:
from distutils import ccompiler
compiler = ccompiler.new_compiler()
objects = compiler.compile(['fft2d/fftsg.c'], extra_postargs=['-fPIC', '-O2'])
compiler.create_static_lib(objects, 'ftt2d', output_dir='.')
To use the ftt2d
library from Cython, we need to include the declaration from the C header file and allocate temporary arrays:
%%cython -f --compile-args=-O2 -I. -l./ftt2d
#
#cython: boundscheck=False
#cython: wraparound=False
import numpy
from libc.stdlib cimport malloc, free
from libc.math cimport sqrt
cdef extern from 'fft2d.h':
void rdft(int n, int isgn, double *a, int *ip, double *w) nogil
def correlate_cython_fft2d(a, b):
"""Return circular correlation of two one-dimensional arrays."""
cdef:
ssize_t size = a.size
double scale = 2.0 / size
double[::1] a_
double[::1] b_
double *w_
int *ip_
int s
# copy input arrays. rdft computes in-place
result = numpy.copy(a)
a_ = result
b_ = numpy.copy(b)
with nogil:
# allocate cos/sin table
w_ = <double *>malloc((size // 2) * sizeof(double))
if not w_:
with gil:
raise MemoryError('could not allocate w_')
# allocate work area for bit reversal
ip_ = <int *>malloc((2 + <int>(sqrt((size//2) + 0.5))) * sizeof(int))
if not ip_:
with gil:
raise MemoryError('could not allocate ip_')
ip_[0] = 0
# forward DFT
rdft(size, 1, &b_[0], ip_, w_)
rdft(size, 1, &a_[0], ip_, w_)
# multiply by complex conjugate
multiply_conj(a_, b_, size)
# reverse DFT
rdft(size, -1, &a_[0], ip_, w_)
# shift and scale results
fftshift(a_, size, scale)
free(w_)
free(ip_)
return result
cdef void multiply_conj(
double[::1] a,
double[::1] b,
ssize_t size
) noexcept nogil:
"""In-place multiply a by complex conjugate of b."""
cdef:
ssize_t i
double ar, br, ai, bi
a[0] = a[0] * b[0]
a[1] = a[1] * b[1]
for i in range(2, size, 2):
ar = a[i]
ai = a[i+1]
br = b[i]
bi = b[i+1]
a[i] = ar * br + ai * bi
a[i+1] = ai * br - ar * bi
cdef void fftshift(
double[::1] a,
ssize_t size,
double scale
) noexcept nogil:
"""In-place shift zero-frequency component to center of spectrum."""
cdef:
ssize_t i
double t
size //= 2
for i in range(size):
t = a[i]
a[i] = a[i + size] * scale
a[i + size] = t * scale
assert numpy.allclose(correlate_cython_fft2d(A, B), correlate_fft(A, B))
%timeit correlate_cython_fft2d(A, B)
%timeit map_threaded(correlate_cython_fft2d, [A, A], [B, B])
58.4 µs ± 265 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each) 307 µs ± 1.39 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
So far this is the fastest implementation of the correlate function. It can be run multi-threaded, although for short input arrays the overhead of multi-threading is detrimental.
Compare implementations¶
Finally, we compare several implementations of the cross-correlate function using longer time series of 16384 samples:
import timeit
import numpy
from IPython.display import display
from ipywidgets import IntProgress
def time_functions(functions, size=2**14, max_workers=MAXCPUS):
"""Return runtimes of single and multi-threaded correlation functions."""
progress = IntProgress(min=0, max=2 * len(functions))
display(progress)
a = numpy.random.random(size) - 0.5
b = numpy.random.random(size) - 0.5
ab = [a] * max_workers, [b] * max_workers
result = []
for function in functions:
try:
func = globals()[function]
t0 = timeit.Timer(lambda: func(a, b)).timeit(number=1)
number = max(2, int(1 / t0))
t0 = timeit.Timer(lambda: func(a, b)).timeit(number)
progress.value += 1
if 'cu' not in function:
t1 = timeit.Timer(
lambda: map_threaded(func, *ab, max_workers=max_workers)
).timeit(number)
else:
# do not run CUDA multi-threaded
t1 = math.inf
result.append(
[
'{:.2f}'.format(t0 * 1e3 / number),
'{:.2f}'.format(t1 * 1e3 / number),
'{:.1f}'.format(t0 / t1 * max_workers),
]
)
progress.value += 1
except Exception:
result.append([float('nan')] * 3)
progress.close()
try:
import pandas
columns = [
'1 thread / ms',
'{} threads / ms'.format(max_workers),
'speedup',
]
return pandas.DataFrame(result, index=functions, columns=columns)
except ImportError:
return result
display(
time_functions(
[
# 'correlate_python'
'correlate_numpy',
'correlate_cython',
'correlate_cython_omp',
'correlate_numba',
'correlate_numba_parallel',
'correlate_numba_cuda',
'correlate_scipy',
'correlate_fft',
'correlate_cufft',
'correlate_cython_fft2d',
]
)
)
IntProgress(value=0, max=20)
1 thread / ms | 8 threads / ms | speedup | |
---|---|---|---|
correlate_numpy | 21.02 | 30.06 | 5.6 |
correlate_cython | 72.74 | 91.53 | 6.4 |
correlate_cython_omp | 6.45 | 53.23 | 1.0 |
correlate_numba | 108.23 | 849.19 | 1.0 |
correlate_numba_parallel | 9.57 | 99.85 | 0.8 |
correlate_numba_cuda | 2.68 | inf | 0.0 |
correlate_scipy | 0.52 | 2.19 | 1.9 |
correlate_fft | 0.22 | 1.43 | 1.2 |
correlate_cufft | 0.80 | inf | 0.0 |
correlate_cython_fft2d | 0.14 | 0.91 | 1.2 |
The best implementation of the correlate function is almost as fast as desired (~200 µs) for the pCF analysis of images.
The correlate function could be further optimized by implementing it in C++ and using the DFTi
functions of the closed source Intel MKL library. Expect another 30% speed improvement.
2. Implement pair correlation function analysis of small image time series¶
Now that we have developed a fast cross-correlation function and learned techniques to speed-up Python code, we use them to analyze a small simulated time series of images.
Load and explore simulated images¶
The Simulation_Channel.bin
file contains the result of a simulation of fluorescent particles diffusing on a 64x64 grid. The grid contains a diagonal, 300 nm wide channel, which restricts free diffusion. The file was produced using the Globals for Images · SimFCS software.
The simulated images at 32,000 time steps are stored contiguously as 16-bit unsigned integers in the file.
The time samples are not stored contiguously. Accessing time series will be inefficient while spatial access will be fast:
The numpy.fromfile
function can be used to load the raw binary data into a 3D numpy array:
import numpy
def rawread(filename, shape, dtype):
"""Return array data from binary file."""
count = numpy.prod(shape, dtype=numpy.intp)
count = count if count >= 0 else -1
data = numpy.fromfile(filename, dtype=dtype, count=count)
data.shape = shape
return data
SIMULATION_DATA = rawread(
os.path.join(DATA_PATH, 'Simulation_Channel.bin'),
shape=(-1, 64, 64),
dtype=numpy.uint16,
)
print('Data shape:', SIMULATION_DATA.shape)
Data shape: (32000, 64, 64)
For FFTs to be performed efficiently, the size of the time axis is truncated to a power of two:
def shape2pow2(data, axis):
"""Return array with axis truncated to power of 2."""
try:
iter(axis)
except TypeError:
axis = [axis]
slices = []
for i, size in enumerate(data.shape):
if i in axis:
size = 2 ** int(math.log(size, 2))
slices.append(slice(0, size))
return data[tuple(slices)]
SIMULATION_DATA = shape2pow2(SIMULATION_DATA, axis=0)
print('Truncated shape:', SIMULATION_DATA.shape)
Truncated shape: (16384, 64, 64)
Let's plot the averages over the time axis (mean image) and the spatial axes (mean time series):
from matplotlib import pyplot
def plot_image_timeseries(image_timeseries):
"""Plot temporal and spatial means of image timeseries."""
pyplot.figure(figsize=(6, 8))
pyplot.subplot(3, 1, (1, 2))
mean_image = numpy.mean(image_timeseries, axis=0)
pyplot.title('image of temporal mean')
pyplot.imshow(mean_image, cmap='viridis', interpolation='none')
pyplot.colorbar(shrink=0.83, pad=0.05)
pyplot.subplot(3, 1, 3)
mean_ts = numpy.mean(image_timeseries, axis=(1, 2))
pyplot.title('time series of spatial mean')
pyplot.xlabel('time index')
pyplot.ylabel('intensity')
pyplot.plot(mean_ts)
pyplot.xlim([0, len(mean_ts)])
pyplot.tight_layout()
pyplot.show()
plot_image_timeseries(SIMULATION_DATA)
Process cross-correlation functions for image fluorescence fluctuation analysis¶
In image fluctuation correlation spectroscopy, the cross-correlation of two time series of fluorescence intensity, $F_a(t)$ and $F_b(t)$, are regularly processed and presented as follows:
The correlation functions are normalized as follows:
$$ G(\tau) = \dfrac{\big \langle F_a(t) \cdot F_b(t+\tau) \big \rangle}{\langle F_a(t) \rangle \cdot \langle F_b(t) \rangle} - 1 = \dfrac{\big \langle \big(F_a(t) - \langle F_a(t) \rangle\big) \cdot \big(F_b(t) - \langle F_b(t) \rangle\big) \big \rangle}{\langle F_a(t) \rangle \cdot \langle F_b(t) \rangle},$$
where $F(t)$ is the fluorescence intensity signal at time $t$, $\tau$ is the time delay, and $\ \langle \rangle$ the mean.
Only the positive time delays $\tau$ are used. This corresponds to the negative delays for the definition of cross-correlation we used.
The functions are averaged in exponentially increasing bins of time delays.
The log-binned functions are smoothed.
We define the following functions for fluctuation correlation analysis of time series:
import numpy
def correlate_circular(a, b):
"""Return circular correlation of two arrays using DFT."""
size = a.size
# forward DFT
a = numpy.fft.rfft(a)
b = numpy.fft.rfft(b)
# multiply by complex conjugate
c = a.conj() * b
# reverse DFT
c = numpy.fft.irfft(c)
# positive delays only
c = c[: size // 2]
# normalize with the averages of a and b
# c is already normalized by size
# the 0th value of the DFT contains the sum of the signal
c /= a[0].real * b[0].real / size
c -= 1.0
return c
def correlate_linear(a, b):
"""Return linear correlation of two arrays using DFT."""
size = a.size
# subtract mean and pad with zeros to twice the size
a_mean = a.mean()
b_mean = b.mean()
a = numpy.pad(a - a_mean, a.size // 2, mode='constant')
b = numpy.pad(b - b_mean, b.size // 2, mode='constant')
# forward DFT
a = numpy.fft.rfft(a)
b = numpy.fft.rfft(b)
# multiply by complex conjugate
c = a.conj() * b
# reverse DFT
c = numpy.fft.irfft(c)
# positive delays only
c = c[: size // 2]
# normalize with the averages of a and b
c /= size * a_mean * b_mean
return c
def average(c, bins):
"""Return averaged chunks of array."""
out = [numpy.mean(c[: bins[0]])]
for i in range(len(bins) - 1):
out.append(numpy.mean(c[bins[i] : bins[i + 1]]))
return out
def logbins(size, nbins):
"""Return up to nbins exponentially increasing integers from 1 to size."""
b = numpy.logspace(0, math.log(size, 2), nbins, base=2, endpoint=True)
return numpy.unique(b.astype(numpy.intp))
def smooth(c):
"""Return double exponentially smoothed array."""
out = c.copy()
out[0] = out[1]
for i in range(1, len(out)):
out[i] = out[i] * 0.3 + out[i - 1] * 0.7
for i in range(len(out) - 2, -1, -1):
out[i] = out[i] * 0.3 + out[i + 1] * 0.7
return out
Let's plot two time series, their normalized linear and circular cross-correlation functions, the log-binned functions, and the final smoothed log-binned normalized cross-correlation functions.
from ipywidgets import IntSlider, interact
from matplotlib import pyplot
def plot_pcf_processing(image_timeseries):
"""Compare linear and circular pair correlation functions."""
ntimes, height, width = image_timeseries.shape
def _plot(y0, x0, y1, x1):
# select time series from image_timeseries
a = image_timeseries[:, y0, x0]
b = image_timeseries[:, y1, x1]
# linear and circular correlation
cl = correlate_linear(a, b)
cc = correlate_circular(a, b)
# average and smooth
bins = logbins(a.size // 2, 32)
averagedl = average(cl, bins)
smoothedl = smooth(averagedl)
averagedc = average(cc, bins)
smoothedc = smooth(averagedc)
pyplot.figure(figsize=(6, 12))
# plot the time series
pyplot.subplot(4, 1, 1)
pyplot.title('time series')
pyplot.xlabel('time index')
pyplot.ylabel('intensity')
pyplot.plot(a, 'g', label='[{}, {}]'.format(y0, x0))
pyplot.plot(b, 'b', label='[{}, {}]'.format(y1, x0))
pyplot.xlim([0, len(a)])
pyplot.legend(fancybox=True, framealpha=0.9)
# plot the cross-correlation function and logbins
pyplot.subplot(4, 1, 2)
pyplot.title('normalized cross-correlation functions and logbins')
pyplot.xlabel('positive time delay index')
pyplot.ylabel('correlation')
for x in bins:
pyplot.axvline(x=x, color='0.8')
pyplot.plot(cl, 'g', label='linear')
pyplot.plot(cc, 'b', label='circular')
pyplot.xlim([0, len(cc)])
pyplot.legend(fancybox=True, framealpha=0.9)
# log-plot the cross-correlation function and logbins
pyplot.subplot(4, 1, 3)
pyplot.title('log-plot of cross-correlation functions and logbins')
pyplot.xlabel('positive time delay index')
pyplot.ylabel('correlation')
for x in bins:
pyplot.axvline(x=x, color='0.8')
pyplot.semilogx(cl, 'g', label='linear', base=2)
pyplot.semilogx(cc, 'b', label='circular', base=2)
pyplot.xlim([0, len(cc)])
pyplot.legend(fancybox=True, framealpha=0.9)
# plot the binned and smoothed cross-correlation function
pyplot.subplot(4, 1, 4)
pyplot.title('averaged and smoothed cross-correlation functions')
pyplot.xlabel('positive log time delay index')
pyplot.ylabel('correlation')
pyplot.plot(averagedl, 'g', label='linear')
pyplot.plot(smoothedl, 'm')
pyplot.plot(averagedc, 'b', label='circular')
pyplot.plot(smoothedc, 'r', label='smoothed')
pyplot.legend(fancybox=True, framealpha=0.9)
pyplot.tight_layout()
pyplot.show()
interact(
_plot,
y0=IntSlider(31, 0, height - 1, continuous_update=False),
x0=IntSlider(31, 0, width - 1, continuous_update=False),
y1=IntSlider(35, 0, height - 1, continuous_update=False),
x1=IntSlider(35, 0, width - 1, continuous_update=False),
)
plot_pcf_processing(SIMULATION_DATA)
Even though the cross-correlation curves differ significantly at larger delays, when averaged into log-bins the differences are minimal and not significant for our application. Hence, we continue using the faster circular correlation.
Reference implementation of pair correlation image analysis¶
We are ready to implement and run the pair correlation function analysis on small images.
For the reference implementation of the ipcf
function, we use the previously defined pseudo code and the circular correlate function using numpy.fft:
import numpy
from numpy import zeros
def ipcf_reference(image_timeseries, circle_coordinates, bins):
"""Return pair correlation function analysis of image time series.
Cross-correlate the time series of each pixel in the image
with all its neighbors at a certain radius and return all
the log-binned and smoothed correlation curves.
"""
ntimes, height, width = image_timeseries.shape
npoints = len(circle_coordinates)
radius = circle_coordinates[0, 0]
result = zeros(
(height - 2 * radius, width - 2 * radius, npoints, len(bins)),
numpy.float32,
)
for y in range(radius, height - radius):
for x in range(radius, width - radius):
a = image_timeseries[:, y, x]
for i in range(npoints):
u, v = circle_coordinates[i]
b = image_timeseries[:, y + v, x + u]
c = correlate(b, a)
result[y - radius, x - radius, i] = smooth(average(c, bins))
return result
def correlate(a, b):
"""Return normalized circular correlation using DFT."""
size = a.size
# forward DFT
a = numpy.fft.rfft(a)
b = numpy.fft.rfft(b)
# multiply by complex conjugate
c = a * b.conj()
# reverse DFT
c = numpy.fft.irfft(c)
# positive delays only
c = c[: size // 2]
# normalize with the averages of a and b
# c is already normalized by size
# the 0th value of the DFT contains the sum of the signal
c /= a[0].real * b[0].real / size
c -= 1.0
return c
def average(c, bins):
"""Return averaged chunks of array."""
out = [numpy.mean(c[: bins[0]])]
for i in range(len(bins) - 1):
out.append(numpy.mean(c[bins[i] : bins[i + 1]]))
return out
def smooth(c):
"""Return double exponentially smoothed array."""
out = c.copy()
out[0] = out[1]
for i in range(1, len(out)):
out[i] = out[i] * 0.3 + out[i - 1] * 0.7
for i in range(len(out) - 2, -1, -1):
out[i] = out[i] * 0.3 + out[i + 1] * 0.7
return out
def logbins(size, nbins):
"""Return up to nbins exponentially increasing integers from 1 to size."""
b = numpy.logspace(0, math.log(size, 2), nbins, base=2, endpoint=True)
return numpy.unique(b.astype(numpy.intp))
def circle(radius, npoints):
"""Return cartesian coordinates of circle on integer grid."""
angles = numpy.linspace(0, 2 * numpy.pi, npoints, endpoint=False)
coordinates = radius * numpy.array((numpy.cos(angles), numpy.sin(angles)))
return numpy.ascontiguousarray(
numpy.round(coordinates).T.astype(numpy.intp)
)
Now that all functions are defined, we can analyze the simulated data and compare it to the know results.
import numpy
def run_ipcf(
ipcf_function, image_timeseries, radius=6, npoints=32, nbins=32, **kwargs
):
"""Run ipcf_function on image_timeseries."""
ntimes, height, width = image_timeseries.shape
# truncate time axis to power of two
ntimes = 2 ** int(math.log(ntimes, 2))
image_timeseries = image_timeseries[:ntimes]
# calculate circle coordinates
circle_coordinates = circle(radius, npoints)
# calculate log-bins
bins = logbins(ntimes // 2, nbins)
# run the pair correlation function analysis
result = ipcf_function(
image_timeseries, circle_coordinates, bins, **kwargs
)
return result
def test_ipcf(result, expected=None, atol=1e-6):
"""Compare ipcf result to known results from file."""
if expected is None:
expected = SIMULATION_IPCF_EXPECTED
if not numpy.allclose(result, expected, atol=atol):
try:
plot_ipcf_results(result - expected)
except NameError:
print('Test failed')
SIMULATION_IPCF_EXPECTED = rawread(
os.path.join(DATA_PATH, 'Simulation_Channel.ipcf.bin'),
(52, 52, 32, 30),
numpy.float32,
)
%time SIMULATION_IPCF_RESULT = run_ipcf(ipcf_reference, SIMULATION_DATA)
test_ipcf(SIMULATION_IPCF_RESULT)
CPU times: total: 30.3 s Wall time: 54.3 s
About two minutes to analyze a small simulated dataset is slower than expected.
Plot results of image pair correlation function analysis¶
There are two meaningful ways to plot the 4-dimensional array returned by the ipCF analysis.
First, we plot all the pair correlation curves for a selected pixel (aka sprites):
import numpy
from ipywidgets import Dropdown, IntSlider, interact
from matplotlib import pyplot
def plot_ipcf_sprites(ipcf_result, figsize=(6, 5)):
"""Interactively plot pair correlation functions at pixel."""
height, width, npoints, nbins = ipcf_result.shape
# data limits
vmax, vmin = numpy.max(ipcf_result), numpy.min(ipcf_result)
vminmax = max(abs(vmax), abs(vmin))
# coordinates for polar plot and Delaunay triangulation
radius = numpy.arange(nbins)
angles = numpy.linspace(0, 2 * numpy.pi, npoints, endpoint=False)
radius, angles = numpy.meshgrid(radius, angles)
xcoords = radius * numpy.cos(angles)
ycoords = radius * numpy.sin(-angles)
def _plot(style, y, x):
pyplot.figure(figsize=figsize)
pyplot.title('pair correlation functions at pixel')
sprite = ipcf_result[y, x]
if style == 'lines':
pyplot.plot(sprite.T, 'b')
pyplot.ylim([vmin, vmax])
pyplot.xlabel('log time delay index')
pyplot.ylabel('pcf')
elif style == 'carpet':
pyplot.imshow(
sprite,
vmin=-vminmax,
vmax=vminmax,
cmap='seismic',
interpolation='none',
)
pyplot.xlabel('log time delay index')
pyplot.ylabel('circle point index')
pyplot.colorbar()
elif style == 'polar':
# polar plot using Delaunay triangulation
pyplot.tripcolor(
xcoords.flat,
ycoords.flat,
sprite.flat,
vmin=-vminmax,
vmax=vminmax,
shading='gouraud',
cmap='seismic',
)
pyplot.axes().set_aspect('equal')
pyplot.axis('off')
pyplot.colorbar()
pyplot.show()
interact(
_plot,
style=Dropdown(options=['carpet', 'polar', 'lines']),
y=IntSlider(height // 2, 0, height - 1, continuous_update=False),
x=IntSlider(width // 2, 0, width - 1, continuous_update=False),
)
plot_ipcf_sprites(SIMULATION_IPCF_RESULT)
Next, we plot the image of all pair correlation values at a selected circle point and bin index:
import numpy
from ipywidgets import IntSlider, interact
from matplotlib import pyplot
def plot_ipcf_images(ipcf_result, figsize=(6, 5), interpolation='none'):
"""Interactively plot image of pair correlation function values."""
height, width, npoints, nbins = ipcf_result.shape
transpose = height > 1.5 * width
# data limits
vmax, vmin = numpy.max(ipcf_result), numpy.min(ipcf_result)
vminmax = max(abs(vmax), abs(vmin))
def _plot(point, bin):
pyplot.figure(figsize=figsize)
image = ipcf_result[:, :, point, bin]
if transpose:
image = image.T
angle = 360.0 / npoints * point
pyplot.title('pair correlation function values')
pyplot.imshow(
image,
vmin=-vminmax,
vmax=vminmax,
cmap='seismic',
interpolation=interpolation,
)
orientation = 'horizontal' if transpose else 'vertical'
pyplot.colorbar(orientation=orientation)
pyplot.show()
interact(
_plot,
point=IntSlider(npoints // 2, 0, npoints - 1, continuous_update=False),
bin=IntSlider(nbins // 2, 0, nbins - 1, continuous_update=False),
)
plot_ipcf_images(SIMULATION_IPCF_RESULT)
Optimize the ipcf
function¶
The algorithm and implementation of the image pair correlation function can be improved by the following means:
change the data layout such that the time axis becomes contiguous and the individual time series can be accessed much faster
pre-calculate the forward DFT and use it for cross-correlation instead of the data. For our dataset, this will take additional 512 MB RAM
redefine functions to not allocate new arrays on every function call but to write directly to the output array
use the symmetry of the cross-correlation function
correlate(a, b) == correlate(b, a)[::-1]
to avoid duplicate calculationsdecorate the functions with numba.jit
import numba
import numpy
def ipcf_optimized(image_timeseries, circle_coordinates, bins, **kwargs):
"""Return pair correlation function analysis of image time series."""
ntimes, height, width = image_timeseries.shape
npoints = len(circle_coordinates)
nbins = len(bins)
radius = circle_coordinates[0, 0]
result = numpy.zeros(
(height - 2 * radius, width - 2 * radius, npoints, nbins),
numpy.float32,
)
# make time axis last dimension
data = numpy.moveaxis(image_timeseries, 0, -1)
# pre-calculate forward DFT along time axis
rfft_buffer = numpy.fft.rfft(data, axis=-1)
for y in range(radius, height - radius):
for x in range(radius, width - radius):
rfft_a = rfft_buffer[y, x].conj()
for i in range(npoints):
# continue if output was already calculated
if result[y - radius, x - radius, i, 0] != 0.0:
continue
u, v = circle_coordinates[i]
rfft_b = rfft_buffer[y + v, x + u]
# cross-correlate b and a
c = numpy.fft.irfft(rfft_a * rfft_b)
scale = ntimes / rfft_a[0].real / rfft_b[0].real
# positive delays
average_smooth_scale(
c, bins, scale, result[y - radius, x - radius, i]
)
# negative delays
if (
radius <= y + v < height - radius
and radius <= x + u < width - radius
):
c = numpy.fft.fftshift(c)
i = (i + npoints // 2) % npoints
average_smooth_scale(
c[ntimes // 2 : 0 : -1],
bins,
scale,
result[y + v - radius, x + u - radius, i],
)
return result
@numba.jit(nogil=True)
def average_smooth_scale(c, bins, scale, out):
"""Average, smooth, and scale correlation function."""
# average
out[0] = numpy.mean(c[: bins[0]])
for i in range(len(bins) - 1):
out[i + 1] = numpy.mean(c[bins[i] : bins[i + 1]])
# smooth
out[0] = out[1]
for i in range(1, len(bins)):
out[i] = out[i] * 0.3 + out[i - 1] * 0.7
for i in range(len(bins) - 2, -1, -1):
out[i] = out[i] * 0.3 + out[i + 1] * 0.7
# scale
out *= scale
out -= 1.0
%time ipcf_result = run_ipcf(ipcf_optimized, SIMULATION_DATA)
test_ipcf(ipcf_result)
CPU times: total: 3.19 s Wall time: 5.89 s
That's about 6 times faster than the reference implementation. This might be good enough to analyze larger datasets in parallel, however this implementation does not multi-thread very well.
A fast ipcf
function using Cython, OpenMP, and fft2d¶
Now that we have optimized the algorithm of the pCF analysis of images, let's put it all together and implement the function in Cython using OpenMP and the fft2d C library, which was compiled to a static library.
%%cython -f --compile-args=-O2 -I. -l./ftt2d $OPENMP_ARGS
#
#cython: boundscheck=False
#cython: wraparound=False
#cython: cdivision=True
import math
import numpy
from cython.parallel import parallel, prange
cimport numpy
from libc.math cimport sqrt
from libc.stdlib cimport free, malloc
cdef extern from 'fft2d.h':
void rdft(int n, int isgn, double *a, int *ip, double *w) nogil
def ipcf_cython(
numpy.uint16_t [:, :, :] image_timeseries not None,
ssize_t [:, ::1] circle_coordinates not None,
ssize_t [::1] bins not None,
int num_threads=0
):
"""Return pair correlation function analysis of image time series."""
cdef:
ssize_t ntimes = image_timeseries.shape[0]
ssize_t height = image_timeseries.shape[1]
ssize_t width = image_timeseries.shape[2]
ssize_t nbins = bins.shape[0]
ssize_t npoints = circle_coordinates.shape[0]
ssize_t radius = circle_coordinates[0, 0]
ssize_t x, y, u, v, i, t, x1, y1, t1
double scale
double *rfft_a
double *rfft_b
double *a_
double *w_
int *ip_
double [:, :, ::1] rdft_
float[:, :, :, ::1] out
# limit length of time axis to power of two
ntimes = 2**int(math.log(ntimes, 2))
if radius < 2:
raise ValueError('invalid radius')
if width <= 2*radius or height <= 2*radius:
raise ValueError('invalid image size')
if ntimes < 32 or ntimes > 2147483647:
raise ValueError('invalid size of time axis')
# output array
result = numpy.zeros(
(height-2*radius, width-2*radius, npoints, nbins),
dtype=numpy.float32
)
out = result
# buffer for forward DFT
rdft_ = numpy.empty((height, width, ntimes), dtype=numpy.float64)
with nogil:
# rdft cos/sin table
w_ = <double *>malloc(ntimes // 2 * sizeof(double))
if not w_:
with gil:
raise MemoryError('could not allocate w_')
# rdft work area for bit reversal
ip_ = <int*>malloc((2 + <int>(sqrt((ntimes//2) + 0.5))) * sizeof(int))
if not ip_:
with gil:
raise MemoryError('could not allocate ip_')
# initialize ip_ and w_
ip_[0] = 0
rdft(ntimes, 1, &rdft_[0, 0, 0], ip_, w_)
with nogil, parallel(num_threads=num_threads):
# thread-local input/output data
a_ = <double *>malloc(sizeof(double) * ntimes)
if not a_:
with gil:
raise MemoryError('could not allocate a_')
# forward DFT
for y1 in prange(height):
for x1 in range(width):
for t1 in range(ntimes):
rdft_[y1, x1, t1] = <double>image_timeseries[t1, y1, x1]
rdft(ntimes, 1, &rdft_[y1, x1, 0], ip_, w_)
# cross-correlation
for y in prange(radius, height-radius):
for x in range(radius, width-radius):
rfft_a = &rdft_[y, x, 0]
for i in range(npoints):
# continue if output was already calculated
if out[y-radius, x-radius, i, 0] != 0.0:
continue
u = x + circle_coordinates[i, 0]
v = y + circle_coordinates[i, 1]
rfft_b = &rdft_[v, u, 0]
# multiply b's DFT by complex conjugate of a's DFT
multiply_conj(rfft_b, rfft_a, a_, ntimes)
# inverse DFT
rdft(ntimes, -1, a_, ip_, w_)
scale = 2.0 / (rfft_a[0] * rfft_b[0])
# positive delays
average_smooth_scale(
a_,
ntimes,
bins,
nbins,
scale,
out[y-radius, x-radius, i]
)
# negative delays
if (
(v >= radius) and
(v < height-radius) and
(u >= radius) and
(u < width-radius)
):
i = (i + npoints // 2) % npoints
average_smooth_scale(
a_,
ntimes,
bins,
nbins,
scale,
out[v-radius, u-radius, i],
-1
)
free(a_)
free(w_)
free(ip_)
return result
cdef void multiply_conj(
double *a,
double *b,
double *c,
ssize_t size
) noexcept nogil:
"""Multiply `a` by complex conjugate of `b` and store in `c`."""
cdef:
ssize_t i
double ar, br, ai, bi
c[0] = a[0] * b[0]
c[1] = a[1] * b[1]
for i in range(2, size, 2):
ar = a[i]
ai = a[i+1]
br = b[i]
bi = b[i+1]
c[i] = ar * br + ai * bi
c[i+1] = ai * br - ar * bi
cdef void average_smooth_scale(
double *a,
ssize_t size,
ssize_t[::1] bins,
ssize_t nbins,
double scale,
float[::1] out,
int mode=1
) noexcept nogil:
"""Average, smooth, and scale correlation function.
The first nbins items of the input array are changed.
"""
cdef:
ssize_t i, j
double s
# average
if mode == 1:
# positive delays
s = 0.0
for i in range(bins[0]):
s += a[i]
a[0] = s / <double>bins[0]
for j in range(1, nbins):
s = 0.0
for i in range(bins[j-1], bins[j]):
s += a[i]
a[j] = s / <double>(bins[j] - bins[j-1])
else:
# negative delay
s = a[0]
for i in range(1, bins[0]):
s += a[size - i]
a[0] = <float>(s / <double>bins[0])
for j in range(1, nbins):
s = 0.0
for i in range(bins[j-1], bins[j]):
s += a[size - i]
a[j] = s / <double>(bins[j] - bins[j-1])
# smooth
a[0] = a[1]
for i in range(1, nbins):
a[i] = a[i] * 0.3 + a[i-1] * 0.7
for i in range(nbins-2, -1, -1):
a[i] = a[i] * 0.3 + a[i+1] * 0.7
# copy to output with scaling
for i in range(nbins):
out[i] = <float>(a[i] * scale - 1.0)
%time ipcf_result = run_ipcf(ipcf_cython, SIMULATION_DATA, num_threads=MAXCPUS)
test_ipcf(ipcf_result)
CPU times: total: 3.05 s Wall time: 500 ms
This is about 80 times faster than the initial implementation in Python using numpy.fft and fast enough for the analysis of big image time series.
3. Implement out-of-core pair correlation function analysis of big image time series¶
In this section, we use the fast image pair correlation function on small, overlapping chunks of a big image time series that is too large to fit into the computer's main memory at once.
First, we need to convert the experimental data files into a more efficient format, remove large areas of background, and correct the data for photobleaching.
Browse SPIM time series of images¶
The directory nih3t3-egfp_2
contains a 34.5 GB dataset of 35,000 TIFF files. Each file contains a single 1024x512 pixel image stored as 16-bit unsigned integers in big-endian byte order:
The files were acquired as a 2D image time series using µManager software and a custom-built Selective Plane Illumination Microscopy (SPIM) instrument at the Laboratory for Fluorescence Dynamics, University of California, Irvine.
The sample is a NIH3T3 cells expressing EGFP, imaged with a pixel size of 76 nm at 83 frames per seconds.
Let's interactively browse the images in the time series using the tifffile.py module:
import glob
import numpy
import tifffile
from ipywidgets import IntSlider, interact
from matplotlib import pyplot
def browse_images(filenames, vmin=0, vmax=None, imread=tifffile.imread):
"""Interactively plot series of image files."""
if not filenames:
raise ValueError('data files not found')
def _plot(fileindex=0):
filename = filenames[fileindex]
image = imread(filename)
pyplot.figure(figsize=(8, 6))
pyplot.title(os.path.split(filename)[-1])
pyplot.imshow(
image.T,
vmin=vmin,
vmax=vmax,
cmap='viridis',
interpolation='lanczos',
)
pyplot.colorbar(orientation='horizontal')
pyplot.show()
interact(
_plot,
fileindex=IntSlider(0, 0, len(filenames) - 1, continuous_update=False),
)
# sorted list of all TIFF files in SPIM dataset
SPIM_DATASET_NAME = 'nih3t3-egfp_2'
SPIM_FILENAMES = list(
sorted(
glob.glob(os.path.join(DATA_PATH, SPIM_DATASET_NAME, 'Pos0', '*.tif'))
)
)
browse_images(SPIM_FILENAMES)
Notice that:
the images contain large areas of background, which are not of interest for the analysis and can be cropped or masked.
the shape of the cell changes or the cell moves out of focus in the second half of the time series. This part will need to be discarded.
the sample shows strong photobleaching, that is the fluorophore molecules (EGFP) are altered during the acquisition such that it permanently is unable to fluoresce. This decay in intensity needs to be corrected before calculating the pair correlations.
Select image regions of interest¶
To select regions of images that contain objects, we use the morphology and segmentation functions of the scikit-image library.
The image is smoothed using a Gaussian filter and then binarized by an intensity threshold. Small holes are closed, and border artifacts removed. Connected regions are labeled and sorted by their area. Small regions are discarded, and the remaining regions expanded to a multiple of 64 pixels:
import matplotlib
import skimage
import skimage.filters
import skimage.morphology
import skimage.restoration
import skimage.segmentation
import tifffile
from ipywidgets import FloatSlider, IntSlider, interact
from matplotlib import pyplot
# global variable where found regions will be stored
REGIONS_FOUND = []
def find_regions(filenames, imread=tifffile.imread):
"""Interactively find regions of interest in image files."""
def _plot(
fileindex=0,
sigma=4.0,
threshold=0.0,
closegaps=10,
minarea=64 * 64,
pow2size=6,
):
# read image
image = imread(filenames[fileindex])
# normalize image
image = image.astype(numpy.float64)
image -= image.min()
image /= image.max()
# remove noise by smoothing with Gaussian filter
image = skimage.filters.gaussian(image, sigma)
# binarize image with intensity threshold
if threshold == 0.0:
threshold = image.mean()
# skimage.filters offers many threshold_* functions:
# otsu, li, yen, adaptive, and isodata
binary = image > threshold
# close small gaps
binary = skimage.morphology.closing(
binary, skimage.morphology.square(closegaps)
)
# remove artifacts connected to image border
skimage.segmentation.clear_border(binary)
# label image regions
labels = skimage.measure.label(binary)
# discard small regions
regions = (
r for r in skimage.measure.regionprops(labels) if r.area > minarea
)
# sort regions by area
regions = reversed(sorted(regions, key=lambda x: x.area))
def expand_bbox(bbox, shape):
# return bounding box expanded to multiple of modulo
minrow, mincol, maxrow, maxcol = bbox
modulo = 2**pow2size
div, mod = divmod(maxrow - minrow, modulo)
if mod:
d = (div + 1) * modulo
minrow = max(0, minrow - (d - maxrow + minrow) // 2)
maxrow = min(shape[0] - 1, minrow + d)
minrow = max(0, maxrow - d)
div, mod = divmod(maxcol - mincol, modulo)
if mod:
d = (div + 1) * modulo
mincol = max(0, mincol - (d - maxcol + mincol) // 2)
maxcol = min(shape[1] - 1, mincol + d)
mincol = max(0, maxcol - d)
return minrow, mincol, maxrow, maxcol
# keep only bounding box of regions
regions = [expand_bbox(r.bbox, image.shape) for r in regions]
# plot image and regions
pyplot.figure(figsize=(8, 5))
pyplot.imshow(
image.T,
vmin=0.0,
vmax=1.0,
cmap='viridis',
interpolation='lanczos',
)
ax = pyplot.gca()
for region in regions:
minrow, mincol, maxrow, maxcol = region
rect = matplotlib.patches.Rectangle(
(minrow + 1, mincol + 1),
maxrow - minrow - 2,
maxcol - mincol - 2,
fill=False,
edgecolor='red',
linewidth=2,
)
ax.add_patch(rect)
# pyplot.colorbar(orientation='horizontal')
pyplot.show()
# store regions in global variable
global REGIONS_FOUND
REGIONS_FOUND = regions
return regions
interact(
_plot,
fileindex=IntSlider(
0, min=0, max=len(filenames) - 1, continuous_update=False
),
sigma=FloatSlider(
4, min=0.1, max=16.0, step=0.1, continuous_update=False
),
threshold=FloatSlider(
0.0, min=0.0, max=1.0, step=0.01, continuous_update=False
),
closegaps=IntSlider(10, min=1, max=20, continuous_update=False),
minarea=IntSlider(
64 * 64, min=1, max=256 * 256, continuous_update=False
),
pow2size=IntSlider(6, min=0, max=8, continuous_update=False),
)
find_regions(SPIM_FILENAMES)
Save region from SPIM files as chunked HDF5 file¶
For further analysis, we extract the selected region from all TIFF files and save them using a file format that is more efficient for both, image and time series access. Some options are:
a binary file format that can be directly memory-mapped to a numpy array, such as a raw binary file or a ImageJ hyperstack compatible TIFF file.
a chunked, optionally compressed, N-dimensional array storage format that allow numpy-like array access such as HDF5 or zarr.
These memory-mapped and chunked formats have significant advantages over accessing thousands of individual files:
less overhead than parsing individual TIFF files
increased I/O performance because of compression or operating system optimizations
small segments of the data can be accessed through the numpy array interface without reading the entire file into main memory
We will use the h5py library to save the largest selected region from the first 20,000 images of the time series as a chunked, uncompressed dataset in a HDF5 file. Too speed up the process, we use multi-threading and process TIFF files in chunks of 1024 in memory before writing to the HDF5 file:
from concurrent.futures import ThreadPoolExecutor
import h5py
import tifffile
def tiff2hdf5(
hdf5file,
tifffiles,
region,
dataset_name='spim_data',
chunks=(512, 16, 16),
max_workers=32,
):
"""Write image region from TIFF files to chunked dataset in HDF5 file."""
minrow, mincol, maxrow, maxcol = region
image = tifffile.imread(tifffiles[0])
nimages = len(tifffiles)
shape = nimages, maxrow - minrow, maxcol - mincol
dtype = image.dtype
with h5py.File(hdf5file, 'w') as hdf:
if dataset_name in hdf:
del hdf[dataset_name]
dataset = hdf.create_dataset(
dataset_name, shape=shape, dtype=dtype, chunks=chunks
)
dataset.attrs['region'] = region
dataset.attrs['file'] = tifffiles[0]
def convert_chunk(start, size=chunks[0]):
# copy size images from TIFF files to HDF5 dataset
# using a temporary buffer
temp = numpy.empty(
shape=(chunks[0], dataset.shape[1], dataset.shape[2]),
dtype=dataset.dtype,
)
for index, fname in enumerate(tifffiles[start : start + size]):
image = tifffile.imread(fname, key=0)
temp[index] = image[minrow:maxrow, mincol:maxcol]
dataset[start : start + size] = temp[: index + 1]
with ThreadPoolExecutor(max_workers) as executor:
executor.map(convert_chunk, range(0, nimages, chunks[0]))
def cleanup_hdf(remove=False):
"""Close handles and optionally remove existing HDF5 file."""
try:
HDF5_FILE.flush()
except Exception:
pass
try:
del SPIM_DATASET
except Exception:
pass
try:
del SPIM_IPCF_RESULT
except Exception:
pass
try:
HDF5_FILE.close()
except Exception:
pass
if remove:
try:
os.remove(HDF5_FILENAME)
except Exception:
pass
HDF5_FILENAME = os.path.join(SCRATCH_PATH, SPIM_DATASET_NAME) + '.hdf5'
cleanup_hdf(remove=True)
%time tiff2hdf5(HDF5_FILENAME, SPIM_FILENAMES[1000:21000], REGIONS_FOUND[0])
print(
'{} ({:.1f} GB)'.format(
HDF5_FILENAME, os.path.getsize(HDF5_FILENAME) / 1024**3
)
)
CPU times: total: 5.73 s Wall time: 9.59 s ./nih3t3-egfp_2.hdf5 (6.1 GB)
The chunked 3D dataset stored in the HDF5 file can be accessed transparently with a numpy ndarray style API. No data is read from the file until the array is indexed or sliced:
import h5py
# the HDF5 file will stay open until the end of the document
HDF5_FILE = h5py.File(HDF5_FILENAME, 'r+')
SPIM_DATASET = HDF5_FILE['spim_data']
# print information about dataset
print('shape: ', SPIM_DATASET.shape)
print('dtype: ', SPIM_DATASET.dtype)
print('file: ', SPIM_DATASET.attrs['file'])
print('region:', SPIM_DATASET.attrs['region'])
print()
# slicing datasets returns a numpy array in memory
print('reading a single image:')
%time image = SPIM_DATASET[10000]
print(image.shape)
print()
print('reading a single time series:')
%time timeseries = SPIM_DATASET[:, 400, 100]
print(timeseries.shape)
shape: (20000, 832, 192) dtype: uint16 file: ./nih3t3-egfp_2\Pos0\img_000001000_Default_000.tif region: [104 156 936 348] reading a single image: CPU times: total: 0 ns Wall time: 25 ms (832, 192) reading a single time series: CPU times: total: 0 ns Wall time: 2 ms (20000,)
Let's interactively plot images in the spim_data dataset in the HDF5 file:
from ipywidgets import IntSlider, interact
from matplotlib import pyplot
def imshow_ts(image_timeseries, vmin=0, vmax=None):
"""Interactively plot images in time series."""
def _plot(index=0):
image = image_timeseries[index]
pyplot.figure(figsize=(8, 5))
pyplot.imshow(
image.T,
vmin=vmin,
vmax=vmax,
cmap='viridis',
interpolation='lanczos',
)
pyplot.colorbar(orientation='horizontal')
pyplot.show()
interact(
_plot,
index=IntSlider(
0, 0, image_timeseries.shape[0] - 1, 100, continuous_update=False
),
)
imshow_ts(SPIM_DATASET)
Correct for photobleaching¶
As noted before, the sample shows strong photobleaching, which needs to be corrected before correlation analysis.
The exponential decay of the fluorescence intensity due to photobleaching is visualized by plotting the time series of selected pixels:
import numpy
from ipywidgets import IntSlider, interact
from matplotlib import pyplot
def plot_its(image_timeseries, ymax=None):
"""Interactively plot time series at selected pixel."""
ntimes, height, width = image_timeseries.shape
pow2 = int(math.log(ntimes, 2))
t = numpy.arange(ntimes)
def _plot(y, x, start=0, pow2=pow2):
pyplot.figure(figsize=(6, 4))
pyplot.title('time series')
pyplot.xlabel('time index')
pyplot.ylabel('intensity')
stop = min(start + 2**pow2, ntimes)
pyplot.plot(t[start:stop], image_timeseries[start:stop, y, x])
pyplot.gca().set_xlim([start, stop - 1])
pyplot.gca().set_ylim([0, ymax])
pyplot.show()
interact(
_plot,
y=IntSlider(height // 2, 0, height - 1, continuous_update=False),
x=IntSlider(width // 2, 0, width - 1, continuous_update=False),
start=IntSlider(0, 0, ntimes - 2**4, 100, continuous_update=False),
pow2=IntSlider(pow2, 4, pow2, 1, continuous_update=False),
)
plot_its(SPIM_DATASET, ymax=2500)
To correct for photobleaching, we subtract the smoothed time series from themselves, add the signal mean, and correct for differences in intensity fluctuations.
The double exponential smoothing algorithm is the same as used for smoothing the log-binned cross-correlation functions:
%%cython -f --compile-args=-O2 $OPENMP_ARGS
#
#cython: boundscheck=False
#cython: wraparound=False
#cython: cdivision=True
import numpy
from cython.parallel import parallel, prange
cimport numpy
from libc.math cimport fabs, round, sqrt
from libc.stdlib cimport free, malloc
ctypedef numpy.uint16_t uint16_t
cdef void highpass_filter(
uint16_t[:] data,
uint16_t[::1] out,
double *a,
ssize_t size,
ssize_t filtersize
) noexcept nogil:
"""Subtract smoothed, add mean, and correct deficit.
Using double exponential smoothing with factor 1/filtersize.
"""
cdef:
ssize_t i
ssize_t sumd
double t
uint16_t d
double f0 = 1.0 / <double>filtersize
double f1 = 1.0 - f0
double mean
double deficit
sumd = data[0]
a[0] = <double>data[0]
for i in range(1, size):
d = data[i]
sumd += d
a[i] = <double>d * f0 + a[i-1] * f1
for i in range(size-2, -1, -1):
a[i] = a[i] * f0 + a[i+1] * f1
mean = sumd / size
for i in range(size):
t = <double>data[i]
deficit = sqrt(fabs(mean / a[i]))
t = round(deficit * (t - a[i]) + mean)
if t < 0.5:
t = 0.0
elif t > 65534.5:
t = 65535.0
else:
t = t + 0.5
out[i] = <uint16_t>t
def correct_bleaching(
numpy.ndarray[uint16_t, ndim=3] image_timeseries,
ssize_t filtersize=1024,
int num_threads=0
):
"""Return time series for exponential photobleaching.
The first and last 'filtersize' samples of 'image_timeseries'
are removed.
"""
cdef:
uint16_t[:, :, ::1] out
uint16_t[:, :, ::] data = image_timeseries
ssize_t ntimes = data.shape[0]
ssize_t height = data.shape[1]
ssize_t width = data.shape[2]
ssize_t t, y, x
double *a_
if filtersize <= 0:
return image_timeseries
# allocate output array with contiguous time axis
result = numpy.empty((height, width, ntimes), image_timeseries.dtype)
out = result
with nogil, parallel(num_threads=num_threads):
# thread-local input/output data
a_ = <double *>malloc(sizeof(double) * ntimes)
if a_ == NULL:
with gil:
raise MemoryError('could not allocate a_')
for y in prange(height):
for x in range(width):
highpass_filter(
data[:, y, x], out[y, x, :], a_, ntimes, filtersize
)
free(a_)
result = numpy.moveaxis(result, -1, 0)
result = result[filtersize:-filtersize]
return result
Since the input and output data will likely not fit in RAM, we apply the photobleaching correction on a small part of the image for testing and visualization:
%time corrected = correct_bleaching(SPIM_DATASET[:, 400:432, 80:112])
plot_its(corrected, ymax=1000)
CPU times: total: 844 ms Wall time: 85.1 ms
Out-of-core image analysis using Dask¶
The multi-gigabyte SPIM image time series is too large to be analyzed by our implementation of the image pCF analysis and photobleaching correction functions because it likely exceeds the available main memory on our personal computer.
The Dask library can be used to chop the big image in smaller blocks/chunks and schedule the analysis of individual blocks based on available memory, CPU cores or cluster nodes.
Dask also supports overlapping blocks where the borders between neighboring blocks overlap, as required for the ipCF analysis:
Operations on dask arrays are lazy and are queued. No computations are performed until values are requested to be computed.
Dask can work with arrays on disks such as arrays stored in HDF5 and memory-mapped binary files. Only when computations are performed are data loaded into memory.
Dask provides serial, threaded, multiprocessing, and distributed schedulers that scale from laptop computers to clusters of computers. The default scheduler is threaded, which works well for our task.
The run_ipcf_blocked
function returns the computed results as a numpy array in memory by default. For larger datasets, the block results can be written incrementally to a memory-mapped array on disk or a HDF5 dataset (see below).
We test the blocked ipCF analysis on the simulated dataset:
import dask
import dask.array
def run_ipcf_blocked(
image_timeseries,
output=None,
chunks=(32, 32),
radius=6,
npoints=32,
nbins=32,
filtersize=0,
ipcf_function=ipcf_cython,
correct_bleaching=correct_bleaching,
num_workers=MAXCPUS,
num_threads=2,
):
"""Run ipcf_function on small overlapping blocks of image timeseries."""
ntimes, height, width = image_timeseries.shape
# truncate time axis to power of two
ntimes = 2 ** int(math.log(ntimes - 2 * filtersize, 2))
# calculate circle coordinates
circle_coordinates = circle(radius, npoints)
# calculate log-bins
bins = logbins(ntimes // 2, nbins)
nbins = bins.size
# create a dask chunked array
blocks = dask.array.from_array(
image_timeseries,
chunks=(image_timeseries.shape[0], chunks[0], chunks[1]),
)
# correct bleaching on overlapping blocks
corrected = blocks.map_overlap(
correct_bleaching,
depth=(0, radius, radius),
boundary=(0, 1, 1),
trim=False,
dtype=blocks.dtype,
filtersize=filtersize,
num_threads=num_threads,
)
# ipfc on corrected, overlapping blocks
ipcf_result = corrected.map_blocks(
ipcf_function,
dtype=numpy.float32,
chunks=(chunks[0], chunks[1], npoints, nbins),
drop_axis=0,
new_axis=[2, 3],
circle_coordinates=circle_coordinates,
bins=bins,
num_threads=num_threads,
)
# execute the compute graph or determine shape of result
if output == 'shape':
output = ipcf_result.shape
elif output is None:
output = ipcf_result.compute(num_workers=num_workers)
else:
ipcf_result.store(output, num_workers=num_workers)
return output
radius = 6
%time ipcf_result = run_ipcf_blocked(SIMULATION_DATA, radius=radius)
test_ipcf(ipcf_result[radius:-radius, radius:-radius])
CPU times: total: 4.66 s Wall time: 813 ms
Run pair correlation function analysis on SPIM image time series¶
Finally, we run the ipCF analysis on the region of interest extracted from the SPIM images and corrected for photobleaching.
We'll use chunks of 64x64 pixels, half of the CPU cores for dask worker threads, and 2 threads for OpenMP. The analysis of each chunk uses ~400 MB RAM and the result array uses 540 MB. The size of chunks and number of worker vs OpenMP threads should be adjusted based on the image size, available main memory, and CPU cores.
The blocks of the ipCF analysis results are directly saved into a dataset in the same HDF5 file:
args = dict(
radius=6,
npoints=32,
nbins=32,
filtersize=1024,
chunks=(64, 64),
ipcf_function=ipcf_cython,
num_workers=MAXCPUS,
num_threads=2,
)
# remove previous results
try:
del SPIM_IPCF_RESULT
except Exception:
pass
if 'ipcf_result' in HDF5_FILE:
del HDF5_FILE['ipcf_result']
# determine size of output array
shape = run_ipcf_blocked(SPIM_DATASET, 'shape')
# allocate a new dataset in the HDF5 file
output = HDF5_FILE.create_dataset(
'ipcf_result', shape=shape, dtype=numpy.float32
)
# run the analysis with output to HDF5 dataset
%time SPIM_IPCF_RESULT = run_ipcf_blocked(SPIM_DATASET, output, **args)
CPU times: total: 5min 14s Wall time: 27.3 s
The 4D result array can be visualized using the previously defined interactive plot_ipcf_sprites
and plot_ipcf_images
functions:
plot_ipcf_sprites(SPIM_IPCF_RESULT)
plot_ipcf_images(SPIM_IPCF_RESULT, figsize=(10, 6), interpolation='lanczos')
Cleanup¶
Close the HDF5 file when the datasets are no longer needed in the notebook:
cleanup_hdf(remove=False)
Outlook¶
In this tutorial, we developed code to perform pair correlation function analysis of a big image time series (3D) and ran it on a personal computer.
We may now move on to:
- analyze and visualize the ipCF results with Globals for Images · SimFCS software
- skip computing pair correlation in uninteresting regions using a pixel mask
- improve error handling, testing, and documentation
- create a Python package, which can be imported into other modules
- create a dynamic C library, which can be used from Python and other programming languages
- implement 3D pair- and cross-channel correlation function analysis of 5D data
- explore 2D or 3D correlation techniques (lSTICS, iMSD)
- analyze bigger image time series on a cluster using dask/distributed
- implement the ipCF algorithm as a CUDA kernel
References¶
Image pair correlation function analysis¶
Carmine Di Rienzo, Enrico Gratton, Fabio Beltram, and Francesco Cardarelli. Spatiotemporal fluctuation analysis: a powerful tool for the future nanoscopy of molecular processes. Biophys J. 2016; 111(4): 679-685. PMCID: PMC5002078
Carmine Di Rienzo, Francesco Cardarelli, Mariagrazia Di Luca, Fabio Beltram, and Enrico Gratton. Diffusion tensor analysis by two-dimensional pair correlation of fluorescence fluctuations in cells. Biophys J. 2016; 111(4): 841-851. PMCID: PMC5002073
Enrico Gratton. Globals software tutorial - 2D pair correlation function analysis (2016) [Online]
Enrico Gratton. LFD Workshop 2016 lecture 6: The pair correlation approach (2016) [Online]
Scientific Computing in Python¶
Python Software Foundation. The Python Programming Language (2016) [Online]
Charles R Harris et al. Array programming with NumPy. Nature, 585(7825), 357-362 (2020)
John D. Hunter. Matplotlib: A 2D Graphics Environment, Computing in Science & Engineering, 9, 90-95 (2007)
Jones E, Oliphant E, Peterson P, et al. SciPy: Open Source Scientific Tools for Python (2016) [Online]
Siu Kwan Lam, Antoine Pitrou, and Stanley Seibert. Numba: A LLVM-based Python JIT Compiler. Proceedings of the Second Workshop on the LLVM Compiler Infrastructure in HPC. Article No. 7 (2015)
Fernando Pérez and Brian E. Granger. IPython: A System for Interactive Scientific Computing. Computing in Science & Engineering, 9, 21-29 (2007)
Stefan Behnel, Robert Bradshaw, Craig Citro, Lisandro Dalcin, Dag Sverre Seljebotn and Kurt Smith. Cython: The Best of Both Worlds, Computing in Science and Engineering, 13, 31-39 (2011)
Stéfan van der Walt, Johannes L. Schönberger, Juan Nunez-Iglesias, François Boulogne, Joshua D. Warner, Neil Yager, Emmanuelle Gouillart, Tony Yu and the scikit-image contributors. scikit-image: Image processing in Python. PeerJ 2:e453 (2014)
Matthew Rocklin. Dask: Parallel Computation with Blocked algorithms and Task Scheduling. Proceedings of the 14th Python in Science Conference, Scipy2015 (2015)
Andrew Collette. Python and HDF5. O’Reilly book (2013)
System information¶
Print information about hardware and software used to generate this document:
%matplotlib inline
%reload_ext Cython
import datetime
import math
import multiprocessing
import os
import shutil
import sys
from distutils import ccompiler
import notebook
print(sys.executable)
print('Python', sys.version, end='\n\n')
for module in (
'IPython',
'notebook',
'ipywidgets',
'widgetsnbextension',
'numpy',
'scipy',
'matplotlib',
'skimage',
'numba',
'cupy',
'h5py',
'Cython',
'dask',
'tifffile',
):
try:
__import__(module)
except Exception:
continue
lib = sys.modules[module]
print(module.lower(), getattr(lib, '__version__', 'Unknown'))
print('\nCompiler type:', ccompiler.new_compiler().compiler_type, end='\n\n')
print(multiprocessing.cpu_count(), 'CPU cores')
try:
import psutil
print(
'{:.0f} GB main memory\n'.format(psutil.virtual_memory()[0] / 2**30)
)
except ImportError:
pass
try:
import numba.cuda
print(numba.cuda.gpus[0].name.decode('utf8'))
except Exception:
pass
if shutil.which('nvcc'):
print()
!nvcc --version
print()
try:
print('Duration:', datetime.datetime.now() - START_TIME)
print()
except NameError:
pass
print(datetime.datetime.now())
X:\Python311\python.exe Python 3.11.7 (tags/v3.11.7:fa7a6f2, Dec 4 2023, 19:24:49) [MSC v.1937 64 bit (AMD64)] ipython 8.18.1 notebook 7.0.6 ipywidgets 8.1.1 widgetsnbextension 4.0.9 numpy 1.26.2 scipy 1.11.3 matplotlib 3.8.2 skimage 0.22.0 numba 0.58.1 cupy 12.3.0 h5py 3.10.0 cython 3.0.6 dask 2023.12.0 tifffile 2023.9.26 Compiler type: msvc 28 CPU cores 96 GB main memory Quadro RTX 5000 nvcc: NVIDIA (R) Cuda compiler driver Copyright (c) 2005-2023 NVIDIA Corporation Built on Tue_Aug_15_22:09:35_Pacific_Daylight_Time_2023 Cuda compilation tools, release 12.2, V12.2.140 Build cuda_12.2.r12.2/compiler.33191640_0 Duration: 0:04:34.902883 2023-12-08 09:25:03.567517