DeepXTrace is a lightweight system tool designed to efficiently and precisely locate slow ranks in DeepEP-based environments by enhancing the DeepEP communication library. It is composed of two core components: DeepEP Metrics Probe and DeepXTrace Metrics Analysis.
DeepXTrace supports diagnosis of various slowdown scenarios, including:
A low-overhead module for measuring critical diagnostic indicators during DeepEP communication. See also: DeepEP Diagnose PR.
DeepXTrace-Metrics-AnalysisAn analysis module that locates the slow rank issues by processing the collected metrics.
python setup.py bdist_wheelExample use in DeepEP Low-Latency (LL) mode
from deep_ep import Buffer from deepxtrace import diagnose as ds _buffer: Optional[Buffer] = None _diagnose: Optional[ds.Diagnose] = None def get_buffer(group: dist.ProcessGroup, num_max_dispatch_tokens_per_rank: int, hidden: int, num_experts: int) -> Buffer: global _buffer num_rdma_bytes = Buffer.get_low_latency_rdma_size_hint(num_max_dispatch_tokens_per_rank, hidden, group.size(), num_experts) if _buffer is None or _buffer.group != group or not _buffer.low_latency_mode or _buffer.num_rdma_bytes < num_rdma_bytes: assert num_experts % group.size() == 0 _buffer = Buffer(group, 0, num_rdma_bytes, low_latency_mode=True, num_qps_per_rank=num_experts // group.size()) return _buffer # Initialize the diagnostic instance. def get_diagnose(group: dist.ProcessGroup, enable_async: bool) -> ds.Diagnose: global _diagnose if _diagnose is None or _diagnose.group != group: _diagnose = ds.Diagnose(group = group, enable_async = enable_async) # Start the asynchronous diagnosis thread which will periodically perform diagnosis. if enable_async: _diagnose.start_async_diagnose() return _diagnose # An example of synchronous diagnostic mode. def diagnose_deepep_sync_mode(hidden_states: torch.Tensor, topk_idx: torch.Tensor, num_max_dispatch_tokens_per_rank: int, num_experts: int, group: dist.ProcessGroup): global _diagnose # get the diagnose object _diagnose = get_diagnose(group = group, enable_async = False) # Get the LL dispatch stats tensor. dispatch_wait_recv_cost_stats = _diagnose.get_stats_ll_stats_tensor()[0] _buffer.low_latency_dispatch(hidden_states, topk_idx, num_max_dispatch_tokens_per_rank, num_experts, dispatch_wait_recv_cost_stats=dispatch_wait_recv_cost_stats, use_fp8=True) # Get the LL combine stats tensor. combine_wait_recv_cost_stats = _diagnose.get_stats_ll_stats_tensor()[1] _buffer.low_latency_combine(hidden_states, topk_idx, topk_weights, handle, use_logfmt=use_logfmt, combine_wait_recv_cost_stats=combine_wait_recv_cost_stats) # Perform synchronous diagnosis for low latency (LL) DeepEP mode. # Set to perform a diagnosis every 100 steps. diagnose_res = _diagnose.diagnose_ll_sync(diagnose_step = 100) # Note: diagnosis results will be gathered to rank0. if rank == 0: print(diagnose_res) # An example of asynchronous diagnostic mode. def diagnose_deepep_async_mode(hidden_states: torch.Tensor, topk_idx: torch.Tensor, num_max_dispatch_tokens_per_rank: int, num_experts: int, group: dist.ProcessGroup): global _diagnose # Note: In asynchronous mode, the diagnostic results will be periodically output in the # background diagnostic thread of rank0. # Get the diagnose object. _diagnose = get_diagnose(group = group, enable_async = True) # Get the LL dispatch stats tensor. dispatch_wait_recv_cost_stats = _diagnose.get_stats_ll_stats_tensor()[0] _buffer.low_latency_dispatch(hidden_states, topk_idx, num_max_dispatch_tokens_per_rank, num_experts, dispatch_wait_recv_cost_stats=dispatch_wait_recv_cost_stats, use_fp8=True) # Get the LL combine stats tensor. combine_wait_recv_cost_stats = _diagnose.get_stats_ll_stats_tensor()[1] _buffer.low_latency_combine(hidden_states, topk_idx, topk_weights, handle, use_logfmt=use_logfmt, combine_wait_recv_cost_stats=combine_wait_recv_cost_stats)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4