Source code for darshan.experimental.plots.data_access_by_filesystem

"""
Draft utility code for the `data access by category` section
of Phil's hand drawing of future report layout.
"""

import os
import pathlib
from typing import List, Dict, Optional, Any, Callable, Sequence

import numpy as np
import pandas as pd
import darshan
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import humanize

[docs] def process_byte_counts(df_reads, df_writes): """ Given separate dataframes for read/write IO activity, returns separate pandas Series objects with the counts of bytes read or written per filesystem root. Parameters ---------- df_reads: a ``pd.DataFrame`` where each row presents at least 1 byte of read-related IO activity, and where additional columns for ``filesystem_root`` and ``filepath`` are present df_writes: a ``pd.DataFrame`` where each row presents at least 1 byte of write-related IO activity, and where additional columns for ``filesystem_root`` and ``filepath`` are present Returns ------- Tuple of form ``(read_groups, write_groups)`` where each tuple element is a ``pd.Series`` object of per-filesystem read/write byte counts. """ # TODO: generalize to other instrumentation # modules if we keep this design try: read_groups_posix = df_reads.groupby('filesystem_root')['POSIX_BYTES_READ'].sum() except KeyError: read_groups_posix = pd.Series(dtype=np.int64) try: read_groups_stdio = df_reads.groupby('filesystem_root')['STDIO_BYTES_READ'].sum() except KeyError: read_groups_stdio = pd.Series(dtype=np.int64) try: write_groups_posix = df_writes.groupby('filesystem_root')['POSIX_BYTES_WRITTEN'].sum() except KeyError: write_groups_posix = pd.Series(dtype=np.int64) try: write_groups_stdio = df_writes.groupby('filesystem_root')['STDIO_BYTES_WRITTEN'].sum() except KeyError: write_groups_stdio = pd.Series(dtype=np.int64) read_groups = read_groups_posix.add(read_groups_stdio, fill_value=0.0).astype(np.int64) write_groups = write_groups_posix.add(write_groups_stdio, fill_value=0.0).astype(np.int64) read_groups.name = "BYTES_READ" write_groups.name = "BYTES_WRITTEN" return read_groups, write_groups
[docs] def process_unique_files(df_reads, df_writes): """ Given separate dataframes for read/write IO activity, returns separate pandas Series objects with the counts of unique files per filesytem root to which 1 byte has been read or written. Parameters ---------- df_reads: a ``pd.DataFrame`` where each row presents at least 1 byte of read-related IO activity, and where additional columns for ``filesystem_root`` and ``filepath`` are present df_writes: a ``pd.DataFrame`` where each row presents at least 1 byte of write-related IO activity, and where additional columns for ``filesystem_root`` and ``filepath`` are present Returns ------- Tuple of form ``(read_groups, write_groups)`` where each tuple element is a ``pd.Series`` object of per-filesystem root unique file read/write counts. """ # groupby filesystem root and filepath, then get sizes # these are Pandas series objects where the index # is the name of the filesystem_root and the value # is the number of unique files counted per above criteria read_groups = df_reads.groupby('filesystem_root')['filepath'].nunique() write_groups = df_writes.groupby('filesystem_root')['filepath'].nunique() return read_groups, write_groups
[docs] def check_empty_series(read_groups, write_groups, filesystem_roots: Sequence[str]): """ Add ``0`` values for inactive filesystem roots for plotting purposes. Parameters ---------- read_groups: a ``pd.Series`` object with IO read activity data write_groups: a ``pd.Series`` object with IO write activity data filesystem_roots: a sequence of strings containing unique filesystem root paths Returns ------- A tuple of form: `(read_groups, write_groups)` where each tuple element is a `pd.Series` object adjusted to contain "0" values for inactive filesystem roots (for plotting purposes). """ read_groups = empty_series_handler(series=read_groups, filesystem_roots=filesystem_roots) write_groups = empty_series_handler(series=write_groups, filesystem_roots=filesystem_roots) return read_groups, write_groups
[docs] def empty_series_handler(series, filesystem_roots: Sequence[str]): """ Parameters ---------- series: a ``pd.Series`` object filesystem_roots: a sequence of strings containing unique filesystem root paths Returns ------- A new ``Series`` with any missing filesystem_root indices filled in along with count values of 0 (for plotting purposes). """ return series.reindex(filesystem_roots, fill_value=0).astype(np.float64)
[docs] def rec_to_rw_counter_dfs_with_cols(report: Any, file_id_dict: Dict[int, str], mod: str = 'POSIX'): """ Filter a DarshanReport into two "counters" dataframes, each with read/write activity in each row (at least 1 byte read or written per row) and two extra columns for filesystem root path and filename data. Parameters ---------- report: a darshan.DarshanReport() file_id_dict: a dictionary mapping integer file hash values to string values corresponding to their respective paths mod: a string indicating the darshan module to use for parsing (default: ``POSIX``) Returns ------- tuple of form: (df_reads, df_writes) """ # filter the DarshanReport into two "counters" dataframes # with read/write activity in each row (at least 1 # byte read or written per row) df_reads, df_writes = rec_to_rw_counter_dfs(report=report, mod=mod) # add columns with filepaths and filesystem root # paths for each event df_reads, df_writes = add_filesystem_cols(df_reads=df_reads, df_writes=df_writes, file_id_dict=file_id_dict) return df_reads, df_writes
[docs] def rec_to_rw_counter_dfs(report: Any, mod: str = 'POSIX'): """ Filter a DarshanReport into two "counters" dataframes, each with read/write activity in each row (at least 1 byte read or written per row). Parameters ---------- report: a darshan.DarshanReport() mod: a string indicating the darshan module to use for parsing (default: ``POSIX``) Returns ------- tuple of form: (df_reads, df_writes) """ rec_counters = pd.DataFrame() df_reads = pd.DataFrame() df_writes = pd.DataFrame() if "POSIX" in report.modules and len(report.records["POSIX"]) > 0: rec_counters = pd.concat(objs=(rec_counters, report.records["POSIX"].to_df()['counters'])) df_reads = pd.concat(objs=(df_reads, rec_counters.loc[rec_counters[f'POSIX_BYTES_READ'] >= 1])) df_writes = pd.concat(objs=(df_writes, rec_counters.loc[rec_counters[f'POSIX_BYTES_WRITTEN'] >= 1])) if "STDIO" in report.modules and len(report.records["STDIO"]) > 0: rec_counters = pd.concat(objs=(rec_counters, report.records["STDIO"].to_df()['counters'])) df_reads = pd.concat(objs=(df_reads, rec_counters.loc[rec_counters[f'STDIO_BYTES_READ'] >= 1])) df_writes = pd.concat(objs=(df_writes, rec_counters.loc[rec_counters[f'STDIO_BYTES_WRITTEN'] >= 1])) return df_reads, df_writes
[docs] def add_filesystem_cols(df_reads, df_writes, file_id_dict: Dict[int, str]): """ Adds two columns to the input dataframes, one with the filepaths for each event, and the other with the filesystem root for each event. Parameters ---------- df_reads: ``pandas.DataFrame`` that has been filtered to only include rows with ``>= 1`` bytes read df_writes: ``pandas.DataFrame`` that has been filtered to only include rows with ``>= 1`` bytes written file_id_dict: a dictionary mapping integer file hash values to string values corresponding to their respective paths Returns ------- A tuple of form ``(df_reads, df_writes)`` with the modified dataframes. """ file_hashes, file_paths = convert_id_dict_to_arrays(file_id_dict=file_id_dict) # add column with filepaths for each event df_reads = df_reads.assign(filepath=df_reads['id'].map(lambda a: convert_file_id_to_path(a, file_hashes, file_paths))) df_writes = df_writes.assign(filepath=df_writes['id'].map(lambda a: convert_file_id_to_path(a, file_hashes, file_paths))) # add column with filesystem root paths for each event df_reads = df_reads.assign(filesystem_root=df_reads['filepath'].map(lambda path: convert_file_path_to_root_path(path))) df_writes = df_writes.assign(filesystem_root=df_writes['filepath'].map(lambda path: convert_file_path_to_root_path(path))) return df_reads, df_writes
[docs] def convert_file_path_to_root_path(file_path: str) -> str: """ Parameters ---------- file_path: a string containing the absolute file path Returns ------- A string containing the root path. Examples: --------- >>> filesystem_root = convert_file_path_to_root_path("/scratch1/scratchdirs/glock/testFile.00000046") >>> filesystem_root '/scratch1' """ path_parts = pathlib.Path(file_path).parts filesystem_root = ''.join(path_parts[:2]) if filesystem_root.isdigit(): # this is probably an anonymized STD.. # stream, so make that clear filesystem_root = f'anonymized\n({filesystem_root})' if filesystem_root.startswith("//"): # Shane indicates that these are individual files # mounted on root # see: # https://github.com/darshan-hpc/darshan/pull/397#discussion_r769186581 filesystem_root = "/" return filesystem_root
[docs] def convert_id_dict_to_arrays(file_id_dict: Dict[int, str]): """ Splits the file hash/path dictionary into corresponding arrays, one for the file hashes and another for the file paths. Parameters ---------- file_id_dict: a dictionary mapping integer file hash values to string values corresponding to their respective paths Returns ------- A tuple of form ``(file_id_hash_arr, file_path_arr)`` where the first element is an array containing the integer file hash values and the second is an array containing the corresponding file paths. """ file_id_hash_arr = np.array(list(file_id_dict.keys())) file_path_arr = np.array(list(file_id_dict.values())) return file_id_hash_arr, file_path_arr
[docs] def convert_file_id_to_path(input_id: float, file_hashes, file_paths) -> Optional[str]: """ Parameters ---------- input_id: a float representing the file hash file_id_dict: a dictionary mapping integer file hash values to string values corresponding to their respective paths Returns ------- A string containing the file path corresponding to ``input_id``, or ``None`` if no matching file path was found for the input hash. Examples -------- >>> # file_id_dict typically comes from `report.data["name_records"]` >>> file_id_dict = {210703578647777632: '/yellow/usr/projects/eap/users/treddy/simple_dxt_mpi_io_darshan/test.out.locktest.0', ... 9457796068806373448: '/yellow/usr/projects/eap/users/treddy/simple_dxt_mpi_io_darshan/test.out'} >>> # input_id may came from i.e., a pandas dataframe of the record data >>> result = convert_file_id_to_path(input_id=9.457796068806373e+18, file_id_dict=file_id_dict) >>> result '/yellow/usr/projects/eap/users/treddy/simple_dxt_mpi_io_darshan/test.out' """ file_idx = np.nonzero((file_hashes - input_id) == 0)[0] try: file_path = file_paths[file_idx][0] except IndexError: return None return file_path
[docs] def identify_filesystems(file_id_dict: Dict[int, str], verbose: bool = False) -> List[str]: """ Parameters ---------- file_id_dict: a dictionary mapping integer file hash values to string values corresponding to their respective paths verbose: if ``True``, will print the filesystem root paths that are identified Returns ------- A list of strings containing the unique filesystem root paths parsed from the input dictionary. Examples -------- >>> # file_id_dict is typically from report.data["name_records"] >>> file_id_dict = {210703578647777632: '/yellow/usr/projects/eap/users/treddy/simple_dxt_mpi_io_darshan/test.out.locktest.0', ... 14388265063268455899: '/tmp/ompi.sn176.28751/jf.29186/1/test.out_cid-0-3400.sm'} >>> filesystem_roots = identify_filesystems(file_id_dict=file_id_dict, verbose=True) filesystem_roots: ['/yellow', '/tmp'] """ filesystem_roots = [] for file_id_hash, file_path in file_id_dict.items(): filesystem_root = convert_file_path_to_root_path(file_path=file_path) if filesystem_root not in filesystem_roots: filesystem_roots.append(filesystem_root) if verbose: print("filesystem_roots:", filesystem_roots) return filesystem_roots
[docs] def unique_fs_rw_counter(report: Any, filesystem_roots: Sequence[str], file_id_dict: Dict[int, str], processing_func: Callable, mod: str = 'POSIX', verbose: bool = False): """ For each filesystem root path, apply the custom analysis specified by ``processing_func``. The data supplied to ``processing_func`` will be filtered to only include rows with activity where at least 1 byte has been read, or to which at least 1 byte has been written. Parameters ---------- report: a darshan.DarshanReport() filesystem_roots: a sequence of strings containing unique filesystem root paths file_id_dict: a dictionary mapping integer file hash values to string values corresponding to their respective paths processing_func: the function that will be used to process the read and write dataframes; it should accept ``df_reads`` and ``df_writes`` dataframes; see the example process_unique_files() for details mod: a string indicating the darshan module to use for parsing (default: ``POSIX``) verbose: if ``True``, print the calculated values of ``read_groups`` and ``write_groups`` Returns ------- tuple of form: (read_groups, write_groups) Where each element of the tuple is a pandas ``Series`` object with a format like the one shown below when ``process_unique_files()`` is the ``processing_func`` # filesystem_root count # /tmp 1 # /yellow 1 The ``int64`` values in the ``Series`` are the counts of unique files to which a single byte has been read (or written) on a given filesystem (index). Raises: ------- NotImplementedError: for unsupported modules ValueError: when POSIX module data is absent from the report """ if not mod in ['POSIX', 'STDIO']: raise NotImplementedError("Only the POSIX and STDIO modules are currently supported") if mod not in report.modules: raise ValueError(f"{mod} module data is required") # filter the DarshanReport into two "counters" dataframes # with read/write activity in each row (at least 1 # byte read or written per row) and extra columns for # filesystem root and filename data df_reads, df_writes = rec_to_rw_counter_dfs_with_cols(report=report, file_id_dict=file_id_dict, mod=mod) read_groups, write_groups = processing_func(df_reads=df_reads, df_writes=df_writes) # if either of the Series are effectively empty we want # to produce a new Series with the filesystem_root indices # and count values of 0 (for plotting purposes) read_groups, write_groups = check_empty_series(read_groups=read_groups, write_groups=write_groups, filesystem_roots=filesystem_roots) if verbose: print("read_groups:\n", read_groups) print("write_groups:\n", write_groups) return (read_groups, write_groups)
[docs] def plot_data(fig: Any, file_rd_series, file_wr_series, bytes_rd_series, bytes_wr_series, filesystem_roots: Sequence[str], num_cats: Optional[int] = None): """ Produce the horizontal bar plots for the data access by category analysis. Parameters ---------- fig: matplotlib ``figure`` object into which the bar plots will be placed file_rd_series: a ``pd.Series`` object with per-category counts of unique files from which at least 1 byte has been read file_wr_series: a ``pd.Series`` object with per-category counts of unique files to which at least 1 byte has been written bytes_rd_series: a ``pd.Series`` object with per-category counts of bytes read bytes_wr_series: a ``pd.Series`` object with per-category counts of bytes written filesystem_roots: a sequence of strings containing unique filesystem root paths num_cats: an integer representing the number of categories to plot; default ``None`` plots all categories """ fontsize = 18 list_byte_axes: list = [] list_count_axes: list = [] # use log10 scale if range exceeds # two orders of magnitude in a column use_log = [False, False] for idx, series_pair in enumerate([[bytes_rd_series, bytes_wr_series], [file_rd_series, file_wr_series]]): maxval = max(series_pair[0].max(), series_pair[1].max()) minval = max(min(series_pair[0].min(), series_pair[1].min()), 1) # adjust ratio to MiB when needed ratio = (maxval / minval) if ratio > 100: use_log[idx] = True if num_cats is None: num_cats = len(filesystem_roots) for row, filesystem in enumerate(bytes_rd_series.index[:num_cats]): ax_filesystem_bytes = fig.add_subplot(len(filesystem_roots[:num_cats]), 2, row * 2 + 1) ax_filesystem_counts = fig.add_subplot(len(filesystem_roots[:num_cats]), 2, row * 2 + 2) if row > 0: ax_filesystem_bytes.sharex(list_byte_axes[row - 1]) if use_log[0]: ax_filesystem_bytes.set_xscale('symlog') ax_filesystem_counts.sharex(list_count_axes[row - 1]) if use_log[1]: ax_filesystem_counts.set_xscale('symlog') list_byte_axes.append(ax_filesystem_bytes) list_count_axes.append(ax_filesystem_counts) bytes_read = bytes_rd_series[filesystem] bytes_written = bytes_wr_series[filesystem] files_written = int(file_wr_series[filesystem]) files_read = int(file_rd_series[filesystem]) # anonymized STD.. streams have associated integers # that are stored in the filesystem data field # but that are confusing to display, so strip them if filesystem.startswith('anonymized'): ax_filesystem_bytes.annotate('anonymized', (-0.1, 0.5), fontsize=fontsize, xycoords='axes fraction', ha="right", va="center") else: ax_filesystem_bytes.annotate(filesystem, (-0.1, 0.5), fontsize=fontsize, xycoords='axes fraction', ha="right", va="center") ax_filesystem_counts.barh(0, files_written, color='red', alpha=0.3) ax_filesystem_counts.barh(1, files_read, color='blue', alpha=0.3) bytes_read_str = humanize.naturalsize(bytes_read, binary=True, format="%.2f") ax_filesystem_bytes.text(0, 0.75, f' bytes read: {bytes_read_str}', transform=ax_filesystem_bytes.transAxes, fontsize=fontsize, va="center") bytes_written_str = humanize.naturalsize(bytes_written, binary=True, format="%.2f") ax_filesystem_bytes.text(0, 0.25, f' bytes written: {bytes_written_str}', transform=ax_filesystem_bytes.transAxes, fontsize=fontsize, va="center") ax_filesystem_counts.text(0, 0.75, f' files read: {files_read}', transform=ax_filesystem_counts.transAxes, fontsize=fontsize, va="center") ax_filesystem_counts.text(0, 0.25, f' files written: {files_written}', transform=ax_filesystem_counts.transAxes, fontsize=fontsize, va="center") ax_filesystem_bytes.barh(0, bytes_written, color='red', alpha=0.3) ax_filesystem_bytes.barh(1, bytes_read, color='blue', alpha=0.3) for axis in fig.axes: # hide the right side plot frame (spine) so that # the value labels don't overlap with the plot frame axis.spines['right'].set_visible(False) axis.set_xticklabels([]) axis.set_yticklabels([]) axis.xaxis.set_ticks_position('none') axis.yaxis.set_ticks_position('none') axis.minorticks_off() if use_log[0]: ax_filesystem_bytes.set_xlabel('symmetric log scaled') if use_log[1]: ax_filesystem_counts.set_xlabel('symmetric log scaled')
[docs] def plot_with_report(report: darshan.DarshanReport, verbose: bool = False, num_cats: Optional[int] = None): """ Plot the data access by category given a darshan ``DarshanReport`` object. Parameters ---------- report: a ``darshan.DarshanReport`` object plot_filename: name of the plot file produced verbose: if ``True``, provide extra debug information num_cats: an integer representing the number of categories to plot; default ``None`` plots all categories Returns ------- fig: matplotlib figure object or None if no data to plot """ fig = plt.figure() file_id_dict = report.data["name_records"] allowed_file_id_dict = {} # only POSIX/STDIO entries allowed for module in ["POSIX", "STDIO"]: for key in ["counters", "fcounters"]: try: allowed_ids = report.records[module].to_df()[key]["id"] except KeyError: allowed_ids = [] for ident in allowed_ids: allowed_file_id_dict[ident] = file_id_dict[ident] if len(allowed_file_id_dict) == 0: # no data, likely because all records have been filtered out return None filesystem_roots = identify_filesystems(file_id_dict=allowed_file_id_dict, verbose=verbose) # NOTE: this is a bit ugly, STDIO and POSIX are both combined # automatically later in the control flow, so an API redesign # may be in order eventually default_mod = "POSIX" if "POSIX" not in report.modules: # STDIO can also be used for this figure/analysis default_mod = "STDIO" file_rd_series, file_wr_series = unique_fs_rw_counter(report=report, filesystem_roots=filesystem_roots, file_id_dict=allowed_file_id_dict, processing_func=process_unique_files, mod=default_mod, verbose=verbose) bytes_rd_series, bytes_wr_series = unique_fs_rw_counter(report=report, filesystem_roots=filesystem_roots, file_id_dict=allowed_file_id_dict, processing_func=process_byte_counts, mod=default_mod, verbose=verbose) # reverse sort by total bytes IO per category sort_inds = (bytes_rd_series + bytes_wr_series).argsort()[::-1] if num_cats is None: height = len(file_rd_series) else: height = num_cats plot_data(fig, file_rd_series.iloc[sort_inds], file_wr_series.iloc[sort_inds], bytes_rd_series.iloc[sort_inds], bytes_wr_series.iloc[sort_inds], filesystem_roots, num_cats=num_cats) # at least this much height seems to # produce a decent aspect ratio if height < 16: height = 16 # add additional padding to left margin for annotations fig.subplots_adjust(left=0.2) fig.set_size_inches(12, height) plt.close(fig) return fig