mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-04-14 10:07:39 +08:00
434 lines
16 KiB
Python
Executable File
434 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
BDShot Timing Jitter Analyzer
|
|
|
|
Analyzes BDShot (Bidirectional DShot) signal timing from Saleae Logic Analyzer
|
|
CSV exports. Measures scheduling consistency and jitter in PX4 flight controller
|
|
DShot output timing.
|
|
|
|
Usage:
|
|
python bdshot_analyzer.py capture.csv
|
|
python bdshot_analyzer.py capture.csv --dshot-rate 300 --loop-rate 800
|
|
python bdshot_analyzer.py capture.csv --channel-a 0 --channel-b 1
|
|
|
|
Arguments:
|
|
input_file Path to Saleae CSV export
|
|
--dshot-rate DShot variant: 150/300/600/1200 (default: 300)
|
|
--loop-rate Expected loop rate in Hz (default: 800)
|
|
--channel-a CSV column index for first channel (default: 0)
|
|
--channel-b CSV column index for second channel (default: 1)
|
|
--no-plots Disable histogram display
|
|
--verbose Enable verbose debugging output
|
|
|
|
Capture Setup:
|
|
1. Connect Saleae to two BDShot signals from different timers
|
|
(e.g., Motor 1 from Timer1, Motor 5 from Timer2)
|
|
2. Sample rate: 24+ MS/s for DShot300
|
|
3. Export as CSV: File -> Export Raw Data -> CSV
|
|
|
|
Interpreting Results:
|
|
- Frame Intervals: Should cluster tightly around expected loop period
|
|
- Inter-Channel Gap: Measures sequential DMA scheduling overhead
|
|
- Jitter: Small std dev is typical for well-behaved systems
|
|
|
|
Dependencies: numpy, matplotlib
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
# Try to import matplotlib, but allow running without it
|
|
try:
|
|
import matplotlib.pyplot as plt
|
|
HAS_MATPLOTLIB = True
|
|
except ImportError:
|
|
HAS_MATPLOTLIB = False
|
|
|
|
|
|
# DShot bit periods in microseconds
|
|
DSHOT_BIT_PERIODS = {
|
|
150: 6.67,
|
|
300: 3.33,
|
|
600: 1.67,
|
|
1200: 0.83,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class BDShotTransaction:
|
|
"""Represents a single BDShot transaction (command + response)."""
|
|
start: float # Timestamp of first falling edge (seconds)
|
|
end: float # Timestamp of final rising edge before idle (seconds)
|
|
|
|
@property
|
|
def duration_us(self) -> float:
|
|
"""Transaction duration in microseconds."""
|
|
return (self.end - self.start) * 1_000_000
|
|
|
|
|
|
@dataclass
|
|
class ChannelStats:
|
|
"""Statistics for a single channel's frame intervals."""
|
|
channel_name: str
|
|
count: int
|
|
min_us: float
|
|
max_us: float
|
|
mean_us: float
|
|
std_us: float
|
|
expected_us: float
|
|
|
|
@property
|
|
def min_jitter_us(self) -> float:
|
|
return self.min_us - self.expected_us
|
|
|
|
@property
|
|
def max_jitter_us(self) -> float:
|
|
return self.max_us - self.expected_us
|
|
|
|
@property
|
|
def mean_jitter_us(self) -> float:
|
|
return self.mean_us - self.expected_us
|
|
|
|
|
|
@dataclass
|
|
class GapStats:
|
|
"""Statistics for inter-channel gaps."""
|
|
count: int
|
|
min_us: float
|
|
max_us: float
|
|
mean_us: float
|
|
std_us: float
|
|
|
|
|
|
def parse_csv(filepath: Path, channel_a_idx: int, channel_b_idx: int, verbose: bool = False):
|
|
"""
|
|
Parse Saleae CSV export and extract edges for each channel.
|
|
|
|
Returns:
|
|
Tuple of (edges_a, edges_b) where each is a list of (timestamp, new_state) tuples.
|
|
"""
|
|
edges_a, edges_b = [], []
|
|
prev_a, prev_b = None, None
|
|
ch_a_col, ch_b_col = channel_a_idx + 1, channel_b_idx + 1 # +1 for time column
|
|
|
|
with open(filepath, 'r', newline='') as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader)
|
|
|
|
if verbose:
|
|
print(f"CSV Header: {header}")
|
|
print(f"Using columns: time=0, ch_a={ch_a_col} ({header[ch_a_col]}), "
|
|
f"ch_b={ch_b_col} ({header[ch_b_col]})")
|
|
|
|
if ch_a_col >= len(header) or ch_b_col >= len(header):
|
|
raise ValueError(
|
|
f"Channel indices out of range. CSV has {len(header)-1} channels. "
|
|
f"Requested channel_a={channel_a_idx}, channel_b={channel_b_idx}"
|
|
)
|
|
|
|
for row_num, row in enumerate(reader, start=2):
|
|
try:
|
|
timestamp = float(row[0])
|
|
state_a, state_b = int(row[ch_a_col]), int(row[ch_b_col])
|
|
|
|
# Record edges (state changes) or initial LOW state
|
|
if (prev_a is not None and state_a != prev_a) or (prev_a is None and state_a == 0):
|
|
edges_a.append((timestamp, state_a))
|
|
if (prev_b is not None and state_b != prev_b) or (prev_b is None and state_b == 0):
|
|
edges_b.append((timestamp, state_b))
|
|
|
|
prev_a, prev_b = state_a, state_b
|
|
|
|
except (ValueError, IndexError) as e:
|
|
if verbose:
|
|
print(f"Warning: Skipping malformed row {row_num}: {e}")
|
|
|
|
if verbose:
|
|
print(f"Parsed {len(edges_a)} edges for channel A, {len(edges_b)} edges for channel B")
|
|
|
|
return edges_a, edges_b
|
|
|
|
|
|
def detect_transactions(edges, idle_threshold_us: float, verbose: bool = False):
|
|
"""
|
|
Detect BDShot transactions from edge list.
|
|
|
|
A transaction starts with a falling edge after an idle period and ends
|
|
when the line returns HIGH and stays HIGH for > idle_threshold_us.
|
|
"""
|
|
if not edges:
|
|
return []
|
|
|
|
idle_threshold_s = idle_threshold_us / 1_000_000
|
|
transactions = []
|
|
tx_start = None
|
|
last_rising = None
|
|
|
|
for i, (timestamp, state) in enumerate(edges):
|
|
if state == 0: # Falling edge - start transaction if idle
|
|
if tx_start is None and (last_rising is None or timestamp - last_rising > idle_threshold_s):
|
|
tx_start = timestamp
|
|
elif tx_start is not None: # Rising edge with active transaction
|
|
last_rising = timestamp
|
|
# Complete transaction if next edge is far away or this is the last edge
|
|
next_gap = edges[i + 1][0] - timestamp if i + 1 < len(edges) else float('inf')
|
|
if next_gap > idle_threshold_s:
|
|
transactions.append(BDShotTransaction(start=tx_start, end=timestamp))
|
|
tx_start = None
|
|
else:
|
|
last_rising = timestamp
|
|
|
|
if verbose:
|
|
print(f"Detected {len(transactions)} transactions")
|
|
if transactions:
|
|
durations = [tx.duration_us for tx in transactions]
|
|
print(f" Duration range: {min(durations):.1f} - {max(durations):.1f} us")
|
|
|
|
return transactions
|
|
|
|
|
|
def compute_channel_stats(transactions, channel_name: str, expected_interval_us: float):
|
|
"""Compute frame interval statistics for a channel."""
|
|
if len(transactions) < 2:
|
|
raise ValueError(f"Need at least 2 transactions to compute intervals, got {len(transactions)}")
|
|
|
|
intervals_us = np.diff([tx.start for tx in transactions]) * 1_000_000
|
|
|
|
stats = ChannelStats(
|
|
channel_name=channel_name,
|
|
count=len(intervals_us),
|
|
min_us=float(np.min(intervals_us)),
|
|
max_us=float(np.max(intervals_us)),
|
|
mean_us=float(np.mean(intervals_us)),
|
|
std_us=float(np.std(intervals_us)),
|
|
expected_us=expected_interval_us,
|
|
)
|
|
|
|
return stats, intervals_us
|
|
|
|
|
|
def compute_inter_channel_gaps(transactions_a, transactions_b, expected_interval_us: float,
|
|
verbose: bool = False):
|
|
"""Compute inter-channel gap statistics (CH_A end -> CH_B start)."""
|
|
gaps = []
|
|
max_reasonable_gap_us = expected_interval_us * 0.5
|
|
b_idx = 0
|
|
|
|
for tx_a in transactions_a:
|
|
# Find first CH_B transaction that starts after this CH_A ends
|
|
while b_idx < len(transactions_b) and transactions_b[b_idx].start <= tx_a.end:
|
|
b_idx += 1
|
|
if b_idx >= len(transactions_b):
|
|
break
|
|
|
|
gap_us = (transactions_b[b_idx].start - tx_a.end) * 1_000_000
|
|
if 0 < gap_us < max_reasonable_gap_us:
|
|
gaps.append(gap_us)
|
|
|
|
if verbose:
|
|
print(f"Inter-channel gap matching: {len(gaps)} matched")
|
|
|
|
if not gaps:
|
|
raise ValueError("No valid inter-channel gaps found")
|
|
|
|
gaps_arr = np.array(gaps)
|
|
stats = GapStats(
|
|
count=len(gaps_arr),
|
|
min_us=float(np.min(gaps_arr)),
|
|
max_us=float(np.max(gaps_arr)),
|
|
mean_us=float(np.mean(gaps_arr)),
|
|
std_us=float(np.std(gaps_arr)),
|
|
)
|
|
|
|
return stats, gaps_arr
|
|
|
|
|
|
def _plot_histogram(ax, data, title: str, xlabel: str, expected_value=None, stats_text=None):
|
|
"""Plot a histogram on the given axes."""
|
|
bin_count = min(100, max(20, len(data) // 100)) if np.ptp(data) > 0 else 20
|
|
ax.hist(data, bins=bin_count, edgecolor='black', alpha=0.7)
|
|
ax.set_title(title, fontsize=12, fontweight='bold')
|
|
ax.set_xlabel(xlabel, fontsize=10)
|
|
ax.set_ylabel('Count', fontsize=10)
|
|
|
|
if expected_value is not None:
|
|
ax.axvline(expected_value, color='red', linestyle='--', linewidth=2,
|
|
label=f'Expected: {expected_value:.2f} us')
|
|
ax.axvline(np.mean(data), color='green', linestyle='-', linewidth=2,
|
|
label=f'Mean: {np.mean(data):.2f} us')
|
|
|
|
if stats_text:
|
|
ax.text(0.98, 0.97, stats_text, transform=ax.transAxes, fontsize=9,
|
|
verticalalignment='top', horizontalalignment='right',
|
|
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8), family='monospace')
|
|
|
|
ax.legend(loc='upper left', fontsize=8)
|
|
ax.grid(True, alpha=0.3)
|
|
|
|
|
|
def generate_combined_histogram(intervals_a, intervals_b, gaps, stats_a, stats_b,
|
|
gap_stats, expected_interval_us: float, dshot_rate: int):
|
|
"""Generate a combined figure with all histograms."""
|
|
if not HAS_MATPLOTLIB:
|
|
return None
|
|
|
|
fig = plt.figure(figsize=(14, 10))
|
|
gs = fig.add_gridspec(2, 2, height_ratios=[1, 1], hspace=0.3, wspace=0.25)
|
|
ax_a, ax_b, ax_gap = fig.add_subplot(gs[0, 0]), fig.add_subplot(gs[0, 1]), fig.add_subplot(gs[1, :])
|
|
|
|
def stats_text(s):
|
|
return format_stats_text(s.min_us, s.max_us, s.mean_us, s.std_us, s.count)
|
|
|
|
_plot_histogram(ax_a, intervals_a, "Channel A Frame Intervals", "Interval (us)",
|
|
expected_value=expected_interval_us, stats_text=stats_text(stats_a))
|
|
_plot_histogram(ax_b, intervals_b, "Channel B Frame Intervals", "Interval (us)",
|
|
expected_value=expected_interval_us, stats_text=stats_text(stats_b))
|
|
|
|
if len(gaps) > 0:
|
|
_plot_histogram(ax_gap, gaps, "Inter-Channel Gap (CH_A End -> CH_B Start)",
|
|
"Gap (us)", stats_text=stats_text(gap_stats))
|
|
else:
|
|
ax_gap.text(0.5, 0.5, "No inter-channel gap data available",
|
|
ha='center', va='center', transform=ax_gap.transAxes)
|
|
ax_gap.set_title("Inter-Channel Gap (CH_A End -> CH_B Start)")
|
|
|
|
fig.suptitle(f"BDShot Timing Analysis (DShot{dshot_rate})", fontsize=14, fontweight='bold', y=0.98)
|
|
return fig
|
|
|
|
|
|
def format_stats_text(min_val: float, max_val: float, mean_val: float,
|
|
std_val: float, count: int) -> str:
|
|
"""Format statistics as a text block for histogram annotation."""
|
|
return (f"Count: {count}\n"
|
|
f"Min: {min_val:.2f} us\n"
|
|
f"Max: {max_val:.2f} us\n"
|
|
f"Mean: {mean_val:.2f} us\n"
|
|
f"Std: {std_val:.2f} us")
|
|
|
|
|
|
def print_report(input_file: Path, dshot_rate: int, loop_rate: float,
|
|
capture_duration: float, tx_count_a: int, tx_count_b: int,
|
|
stats_a: ChannelStats, stats_b: ChannelStats, gap_stats: GapStats):
|
|
"""Print the analysis report to console."""
|
|
bit_period = DSHOT_BIT_PERIODS.get(dshot_rate, 3.33)
|
|
expected_interval = 1_000_000 / loop_rate
|
|
|
|
print(f"""
|
|
BDShot Timing Analysis
|
|
{"=" * 60}
|
|
Input: {input_file}
|
|
DShot Rate: {dshot_rate} (bit period: {bit_period:.2f} us)
|
|
Expected Loop Rate: {loop_rate:.0f} Hz ({expected_interval:.2f} us interval)
|
|
Capture Duration: {capture_duration:.2f} s
|
|
Transactions Detected: CH_A={tx_count_a}, CH_B={tx_count_b}
|
|
""")
|
|
|
|
def print_channel_stats(name: str, s: ChannelStats):
|
|
print(f"{name} Frame Intervals")
|
|
print("-" * 40)
|
|
print(f" Count: {s.count}")
|
|
print(f" Min: {s.min_us:.2f} us ({s.min_jitter_us:+.2f} us from nominal)")
|
|
print(f" Max: {s.max_us:.2f} us ({s.max_jitter_us:+.2f} us from nominal)")
|
|
print(f" Mean: {s.mean_us:.2f} us ({s.mean_jitter_us:+.2f} us from nominal)")
|
|
print(f" Std Dev: {s.std_us:.2f} us\n")
|
|
|
|
print_channel_stats("Channel A", stats_a)
|
|
print_channel_stats("Channel B", stats_b)
|
|
|
|
print(f"""Inter-Channel Gap (CH_A End -> CH_B Start)
|
|
{"-" * 40}
|
|
Count: {gap_stats.count}
|
|
Min: {gap_stats.min_us:.2f} us
|
|
Max: {gap_stats.max_us:.2f} us
|
|
Mean: {gap_stats.mean_us:.2f} us
|
|
Std Dev: {gap_stats.std_us:.2f} us
|
|
""")
|
|
|
|
|
|
def fatal(msg: str):
|
|
"""Print error and exit."""
|
|
print(f"Error: {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze BDShot timing from Saleae Logic Analyzer CSV exports",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="Examples:\n %(prog)s capture.csv\n %(prog)s capture.csv --dshot-rate 300 --loop-rate 800"
|
|
)
|
|
parser.add_argument('input_file', type=Path, help='Path to Saleae CSV export')
|
|
parser.add_argument('--dshot-rate', type=int, default=300, choices=[150, 300, 600, 1200],
|
|
help='DShot variant (default: 300)')
|
|
parser.add_argument('--loop-rate', type=float, default=800, help='Expected loop rate in Hz (default: 800)')
|
|
parser.add_argument('--channel-a', type=int, default=0, help='CSV column index for first channel (default: 0)')
|
|
parser.add_argument('--channel-b', type=int, default=1, help='CSV column index for second channel (default: 1)')
|
|
parser.add_argument('--no-plots', action='store_true', help='Disable histogram display')
|
|
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output for debugging')
|
|
args = parser.parse_args()
|
|
|
|
if not args.input_file.exists():
|
|
fatal(f"Input file not found: {args.input_file}")
|
|
|
|
# Calculate timing parameters
|
|
bit_period_us = DSHOT_BIT_PERIODS.get(args.dshot_rate, 3.33)
|
|
idle_threshold_us = max(50.0, bit_period_us * 15) # Must exceed turnaround gap (~25-30us)
|
|
expected_interval_us = 1_000_000 / args.loop_rate
|
|
|
|
if args.verbose:
|
|
print(f"Bit period: {bit_period_us:.2f} us\nIdle threshold: {idle_threshold_us:.2f} us\n"
|
|
f"Expected interval: {expected_interval_us:.2f} us\n")
|
|
|
|
# Parse CSV
|
|
print(f"Parsing {args.input_file}...")
|
|
try:
|
|
edges_a, edges_b = parse_csv(args.input_file, args.channel_a, args.channel_b, args.verbose)
|
|
except Exception as e:
|
|
fatal(f"parsing CSV: {e}")
|
|
|
|
if not edges_a or not edges_b:
|
|
fatal("No edges found in one or both channels")
|
|
|
|
# Detect transactions
|
|
print("Detecting transactions...")
|
|
transactions_a = detect_transactions(edges_a, idle_threshold_us, args.verbose)
|
|
transactions_b = detect_transactions(edges_b, idle_threshold_us, args.verbose)
|
|
|
|
for name, txns in [("A", transactions_a), ("B", transactions_b)]:
|
|
if len(txns) < 2:
|
|
fatal(f"Insufficient transactions in channel {name} ({len(txns)})")
|
|
|
|
# Calculate capture duration
|
|
all_txns = transactions_a + transactions_b
|
|
capture_duration = max(tx.end for tx in all_txns) - min(tx.start for tx in all_txns)
|
|
|
|
# Compute statistics
|
|
print("Computing statistics...")
|
|
stats_a, intervals_a = compute_channel_stats(transactions_a, "Channel A", expected_interval_us)
|
|
stats_b, intervals_b = compute_channel_stats(transactions_b, "Channel B", expected_interval_us)
|
|
|
|
try:
|
|
gap_stats, gaps = compute_inter_channel_gaps(transactions_a, transactions_b,
|
|
expected_interval_us, args.verbose)
|
|
except ValueError as e:
|
|
print(f"Warning: {e}", file=sys.stderr)
|
|
gap_stats = GapStats(count=0, min_us=0, max_us=0, mean_us=0, std_us=0)
|
|
gaps = np.array([])
|
|
|
|
print_report(args.input_file, args.dshot_rate, args.loop_rate, capture_duration,
|
|
len(transactions_a), len(transactions_b), stats_a, stats_b, gap_stats)
|
|
|
|
if not args.no_plots and HAS_MATPLOTLIB:
|
|
generate_combined_histogram(intervals_a, intervals_b, gaps, stats_a, stats_b,
|
|
gap_stats, expected_interval_us, args.dshot_rate)
|
|
plt.show()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|