3.2 Working with larger than memory data#
Introduction#
In this notebook, we’ll read an ITS_LIVE data cube like in the previous notebook, this time focusing on strategies and approaches for working with such a large dataset in memory.
Tip
If you’re not familiar with terms like Dask, chunking and parallelization, we highly suggest checking out the brief overview in Relevant Concepts and the resources linked therein.
A. Compare approaches for reading larger than memory data
Calling
xr.open_dataset()
withchunks = 'auto'
Calling
xr.open_dataset()
withchunks = {}
An out-of-order time dimension
Read the dataset without Dask
B. Organize data once its in memory
Arrange dataset in chronological order
Convert to a Dask-backed
Xarray.Dataset
Expand the next cell to see specific packages used in this notebook and relevant system and version information.
Show code cell source
%xmode minimal
import inspect
import warnings
import geopandas as gpd
import xarray as xr
warnings.simplefilter(action="ignore", category=FutureWarning)
Show code cell output
Exception reporting mode: Minimal
A. Compare approaches for reading larger than memory data#
This section uses functions we defined in the data access notebook, all of which are stored in the itslive_tools.py
file. If you cloned this tutorial from its github repository you’ll see that itslive_tools.py
is in the same directory as our current notebook, so we can import it with the following line:
import itslive_tools
Read in the catalog again, and use the find_granule_by_point()
to find the URL that points to the ITS_LIVE granule covering your area of interest.
itslive_catalog = gpd.read_file("https://its-live-data.s3.amazonaws.com/datacubes/catalog_v02.json")
# Find urls for granule covering point of interest
url = itslive_tools.find_granule_by_point([95.180191, 30.645973])
url
'http://its-live-data.s3.amazonaws.com/datacubes/v2-updated-october2024/N30E090/ITS_LIVE_vel_EPSG32646_G0120_X750000_Y3350000.zarr'
The function that we defined in the previous notebook, read_in_s3()
, supports different options for reading large, chunked raster datasets. Before we use that again in this notebook, we will explore these options and the ways that they can impact how we work with the data. You can learn more about reading Zarr data with Xarray here, and see the different chunking options that are supported and which we will demonstrate below here.
1) chunks = 'auto'
#
This is the default option in read_in_s3()
. The Xarray documentation states that chunks='auto'
uses “dask auto
chunking, taking into account the engine preferred chunks”.
dc_auto = xr.open_dataset(url, engine="zarr", chunks="auto")
dc_auto
<xarray.Dataset> Size: 1TB Dimensions: (mid_date: 47892, y: 833, x: 833) Coordinates: * mid_date (mid_date) datetime64[ns] 383kB 2022-06-07T04... * x (x) float64 7kB 7.001e+05 7.003e+05 ... 8e+05 * y (y) float64 7kB 3.4e+06 3.4e+06 ... 3.3e+06 Data variables: (12/60) M11 (mid_date, y, x) float32 133GB dask.array<chunksize=(47892, 20, 20), meta=np.ndarray> M11_dr_to_vr_factor (mid_date) float32 192kB dask.array<chunksize=(47892,), meta=np.ndarray> M12 (mid_date, y, x) float32 133GB dask.array<chunksize=(47892, 20, 20), meta=np.ndarray> M12_dr_to_vr_factor (mid_date) float32 192kB dask.array<chunksize=(47892,), meta=np.ndarray> acquisition_date_img1 (mid_date) datetime64[ns] 383kB dask.array<chunksize=(47892,), meta=np.ndarray> acquisition_date_img2 (mid_date) datetime64[ns] 383kB dask.array<chunksize=(47892,), meta=np.ndarray> ... ... vy_error_modeled (mid_date) float32 192kB dask.array<chunksize=(47892,), meta=np.ndarray> vy_error_slow (mid_date) float32 192kB dask.array<chunksize=(47892,), meta=np.ndarray> vy_error_stationary (mid_date) float32 192kB dask.array<chunksize=(47892,), meta=np.ndarray> vy_stable_shift (mid_date) float32 192kB dask.array<chunksize=(47892,), meta=np.ndarray> vy_stable_shift_slow (mid_date) float32 192kB dask.array<chunksize=(47892,), meta=np.ndarray> vy_stable_shift_stationary (mid_date) float32 192kB dask.array<chunksize=(47892,), meta=np.ndarray> Attributes: (12/19) Conventions: CF-1.8 GDAL_AREA_OR_POINT: Area author: ITS_LIVE, a NASA MEaSUREs project (its-live.j... autoRIFT_parameter_file: http://its-live-data.s3.amazonaws.com/autorif... datacube_software_version: 1.0 date_created: 25-Sep-2023 22:00:23 ... ... s3: s3://its-live-data/datacubes/v2/N30E090/ITS_L... skipped_granules: s3://its-live-data/datacubes/v2/N30E090/ITS_L... time_standard_img1: UTC time_standard_img2: UTC title: ITS_LIVE datacube of image pair velocities url: https://its-live-data.s3.amazonaws.com/datacu...
In this instance, the chunks of the object created with xr.open_dataset(..., chunks='auto')
are a multiple of the on-disk chunk sizes.
The data structure section of the previous notebook discussed scalar information that is stored as attributes attached to Xarray objects. Similarly, Xarray objects read from Zarr data cubes have associated encodings
that tell Xarray how to read and write the object to disk. We can use the encoding to learn about preferred chunking schemes.
dc_auto["v"].encoding
{'chunks': (20000, 10, 10),
'preferred_chunks': {'mid_date': 20000, 'y': 10, 'x': 10},
'compressor': Blosc(cname='zlib', clevel=2, shuffle=SHUFFLE, blocksize=0),
'filters': None,
'missing_value': -32767,
'dtype': dtype('int16')}
dc_auto["v"]
<xarray.DataArray 'v' (mid_date: 47892, y: 833, x: 833)> Size: 133GB dask.array<open_dataset-v, shape=(47892, 833, 833), dtype=float32, chunksize=(47892, 20, 20), chunktype=numpy.ndarray> Coordinates: * mid_date (mid_date) datetime64[ns] 383kB 2022-06-07T04:21:44.211208960 .... * x (x) float64 7kB 7.001e+05 7.003e+05 7.004e+05 ... 7.999e+05 8e+05 * y (y) float64 7kB 3.4e+06 3.4e+06 3.4e+06 ... 3.3e+06 3.3e+06 Attributes: description: velocity magnitude grid_mapping: mapping standard_name: land_ice_surface_velocity units: meter/year
For the encoding of the v
variable, it looks like the chunking scheme is expected to be {'mid_date': 2000, 'y':10, 'x':10}
. However, the chunks for this variable created with chunks='auto'
are {'mid_date': 47892, 'y': 20, 'x': 20}
.
Let’s take a look at the encoding for a 1-dimensional variable:
dc_auto["vx_error"]
<xarray.DataArray 'vx_error' (mid_date: 47892)> Size: 192kB dask.array<open_dataset-vx_error, shape=(47892,), dtype=float32, chunksize=(47892,), chunktype=numpy.ndarray> Coordinates: * mid_date (mid_date) datetime64[ns] 383kB 2022-06-07T04:21:44.211208960 .... Attributes: description: best estimate of x_velocity error: vx_error is populated ... standard_name: vx_error units: meter/year
dc_auto["vx_error"].encoding
{'chunks': (56147,),
'preferred_chunks': {'mid_date': 56147},
'compressor': Blosc(cname='zlib', clevel=2, shuffle=SHUFFLE, blocksize=0),
'filters': None,
'_FillValue': -32767.0,
'dtype': dtype('float32')}
Interesting. We see that:
The chunk size specified in the encoding doesn’t match the total length of the
mid_date
dimension. It may be an artifact from an earlier step in the data processing chain before some observations were eliminated.The encoding specifies a single chunk along the
mid_date
dimension for this variable, which matches the object we read into memory, the size of this chunk is just different.
Another thing to note is that it looks like some of the variables within this xr.Dataset
have different chunk sizes on the y
dimension (Shown by the error produced below). We will need to address this later before rechunking the dataset.
dc_auto.chunksizes
ValueError: Object has inconsistent chunks along dimension y. This can be fixed by calling unify_chunks().
dc_auto = dc_auto.unify_chunks()
dc_auto.chunksizes
Frozen({'mid_date': (32736, 15156), 'y': (20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 13), 'x': (20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 13)})
2) chunks = {}
#
For this argument, the documentation says: “loads the data with dask using the engine’s preferred chunk size, generally identical to the format’s chunk size. If not available, a single chunk for all arrays.”
Note that with this dataset, 'auto'
and {}
don’t return the same chunking scheme.
dc_set = xr.open_dataset(url, engine="zarr", chunks={})
dc_set["v"].encoding
{'chunks': (20000, 10, 10),
'preferred_chunks': {'mid_date': 20000, 'y': 10, 'x': 10},
'compressor': Blosc(cname='zlib', clevel=2, shuffle=SHUFFLE, blocksize=0),
'filters': None,
'missing_value': -32767,
'dtype': dtype('int16')}
dc_set["v"]
<xarray.DataArray 'v' (mid_date: 47892, y: 833, x: 833)> Size: 133GB dask.array<open_dataset-v, shape=(47892, 833, 833), dtype=float32, chunksize=(20000, 10, 10), chunktype=numpy.ndarray> Coordinates: * mid_date (mid_date) datetime64[ns] 383kB 2022-06-07T04:21:44.211208960 .... * x (x) float64 7kB 7.001e+05 7.003e+05 7.004e+05 ... 7.999e+05 8e+05 * y (y) float64 7kB 3.4e+06 3.4e+06 3.4e+06 ... 3.3e+06 3.3e+06 Attributes: description: velocity magnitude grid_mapping: mapping standard_name: land_ice_surface_velocity units: meter/year
With this approach, we see that the chunking on the 3-dimensional variable we looked at above (‘v’) does match the chunking specified in the object’s encoding: {'mid_date': 20000, 'y': 10, 'x': 10}
.
Looking at a one-dimensional variable, we see the same occurrence as with dc_auto
: the number of chunks matches what is specified in the encoding, but the size of the chunk is different.
dc_set["vx_error"].encoding
{'chunks': (56147,),
'preferred_chunks': {'mid_date': 56147},
'compressor': Blosc(cname='zlib', clevel=2, shuffle=SHUFFLE, blocksize=0),
'filters': None,
'_FillValue': -32767.0,
'dtype': dtype('float32')}
dc_set["vx_error"]
<xarray.DataArray 'vx_error' (mid_date: 47892)> Size: 192kB dask.array<open_dataset-vx_error, shape=(47892,), dtype=float32, chunksize=(47892,), chunktype=numpy.ndarray> Coordinates: * mid_date (mid_date) datetime64[ns] 383kB 2022-06-07T04:21:44.211208960 .... Attributes: description: best estimate of x_velocity error: vx_error is populated ... standard_name: vx_error units: meter/year
The v
and vx_error
variables shown above have different chunk sizes along the mid_date
dimension, so we can expect the same chunk sizes error as above, but this time for mid_date
:
dc_set.chunksizes
ValueError: Object has inconsistent chunks along dimension mid_date. This can be fixed by calling unify_chunks().
However this time, if try to resolve the above error like we did for dc_auto
(by calling dc_set = dc_set.unify_chunks()
), We would get a performance warning about the number of chunks increasing by a factor of 186, which isn’t great either.
PerformanceWarning: Increasing number of chunks by factor of 186_, chunked_data = chunkmanager.unify_chunks(*unify_chunks_args)
In the next sections, we’ll see another option for reading the data into memory.
3) An out-of-order time dimension#
When we read this dataset from the S3 bucket, we get an object where the time dimension is not in chronological order. Because the dataset is so large, fixing this is not entirely straightforward.
Tip
It’s always a good idea to look at the data!
dc_set.mid_date
<xarray.DataArray 'mid_date' (mid_date: 47892)> Size: 383kB array(['2022-06-07T04:21:44.211208960', '2018-04-14T04:18:49.171219968', '2017-02-10T16:15:50.660901120', ..., '2024-01-23T04:18:19.231119104', '2023-06-01T04:10:44.893907968', '2023-09-02T16:18:20.230413056'], dtype='datetime64[ns]') Coordinates: * mid_date (mid_date) datetime64[ns] 383kB 2022-06-07T04:21:44.211208960 .... Attributes: description: midpoint of image 1 and image 2 acquisition date and time... standard_name: image_pair_center_date_with_time_separation
The standard approach would be calling Xarray’s .sortby()
method:
dc_set = dc_set.sortby('mid_date')
Performing an operation like sorting or slicing requires the entire array to be loaded into memory; for a large dimension like mid_date
(~48,000 elements), would be very slow and/or would max out available computational resources.
There may be a chunking strategy that successfully allows one to sort this dataset along the mid_date
dimension, but when I tried a few different re-chunking approaches, they did not work. Instead, the successful approach I found was a bit counterintuitive: Re-read the dataset into memory without dask. This let’s us use Xarray’s ‘lazy indexing’ functionality; we can sort the dataset without loading it into memory. The object will still be quite large so we will chunk the data, incorporating dask, after we sort by the time dimension.
4) Read the dataset without Dask#
We’ll again use the read_in_s3()
function, but this time passing chunks_arg = None
. This is the same as running: dc = xr.open_dataset(url, engine='Zarr')
. The read_in_s3()
signature is shown below as a reminder:
signature = inspect.signature(itslive_tools.read_in_s3)
print(signature)
(http_url: str, chunks: Union[NoneType, str, dict] = 'auto') -> xarray.core.dataset.Dataset
dc = itslive_tools.read_in_s3(url, chunks=None)
dc
<xarray.Dataset> Size: 1TB Dimensions: (mid_date: 47892, y: 833, x: 833) Coordinates: mapping <U1 4B ... * mid_date (mid_date) datetime64[ns] 383kB 2022-06-07T04... * x (x) float64 7kB 7.001e+05 7.003e+05 ... 8e+05 * y (y) float64 7kB 3.4e+06 3.4e+06 ... 3.3e+06 Data variables: (12/59) M11 (mid_date, y, x) float32 133GB ... M11_dr_to_vr_factor (mid_date) float32 192kB ... M12 (mid_date, y, x) float32 133GB ... M12_dr_to_vr_factor (mid_date) float32 192kB ... acquisition_date_img1 (mid_date) datetime64[ns] 383kB ... acquisition_date_img2 (mid_date) datetime64[ns] 383kB ... ... ... vy_error_modeled (mid_date) float32 192kB ... vy_error_slow (mid_date) float32 192kB ... vy_error_stationary (mid_date) float32 192kB ... vy_stable_shift (mid_date) float32 192kB ... vy_stable_shift_slow (mid_date) float32 192kB ... vy_stable_shift_stationary (mid_date) float32 192kB ... Attributes: (12/19) Conventions: CF-1.8 GDAL_AREA_OR_POINT: Area author: ITS_LIVE, a NASA MEaSUREs project (its-live.j... autoRIFT_parameter_file: http://its-live-data.s3.amazonaws.com/autorif... datacube_software_version: 1.0 date_created: 25-Sep-2023 22:00:23 ... ... s3: s3://its-live-data/datacubes/v2/N30E090/ITS_L... skipped_granules: s3://its-live-data/datacubes/v2/N30E090/ITS_L... time_standard_img1: UTC time_standard_img2: UTC title: ITS_LIVE datacube of image pair velocities url: https://its-live-data.s3.amazonaws.com/datacu...
As we saw above, the mid_date
dimension is still out of order:
dc.mid_date
<xarray.DataArray 'mid_date' (mid_date: 47892)> Size: 383kB array(['2022-06-07T04:21:44.211208960', '2018-04-14T04:18:49.171219968', '2017-02-10T16:15:50.660901120', ..., '2024-01-23T04:18:19.231119104', '2023-06-01T04:10:44.893907968', '2023-09-02T16:18:20.230413056'], dtype='datetime64[ns]') Coordinates: mapping <U1 4B ... * mid_date (mid_date) datetime64[ns] 383kB 2022-06-07T04:21:44.211208960 .... Attributes: description: midpoint of image 1 and image 2 acquisition date and time... standard_name: image_pair_center_date_with_time_separation
B. Organize data once it’s in memory#
1) Arrange dataset in chronological order#
But now, we can lazily perform the .sortby()
method.
dc = dc.sortby("mid_date")
dc.mid_date
<xarray.DataArray 'mid_date' (mid_date: 47892)> Size: 383kB array(['1986-09-11T03:31:15.003252992', '1986-10-05T03:31:06.144750016', '1986-10-21T03:31:34.493249984', ..., '2024-10-29T04:18:09.241024000', '2024-10-29T04:18:09.241024000', '2024-10-29T04:18:09.241024000'], dtype='datetime64[ns]') Coordinates: mapping <U1 4B ... * mid_date (mid_date) datetime64[ns] 383kB 1986-09-11T03:31:15.003252992 .... Attributes: description: midpoint of image 1 and image 2 acquisition date and time... standard_name: image_pair_center_date_with_time_separation
Great! After some experimentation with different approaches, we have our dataset sorted in chronological order.
2) Convert to a Dask-backed Xarray.Dataset
#
Not passing a 'chunks'
argument to xr.open_dataset()
means that the Xarray object is a collection of Numpy arrays rather than Dask arrays. However, the dataset is still very large: there are 60 variables that exist along 1,2 or, 3 dimensions (with the exception of the mapping
variable which we will discuss later), and a single 3-d variable is 123 GB. We will need to use Dask even though we didn’t read it in as a collection of Dask arrays. We’ll use the preferred chunking from .encoding['chunks']
to specify a chunking scheme to the object and convert the underlying arrays from Numpy to Dask.
Take a look at the 'chunks'
/'preferred_chunks'
encoding for a 3-d variable:
dc["v"].encoding
{'chunks': (20000, 10, 10),
'preferred_chunks': {'mid_date': 20000, 'y': 10, 'x': 10},
'compressor': Blosc(cname='zlib', clevel=2, shuffle=SHUFFLE, blocksize=0),
'filters': None,
'missing_value': -32767,
'dtype': dtype('int16'),
'grid_mapping': 'mapping'}
chunking_dict = {"x": 10, "y": 10, "mid_date": 20000}
dc_rechunk = dc.chunk(chunking_dict)
dc_rechunk.chunks
Frozen({'mid_date': (20000, 20000, 7892), 'y': (10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 3), 'x': (10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 3)})
Conclusion#
In this notebook, we identified a strategy for reading, chunking, and organizing this dataset that works within the memory constraints of my laptop and the size of the data. In the next notebook, we use vector data to narrow our focus in on a spatial area of interest and start examining ice velocity data.