# Copyright 2024 - present The PyMC Developers # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """PyMC-ArviZ conversion code.""" import logging import warnings from collections.abc import Iterable, Mapping, Sequence from typing import ( TYPE_CHECKING, Any, Optional, Union, cast, ) import numpy as np import xarray from arviz import InferenceData, concat, rcParams from arviz.data.base import CoordSpec, DimSpec, dict_to_dataset, requires from pytensor.graph import ancestors from pytensor.tensor.sharedvar import SharedVariable from rich.progress import Console from rich.theme import Theme from xarray import Dataset import pymc from pymc.model import Model, modelcontext from pymc.progress_bar import CustomProgress, default_progress_theme from pymc.pytensorf import PointFunc, extract_obs_data from pymc.util import get_default_varnames if TYPE_CHECKING: from pymc.backends.base import MultiTrace ___all__ = [""] _log = logging.getLogger(__name__) RAISE_ON_INCOMPATIBLE_COORD_LENGTHS = False # random variable object ... Var = Any def dict_to_dataset_drop_incompatible_coords(vars_dict, *args, dims, coords, **kwargs): safe_coords = coords if not RAISE_ON_INCOMPATIBLE_COORD_LENGTHS: coords_lengths = {k: len(v) for k, v in coords.items()} for var_name, var in vars_dict.items(): # Iterate in reversed because of chain/draw batch dimensions for dim, dim_length in zip(reversed(dims.get(var_name, ())), reversed(var.shape)): coord_length = coords_lengths.get(dim, None) if (coord_length is not None) and (coord_length != dim_length): warnings.warn( f"Incompatible coordinate length of {coord_length} for dimension '{dim}' of variable '{var_name}'.\n" "This usually happens when a sliced or concatenated variable is wrapped as a `pymc.dims.Deterministic`." "The originate coordinates for this dim will not be included in the returned dataset for any of the variables. " "Instead they will default to `np.arange(var_length)` and the shorter variables will be right-padded with nan.\n" "To make this warning into an error set `pymc.backends.arviz.RAISE_ON_INCOMPATIBLE_COORD_LENGTHS` to `True`", UserWarning, ) if safe_coords is coords: safe_coords = coords.copy() safe_coords.pop(dim) coords_lengths.pop(dim) # FIXME: Would be better to drop coordinates altogether, but arviz defaults to `np.arange(var_length)` return dict_to_dataset(vars_dict, *args, dims=dims, coords=safe_coords, **kwargs) def find_observations(model: "Model") -> dict[str, Var]: """If there are observations available, return them as a dictionary.""" observations = {} for obs in model.observed_RVs: aux_obs = model.rvs_to_values.get(obs, None) if aux_obs is not None: try: obs_data = extract_obs_data(aux_obs) observations[obs.name] = obs_data except TypeError: warnings.warn(f"Could not extract data from symbolic observation {obs}") else: warnings.warn(f"No data for observation {obs}") return observations def find_constants(model: "Model") -> dict[str, Var]: """If there are constants available, return them as a dictionary.""" model_vars = model.basic_RVs + model.deterministics + model.potentials value_vars = set(model.rvs_to_values.values()) constant_data = {} for var in model.data_vars: if var in value_vars: # An observed value variable could also be part of the generative graph if var not in ancestors(model_vars): continue if isinstance(var, SharedVariable): var_value = var.get_value() else: var_value = var.data constant_data[var.name] = var_value return constant_data def coords_and_dims_for_inferencedata(model: Model) -> tuple[dict[str, Any], dict[str, Any]]: """Parse PyMC model coords and dims format to one accepted by InferenceData.""" coords = { cname: np.array(cvals) if isinstance(cvals, tuple) else cvals for cname, cvals in model.coords.items() if cvals is not None } dims = {dname: list(dvals) for dname, dvals in model.named_vars_to_dims.items()} return coords, dims class _DefaultTrace: """ Utility for collecting samples into a dictionary. Name comes from its similarity to ``defaultdict``: entries are lazily created. Parameters ---------- samples : int The number of samples that will be collected, per variable, into the trace. Attributes ---------- trace_dict : Dict[str, np.ndarray] A dictionary constituting a trace. Should be extracted after a procedure has filled the `_DefaultTrace` using the `insert()` method """ def __init__(self, samples: int): self._len: int = samples self.trace_dict: dict[str, np.ndarray] = {} def insert(self, k: str, v, idx: int): """ Insert `v` as the value of the `idx`th sample for the variable `k`. Parameters ---------- k: str Name of the variable. v: anything that can go into a numpy array (including a numpy array) The value of the `idx`th sample from variable `k` ids: int The index of the sample we are inserting into the trace. """ value_shape = np.shape(v) # initialize if necessary if k not in self.trace_dict: array_shape = (self._len, *value_shape) self.trace_dict[k] = np.empty(array_shape, dtype=np.array(v).dtype) # do the actual insertion if value_shape == (): self.trace_dict[k][idx] = v else: self.trace_dict[k][idx, :] = v class InferenceDataConverter: """Encapsulate InferenceData specific logic.""" model: Model | None = None posterior_predictive: Mapping[str, np.ndarray] | None = None predictions: Mapping[str, np.ndarray] | None = None prior: Mapping[str, np.ndarray] | None = None def __init__( self, *, trace=None, prior=None, posterior_predictive=None, log_likelihood=False, log_prior=False, predictions=None, coords: CoordSpec | None = None, dims: DimSpec | None = None, sample_dims: list | None = None, model=None, save_warmup: bool | None = None, include_transformed: bool = False, ): self.save_warmup = rcParams["data.save_warmup"] if save_warmup is None else save_warmup self.include_transformed = include_transformed self.trace = trace # this permits us to get the model from command-line argument or from with model: self.model = modelcontext(model) self.attrs = None if trace is not None: self.nchains = trace.nchains if hasattr(trace, "nchains") else 1 if hasattr(trace.report, "n_draws") and trace.report.n_draws is not None: self.ndraws = trace.report.n_draws self.attrs = { "sampling_time": trace.report.t_sampling, "tuning_steps": trace.report.n_tune, } else: self.ndraws = len(trace) if self.save_warmup: warnings.warn( "Warmup samples will be stored in posterior group and will not be" " excluded from stats and diagnostics." " Do not slice the trace manually before conversion", UserWarning, ) self.ntune = len(self.trace) - self.ndraws self.posterior_trace, self.warmup_trace = self.split_trace() else: self.nchains = self.ndraws = 0 self.prior = prior self.posterior_predictive = posterior_predictive self.log_likelihood = log_likelihood self.log_prior = log_prior self.predictions = predictions if all(elem is None for elem in (trace, predictions, posterior_predictive, prior)): raise ValueError( "When constructing InferenceData you must pass at least" " one of trace, prior, posterior_predictive or predictions." ) user_coords = {} if coords is None else coords user_dims = {} if dims is None else dims model_coords, model_dims = coords_and_dims_for_inferencedata(self.model) self.coords = {**model_coords, **user_coords} self.dims = {**model_dims, **user_dims} if sample_dims is None: sample_dims = ["chain", "draw"] self.sample_dims = sample_dims self.observations = find_observations(self.model) def split_trace(self) -> tuple[Union[None, "MultiTrace"], Union[None, "MultiTrace"]]: """Split MultiTrace object into posterior and warmup. Returns ------- trace_posterior: MultiTrace or None The slice of the trace corresponding to the posterior. If the posterior trace is empty, None is returned trace_warmup: MultiTrace or None The slice of the trace corresponding to the warmup. If the warmup trace is empty or ``save_warmup=False``, None is returned """ trace_posterior = None trace_warmup = None if self.save_warmup and self.ntune > 0: trace_warmup = self.trace[: self.ntune] if self.ndraws > 0: trace_posterior = self.trace[self.ntune :] return trace_posterior, trace_warmup @requires("trace") def posterior_to_xarray(self): """Convert the posterior to an xarray dataset.""" var_names = get_default_varnames( self.trace.varnames, include_transformed=self.include_transformed ) data = {} data_warmup = {} for var_name in var_names: if self.warmup_trace: data_warmup[var_name] = np.array( self.warmup_trace.get_values(var_name, combine=False, squeeze=False) ) if self.posterior_trace: data[var_name] = np.array( self.posterior_trace.get_values(var_name, combine=False, squeeze=False) ) return ( dict_to_dataset( data, library=pymc, coords=self.coords, dims=self.dims, attrs=self.attrs, ), dict_to_dataset( data_warmup, library=pymc, coords=self.coords, dims=self.dims, attrs=self.attrs, ), ) @requires("trace") def sample_stats_to_xarray(self): """Extract sample_stats from PyMC trace.""" data = {} rename_key = { "model_logp": "lp", "mean_tree_accept": "acceptance_rate", "depth": "tree_depth", "tree_size": "n_steps", } data = {} data_warmup = {} for stat in self.trace.stat_names: name = rename_key.get(stat, stat) if name == "tune": continue if self.warmup_trace: data_warmup[name] = np.array( self.warmup_trace.get_sampler_stats(stat, combine=False, squeeze=False) ) if self.posterior_trace: data[name] = np.array( self.posterior_trace.get_sampler_stats(stat, combine=False, squeeze=False) ) return ( dict_to_dataset( data, library=pymc, dims=None, coords=self.coords, attrs=self.attrs, ), dict_to_dataset( data_warmup, library=pymc, dims=None, coords=self.coords, attrs=self.attrs, ), ) @requires(["posterior_predictive"]) def posterior_predictive_to_xarray(self): """Convert posterior_predictive samples to xarray.""" data = self.posterior_predictive dims = {var_name: self.sample_dims + self.dims.get(var_name, []) for var_name in data} return dict_to_dataset( data, library=pymc, coords=self.coords, dims=dims, default_dims=self.sample_dims ) @requires(["predictions"]) def predictions_to_xarray(self): """Convert predictions (out of sample predictions) to xarray.""" data = self.predictions dims = {var_name: self.sample_dims + self.dims.get(var_name, []) for var_name in data} return dict_to_dataset( data, library=pymc, coords=self.coords, dims=dims, default_dims=self.sample_dims ) def priors_to_xarray(self): """Convert prior samples (and if possible prior predictive too) to xarray.""" if self.prior is None: return {"prior": None, "prior_predictive": None} if self.observations is not None: prior_predictive_vars = list(set(self.observations).intersection(self.prior)) prior_vars = [key for key in self.prior.keys() if key not in prior_predictive_vars] else: prior_vars = list(self.prior.keys()) prior_predictive_vars = None priors_dict = {} for group, var_names in zip( ("prior", "prior_predictive"), (prior_vars, prior_predictive_vars) ): priors_dict[group] = ( None if var_names is None else dict_to_dataset_drop_incompatible_coords( {k: np.expand_dims(self.prior[k], 0) for k in var_names}, library=pymc, coords=self.coords, dims=self.dims, ) ) return priors_dict @requires("observations") @requires("model") def observed_data_to_xarray(self): """Convert observed data to xarray.""" if self.predictions: return None return dict_to_dataset( self.observations, library=pymc, coords=self.coords, dims=self.dims, default_dims=[], ) @requires("model") def constant_data_to_xarray(self): """Convert constant data to xarray.""" constant_data = find_constants(self.model) if not constant_data: return None xarray_dataset = dict_to_dataset( constant_data, library=pymc, coords=self.coords, dims=self.dims, default_dims=[], ) # provisional handling of scalars in constant # data to prevent promotion to rank 1 # in the future this will be handled by arviz scalars = [var_name for var_name, value in constant_data.items() if np.ndim(value) == 0] for s in scalars: s_dim_0_name = f"{s}_dim_0" xarray_dataset = xarray_dataset.squeeze(s_dim_0_name, drop=True) return xarray_dataset def to_inference_data(self): """Convert all available data to an InferenceData object. Note that if groups can not be created (e.g., there is no `trace`, so the `posterior` and `sample_stats` can not be extracted), then the InferenceData will not have those groups. """ id_dict = { "posterior": self.posterior_to_xarray(), "sample_stats": self.sample_stats_to_xarray(), "posterior_predictive": self.posterior_predictive_to_xarray(), "predictions": self.predictions_to_xarray(), **self.priors_to_xarray(), "observed_data": self.observed_data_to_xarray(), } if self.predictions: id_dict["predictions_constant_data"] = self.constant_data_to_xarray() else: id_dict["constant_data"] = self.constant_data_to_xarray() idata = InferenceData(save_warmup=self.save_warmup, **id_dict) if self.log_likelihood: from pymc.stats.log_density import compute_log_likelihood idata = compute_log_likelihood( idata, var_names=None if self.log_likelihood is True else self.log_likelihood, extend_inferencedata=True, model=self.model, sample_dims=self.sample_dims, progressbar=False, ) if self.log_prior: from pymc.stats.log_density import compute_log_prior idata = compute_log_prior( idata, var_names=None if self.log_prior is True else self.log_prior, extend_inferencedata=True, model=self.model, sample_dims=self.sample_dims, progressbar=False, ) return idata [docs] def to_inference_data( trace: Optional["MultiTrace"] = None, *, prior: Mapping[str, Any] | None = None, posterior_predictive: Mapping[str, Any] | None = None, log_likelihood: bool | Iterable[str] = False, log_prior: bool | Iterable[str] = False, coords: CoordSpec | None = None, dims: DimSpec | None = None, sample_dims: list | None = None, model: Optional["Model"] = None, save_warmup: bool | None = None, include_transformed: bool = False, ) -> InferenceData: """Convert pymc data into an InferenceData object. All three of them are optional arguments, but at least one of ``trace``, ``prior`` and ``posterior_predictive`` must be present. For a usage example read the :ref:`Creating InferenceData section on from_pymc <creating_InferenceData>` Parameters ---------- trace : MultiTrace, optional Trace generated from MCMC sampling. Output of :func:`~pymc.sampling.sample`. prior : dict, optional Dictionary with the variable names as keys, and values numpy arrays containing prior and prior predictive samples. posterior_predictive : dict, optional Dictionary with the variable names as keys, and values numpy arrays containing posterior predictive samples. log_likelihood : bool or array_like of str, optional List of variables to calculate `log_likelihood`. Defaults to False. If set to True, computes `log_likelihood` for all observed variables. log_prior : bool or array_like of str, optional List of variables to calculate `log_prior`. Defaults to False. If set to True, computes `log_prior` for all unobserved variables. coords : dict of {str: array-like}, optional Map of coordinate names to coordinate values dims : dict of {str: list of str}, optional Map of variable names to the coordinate names to use to index its dimensions. model : Model, optional Model used to generate ``trace``. It is not necessary to pass ``model`` if in ``with`` context. save_warmup : bool, optional Save warmup iterations InferenceData object. If not defined, use default defined by the rcParams. include_transformed : bool, optional Save the transformed parameters in the InferenceData object. By default, these are not saved. Returns ------- arviz.InferenceData """ if isinstance(trace, InferenceData): return trace return InferenceDataConverter( trace=trace, prior=prior, posterior_predictive=posterior_predictive, log_likelihood=log_likelihood, log_prior=log_prior, coords=coords, dims=dims, sample_dims=sample_dims, model=model, save_warmup=save_warmup, include_transformed=include_transformed, ).to_inference_data() ### Later I could have this return ``None`` if the ``idata_orig`` argument is supplied. But ### perhaps we should have an inplace argument? [docs] def predictions_to_inference_data( predictions, posterior_trace: Optional["MultiTrace"] = None, model: Optional["Model"] = None, coords: CoordSpec | None = None, dims: DimSpec | None = None, sample_dims: list | None = None, idata_orig: InferenceData | None = None, inplace: bool = False, ) -> InferenceData: """Translate out-of-sample predictions into ``InferenceData``. Parameters ---------- predictions: Dict[str, np.ndarray] The predictions are the return value of :func:`~pymc.sample_posterior_predictive`, a dictionary of strings (variable names) to numpy ndarrays (draws). Requires the arrays to follow the convention ``chain, draw, *shape``. posterior_trace: MultiTrace This should be a trace that has been thinned appropriately for ``pymc.sample_posterior_predictive``. Specifically, any variable whose shape is a deterministic function of the shape of any predictor (explanatory, independent, etc.) variables must be *removed* from this trace. model: Model The pymc model. It can be omitted if within a model context. coords: Dict[str, array-like[Any]] Coordinates for the variables. Map from coordinate names to coordinate values. dims: Dict[str, array-like[str]] Map from variable name to ordered set of coordinate names. idata_orig: InferenceData, optional If supplied, then modify this inference data in place, adding ``predictions`` and (if available) ``predictions_constant_data`` groups. If this is not supplied, make a fresh InferenceData inplace: boolean, optional If idata_orig is supplied and inplace is True, merge the predictions into idata_orig, rather than returning a fresh InferenceData object. Returns ------- InferenceData: May be modified ``idata_orig``. """ if inplace and not idata_orig: raise ValueError( "Do not pass True for inplace unless passing an existing InferenceData as idata_orig" ) converter = InferenceDataConverter( trace=posterior_trace, predictions=predictions, model=model, coords=coords, dims=dims, sample_dims=sample_dims, log_likelihood=False, ) if hasattr(idata_orig, "posterior"): assert idata_orig is not None converter.nchains = idata_orig["posterior"].sizes["chain"] converter.ndraws = idata_orig["posterior"].sizes["draw"] else: aelem = next(iter(predictions.values())) converter.nchains, converter.ndraws = aelem.shape[:2] new_idata = converter.to_inference_data() if idata_orig is None: return new_idata elif inplace: concat([idata_orig, new_idata], dim=None, inplace=True) return idata_orig else: # if we are not returning in place, then merge the old groups into the new inference # data and return that. concat([new_idata, idata_orig], dim=None, copy=True, inplace=True) return new_idata def dataset_to_point_list( ds: xarray.Dataset | dict[str, xarray.DataArray], sample_dims: Sequence[str] ) -> tuple[list[dict[str, np.ndarray]], dict[str, Any]]: # All keys of the dataset must be a str var_names = cast(list[str], list(ds.keys())) for vn in var_names: if not isinstance(vn, str): raise ValueError(f"Variable names must be str, but dataset key {vn} is a {type(vn)}.") num_sample_dims = len(sample_dims) stacked_dims = {dim_name: ds[var_names[0]][dim_name] for dim_name in sample_dims} transposed_dict = {vn: da.transpose(*sample_dims, ...) for vn, da in ds.items()} stacked_size = np.prod(transposed_dict[var_names[0]].shape[:num_sample_dims], dtype=int) stacked_dict = { vn: da.values.reshape((stacked_size, *da.shape[num_sample_dims:])) for vn, da in transposed_dict.items() } points = [ {vn: stacked_dict[vn][i, ...] for vn in var_names} for i in range(np.prod([len(coords) for coords in stacked_dims.values()])) ] # use the list of points return cast(list[dict[str, np.ndarray]], points), stacked_dims def apply_function_over_dataset( fn: PointFunc, dataset: Dataset, *, output_var_names: Sequence[str], coords, dims, sample_dims: Sequence[str] = ("chain", "draw"), progressbar: bool = True, progressbar_theme: Theme | None = default_progress_theme, ) -> Dataset: posterior_pts, stacked_dims = dataset_to_point_list(dataset, sample_dims) n_pts = len(posterior_pts) out_dict = _DefaultTrace(n_pts) indices = range(n_pts) with CustomProgress( console=Console(theme=progressbar_theme), disable=not progressbar ) as progress: task = progress.add_task("Computing ...", total=n_pts) for idx in indices: out = fn(posterior_pts[idx]) fn.f.trust_input = True # If we arrive here the dtypes are valid for var_name, val in zip(output_var_names, out, strict=True): out_dict.insert(var_name, val, idx) progress.advance(task) progress.update(task, refresh=True, completed=n_pts) out_trace = out_dict.trace_dict for key, val in out_trace.items(): out_trace[key] = val.reshape( ( *[len(coord) for coord in stacked_dims.values()], *val.shape[1:], ) ) return dict_to_dataset( out_trace, library=pymc, dims=dims, coords=coords, default_dims=list(sample_dims), skip_event_dims=True, )
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4