Source code for spinningdiskanalyzer.imaging.io

"""Input/output of image files"""

from .vips import pyvips

import re
from numpy    import array
from numpy    import ndarray
from datetime import datetime
from pathlib  import Path
from logging  import getLogger


log = getLogger(__name__)


[docs] def read(source: Path) -> ndarray: """Reads an image from the given `source` path into an array.""" log.debug(f'Reading image file: {source}') image = pyvips.Image.new_from_file(source) return image.numpy()
[docs] def write(target: Path, image: ndarray | pyvips.Image, overwrite=False): """ Saves the `image` as the given `target` file. An appropriate compression method is selected depending on the image format of the file, which is inferred from the file's suffix, e.g. `.tif` or `.jpg`. Lossless compression is used for formats that support it, such as `.tif` and `.png`. The level of compression we select strikes a balance between small file size and fast output speed. """ log.debug(f'Writing image file: {target}') if target.exists(): if overwrite: target.unlink() else: error = f'Target file already exists: {target}' log.error(error) raise FileExistsError(error) if isinstance(image, ndarray): image = pyvips.Image.new_from_array(image) elif isinstance(image, pyvips.Image): pass else: error = f'Image is of invalid data type: {type(image)}' log.error(error) raise TypeError(error) match target.suffix.lower(): case '.tif' | '.tiff': image.tiffsave(target, compression='lzw') case '.png': image.pngsave(target, compression=9) case '.jpg' | 'jpeg': image.jpegsave(target, Q=80, optimize_coding=True) case _: error = 'Invalid image format with suffix: {target.suffix}' log.error(error) raise ValueError(error)
[docs] def compress(source: Path, target: Path, grayscale=False): """ Compresses the `source` image file and saves it as `target`. The file format of the target is inferred from the file suffix, for example `.tif` for the TIFF image format. Optionally, the image can be converted to `grayscale`, which means averaging over the color channels. """ log.debug(f'Compressing image file: {source}') if target.exists(): error = f'Target file already exists: {target}' log.error(error) raise FileExistsError(error) image = pyvips.Image.new_from_file(source) if grayscale: image = image.bandmean() write(target, image)
[docs] def sort_into_grid(folder: Path, suffix='.png') -> ndarray[Path]: """ Finds input images in `folder` and sorts them into the correct grid. The control software of the spinning-disk microscope outputs partial images acquired while scanning the full area. We need to find those images among other files possibly residing in the same folder and arrange them correctly. The position of an individual image in the scan grid is encoded in its file name. Note that, in the past, there have been bugs in the acquisition software that resulted in incorrectly names files. Raises appropriate exceptions if, for whatever reason, the folder does not contain the expected set of input images. Returns a 2d array of `files`. """ log.debug(f'Looking for scan-grid images in: {folder}') # Path must point to an existing directory. if not folder.exists(): error = f'Folder does not exist: {folder}' log.error(error) raise FileNotFoundError(error) if not folder.is_dir(): error = f'Folder is not a directory: {folder}' log.error(error) raise NotADirectoryError(error) # Add leading period to suffix if missing. if not suffix.startswith('.'): suffix = '.' + suffix # Find image files. files = [] pattern = re.compile(r'(\d{6}_\d{6})_X_([-0-9]+)_Y_([-0-9]+)_Z_([-0-9]+)') for path in folder.glob(f'*{suffix}'): file = {} file['path'] = path match = pattern.match(path.stem) if not match: continue file['t'] = datetime.strptime(match.group(1), '%y%m%d_%H%M%S') file['x'] = re.sub(r'(\d)-(\d)', r'\1.\2', match.group(2)) file['y'] = re.sub(r'(\d)-(\d)', r'\1.\2', match.group(3)) file['z'] = re.sub(r'(\d)-(\d)', r'\1.\2', match.group(4)) files.append(file) if not files: error = f'Found no {suffix} images in given folder.' log.error(error) raise IOError(error) # Make sure the (x,y) positions form a regular and complete grid. x_values = {file['x'] for file in files} y_values = {file['y'] for file in files} (m, n) = (len(x_values), len(y_values)) if len(files) != m*n: error = f'Expected to find {m*n} images, but found {len(files)}.' log.error(error) raise IOError(error) # Sort by (x,y) coordinates from top left to bottom right. files = sorted( files, key=lambda file: (float(file['x']), float(file['y'])) ) # Reshape into a 2d array. paths = [file['path'] for file in files] grid = array(paths).reshape(m, n) return grid
[docs] def stitch(files: ndarray[Path], target: Path): """ Stitches scan-grid images into the full image. The `files` must be passed in as a 2d array, the shape of which determines the grid. That is typically the result returned by {func}`sort_into_grid`. The stitched image will be saved as `target`. """ log.debug('Stitching images into full grid.') if target.exists(): error = f'Target file already exists: {target}' log.error(error) raise FileExistsError(error) images = [pyvips.Image.new_from_file(file) for file in files.ravel()] stitched = pyvips.Image.arrayjoin(images, across=files.shape[1]) write(target, stitched)
[docs] def tile(source: Path, target: Path, tile_size: int = 512, overlap: int = 0): """ Creates a tiled zoom-level pyramid from a large image. The `source` file is typically a high-resolution image that is intended to be displayed interactively, at various zoom levels. The `target` must be a `.dzi` file. The input image will be converted to the DeepZoom format, essentially a collection of small tiles (which may or may not overlap somewhat). Internally, the individual tiles are just `.png` images that can be quickly loaded from disk for given zoom level and position in the larger image. """ log.info('Generating tiles of zoom-level image pyramid.') log.debug(f'source: {source}') if not source.exists(): error = f'Source file does not exist: {source}' log.error(error) raise FileNotFoundError(error) if not source.is_file(): error = f'Source is not a file: {source}' log.error(error) raise IsADirectoryError(error) log.debug(f'target: {target}') if target.suffix == '.dzi': target = target.with_suffix('') elif target.suffix == '': pass else: error = f'Target has unexpected suffix: {target.suffix}' log.error(error) raise ValueError(error) if target.with_suffix('.dzi').exists(): error = f'Target already exists: {target}' log.error(error) raise FileExistsError(error) image = pyvips.Image.new_from_file(source) image.dzsave( target, tile_size=tile_size, overlap=overlap, suffix='.png[compression=1]', depth=pyvips.enums.ForeignDzDepth.ONETILE, ) log.info('Finished generating tiles.')