Pytorch通信算子组合测试
Pytorch通信算子组合测试
- 一.背景
- 二.相关链接
- 三.遇到的问题
- 四.操作步骤
- 1.登录服务器
- 2.查看拓扑
- 3.准备测试用例
- A.准备目录
- B.用例代码
- 4.创建docker容器
- 5.查看当前pytorch版本
- 6.运行测试程序
一.背景
- 测试pytorch通信算子不同配置下的功能及性能
- 测试不同的group组合
- 测试不同的tensor:数据类型、shape[小到大,是否对齐]、不同的dim大小
- 不同的reduce op:SUM,PRODUCT,MAX,MIN,PREMUL_SUM,AVG
- async_op: True False
- 单机多卡、多机多卡(走IB、不走IB)、是否启动共享内存
- 覆盖的算子
- barrier
- all_gather
- all_to_all
- scatter
- gather
- broadcast
- send\recv
- reduce
- all_reduce
- reduce_scatter
- all_to_all_single
二.相关链接
- NCCL环境变量
- pytorch通信算子
- Writing Distributed Applications with PyTorch
三.遇到的问题
- /dev/shm空间不足: 启动docker容器时加入–shm-size=128g
四.操作步骤
1.登录服务器
2.查看拓扑
nvidia-smi topo -m
输出
GPU0 GPU1 GPU2 GPU3 GPU4 GPU5 GPU6 GPU7 CPU Affinity NUMA Affinity
GPU0 X NODE NODE NODE SYS SYS SYS SYS 0-31,64-95 0
GPU1 NODE X NODE NODE SYS SYS SYS SYS 0-31,64-95 0
GPU2 NODE NODE X NODE SYS SYS SYS SYS 0-31,64-95 0
GPU3 NODE NODE NODE X SYS SYS SYS SYS 0-31,64-95 0
GPU4 SYS SYS SYS SYS X NODE NODE NODE 32-63,96-127 1
GPU5 SYS SYS SYS SYS NODE X NODE NODE 32-63,96-127 1
GPU6 SYS SYS SYS SYS NODE NODE X NODE 32-63,96-127 1
GPU7 SYS SYS SYS SYS NODE NODE NODE X 32-63,96-127 1Legend:X = SelfSYS = Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI)NODE = Connection traversing PCIe as well as the interconnect between PCIe Host Bridges within a NUMA nodePHB = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)PXB = Connection traversing multiple PCIe bridges (without traversing the PCIe Host Bridge)PIX = Connection traversing at most a single PCIe bridgeNV# = Connection traversing a bonded set of # NVLinks
- NODE:表示连接在同一个NUMA节点内,支持直接的P2P访问,这通常是最理想的情况。
- PHB:此类连接表示穿越了PCIe Host Bridge,通常表示在同一个CPU上,有时候也支持P2P。
- PIX:表示穿越最多一个PCIe桥,也通常支持P2P,但可能不是最佳的性能。
在输出中,这三种连接方式(特别是NODE 和 PHB)都是表明您的GPU之间可以进行P2P通信的良好指示。
NODE
或者PHB
的连接表示,基本可以判断支持P2P。而SYS
和PXB
通常不支持有效的P2P访问,因为它们之间的连接比较复杂,涉及多个桥接或连接。
3.准备测试用例
A.准备目录
mkdir nccl_test
cd nccl_test
vim nccl_benchmark.py
B.用例代码
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
import os
import torch
import argparse
import torch.distributed as dist
from torch.distributed import ReduceOp
import time
import numpy as np
from sklearn.metrics import mean_squared_error
from itertools import combinations# 全局配置
backend = "nccl"
dev_type = "cuda"# 定义数据类型映射表
dtype_map = {np.float32: torch.float32,np.float64: torch.float64,np.float16: torch.float16,np.int32: torch.int32,np.int64: torch.int64,np.int16: torch.int16,np.int8: torch.int8,np.uint8: torch.uint8
}def get_torch_dtype(np_dtype):torch_dtype = dtype_map.get(np_dtype)if torch_dtype is None:raise ValueError(f"不支持的 dtype: {np_dtype}")return torch_dtypedef generate_partitions(elements):"""生成元素的所有可能划分,满足每个划分中的组至少有两个元素"""if len(elements) < 2:return []result = []first = elements[0]for i in range(2, len(elements) + 1):for subset in combinations(elements, i):if first not in subset:continuesubset = set(subset)remaining = set(elements) - subsetif len(remaining) == 0:result.append([subset])else:for rest in generate_partitions(list(remaining)):partition = [subset] + restresult.append(partition)return resultdef generate_rank_lists(world_size):"""生成所有可能的 rank 列表,满足每个列表中的组至少有两个元素"""elements = list(range(world_size))partitions = generate_partitions(elements)all_rank_lists = []for partition in partitions:if all(len(group) >= 2 for group in partition):rank_lists = [None] * world_sizefor group in partition:for rank in group:rank_lists[rank] = sorted(list(group))all_rank_lists.append(rank_lists)return all_rank_listsdef get_group(rank_list, world_size):"""根据 rank_list 获取通信组"""group_size = len(rank_list)if world_size == group_size:group = dist.group.WORLDelse:group = dist.new_group(rank_list, use_local_synchronization=True)return groupdef validate_outputs(output_tensor, expected_output, input_dtype):"""验证输出张量是否与期望输出匹配"""output_tensor_cpu = output_tensor.cpu().type(input_dtype).reshape(-1)expected_output = expected_output.cpu().type(input_dtype).reshape(-1)has_inf = torch.isinf(output_tensor_cpu).any()has_nan = torch.isnan(output_tensor_cpu).any()invalid = has_inf or has_nanif invalid:mse = 255else:mse = mean_squared_error(output_tensor_cpu, expected_output)return msedef generate_random_tensor(seed, shape, dtype, device):"""生成指定形状和数据类型的随机张量"""np.random.seed(seed)random_tensor = np.random.uniform(-3, 3, shape).astype(dtype)tensor = torch.from_numpy(random_tensor).to(device)return tensordef compute_expected_reduce_output(cpu_input_tensor, op, group_size):"""计算期望的 reduce 操作输出"""if op == dist.ReduceOp.SUM or op == dist.ReduceOp.AVG:expected_output = cpu_input_tensor * group_sizeif op == dist.ReduceOp.AVG:expected_output = expected_output / group_sizeelif op == dist.ReduceOp.PRODUCT:expected_output = cpu_input_tensor ** group_sizeelif op == dist.ReduceOp.MAX or op == dist.ReduceOp.MIN:expected_output = cpu_input_tensorelif op == dist.ReduceOp.PREMUL_SUM:expected_output = cpu_input_tensor * group_sizeelse:raise ValueError(f"不支持的 ReduceOp 操作:{op}")return expected_outputdef test_reduce_scatter(dtype, shape, device, rank, all_rank_list, world_size, iters=5,ops=[dist.ReduceOp.SUM, dist.ReduceOp.PRODUCT, dist.ReduceOp.MAX,dist.ReduceOp.MIN, dist.ReduceOp.PREMUL_SUM, dist.ReduceOp.AVG]):"""测试 reduce_scatter 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)assert shape[0] % group_size == 0, "第一个维度大小必须能被 group_size 整除"for op in ops:if op == dist.ReduceOp.PREMUL_SUM:continue # 暂不支持mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i, shape, dtype, device)cpu_input_tensor = input_tensor.cpu().float()# 将输入张量拆分为列表,长度为 group_sizeinput_list = list(input_tensor.chunk(group_size, dim=0))# 准备输出张量output_shape = [shape[0] // group_size] + list(shape[1:])output_tensor = torch.empty(output_shape, dtype=get_torch_dtype(dtype), device=device)# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.reduce_scatter(output_tensor, input_list, op=op, group=group, async_op=False)else:req = dist.reduce_scatter(output_tensor, input_list, op=op, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出cpu_input_list = list(cpu_input_tensor.chunk(group_size, dim=0))if op == dist.ReduceOp.SUM or op == dist.ReduceOp.AVG:expected_chunks = [chunk * group_size for chunk in cpu_input_list]if op == dist.ReduceOp.AVG:expected_chunks = [chunk / group_size for chunk in expected_chunks]elif op == dist.ReduceOp.PRODUCT:expected_chunks = [chunk ** group_size for chunk in cpu_input_list]elif op == dist.ReduceOp.MAX or op == dist.ReduceOp.MIN:expected_chunks = cpu_input_listelif op == dist.ReduceOp.PREMUL_SUM:expected_chunks = [chunk * group_size for chunk in cpu_input_list]else:raise ValueError(f"不支持的 ReduceOp 操作:{op}")expected_output = expected_chunks[group_rank]# 验证输出mse = validate_outputs(output_tensor, expected_output, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_reduce_scatter op:{op} dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_all_reduce(dtype, shape, device, rank, all_rank_list, world_size, iters=5,ops=[dist.ReduceOp.SUM, dist.ReduceOp.PRODUCT, dist.ReduceOp.MAX,dist.ReduceOp.MIN, dist.ReduceOp.PREMUL_SUM, dist.ReduceOp.AVG]):"""测试 all_reduce 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)for op in ops:if op == dist.ReduceOp.PREMUL_SUM:continue # 暂不支持mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i, shape, dtype, device)cpu_input_tensor = input_tensor.cpu().float()# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.all_reduce(input_tensor, op=op, group=group, async_op=False)else:req = dist.all_reduce(input_tensor, op=op, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出expected_output = compute_expected_reduce_output(cpu_input_tensor, op, group_size)# 验证输出mse = validate_outputs(input_tensor, expected_output, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_all_reduce op:{op} dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_reduce(dtype, shape, device, rank, all_rank_list, world_size, iters=1,ops=[dist.ReduceOp.SUM, dist.ReduceOp.PRODUCT, dist.ReduceOp.MAX,dist.ReduceOp.MIN, dist.ReduceOp.PREMUL_SUM, dist.ReduceOp.AVG]):"""测试 reduce 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)for op in ops:if op == dist.ReduceOp.PREMUL_SUM:continue # 暂不支持mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i, shape, dtype, device)cpu_input_tensor = input_tensor.cpu().float()# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.reduce(input_tensor, dst=rank_list[0], op=op, group=group, async_op=False)else:req = dist.reduce(input_tensor, dst=rank_list[0], op=op, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 仅在目标进程上计算期望输出if group_rank == 0:expected_output = compute_expected_reduce_output(cpu_input_tensor, op, group_size)mse = validate_outputs(input_tensor, expected_output, input_tensor.dtype)else:mse = 0mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_reduce op:{op} dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_all_gather(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 all_gather 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i + rank * 1000, shape, dtype, device)# 准备输出张量列表output_tensor_list = [torch.empty_like(input_tensor) for _ in range(group_size)]# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.all_gather(output_tensor_list, input_tensor, group=group, async_op=False)else:req = dist.all_gather(output_tensor_list, input_tensor, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出expected_tensors = []for p in rank_list:expected_tensor = generate_random_tensor(i + p * 1000, shape, dtype, device).cpu().float()expected_tensors.append(expected_tensor)expected_output = torch.stack(expected_tensors, dim=0)# 验证输出output_tensors_cpu = torch.stack([t.cpu().float() for t in output_tensor_list], dim=0)mse = validate_outputs(output_tensors_cpu, expected_output, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_all_gather dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_all_to_all(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 all_to_all 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)assert shape[0] % group_size == 0, "第一个维度大小必须能被 group_size 整除"mse_arr, duration_arr = [], []split_size = shape[0] // group_sizefor i in range(iters):input_tensor = generate_random_tensor(i + rank * 1000, shape, dtype, device)# 将输入张量拆分为输入列表input_tensor_list = list(input_tensor.chunk(group_size, dim=0))# 准备输出张量列表output_tensor_list = [torch.empty_like(input_tensor_list[0]) for _ in range(group_size)]# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.all_to_all(output_tensor_list, input_tensor_list, group=group, async_op=False)else:req = dist.all_to_all(output_tensor_list, input_tensor_list, group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出expected_chunks = []for p in rank_list:expected_tensor = generate_random_tensor(i + p * 1000, shape, dtype, device).cpu().float()expected_chunk = list(expected_tensor.chunk(group_size, dim=0))[group_rank]expected_chunks.append(expected_chunk)expected_output = torch.cat(expected_chunks, dim=0)# 验证输出output_tensor_cpu = torch.cat([t.cpu().float() for t in output_tensor_list], dim=0)mse = validate_outputs(output_tensor_cpu, expected_output, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_all_to_all dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_scatter(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 scatter 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)torch_dtype = get_torch_dtype(dtype)assert shape[0] % group_size == 0, "第一个维度大小必须能被 group_size 整除"split_size = shape[0] // group_sizemse_arr, duration_arr = [], []for i in range(iters):if group_rank == 0:input_tensor = generate_random_tensor(i, shape, dtype, device)input_tensor_list = list(input_tensor.chunk(group_size, dim=0))else:input_tensor_list = Noneoutput_tensor = torch.empty([split_size] + list(shape[1:]), dtype=torch_dtype, device=device)# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.scatter(output_tensor, scatter_list=input_tensor_list, src=rank_list[0], group=group)else:req = dist.scatter(output_tensor, scatter_list=input_tensor_list, src=rank_list[0], group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 计算期望输出expected_output = generate_random_tensor(i, shape, dtype, device).cpu().float()expected_output = list(expected_output.chunk(group_size, dim=0))[group_rank]# 验证输出mse = validate_outputs(output_tensor, expected_output, output_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3) or (group_rank == 0)if do_print:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_scatter dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_gather(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 gather 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)torch_dtype = get_torch_dtype(dtype)mse_arr, duration_arr = [], []for i in range(iters):input_tensor = generate_random_tensor(i + rank * 1000, shape, dtype, device)if group_rank == 0:gather_list = [torch.empty_like(input_tensor) for _ in range(group_size)]else:gather_list = None# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.gather(input_tensor, gather_list=gather_list, dst=rank_list[0], group=group)else:req = dist.gather(input_tensor, gather_list=gather_list, dst=rank_list[0], group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 仅在目标进程上验证输出if group_rank == 0:expected_tensors = [generate_random_tensor(i + p * 1000, shape, dtype, device).cpu().float() for p in rank_list]gathered_tensors = [t.cpu().float() for t in gather_list]mse_list = [validate_outputs(gathered, expected, input_tensor.dtype)for gathered, expected in zip(gathered_tensors, expected_tensors)]mse = max(mse_list)else:mse = 0mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)if (max_mse > 1e-3 and group_rank == 0) or group_rank == 0:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_gather dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_broadcast(dtype, shape, device, rank, all_rank_list, world_size, iters=5):"""测试 broadcast 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)torch_dtype = get_torch_dtype(dtype)mse_arr, duration_arr = [], []for i in range(iters):expected_tensor = generate_random_tensor(i, shape, dtype, device)if group_rank == 0:input_tensor = expected_tensor.clone()else:input_tensor = torch.empty(shape, dtype=torch_dtype, device=device)# 记录通信开始时间start_time = time.time()if i % 2 == 0:dist.broadcast(input_tensor, src=rank_list[0], group=group)else:req = dist.broadcast(input_tensor, src=rank_list[0], group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 验证输出mse = validate_outputs(input_tensor, expected_tensor, input_tensor.dtype)mse_arr.append(mse)max_mse = np.max(mse_arr)max_duration = np.max(duration_arr)do_print = (max_mse > 1e-3)if do_print or group_rank == 0:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration * 1000:6.2f} "f"test_broadcast dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_send_recv(dtype, shape, device, rank, all_rank_list,world_size, iters=5):"""测试 send_recv 操作"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)torch_dtype = get_torch_dtype(dtype)mse_arr = []duration_arr = []if rank not in rank_list[0:2]:returnfor i in range(iters):# 进程配对,假设将相邻的两个进程配对tag = i # 通信的标签,用于匹配发送和接收# 使用固定种子生成数据np.random.seed(i)if group_rank == 0:# 偶数Rank作为发送进程input_array = np.random.uniform(-10, 10, shape).astype(dtype)input_tensor = torch.from_numpy(input_array).to(device)#if i%3==0:# input_tensor=make_non_contiguous(input_tensor)# 记录发送开始时间start_time = time.time()if i % 2 == 0:dist.send(input_tensor, dst=rank_list[1], tag=tag)else:req = dist.isend(input_tensor, dst=rank_list[1], tag=tag)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 发送进程不需要验证结果mse_arr.append(0)else:# 奇数Rank作为接收进程output_tensor = torch.empty(shape, dtype=torch_dtype, device=device)# 记录接收开始时间start_time = time.time()if i % 2 == 0:dist.recv(output_tensor, src=rank_list[0], tag=tag)else:req = dist.irecv(output_tensor, src=rank_list[0], tag=tag)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 生成预期的张量用于比较expected_array = np.random.uniform(-10, 10, shape).astype(dtype)expected_tensor = torch.from_numpy(expected_array).cpu().float()# 将接收到的张量转换为 CPU,并转换为 float32 类型output_tensor_cpu = output_tensor.cpu().float()mse = validate_outputs(output_tensor_cpu, expected_tensor,torch_dtype)mse_arr.append(mse)# 打印结果if rank==rank_list[1]:max_mse = np.max(mse_arr)max_duration = np.max(duration_arr) print(f"mse:{max_mse:6.2e} duration(ms):{max_duration*1000:6.2f} "f"test_send_recv dtype:{dtype.__name__} shape:{shape} Rank {rank}")def test_all_to_all_single(dtype, shape, device, rank, all_rank_list,world_size, iters=5):"""这是一个针对 all_to_all_single 通信算子的单元测试函数。参数:- dtype: 数据类型- shape: 输入张量的形状- device: 设备类型(如 'cpu' 或 'cuda')- rank: 当前进程的排名- all_rank_list: 所有进程的排名列表- iters: 测试的迭代次数"""rank_list = all_rank_list[rank]group_size = len(rank_list)group = get_group(rank_list, world_size)group_rank = dist.get_rank(group)assert shape[0] % group_size == 0, "第一个维度大小必须能被 group_size 整除"mse_arr = []duration_arr = []# 计算每个子张量的大小split_size = shape[0] // group_size# 准备输入和输出的拆分大小列表input_split_sizes = [split_size for _ in range(group_size)]output_split_sizes = [split_size for _ in range(group_size)]for i in range(iters):# 为了使每个进程的输入张量不同,这里使用 (i + rank * 1000) 作为随机种子np.random.seed(i + rank * 1000)# 生成一个形状为 shape 的随机张量input_tensor = torch.from_numpy(np.random.uniform(-10, 10, shape).astype(dtype)).to(device)cpu_input_tensor = input_tensor.cpu().float()#if i%3==0:# input_tensor=make_non_contiguous(input_tensor)# 记录通信开始时间start_time = time.time()# 准备输出张量output_tensor = torch.empty_like(input_tensor)if i % 2 == 0:# 同步操作dist.all_to_all_single(output_tensor, input_tensor,output_split_sizes=output_split_sizes,input_split_sizes=input_split_sizes,group=group, async_op=False)else:# 异步操作req = dist.all_to_all_single(output_tensor, input_tensor,output_split_sizes=output_split_sizes,input_split_sizes=input_split_sizes,group=group, async_op=True)req.wait()total_time = time.time() - start_timeduration_arr.append(total_time)# 在 CPU 上模拟 all_to_all_single 操作,计算期望的输出all_cpu_tensors = []for p in rank_list:np.random.seed(i + p * 1000)cpu_tensor_p = torch.from_numpy(np.random.uniform(-10, 10, shape).astype(dtype)).cpu().float()all_cpu_tensors.append(cpu_tensor_p)# 将每个进程的输入张量在第一个维度上拆分cpu_splits = [t.split(split_size, dim=0) for t in all_cpu_tensors]# 收集当前进程应接收的子张量(来自所有其他进程的对应部分)expected_chunks = [cpu_splits[p][group_rank] for p in range(group_size)]expected_output = torch.cat(expected_chunks, dim=0).type(input_tensor.dtype)# 获取实际输出并转换到 CPUoutput_tensor_cpu = output_tensor.cpu().type(input_tensor.dtype)# 检查输出中是否存在无限大或非数字mse = validate_outputs(output_tensor_cpu, expected_output, input_tensor.dtype)mse_arr.append(mse)# 判断是否需要打印结果do_print = Falsemax_mse = np.max(mse_arr)max_duration = np.max(duration_arr)if max_mse > 1e-3:do_print = Trueif do_print or group_rank == 0:print(f"mse:{max_mse:6.2e} duration(ms):{max_duration*1000:6.2f} "f"test_all_to_all_single dtype:{dtype.__name__} shape:{shape} Rank {rank}")def main():local_rank = int(os.environ['LOCAL_RANK'])dist.init_process_group(backend=backend)if not dist.is_initialized():returnparser = argparse.ArgumentParser(description='测试通信操作')parser.add_argument('--level', type=int, default=0, help='测试级别,控制测试的张量大小和维度')args = parser.parse_args()dtype_list = [np.float32, np.float16, np.int32, np.int8, np.int64]shape_format = [[i, [8]] for i in dtype_list]if args.level > 0:shape_format += [[i, [1024, 406]] for i in dtype_list]if args.level > 1:shape_format += [[i, [8, 256, 26]] for i in dtype_list]if args.level > 2:shape_format += [[i, [8, 64, 2, 2]] for i in dtype_list]if args.level > 3:shape_format += [[i, [8, 64, 2, 2]] for i in dtype_list]if args.level > 4:shape_format += [[i, [32, 33, 34, 3, 5]] for i in dtype_list]rank = dist.get_rank()device = torch.device(dev_type, local_rank)world_size = dist.get_world_size()for rank_list in generate_rank_lists(world_size):if rank == 0:print(f"Rank lists: {rank_list}")for val in shape_format:dtype, shape = valif rank == 0:print(f"Testing dtype: {dtype.__name__}, shape: {shape}")dist.barrier()test_reduce_scatter(dtype, shape, device, rank, rank_list, world_size)test_all_reduce(dtype, shape, device, rank, rank_list, world_size)test_reduce(dtype, shape, device, rank, rank_list, world_size)test_all_gather(dtype, shape, device, rank, rank_list, world_size)test_all_to_all(dtype, shape, device, rank, rank_list, world_size)test_scatter(dtype, shape, device, rank, rank_list, world_size)test_gather(dtype, shape, device, rank, rank_list, world_size)test_broadcast(dtype, shape, device, rank, rank_list, world_size)test_send_recv(dtype, shape, device, rank, rank_list, world_size)test_all_to_all_single(dtype, shape, device, rank, rank_list, world_size)print(f"finished: Rank {rank}")if dist.is_initialized():dist.destroy_process_group()if __name__ == '__main__':main()
4.创建docker容器
docker stop nccl_test
docker rm nccl_test
docker run --gpus all --shm-size=128g -id -e NVIDIA_VISIBLE_DEVICES=all \--privileged --net=host -v $PWD:/home -w /home \--name=nccl_test nvcr.io/nvidia/pytorch:23.07-py3 /bin/bash
docker start nccl_test
docker exec -ti nccl_test bash
5.查看当前pytorch版本
pip list | grep -w torch
输出
torch 2.1.0a0+b5021ba
6.运行测试程序
export NCCL_SOCKET_IFNAME=lo
export NCCL_IB_DISABLE=1
export NCCL_P2P_DISABLE=1
export NCCL_SHM_DISABLE=1
torchrun -m --nnodes=1 --nproc_per_node=4 --node_rank=0 \--master_addr=127.0.0.1 --master_port=12355 nccl_benchmark --level=0 2>&1 | tee log
cat log | grep "mse" | awk '{print $1}' | sed "s/mse://g" | sort | uniq
相关文章:
Pytorch通信算子组合测试
Pytorch通信算子组合测试 一.背景二.相关链接三.遇到的问题四.操作步骤1.登录服务器2.查看拓扑3.准备测试用例A.准备目录B.用例代码 4.创建docker容器5.查看当前pytorch版本6.运行测试程序 一.背景 测试pytorch通信算子不同配置下的功能及性能测试不同的group组合测试不同的te…...
Android Dex VMP 动态加载加密指令流
版权归作者所有,如有转发,请注明文章出处:https://cyrus-studio.github.io/blog/ 上一篇【详解如何自定义 Android Dex VMP 保护壳】实现了 VMP 保护壳。 为了进一步加强对 dex 指令的保护,实现指令流加密和动态加载,…...
深度学习blog-剪枝和知识蒸馏
深度学习网络模型从卷积层到全连接层存在着大量冗余的参数,大量神经元激活值趋近于0,将这些神经元去除后可以表现出同样的模型表达能力,这种情况被称为过参数化。因此需要一些技术手段减少模型的复杂性,去除一些不重要的参数和连接…...
13:00面试,13:08就出来了,问的问题有点变态。。。
从小厂出来,没想到在另一家公司又寄了。 到这家公司开始上班,加班是每天必不可少的,看在钱给的比较多的份上,就不太计较了。没想到9月一纸通知,所有人不准加班,加班费不仅没有了,薪资还要降40%…...
机器学习笔记合集
大家好,这里是好评笔记,公主 号:Goodnote。本笔记的任务是解读机器学习实践/面试过程中可能会用到的知识点,内容通俗易懂,入门、实习和校招轻松搞定。 笔记介绍 本笔记的任务是解读机器学习实践/面试过程中可能会用到…...
七 rk3568 android 11 ec20 4G驱动移植
一 内核驱动集成 参考:Quectel_LTE&5G_Linux_USB_Driver_V1.0.zip EC20 内核驱动有两个版本 ,一个是 qmi_wwan, 一个是 GOBNet , 这里用的是 qmi_wwan版本 1.1 添加 USBNET 驱动文件 将驱动包里的 qmi_wwan_q.c 拷到 kernel/driver/net/usb/ 下 修改 kernel/dr…...
【Elasticsearch7.11】postman批量导入少量数据
JSON 文件内的数据格式,json文件数据条数不要过多,会请求参数过大,最好控制再10000以内。 {"index":{"_id":"baec07466732902d22a24ba01ff09751"}} {"uuid":"baec07466732902d22a24ba01ff0975…...
NLP三大特征抽取器:CNN、RNN与Transformer全面解析
引言 自然语言处理(NLP)领域的快速发展离不开深度学习技术的推动。随着应用需求的不断增加,如何高效地从文本中抽取特征成为NLP研究中的核心问题。深度学习中三大主要特征抽取器——卷积神经网络(Convolutional Neural Network, …...
45_Lua模块与包
Lua中的模块系统是该语言的一个重要特性,它允许开发者将代码分割成更小、更易于管理的部分。通过使用模块,你可以创建可重用的代码片段,并且可以降低代码间的耦合度。下面我将详细介绍Lua模块的基本概念、语法以及一些实际案例。 1.Lua模块 1.1 模块的基本概念 从Lua 5.1…...
软定时器的原理与创建
目录 问题概述 设计原理 设计实现 一个任务来管理所有在指定的时间、以特定的周期触发某种操作的定时需求。 问题概述 在实际应用中,常常需要周期性或者在指定时间做一件事情。 周期性:在指定的延时开始做某件事情,然后周期性重复执行 一次性…...
【自动化测试】—— Appium安装配置保姆教程(图文详解)
目录 一. 环境准备 二. JDK安装 1. 下载JDK 2. 安装JDK 3. 配置环境 4. 验证安装 三. Android SDK安装 1. 下载Android SDK 2. 安装Android SDK 3. 安装工具 4. 配置环境 5. 验证安装 四. NodeJS安装 1. 下载NodeJS 2. 安装NodeJS 3. 验证安装 4. 安装淘宝镜像…...
穿越火线怀旧服预约网页vue3版本
源码下载地址: https://github.com/superBiuBiuMan/crossfire-old-vue3版权来自穿越火线,项目仅供参考学习!!! 效果 源码下载地址: https://github.com/superBiuBiuMan/crossfire-old-vue3预览地址: https://crossfire.123916.xyz/官网效果: https://www.cfhuodong.com/2025-…...
《Keras3从头开始的图像分类》
Keras3从头开始的图像分类 作者:fchollet创建日期:2020/04/27最后修改时间:2023/11/09描述:在 Kaggle Cats vs Dogs 数据集上从头开始训练图像分类器。 (i) 此示例使用 Keras 3 在 Colab 中查看 • GitHub…...
Apache Hop从入门到精通 第三课 Apache Hop下载安装
1、下载 官方下载地址:https://hop.apache.org/download/,本教程是基于apache-hop-client-2.11.0.zip进行解压,需要jdk17,小伙伴们可以根据自己的需求下载相应的版本。如下图所示 2、下载jdk17(https://www.microsoft…...
Vue.js组件开发-图片剪裁性能优化最佳方案实例
在Vue.js组件开发中,优化图片剪裁性能的最佳方案通常涉及多个方面的综合考虑。以下是一个结合多个优化策略的图片剪裁组件性能优化实例: 1. 组件设计 首先,设计一个简洁且高效的图片剪裁组件,确保其功能明确且易于使用。组件应包…...
React - router的使用 结合react-redux的路由守卫
web端使用路由安装的是 react-router-dom "react-router-dom": "^5.2.0"在组件中使用路由,我们先设置2个路由,分别是首页、关于 // src/components/RouteSample.jsimport React from react; // 引入路由需要的基础模块 import {Bro…...
day09_kafka高级
文章目录 kafka高级今日课程内容核心概念整理Kafka的数据位移offset**为什么 Kafka 的 offset 就像是“书签”?****实际意义** Kafka的基准/压力测试测试生产的效率测试消费的效率 Kafka的分片与副本机制kafka如何保证数据不丢失生产者端Broker端消费者端相关参数 K…...
【MT32F006】MT32F006之通信协议
本文最后修改时间:2025年01月09日 一、本节简介 本文介绍如何使用MT32F006写一个通信协议。 二、实验平台 库版本:V1.0.0 编译软件:MDK5.37 硬件平台:MT32F006开发板(主芯片MT32F006) 仿真器ÿ…...
CMake学习笔记(2)
1. 嵌套的CMake 如果项目很大,或者项目中有很多的源码目录,在通过CMake管理项目的时候如果只使用一个CMakeLists.txt,那么这个文件相对会比较复杂,有一种化繁为简的方式就是给每个源码目录都添加一个CMakeLists.txt文件ÿ…...
访客机的四个功能
访客机,也被称为访客自动登记安全管理系统或访客一体机,是现代安全管理中不可或缺的一部分。它通过整合计算机技术、射频识别技术、指纹生物识别、触摸屏手写技术、文字识别(OCR)技术、热敏打印技术、条码技术、数码摄像技术、自动…...
【Linux系统】—— vim 的使用
【Linux系统】—— vim 的使用 1 vim 的基本概念2 vim 的多模式3 命令模式下的命令集3.1 进入/退出其他模式3.2 光标移动命令集3.3 复制/剪切/粘贴/删除命令集3.4 撤销命令集3.5 查找命令集3.6 替换命令集3.7 进入与退出替换模式 4 批量化编译5 底行模式6 vim 小技巧7 vim简单配…...
华为C语言编程规范总结
1.头文件更改会导致所有直接或间接包含该头文件的的C文件重新编译,会增加大量编译工作量,延长编译时间,因此: 1.1 头文件里尽量少包含头文件 1.2 头文件应向稳定的方向包含 2.每一个.c文件应有一个同名.h文件,…...
深入学习 Python 量化编程
深入学习 Python 量化编程 第一章:Python 基础与量化编程环境搭建 1.1 安装必要的库 首先,你需要安装一些在量化编程中常用的 Python 库。可以通过以下命令安装这些库: pip install numpy pandas matplotlib yfinance backtrader scikit-…...
初识Java3
目录 一.面向对象与面向过程编程区别 二.类 1.类的定义 2.类一般格式 3.类的实例化具体对象 4.this的使用(习惯经常用) 5.this引用 三.对象 1.初始化对象方法 2.构造方法 四.封装 1.封装: 2.拓展“包” (1).包概念 (…...
uniapp 微信小程序内嵌h5实时通信
描述: 小程序webview内嵌的h5需要向小程序实时发送消息,有人说postMessage可以实现,所以试验一下,结果是实现不了实时,只能在特定时机后退、组件销毁、分享时小程序才能接收到信息(小程序为了安全等考虑做了…...
Blazor开发复杂信息管理系统的优势
随着现代企业信息管理需求的不断提升,开发高效、易维护、可扩展的系统变得尤为重要。在这个过程中,Blazor作为一种新兴的Web开发框架,因其独特的优势,逐渐成为开发复杂信息管理系统的首选技术之一。本文将结合Blazor在开发复杂信息…...
【微服务】面试题 5、分布式系统理论:CAP 与 BASE 详解
分布式系统理论:CAP 与 BASE 详解 一、CAP 定理 背景与定义:1998 年由加州大学科学家埃里克布鲁尔提出,分布式系统存在一致性(Consistency)、可用性(Availability)、分区容错性(Part…...
<论文>时序大模型如何应用于金融领域?
一、摘要 本文介绍2024年的论文《Financial Fine-tuning a Large Time Series Model》,论文探索了主流的时间序列大模型在金融领域的微调应用实践,为时序大模型的领域微调提供了参考。 译文: 大型模型在自然语言处理、图像生成以及近期的时间…...
Oracle 表分区简介
目录 一. 前置知识1.1 什么是表分区1.2 表分区的优势1.3 表分区的使用条件 二. 表分区的方法2.1 范围分区(Range Partitioning)2.2 列表分区(List Partitioning)2.3 哈希分区(Hash Partitioning)2.4 复合分…...
安卓硬件加速hwui
安卓硬件加速 本文基于安卓11。 从 Android 3.0 (API 级别 11) 开始,Android 2D 渲染管道支持硬件加速,这意味着在 View 的画布上执行的所有绘图操作都使用 GPU。由于启用硬件加速所需的资源增加,你的应用程序将消耗更多内存。 软件绘制&am…...
【Bluedroid】HFP连接流程源码分析(二)
接上一篇【Bluedroid】HFP连接流程源码分析(一)-CSDN博客分析。本篇主要围绕RFCOMM Connect 与 RFCOMM UA Frame 的处理流程来展开分析。 RFCOMM Connect RFCOMM(Radio Frequency Communication)作为蓝牙协议栈的关键部分&#…...
基于文件系统分布式锁原理
分布式锁:在一个公共的存储服务上打上一个标记,如Redis的setnx命令,是先到先得方式获得锁,ZooKeeper有点像下面的demo,比较大小的方式判决谁获得锁。 package com.ldj.mybatisflex.demo;import java.util.*; import java.util.co…...
java语法知识(二)
1. class文件可以直接拖动到idea中,显示源码。 2.idea快捷键: sout : System.out.println 输出内容.sout :---》 System.out.println(输出内容); psvm: public static void main() 格式化:ctrl altL 复制粘贴:ctrld 3.注释…...
基于Piquasso的光量子计算机的模拟与编程
一、引言 在科技飞速发展的当下,量子计算作为前沿领域,正以前所未有的态势蓬勃崛起。它凭借独特的量子力学原理,为解决诸多经典计算难以攻克的复杂问题提供了全新路径。从优化物流配送网络,以实现资源高效调配,到药物分子结构的精准模拟,加速新药研发进程;从金融风险的…...
导出文件,能够导出但是文件打不开
背景: 在项目开发中,对于列表的查询,而后会有导出功能,这里导出的是一个excell表格。实现了两种,1.导出的文件,命名是前端传输过去的;2.导出的文件,命名是根据后端返回的文件名获取的…...
【动手学电机驱动】STM32-FOC(4)STM32之UART 串口通信
STM32-FOC(1)STM32 电机控制的软件开发环境 STM32-FOC(2)STM32 导入和创建项目 STM32-FOC(3)STM32 三路互补 PWM 输出 STM32-FOC(4)STM32之UART 串口通信 STM32-FOC(6&am…...
RabbitMQ 高可用方案:原理、构建与运维全解析
文章目录 前言:1 集群方案的原理2 RabbitMQ高可用集群相关概念2.1 设计集群的目的2.2 集群配置方式2.3 节点类型 3 集群架构3.1 为什么使用集群3.2 集群的特点3.3 集群异常处理3.4 普通集群模式3.5 镜像集群模式 前言: 在实际生产中,RabbitM…...
Center Loss 和 ArcFace Loss 笔记
一、Center Loss 1. 定义 Center Loss 旨在最小化类内特征的离散程度,通过约束样本特征与其类别中心之间的距离,提高类内特征的聚合性。 2. 公式 对于样本 xi 和其类别yi,Center Loss 的公式为: xi: 当前样本的特征向量&…...
深度解读微软Speech服务:让语音识别走进现实
大家好,今天我们来探讨一个激动人心的技术话题:微软的语音识别服务如何为我们提供强大的语音识别解决方案,特别是在电话录音中识别出不同的说话人。 场景描绘 想象一下,你有一段电话录音,并需要将其中的多个说话人区分…...
第21篇 基于ARM A9处理器用汇编语言实现中断<三>
Q:怎样编写ARM A9处理器汇编语言代码配置按键端口产生中断? A:使用Intel Monitor Program创建中断程序时,Linker Section Presets下拉菜单中需选择Exceptions。主程序在.vectors代码段为ARM处理器设置异常向量表,在…...
专题 - STM32
基础 基础知识 STM所有产品线(列举型号): STM产品的3内核架构(列举ARM芯片架构): STM32的3开发方式: STM32的5开发工具和套件: 若要在电脑上直接硬件级调试STM32设备,则…...
极客说|Azure AI Agent Service 结合 AutoGen/Semantic Kernel 构建多智能体解决⽅案
作者:卢建晖 - 微软高级云技术布道师 「极客说」 是一档专注 AI 时代开发者分享的专栏,我们邀请来自微软以及技术社区专家,带来最前沿的技术干货与实践经验。在这里,您将看到深度教程、最佳实践和创新解决方案。关注「极客说」&am…...
【C++指南】模板 深度解析
💓 博客主页:倔强的石头的CSDN主页 📝Gitee主页:倔强的石头的gitee主页 ⏩ 文章专栏:《C指南》 期待您的关注 目录 1. 引言 2. 模板的基本概念 3. 函数模板 3.1 定义和语法 3.2 函数模板实例化 3.3 隐式实例化 …...
【traefik】forwadAuth中间件跨namespace请求的问题
前情提要 - fowardAuth鉴权中间件的使用: 【traefik】使用forwardAuth中间件做网关层的全局鉴权 1. 问题 我的 traefik-ingress-controller 所在 namespace: traefik 业务服务所在 namespace: apps 路由与 forwardAuth 中间件配置如下: # 路由 apiV…...
【25考研】西南交通大学软件工程复试攻略!
一、复试内容 复试对考生的既往学业情况、外语听说交流能力、专业素质和科研创新能力,以及综合素质和一贯表现等进行全面考查,主要考核内容包括思想政治素质和道德品质、外语听说能力、专业素质和能力,综合素质及能力。考核由上机考试和面试两部分组成&a…...
在 Safari 浏览器中,快速将页面恢复到 100% 缩放(也就是默认尺寸)Command (⌘) + 0 (零)
在 Safari 浏览器中,没有一个专门的快捷键可以将页面恢复到默认的缩放比例。 但是,你可以使用以下两种方法快速将页面恢复到 100% 缩放(也就是默认尺寸): 方法一:使用快捷键 (最常用) Command (⌘) 0 (零…...
linux的大内核锁与顺序锁
大内核锁 Linux大内核锁(Big Kernel Lock,BKL)是Linux内核中的一种锁机制,用于保护内核资源,以下是关于它的详细介绍: 概念与作用 大内核锁是一种全局的互斥锁,在同一时刻只允许一个进程访问…...
CVE-2025-22777 (CVSS 9.8):WordPress | GiveWP 插件的严重漏洞
漏洞描述 GiveWP 插件中发现了一个严重漏洞,该插件是 WordPress 最广泛使用的在线捐赠和筹款工具之一。该漏洞的编号为 CVE-2025-22777,CVSS 评分为 9.8,表明其严重性。 GiveWP 插件拥有超过 100,000 个活跃安装,为全球无数捐赠平…...
牛客周赛 Round 76题解
小红出题 思路:我们发现,每七天可以获得15元,那么我们可以对7取模,看能有多少7的倍数,然后剩下的就是看是否超过5,超过5就直接15,否则加上天数*3 #include<bits/stdc.h> using namespace…...
【ARM】MDK如何将变量存储到指定内存地址
1、 文档目标 通过MDK的工程配置,将指定的变量存储到指定的内存地址上。 2、 问题场景 在项目工程的开发过程中,对于flash要进行分区,需要规划出一个特定的内存区域来存储变量。 3、软硬件环境 1)、软件版本:MDK 5.…...