Source code for spinningdiskanalyzer.frontend.tasks

"""Background tasks triggered by the front-end."""

from .. import imaging

from PySide6.QtCore import QObject
from PySide6.QtCore import Signal
from PySide6.QtCore import QRunnable
from PySide6.QtCore import QThread
from PySide6.QtCore import QThreadPool

from matplotlib.figure import Figure
from time              import sleep
from pathlib           import Path
from shutil            import rmtree
from collections.abc   import Iterable


[docs] class Task(QThread): """Thread base class supporting status updates""" status_update = Signal(str) """Emitted with a message when the status bar should be updated.""" status_clear = Signal() """Emitted when the status bar should be cleared."""
[docs] def status(self, message: str): """Updates the status bar with the given message.""" self.status_update.emit(message)
[docs] class Worker(QRunnable): """Worker thread to be used in thread pools""" def __init__(self): self.signals = WorkerSignals() super().__init__()
[docs] class WorkerSignals(QObject): """Qt signals for worker threads""" done = Signal() """Emitted when the worker thread is done."""
[docs] class CompressImages(Task): """Compresses the input images."""
[docs] def start(self, sources: Path, targets: Path): self.sources = sources self.targets = targets super().start()
[docs] def run(self): self.targets.mkdir(exist_ok=True, parents=True) sources = imaging.sort_into_grid(self.sources).ravel() targets = [ (self.targets/source.name).with_suffix('.tif') for source in sources ] if all(target.exists() for target in targets): return for target in targets: if target.exists(): target.unlink() for (source, target) in zip(sources, targets): self.status(f'Compressing input image: {target.name}') imaging.compress(source, target, grayscale=True)
[docs] class StitchImages(Task): """Stitches the full image."""
[docs] def start(self, sources: Path, target: Path, suffix: str = '.tif'): self.sources = sources self.target = target self.suffix = suffix super().start()
[docs] def run(self): if self.target.exists(): return self.status('Stitching full image…') files = imaging.sort_into_grid(self.sources, self.suffix) imaging.stitch(files, self.target)
[docs] class TileImage(Task): """Tiles a large image into a zoom-level pyramid."""
[docs] def start(self, source: Path, target: Path): self.source = source self.target = target super().start()
[docs] def run(self): if self.target.exists(): return self.status('Tiling display image…') self.target.parent.mkdir(exist_ok=True, parents=True) imaging.tile(self.source, self.target)
[docs] class RollBall(Task): """Corrects background using the rolling-ball method."""
[docs] def start(self, sources: Path, targets: Path, radius: int): self.sources = sources self.targets = targets self.radius = radius self.n = self.N = 0 self.threadpool = QThreadPool.globalInstance() super().start()
[docs] def run(self): sources = self.sources targets = self.targets radius = self.radius targets.mkdir(exist_ok=True, parents=True) files = imaging.sort_into_grid(sources, '.tif').ravel() self.N = len(files) for source in files: target = targets/source.name if not target.exists(): worker = RollBallWorker(source, target, radius) worker.signals.done.connect(self.thread_done) self.threadpool.start(worker) else: self.n += 1 self.display_progress() while self.n < self.N: sleep(0.1)
def display_progress(self): progress = f'{self.n}/{self.N}' self.status(f'Correcting background… {progress}') def thread_done(self): self.n += 1 self.display_progress()
[docs] def terminate(self): self.threadpool.clear() super().terminate()
[docs] class RollBallWorker(Worker): done = Signal() def __init__(self, source: Path, target: Path, radius: int): self.source = source self.target = target self.radius = radius self.signals = WorkerSignals() super().__init__()
[docs] def run(self): image = imaging.read(self.source) image = imaging.rolling_ball(image, radius=self.radius) imaging.write(self.target, image) self.signals.done.emit()
[docs] class BernsenThreshold(Task): """Performs local thresholding using the Bernsen method."""
[docs] def start(self, source: Path, target: Path, radius: int, contrast: int): self.source = source self.target = target self.radius = radius self.contrast = contrast super().start()
[docs] def run(self): if self.target.exists(): return self.status('Performing local thresholding…') image = imaging.read(self.source) image = imaging.thresholding_bernsen(image, self.radius, self.contrast) imaging.write(self.target, image)
[docs] class OutlineDisk(Task): """Draws the disk boundary onto the image."""
[docs] def start(self, source: Path, target: Path, x: int, y: int, r: int): self.source = source self.target = target (self.x, self.y, self.r) = (x, y, r) super().start()
[docs] def run(self): if self.target.exists(): return self.status('Outlining disk…') image = imaging.read(self.source) image = imaging.circle.draw(image, self.x, self.y, self.r) imaging.write(self.target, image)
[docs] class SegmentCells(Task): """Performs the cell segmentation."""
[docs] def start(self, source: Path, target: Path, x: int, y: int, r: int): self.source = source self.target = target (self.x, self.y, self.r) = (x, y, r) super().start()
[docs] def run(self): self.status('Segmenting cells…') if self.target.exists(): return image = imaging.read(self.source) cells = imaging.cells.find(image, self.x, self.y, self.r) imaging.cells.write(self.target, cells)
[docs] class MarkCells(Task): """Marks the cells in the image."""
[docs] def start(self, source: Path, target: Path, cells: Path): self.source = source self.target = target self.cells = cells super().start()
[docs] def run(self): if self.target.exists(): return self.status('Marking cells…') image = imaging.read(self.source) cells = imaging.cells.read(self.cells) image = imaging.cells.draw(image, cells) imaging.write(self.target, image)
[docs] class FitCurve(Task): """Fits curve to cell density data."""
[docs] def start(self, figure: Figure, cells: Path, target: Path, n_bins: int): self.figure = figure self.cells = cells self.target = target self.n_bins = n_bins super().start()
[docs] def run(self): if self.target.exists(): return self.status('Fitting curve…') cells = imaging.cells.read(self.cells) distances = cells['distance_rel'].values (x_data, y_data, x_fit, y_fit, params) = imaging.analysis.fit( distances, self.n_bins) imaging.analysis.plot( self.figure, x_data, y_data, x_fit, y_fit, params ) self.figure.savefig(self.target) self.figure.savefig(self.target.with_suffix('.png')) self.figure.savefig(self.target.with_suffix('.svg'))
[docs] def delete(items: Iterable[Path]): """Deletes given files or folders.""" for item in items: if item.is_file(): item.unlink() elif item.is_dir(): rmtree(item)