-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexample.cpp
More file actions
152 lines (132 loc) · 5.1 KB
/
Copy pathexample.cpp
File metadata and controls
152 lines (132 loc) · 5.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*************************************************************************
* 2026 - Modified by MetaX Integrated Circuits (Shanghai) Co., Ltd. All Rights Reserved.
************************************************************************/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include "mpi.h"
#include "mc_runtime.h"
#include "mccl.h"
#define BLOCK_DIM 512
#define NUM (16 * BLOCK_DIM)
#define MAX_GPU_NUM 16
#define DRAGONFLY
#define STREAM_NUM 2
#define MPICHECK(cmd) do { \
int e = cmd; \
if( e != MPI_SUCCESS ) { \
printf("Failed: MPI error %s:%d '%d'\n", \
__FILE__,__LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define MC_CHECK(stmt) \
do \
{ \
mcError_t status = (stmt); \
if (mcSuccess != status) \
{ \
fprintf(stderr, "[%s:%d] mcruntime call failed with %d \n", __FILE__, \
__LINE__, status); \
exit(1); \
} \
assert(mcSuccess == status); \
} while (0)
__global__ void vectoradd_uint32_t(uint32_t **input, uint32_t *output, uint32_t rank_num)
{
int index = threadIdx.x + blockIdx.x * blockDim.x;
for (int i = 0; i < rank_num; i++) {
output[index] += input[i][index];
}
}
static uint64_t getHostHash(const char *string) {
// Based on DJB2, result = result * 33 + char
uint64_t result = 5381;
for (int c = 0; string[c] != '\0'; c++) {
result = ((result << 5) + result) + string[c];
}
return result;
}
static void getHostName(char *hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i = 0; i < maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}
std::string HandleToString(mcIpcMemHandle_t handle)
{
char mapping[17] = "0123456789ABCDEF";
std::string result;
for (int i = 0; i < 4; i++) {
unsigned char val = (unsigned char)handle.reserved[i];
result += mapping[val / 16];
result += mapping[val % 16];
}
return result;
}
template <int kNumRanks>
__device__ __forceinline__ bool not_finished(int *task, int expected) {
auto result = false;
auto lane_id = threadIdx.x % 32;
if (lane_id < kNumRanks)
result = ld_volatile_global(task + lane_id) != expected;
return __any_sync(0xffffffff, result);
}
template <int kNumRanks>
__forceinline__ __device__ void
timeout_check(int64_t **task_fifo_ptrs, int head, int rank, int expected, int tag = 0) {
auto start_time = clock64();
while (not_finished<kNumRanks>(task_fifo_ptrs[rank] + head, expected)) {
if (clock64() - start_time > NUM_TIMEOUT_CYCLES and threadIdx.x == 0) {
printf("DeepEP timeout check failed: %d (rank = %d)\n", tag, rank);
trap();
}
}
}
template <int kNumRanks>
__forceinline__ __device__ void
barrier_device(int64_t **task_fifo_ptrs, int head, int rank, int tag = 0) {
auto thread_id = static_cast<int>(threadIdx.x);
EP_DEVICE_ASSERT(kNumRanks <= 32);
if (thread_id < kNumRanks) {
atomicAdd_system(task_fifo_ptrs[rank] + head + thread_id, FINISHED_SUM_TAG);
memory_fence();
atomicSub_system(task_fifo_ptrs[thread_id] + head + rank, FINISHED_SUM_TAG);
}
timeout_check<kNumRanks>(task_fifo_ptrs, head, rank, 0, tag);
}
int main(int argc, char *argv[]) {
int rank, nRanks, localRank = 0;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nRanks);
// calculating localRank which is used in selecting a GPU
uint64_t hostHashs[nRanks];
char hostname[1024];
getHostName(hostname, 1024);
hostHashs[myRank] = getHostHash(hostname);
MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
for (int p = 0; p < nRanks; p++) {
if (p == myRank) {
break;
}
if (hostHashs[p] == hostHashs[myRank]) {
localRank++;
}
}
mcSetDevice(localRank);
// mcSetDevice(0);
int fifo_bytes = 1024;
uint32_t *buffer_ptrs;
mcMalloc(&buffer_ptrs, fifo_bytes * sizeof(uint32_t));
MC_CHECK(mcMemset(buffer_ptrs[rank], 0, fifo_bytes));
mcDeviceSynchronize();
MPI_Finalize();
printf("Rank %d Work Done!\n", rank);
return 0;
}