Scaling to large datasets¶
pandas provides data structures for in-memory analytics, which makes using pandas to analyze datasets that are larger than memory datasets somewhat tricky. Even datasets that are a sizable fraction of memory become unwieldy, as some pandas operations need to make intermediate copies.
This document provides a few recommendations for scaling your analysis to larger datasets. It’s a complement to Enhancing performance, which focuses on speeding up analysis for datasets that fit in memory.
Load less data¶
Suppose our raw dataset on disk has many columns.
In [1]: import pandas as pd
In [2]: import numpy as np
In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None):
...: index = pd.date_range(start=start, end=end, freq=freq, name="timestamp")
...: n = len(index)
...: state = np.random.RandomState(seed)
...: columns = {
...: "name": state.choice(["Alice", "Bob", "Charlie"], size=n),
...: "id": state.poisson(1000, size=n),
...: "x": state.rand(n) * 2 - 1,
...: "y": state.rand(n) * 2 - 1,
...: }
...: df = pd.DataFrame(columns, index=index, columns=sorted(columns))
...: if df.index[-1] == end:
...: df = df.iloc[:-1]
...: return df
...:
In [4]: timeseries = [
...: make_timeseries(freq="1T", seed=i).rename(columns=lambda x: f"{x}_{i}")
...: for i in range(10)
...: ]
...:
In [5]: ts_wide = pd.concat(timeseries, axis=1)
In [6]: ts_wide.head()
Out[6]:
id_0 name_0 x_0 ... name_9 x_9 y_9
timestamp ...
2000-01-01 00:00:00 977 Alice -0.821225 ... Charlie -0.957208 -0.757508
2000-01-01 00:01:00 1018 Bob -0.219182 ... Alice -0.414445 -0.100298
2000-01-01 00:02:00 927 Alice 0.660908 ... Charlie -0.325838 0.581859
2000-01-01 00:03:00 997 Bob -0.852458 ... Bob 0.992033 -0.686692
2000-01-01 00:04:00 965 Bob 0.717283 ... Charlie -0.924556 -0.184161
[5 rows x 40 columns]
In [7]: ts_wide.to_parquet("timeseries_wide.parquet")
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
Cell In[7], line 1
----> 1 ts_wide.to_parquet("timeseries_wide.parquet")
File /usr/lib/python3/dist-packages/pandas/core/frame.py:2970, in DataFrame.to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
2882 """
2883 Write a DataFrame to the binary parquet format.
2884
(...)
2966 >>> content = f.read()
2967 """
2968 from pandas.io.parquet import to_parquet
-> 2970 return to_parquet(
2971 self,
2972 path,
2973 engine,
2974 compression=compression,
2975 index=index,
2976 partition_cols=partition_cols,
2977 storage_options=storage_options,
2978 **kwargs,
2979 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:479, in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, filesystem, **kwargs)
477 if isinstance(partition_cols, str):
478 partition_cols = [partition_cols]
--> 479 impl = get_engine(engine)
481 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
483 impl.write(
484 df,
485 path_or_buf,
(...)
491 **kwargs,
492 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
63 except ImportError as err:
64 error_msgs += "\n - " + str(err)
---> 66 raise ImportError(
67 "Unable to find a usable engine; "
68 "tried using: 'pyarrow', 'fastparquet'.\n"
69 "A suitable version of "
70 "pyarrow or fastparquet is required for parquet "
71 "support.\n"
72 "Trying to import the above resulted in these errors:"
73 f"{error_msgs}"
74 )
76 if engine == "pyarrow":
77 return PyArrowImpl()
ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
- Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
- Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.
To load the columns we want, we have two options. Option 1 loads in all the data and then filters to what we need.
In [8]: columns = ["id_0", "name_0", "x_0", "y_0"]
In [9]: pd.read_parquet("timeseries_wide.parquet")[columns]
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
Cell In[9], line 1
----> 1 pd.read_parquet("timeseries_wide.parquet")[columns]
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:654, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
501 @doc(storage_options=_shared_docs["storage_options"])
502 def read_parquet(
503 path: FilePath | ReadBuffer[bytes],
(...)
511 **kwargs,
512 ) -> DataFrame:
513 """
514 Load a parquet object from the file path, returning a DataFrame.
515
(...)
651 1 4 9
652 """
--> 654 impl = get_engine(engine)
656 if use_nullable_dtypes is not lib.no_default:
657 msg = (
658 "The argument 'use_nullable_dtypes' is deprecated and will be removed "
659 "in a future version."
660 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
63 except ImportError as err:
64 error_msgs += "\n - " + str(err)
---> 66 raise ImportError(
67 "Unable to find a usable engine; "
68 "tried using: 'pyarrow', 'fastparquet'.\n"
69 "A suitable version of "
70 "pyarrow or fastparquet is required for parquet "
71 "support.\n"
72 "Trying to import the above resulted in these errors:"
73 f"{error_msgs}"
74 )
76 if engine == "pyarrow":
77 return PyArrowImpl()
ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
- Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
- Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.
Option 2 only loads the columns we request.
In [10]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
Cell In[10], line 1
----> 1 pd.read_parquet("timeseries_wide.parquet", columns=columns)
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:654, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
501 @doc(storage_options=_shared_docs["storage_options"])
502 def read_parquet(
503 path: FilePath | ReadBuffer[bytes],
(...)
511 **kwargs,
512 ) -> DataFrame:
513 """
514 Load a parquet object from the file path, returning a DataFrame.
515
(...)
651 1 4 9
652 """
--> 654 impl = get_engine(engine)
656 if use_nullable_dtypes is not lib.no_default:
657 msg = (
658 "The argument 'use_nullable_dtypes' is deprecated and will be removed "
659 "in a future version."
660 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
63 except ImportError as err:
64 error_msgs += "\n - " + str(err)
---> 66 raise ImportError(
67 "Unable to find a usable engine; "
68 "tried using: 'pyarrow', 'fastparquet'.\n"
69 "A suitable version of "
70 "pyarrow or fastparquet is required for parquet "
71 "support.\n"
72 "Trying to import the above resulted in these errors:"
73 f"{error_msgs}"
74 )
76 if engine == "pyarrow":
77 return PyArrowImpl()
ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
- Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
- Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.
If we were to measure the memory usage of the two calls, we’d see that specifying
columns
uses about 1/10th the memory in this case.
With pandas.read_csv()
, you can specify usecols
to limit the columns
read into memory. Not all file formats that can be read by pandas provide an option
to read a subset of columns.
Use efficient datatypes¶
The default pandas data types are not the most memory efficient. This is especially true for text data columns with relatively few unique values (commonly referred to as “low-cardinality” data). By using more efficient data types, you can store larger datasets in memory.
In [11]: ts = make_timeseries(freq="30S", seed=0)
In [12]: ts.to_parquet("timeseries.parquet")
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
Cell In[12], line 1
----> 1 ts.to_parquet("timeseries.parquet")
File /usr/lib/python3/dist-packages/pandas/core/frame.py:2970, in DataFrame.to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
2882 """
2883 Write a DataFrame to the binary parquet format.
2884
(...)
2966 >>> content = f.read()
2967 """
2968 from pandas.io.parquet import to_parquet
-> 2970 return to_parquet(
2971 self,
2972 path,
2973 engine,
2974 compression=compression,
2975 index=index,
2976 partition_cols=partition_cols,
2977 storage_options=storage_options,
2978 **kwargs,
2979 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:479, in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, filesystem, **kwargs)
477 if isinstance(partition_cols, str):
478 partition_cols = [partition_cols]
--> 479 impl = get_engine(engine)
481 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
483 impl.write(
484 df,
485 path_or_buf,
(...)
491 **kwargs,
492 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
63 except ImportError as err:
64 error_msgs += "\n - " + str(err)
---> 66 raise ImportError(
67 "Unable to find a usable engine; "
68 "tried using: 'pyarrow', 'fastparquet'.\n"
69 "A suitable version of "
70 "pyarrow or fastparquet is required for parquet "
71 "support.\n"
72 "Trying to import the above resulted in these errors:"
73 f"{error_msgs}"
74 )
76 if engine == "pyarrow":
77 return PyArrowImpl()
ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
- Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
- Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.
In [13]: ts = pd.read_parquet("timeseries.parquet")
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
Cell In[13], line 1
----> 1 ts = pd.read_parquet("timeseries.parquet")
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:654, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
501 @doc(storage_options=_shared_docs["storage_options"])
502 def read_parquet(
503 path: FilePath | ReadBuffer[bytes],
(...)
511 **kwargs,
512 ) -> DataFrame:
513 """
514 Load a parquet object from the file path, returning a DataFrame.
515
(...)
651 1 4 9
652 """
--> 654 impl = get_engine(engine)
656 if use_nullable_dtypes is not lib.no_default:
657 msg = (
658 "The argument 'use_nullable_dtypes' is deprecated and will be removed "
659 "in a future version."
660 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
63 except ImportError as err:
64 error_msgs += "\n - " + str(err)
---> 66 raise ImportError(
67 "Unable to find a usable engine; "
68 "tried using: 'pyarrow', 'fastparquet'.\n"
69 "A suitable version of "
70 "pyarrow or fastparquet is required for parquet "
71 "support.\n"
72 "Trying to import the above resulted in these errors:"
73 f"{error_msgs}"
74 )
76 if engine == "pyarrow":
77 return PyArrowImpl()
ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
- Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
- Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.
In [14]: ts
Out[14]:
id name x y
timestamp
2000-01-01 00:00:00 1041 Alice 0.889987 0.281011
2000-01-01 00:00:30 988 Bob -0.455299 0.488153
2000-01-01 00:01:00 1018 Alice 0.096061 0.580473
2000-01-01 00:01:30 992 Bob 0.142482 0.041665
2000-01-01 00:02:00 960 Bob -0.036235 0.802159
... ... ... ... ...
2000-12-30 23:58:00 1022 Alice 0.266191 0.875579
2000-12-30 23:58:30 974 Alice -0.009826 0.413686
2000-12-30 23:59:00 1028 Charlie 0.307108 -0.656789
2000-12-30 23:59:30 1002 Alice 0.202602 0.541335
2000-12-31 00:00:00 987 Alice 0.200832 0.615972
[1051201 rows x 4 columns]
Now, let’s inspect the data types and memory usage to see where we should focus our attention.
In [15]: ts.dtypes
Out[15]:
id int64
name object
x float64
y float64
dtype: object
In [16]: ts.memory_usage(deep=True) # memory usage in bytes
Out[16]:
Index 8409608
id 8409608
name 65176434
x 8409608
y 8409608
dtype: int64
The name
column is taking up much more memory than any other. It has just a
few unique values, so it’s a good candidate for converting to a
pandas.Categorical
. With a pandas.Categorical
, we store each unique name once and use
space-efficient integers to know which specific name is used in each row.
In [17]: ts2 = ts.copy()
In [18]: ts2["name"] = ts2["name"].astype("category")
In [19]: ts2.memory_usage(deep=True)
Out[19]:
Index 8409608
id 8409608
name 1051495
x 8409608
y 8409608
dtype: int64
We can go a bit further and downcast the numeric columns to their smallest types
using pandas.to_numeric()
.
In [20]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")
In [21]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")
In [22]: ts2.dtypes
Out[22]:
id uint16
name category
x float32
y float32
dtype: object
In [23]: ts2.memory_usage(deep=True)
Out[23]:
Index 8409608
id 2102402
name 1051495
x 4204804
y 4204804
dtype: int64
In [24]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()
In [25]: print(f"{reduction:0.2f}")
0.20
In all, we’ve reduced the in-memory footprint of this dataset to 1/5 of its original size.
See Categorical data for more on pandas.Categorical
and dtypes
for an overview of all of pandas’ dtypes.
Use chunking¶
Some workloads can be achieved with chunking by splitting a large problem into a bunch of small problems. For example, converting an individual CSV file into a Parquet file and repeating that for each file in a directory. As long as each chunk fits in memory, you can work with datasets that are much larger than memory.
Note
Chunking works well when the operation you’re performing requires zero or minimal coordination between chunks. For more complicated workflows, you’re better off using another library.
Suppose we have an even larger “logical dataset” on disk that’s a directory of parquet files. Each file in the directory represents a different year of the entire dataset.
In [26]: import pathlib
In [27]: N = 12
In [28]: starts = [f"20{i:>02d}-01-01" for i in range(N)]
In [29]: ends = [f"20{i:>02d}-12-13" for i in range(N)]
In [30]: pathlib.Path("data/timeseries").mkdir(exist_ok=True)
In [31]: for i, (start, end) in enumerate(zip(starts, ends)):
....: ts = make_timeseries(start=start, end=end, freq="1T", seed=i)
....: ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
....:
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
Cell In[31], line 3
1 for i, (start, end) in enumerate(zip(starts, ends)):
2 ts = make_timeseries(start=start, end=end, freq="1T", seed=i)
----> 3 ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
File /usr/lib/python3/dist-packages/pandas/core/frame.py:2970, in DataFrame.to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
2882 """
2883 Write a DataFrame to the binary parquet format.
2884
(...)
2966 >>> content = f.read()
2967 """
2968 from pandas.io.parquet import to_parquet
-> 2970 return to_parquet(
2971 self,
2972 path,
2973 engine,
2974 compression=compression,
2975 index=index,
2976 partition_cols=partition_cols,
2977 storage_options=storage_options,
2978 **kwargs,
2979 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:479, in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, filesystem, **kwargs)
477 if isinstance(partition_cols, str):
478 partition_cols = [partition_cols]
--> 479 impl = get_engine(engine)
481 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
483 impl.write(
484 df,
485 path_or_buf,
(...)
491 **kwargs,
492 )
File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
63 except ImportError as err:
64 error_msgs += "\n - " + str(err)
---> 66 raise ImportError(
67 "Unable to find a usable engine; "
68 "tried using: 'pyarrow', 'fastparquet'.\n"
69 "A suitable version of "
70 "pyarrow or fastparquet is required for parquet "
71 "support.\n"
72 "Trying to import the above resulted in these errors:"
73 f"{error_msgs}"
74 )
76 if engine == "pyarrow":
77 return PyArrowImpl()
ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
- Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
- Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.
data
└── timeseries
├── ts-00.parquet
├── ts-01.parquet
├── ts-02.parquet
├── ts-03.parquet
├── ts-04.parquet
├── ts-05.parquet
├── ts-06.parquet
├── ts-07.parquet
├── ts-08.parquet
├── ts-09.parquet
├── ts-10.parquet
└── ts-11.parquet
Now we’ll implement an out-of-core pandas.Series.value_counts()
. The peak memory usage of this
workflow is the single largest chunk, plus a small series storing the unique value
counts up to this point. As long as each individual file fits in memory, this will
work for arbitrary-sized datasets.
In [32]: %%time
....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
....: counts = pd.Series(dtype=int)
....: for path in files:
....: df = pd.read_parquet(path)
....: counts = counts.add(df["name"].value_counts(), fill_value=0)
....: counts.astype(int)
....:
CPU times: user 358 us, sys: 30 us, total: 388 us
Wall time: 397 us
Out[32]: Series([], dtype: int64)
Some readers, like pandas.read_csv()
, offer parameters to control the
chunksize
when reading a single file.
Manually chunking is an OK option for workflows that don’t
require too sophisticated of operations. Some operations, like pandas.DataFrame.groupby()
, are
much harder to do chunkwise. In these cases, you may be better switching to a
different library that implements these out-of-core algorithms for you.
Use Dask¶
pandas is just one library offering a DataFrame API. Because of its popularity, pandas’ API has become something of a standard that other libraries implement. The pandas documentation maintains a list of libraries implementing a DataFrame API in the ecosystem page.
For example, Dask, a parallel computing library, has dask.dataframe, a pandas-like API for working with larger than memory datasets in parallel. Dask can use multiple threads or processes on a single machine, or a cluster of machines to process data in parallel.
We’ll import dask.dataframe
and notice that the API feels similar to pandas.
We can use Dask’s read_parquet
function, but provide a globstring of files to read in.
In [33]: import dask.dataframe as dd
In [34]: ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
File /usr/lib/python3/dist-packages/dask/utils.py:196, in import_required(mod_name, error_msg)
195 try:
--> 196 return import_module(mod_name)
197 except ImportError as e:
File /usr/lib/python3.11/importlib/__init__.py:126, in import_module(name, package)
125 level += 1
--> 126 return _bootstrap._gcd_import(name[level:], package, level)
File <frozen importlib._bootstrap>:1204, in _gcd_import(name, package, level)
File <frozen importlib._bootstrap>:1176, in _find_and_load(name, import_)
File <frozen importlib._bootstrap>:1140, in _find_and_load_unlocked(name, import_)
ModuleNotFoundError: No module named 'pyarrow'
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
File /usr/lib/python3/dist-packages/dask/backends.py:136, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
135 try:
--> 136 return func(*args, **kwargs)
137 except Exception as e:
File /usr/lib/python3/dist-packages/dask/dataframe/io/parquet/core.py:497, in read_parquet(path, columns, filters, categories, index, storage_options, engine, use_nullable_dtypes, dtype_backend, calculate_divisions, ignore_metadata_file, metadata_task_size, split_row_groups, blocksize, aggregate_files, parquet_file_extension, filesystem, **kwargs)
496 if isinstance(engine, str):
--> 497 engine = get_engine(engine)
499 if hasattr(path, "name"):
File /usr/lib/python3/dist-packages/dask/dataframe/io/parquet/core.py:1247, in get_engine(engine)
1246 elif engine in ("pyarrow", "arrow", "pyarrow-dataset"):
-> 1247 import_required("pyarrow", "`pyarrow` not installed")
1249 if engine in ("pyarrow-dataset", "arrow"):
File /usr/lib/python3/dist-packages/dask/utils.py:198, in import_required(mod_name, error_msg)
197 except ImportError as e:
--> 198 raise RuntimeError(error_msg) from e
RuntimeError: `pyarrow` not installed
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
Cell In[34], line 1
----> 1 ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")
File /usr/lib/python3/dist-packages/dask/backends.py:138, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
136 return func(*args, **kwargs)
137 except Exception as e:
--> 138 raise type(e)(
139 f"An error occurred while calling the {funcname(func)} "
140 f"method registered to the {self.backend} backend.\n"
141 f"Original Message: {e}"
142 ) from e
RuntimeError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: `pyarrow` not installed
In [35]: ddf
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[35], line 1
----> 1 ddf
NameError: name 'ddf' is not defined
Inspecting the ddf
object, we see a few things
There are familiar attributes like
.columns
and.dtypes
There are familiar methods like
.groupby
,.sum
, etc.There are new attributes like
.npartitions
and.divisions
The partitions and divisions are how Dask parallelizes computation. A Dask
DataFrame is made up of many pandas pandas.DataFrame
. A single method call on a
Dask DataFrame ends up making many pandas method calls, and Dask knows how to
coordinate everything to get the result.
In [36]: ddf.columns
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[36], line 1
----> 1 ddf.columns
NameError: name 'ddf' is not defined
In [37]: ddf.dtypes
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[37], line 1
----> 1 ddf.dtypes
NameError: name 'ddf' is not defined
In [38]: ddf.npartitions
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[38], line 1
----> 1 ddf.npartitions
NameError: name 'ddf' is not defined
One major difference: the dask.dataframe
API is lazy. If you look at the
repr above, you’ll notice that the values aren’t actually printed out; just the
column names and dtypes. That’s because Dask hasn’t actually read the data yet.
Rather than executing immediately, doing operations build up a task graph.
In [39]: ddf
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[39], line 1
----> 1 ddf
NameError: name 'ddf' is not defined
In [40]: ddf["name"]
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[40], line 1
----> 1 ddf["name"]
NameError: name 'ddf' is not defined
In [41]: ddf["name"].value_counts()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[41], line 1
----> 1 ddf["name"].value_counts()
NameError: name 'ddf' is not defined
Each of these calls is instant because the result isn’t being computed yet.
We’re just building up a list of computation to do when someone needs the
result. Dask knows that the return type of a pandas.Series.value_counts
is a pandas pandas.Series
with a certain dtype and a certain name. So the Dask version
returns a Dask Series with the same dtype and the same name.
To get the actual result you can call .compute()
.
In [42]: %time ddf["name"].value_counts().compute()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
File <timed eval>:1
NameError: name 'ddf' is not defined
At that point, you get back the same thing you’d get with pandas, in this case
a concrete pandas pandas.Series
with the count of each name
.
Calling .compute
causes the full task graph to be executed. This includes
reading the data, selecting the columns, and doing the value_counts
. The
execution is done in parallel where possible, and Dask tries to keep the
overall memory footprint small. You can work with datasets that are much larger
than memory, as long as each partition (a regular pandas pandas.DataFrame
) fits in memory.
By default, dask.dataframe
operations use a threadpool to do operations in
parallel. We can also connect to a cluster to distribute the work on many
machines. In this case we’ll connect to a local “cluster” made up of several
processes on this single machine.
>>> from dask.distributed import Client, LocalCluster
>>> cluster = LocalCluster()
>>> client = Client(cluster)
>>> client
<Client: 'tcp://127.0.0.1:53349' processes=4 threads=8, memory=17.18 GB>
Once this client
is created, all of Dask’s computation will take place on
the cluster (which is just processes in this case).
Dask implements the most used parts of the pandas API. For example, we can do a familiar groupby aggregation.
In [43]: %time ddf.groupby("name")[["x", "y"]].mean().compute().head()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
File <timed eval>:1
NameError: name 'ddf' is not defined
The grouping and aggregation is done out-of-core and in parallel.
When Dask knows the divisions
of a dataset, certain optimizations are
possible. When reading parquet datasets written by dask, the divisions will be
known automatically. In this case, since we created the parquet files manually,
we need to supply the divisions manually.
In [44]: N = 12
In [45]: starts = [f"20{i:>02d}-01-01" for i in range(N)]
In [46]: ends = [f"20{i:>02d}-12-13" for i in range(N)]
In [47]: divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)
In [48]: ddf.divisions = divisions
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[48], line 1
----> 1 ddf.divisions = divisions
NameError: name 'ddf' is not defined
In [49]: ddf
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[49], line 1
----> 1 ddf
NameError: name 'ddf' is not defined
Now we can do things like fast random access with .loc
.
In [50]: ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[50], line 1
----> 1 ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
NameError: name 'ddf' is not defined
Dask knows to just look in the 3rd partition for selecting values in 2002. It doesn’t need to look at any other data.
Many workflows involve a large amount of data and processing it in a way that
reduces the size to something that fits in memory. In this case, we’ll resample
to daily frequency and take the mean. Once we’ve taken the mean, we know the
results will fit in memory, so we can safely call compute
without running
out of memory. At that point it’s just a regular pandas object.
In [51]: ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[51], line 1
----> 1 ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
NameError: name 'ddf' is not defined

These Dask examples have all be done using multiple processes on a single machine. Dask can be deployed on a cluster to scale up to even larger datasets.
You see more dask examples at https://examples.dask.org.