In this notebook, we will learn how to use what we learned about lazy evaluation with ImgLib2 on a Spark cluster.
Spark depends on all methods and data to be serializable. You can use Java’s default serialization, or you can implement your own custom serializers. For this workshop, we will depend on default serialization, and we will work around things that are not serializable by default (e.g. all of [ImgLib2] and [N5]).
For the Spark tutorial, we will work with an actual Java application from our favorite IDE. Please import the small project in /i2k2024-lazy-utils
.
The main application is org.janelia.saalfeldlab.i2k2024.SparkWorkshop
. For local experiments, we can start this with the runtime parameters:
-Dspark.master=local[12]
-Xmx16g
and application parameters e.g.:
--n5dataset='/em/fibsem-uint16/s4'
--n5outdataset='/em/fibsem-uint16/s4-contrast-enhanced'
--n5url='/home/saalfeld/tmp/jrc_hela-2.n5'
--n5outurl='/home/saalfeld/tmp/jrc_hela-2.n5'
--scaleindex=4
You can observe the running application from the standard URL of Spark’s WebUI http://localhost:4040.
Tricks to make it more efficient
A Spark cluster consists of Worker Nodes (computers) that run Executors (JVMs), each running Tasks (in Threads). Since we do not want to serialize complicated large data across the Spark cluster, each task has to open readers, writers, and the entire image:
.foreach(gridBlock -> {
rddGrid
final var n5 = new N5Factory().openReader(n5Url);
final RandomAccessibleInterval<T> img = N5Utils.open(n5, n5Dataset);
/* Use the new ImageJ plugin contrast limited local contrast normalization */
final var cllcn =
new ImageJStackOp<T>(
.extendZero(img),
Views(fp) -> new CLLCN(fp).run(
,
blockRadius,
blockRadius3f,
10,
0.5f,
true,
true,
true),
,
blockRadius0,
65535);
/* create a cached image factory with reasonable default values */
final var cacheFactory = new ReadOnlyCachedCellImgFactory(
new ReadOnlyCachedCellImgOptions()
.volatileAccesses(true) // < use volatile accesses for display
.cellDimensions(256, 256, 32)); // < standard block size for this example
final var cllcned = cacheFactory.create(
.dimensionsAsLongArray(), // < the size of the result
img.getType().createVariable(), // < the type that is used to generate the result pixels
img::accept); // < the consumer that creates each cell
cllcn
/* crop the block of interest */
final var block = Views.offsetInterval(cllcned, gridBlock[0], gridBlock[1]);
final N5Writer n5Writer = new N5Factory().openWriter(n5OutUrl);
.saveNonEmptyBlock(
N5Utils,
block,
n5Writer,
n5OutDataset[2],
gridBlock.getType().createVariable());
img});
This is not a problem in itself because opening those readers and images is lazy and amounts to the work required to serialize and deserialize from meta-data (except for storage services with slow authentication…). However, we have learned that ImgLib2 cache stores the lazily generated results, and with the naive approach above, we do not re-use cached results, because each threads produces their own independent cache. With a simple trick, we can make sure that all our objects are Singletons per JVM, such that all Tasks on an Executor share the same objects and cell cache:
import net.imglib2.algorithm.util.Singleton;
// [...]
.foreach(gridBlock -> {
rddGrid
final var n5 = Singleton.get(
"n5" + n5Url,
() -> new N5Factory().openReader(n5Url));
final RandomAccessibleInterval<T> img = Singleton.get(
"img" + n5Dataset,
() -> (RandomAccessibleInterval<T>)N5Utils.open(n5, n5Dataset));
final var cllcned = Singleton.get(
"cllcned" + n5OutDataset,
() -> {
/* Use the new ImageJ plugin contrast limited local contrast normalization */
final var cllcn =
new ImageJStackOp<T>(
.extendZero(img),
Views(fp) -> new CLLCN(fp).run(
,
blockRadius,
blockRadius3f,
10,
0.5f,
true,
true,
true),
,
blockRadius0,
65535);
/* create a cached image factory with reasonable default values */
final var cacheFactory = new ReadOnlyCachedCellImgFactory(
new ReadOnlyCachedCellImgOptions()
.volatileAccesses(true) // < use volatile accesses for display
.cellDimensions(256, 256, 32)); // < standard block size for this example
return (RandomAccessibleInterval<T>)cacheFactory.create(
.dimensionsAsLongArray(), // < the size of the result
img.getType().createVariable(), // < the type that is used to generate the result pixels
img::accept); // < the consumer that creates each cell
cllcn});
/* crop the block of interest */
final var block = Views.offsetInterval(cllcned, gridBlock[0], gridBlock[1]);
final N5Writer n5Writer = Singleton.get(
"n5Writer" + n5OutUrl,
() -> new N5Factory().openWriter(n5OutUrl));
.saveNonEmptyBlock(
N5Utils,
block,
n5Writer,
n5OutDataset[2],
gridBlock.getType().createVariable());
img});
More tricks in the future…