This commit is contained in:
vpatuta 2023-11-19 13:05:38 +01:00
commit 64ae6749d1
9 changed files with 714 additions and 0 deletions

28
LICENSE Normal file
View File

@ -0,0 +1,28 @@
BSD 3-Clause License
Copyright (c) 2023, Martin Hart
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

19
Makefile Normal file
View File

@ -0,0 +1,19 @@
TARGET := smash-MPI.so
CC := mpicc
CFLAGS := -std=gnu99 -fPIC -Wall -Wextra -c -I.
LIB := -ldl
SRC := $(wildcard *.c)
OBJ := $(SRC:.c=.o)
.DEFAULT_GOAL := $(TARGET)
.PHONY: clean
$(TARGET): $(OBJ)
$(CC) $(LIB) -shared $? -o $@
%.o: %.c
$(CC) $(CFLAGS) $<
clean:
@rm -f $(OBJ) $(TARGET)

1
README.md Normal file
View File

@ -0,0 +1 @@
# smash-MPI

121
callout.c Normal file
View File

@ -0,0 +1,121 @@
#include "callout.h"
#include <err.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <semaphore.h>
#include <time.h>
struct callo callout[NCALL] = { 0 };
extern timer_t smash_timer_id;
void
smash_print_callout(void)
{
size_t i;
for (i = 0; callout[i].c_func != NULL; ++i)
printf("c_time: %lli\nc_arg: %i\nc_func @ %p\n\n",
callout[i].c_time, callout[i].c_arg,
(void *)callout[i].c_func);
}
void
smash_set_timer(int time)
{
int t, s, ns;
struct itimerspec curr;
timer_gettime(smash_timer_id, &curr);
if (time == -1 || (curr.it_value.tv_nsec == 0 && curr.it_value.tv_sec == 0)) {
t = time == -1 ? callout[0].c_time : time;
s = t / 1000;
ns = (t % 1000) * 1000000;
if (s == 0 && ns == 0)
return;
struct itimerspec its;
its.it_value.tv_sec = s;
its.it_value.tv_nsec = ns;
its.it_interval.tv_sec = 0;
its.it_interval.tv_nsec = 0;
if (timer_settime(smash_timer_id, 0, &its, NULL) < 0)
err(1, "timer_settime");
}
}
sem_t *
smash_timeout(int (*func)(), int arg, int time, struct mpi_send_args *args)
{
int t;
struct callo *p1, *p2;
t = time;
p1 = &callout[0];
while (p1->c_func != 0 && p1->c_time <= t) {
t -= p1->c_time;
p1++;
}
p1->c_time -= t;
p2 = p1;
while (p2->c_func != 0)
p2++;
while (p2 >= p1) {
(p2+1)->c_time = p2->c_time;
(p2+1)->c_func = p2->c_func;
(p2+1)->c_arg = p2->c_arg;
(p2+1)->c_send_args = p2->c_send_args;
p2--;
}
p1->c_time = t;
p1->c_func = func;
p1->c_arg = arg;
if (args != NULL)
memcpy(&p1->c_send_args, args, sizeof(struct mpi_send_args));
smash_set_timer(t);
return &p1->c_lock;
}
void
smash_clock(void)
{
struct callo *p1, *p2;
if (callout[0].c_func != 0) {
p1 = p2 = &callout[0];
p1->c_time = 0;
while (p1->c_func != 0 && p1->c_time == 0) {
switch (p1->c_arg) {
case 6:
p1->c_func(p1->c_send_args.buf, p1->c_send_args.count,
p1->c_send_args.datatype, p1->c_send_args.dest,
p1->c_send_args.tag, p1->c_send_args.comm);
free(p1->c_send_args.buf);
sem_post(&p1->c_lock);
break;
case 0:
p1->c_func();
break;
}
p1++;
}
while ((p2->c_func = p1->c_func)) {
p2->c_time = p1->c_time;
p2->c_arg = p1->c_arg;
p2->c_send_args = p1->c_send_args;
p1++;
p2++;
}
smash_set_timer(-1);
}
}

31
callout.h Normal file
View File

@ -0,0 +1,31 @@
#ifndef CALLOUT_H
#define CALLOUT_H
#define NCALL 256
#include <mpi.h>
#include <semaphore.h>
struct mpi_send_args {
void *buf;
MPI_Comm comm;
MPI_Datatype datatype;
int count, dest, tag;
};
struct callo {
long long c_time; /* incremental time */
int c_arg; /* argument to routine */
int (*c_func)(); /* routine */
sem_t c_lock; /* lock to preserve locking semantic */
struct mpi_send_args c_send_args; /* args for MPI_Send */
};
void smash_print_callout(void);
sem_t *smash_timeout(int (*func)(), int arg, int time,
struct mpi_send_args *args);
void smash_clock(void);
#endif /* CALLOUT_H */

310
hooking.c Normal file
View File

@ -0,0 +1,310 @@
#include <dlfcn.h>
#include <err.h>
#include <gnu/lib-names.h>
#include <mpi.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <time.h>
#include "callout.h"
#include "hooking.h"
#include "parser.h"
#define SMASH_GRAPH 0x1234
timer_t smash_timer_id;
unsigned int smash_my_rank;
int smash_dead, smash_world_size, smash_alarm;
struct cfg_delays *smash_delays;
struct cfg_failures *smash_failures;
struct smash_graph_msg {
int src, dst;
};
struct smash_graph_msgs {
size_t i;
struct smash_graph_msg msgs[4096];
} smash_graph_msgs;
static int master_done = 0;
int
smash_failure(void)
{
int buf;
MPI_Status status;
size_t recv = 0;
int (*f)();
smash_dead = 1;
f = smash_get_lib_func(LIBMPI, "MPI_Recv");
while (recv != smash_world_size - smash_failures->size) {
f(&buf, 1, MPI_INT, MPI_ANY_SOURCE, 0xdead, MPI_COMM_WORLD, &status);
recv++;
}
MPI_Finalize();
exit(0);
}
int
MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag,
MPI_Comm comm, MPI_Status *status) {
int (*f)(), res;
f = smash_get_lib_func(LIBMPI, "MPI_Recv");
while (1) {
res = f(buf, count, datatype, source, tag, comm, status);
if (status->MPI_TAG != 0xdead || status->MPI_TAG != SMASH_GRAPH)
break;
bzero(status, sizeof(MPI_Status));
master_done = status->MPI_TAG == SMASH_GRAPH;
}
smash_graph_msgs.msgs[smash_graph_msgs.i].src = status->MPI_SOURCE;
smash_graph_msgs.msgs[smash_graph_msgs.i].dst = smash_my_rank;
smash_graph_msgs.i++;
return res;
}
void *
smash_get_lib_func(const char *lname, const char *fname)
{
void *lib, *p;
if (!(lib = dlopen(lname, RTLD_LAZY)))
errx(EXIT_FAILURE, "%s", dlerror());
if (!(p = dlsym(lib, fname)))
errx(EXIT_FAILURE, "%s", dlerror());
dlclose(lib);
return p;
}
static void
smash_handler(__attribute__((unused)) int signum)
{
smash_clock();
}
timer_t
smash_setup_alarm(void)
{
timer_t timerid;
struct sigaction sa;
struct sigevent sev;
sa.sa_handler = smash_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
sigaction(SIGALRM, &sa, NULL);
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = SIGALRM;
sev.sigev_value.sival_ptr = &timerid;
if (timer_create(CLOCK_REALTIME, &sev, &timerid) < 0)
errx(1, "timer_create");
return timerid;
}
int
__libc_start_main(
int (*main)(int, char **, char **),
int argc,
char **argv,
int (*init)(int, char **, char **),
void (*fini)(void),
void (*rtld_fini)(void),
void *stack_end)
{
int (*f)();
if (smash_parse_cfg(CFG_DELAY, (void **)&smash_delays) < 0)
errx(EXIT_FAILURE, "error in CFG_DELAY\n");
if (smash_parse_cfg(CFG_FAILURE, (void **)&smash_failures) < 0)
errx(EXIT_FAILURE, "error in CFG_FAILURE\n");
f = smash_get_lib_func(LIBSTD, "__libc_start_main");
smash_alarm = 0;
smash_dead = 0;
return f(main, argc, argv, init, fini, rtld_fini, stack_end);
}
int
MPI_Init(int *argc, char ***argv)
{
unsigned int i;
int (*f)(int *, char ***), res, rank;
if (!smash_alarm) {
smash_timer_id = smash_setup_alarm();
smash_alarm = 1;
}
smash_graph_msgs.i = 0;
f = smash_get_lib_func(LIBMPI, "MPI_Init");
res = f(argc, argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
smash_my_rank = rank;
MPI_Comm_size(MPI_COMM_WORLD, &smash_world_size);
if (smash_failures != NULL) {
for (i = 0; i < smash_failures->size; ++i) {
if (smash_failures->failures[i].node == smash_my_rank) {
smash_timeout(smash_failure, 0, smash_failures->failures[i].time, NULL);
}
}
}
return res;
}
int
save_graph(struct smash_graph_msgs *m)
{
FILE *fs;
size_t i;
char *filepath;
filepath = getenv("SMASH_MPI_GRAPH");
if (!filepath)
filepath = "graph.dot";
if (!(fs = fopen(filepath, "w+")))
return -1;
fprintf(fs, "digraph SMASH_MPI {\n layout=twopi\n ranksep=3;\n ratio=auto;\n");
for (i = 0; i < m->i; ++i) {
fprintf(fs, "\"p%d\" -> \"p%d\" [ color=\"purple\" ];\n",
m->msgs[i].src,
m->msgs[i].dst);
}
fprintf(fs, "}");
fflush(fs);
return 0;
}
int
MPI_Finalize(void)
{
int (*f)(void);
size_t i, j;
int (*ssend)();
int (*recv)();
recv = smash_get_lib_func(LIBMPI, "MPI_Recv");
ssend = smash_get_lib_func(LIBMPI, "MPI_Ssend");
if (smash_failures != NULL) {
if (!smash_dead) {
for (i = 0; i < smash_failures->size; i++)
ssend(&smash_world_size, 1, MPI_INT, smash_failures->failures[i].node, 0xdead, MPI_COMM_WORLD);
}
}
int done;
if (smash_my_rank == 0) {
struct smash_graph_msgs tmp = {0};
MPI_Status status;
for (i = 1; i < (unsigned int)smash_world_size; ++i) {
done = 1;
ssend(&done, 1, MPI_INT, i, SMASH_GRAPH, MPI_COMM_WORLD, &status);
recv(&tmp, sizeof(struct smash_graph_msgs), MPI_CHAR,
i, SMASH_GRAPH, MPI_COMM_WORLD,
&status);
for (j = 0; j < tmp.i; ++j) {
smash_graph_msgs.msgs[smash_graph_msgs.i].src = tmp.msgs[j].src;
smash_graph_msgs.msgs[smash_graph_msgs.i].dst = tmp.msgs[j].dst;
smash_graph_msgs.i++;
}
}
/* Output graph */
save_graph(&smash_graph_msgs);
} else {
if (!master_done)
recv(&done, 1, MPI_INT, 0, SMASH_GRAPH, MPI_COMM_WORLD);
ssend(&smash_graph_msgs, sizeof(struct smash_graph_msgs),
MPI_CHAR, 0, SMASH_GRAPH, MPI_COMM_WORLD);
}
free(smash_delays);
free(smash_failures);
f = smash_get_lib_func(LIBMPI, "MPI_Finalize");
return f();
}
int
MPI_Ssend(const void *buf, int count, MPI_Datatype datatype, int dest,
int tag, MPI_Comm comm)
{
int (*f)();
unsigned int i;
struct mpi_send_args args = {
.count = count,
.datatype = datatype,
.dest = dest,
.tag = tag,
.comm = comm,
};
args.buf = malloc(sizeof(buf) * count);
memcpy(args.buf, buf, sizeof(buf) * count);
f = smash_get_lib_func(LIBMPI, "MPI_Ssend");
for (i = 0; i < smash_delays->size; ++i) {
/* If a delay in the config file matches our rank and the target rank, inject it in the callout struct. */
if (smash_delays->delays[i].dst == (unsigned int)dest &&
smash_delays->delays[i].src == smash_my_rank &&
(smash_delays->delays[i].msg > 0 ||
smash_delays->delays[i].msg == -1)) {
sem_wait(smash_timeout(f, 6, smash_delays->delays[i].delay, &args));
smash_delays->delays[i].msg -= 1 * (smash_delays->delays[i].msg != -1);
return 0;
}
}
/* If there is no delay to apply, call MPI_Ssend directly. */
return f(buf, count, datatype, dest, tag, comm);
}
int
MPI_Send(const void *buf, int count, MPI_Datatype datatype, int dest,
int tag, MPI_Comm comm)
{
int (*f)();
unsigned int i;
struct mpi_send_args args = {
.count = count,
.datatype = datatype,
.dest = dest,
.tag = tag,
.comm = comm,
};
args.buf = malloc(sizeof(buf) * count);
memcpy(args.buf, buf, sizeof(buf) * count);
f = smash_get_lib_func(LIBMPI, "MPI_Send");
for (i = 0; i < smash_delays->size; ++i) {
/* If a delay in the config file matches our rank and the target rank, inject it in the callout struct. */
if (smash_delays->delays[i].dst == (unsigned int)dest &&
smash_delays->delays[i].src == smash_my_rank &&
(smash_delays->delays[i].msg > 0 ||
smash_delays->delays[i].msg == -1)) {
smash_timeout(f, 6, smash_delays->delays[i].delay, &args);
smash_delays->delays[i].msg -= 1 * (smash_delays->delays[i].msg != -1);
return 0;
}
}
/* If there is no delay to apply, call MPI_Send directly. */
return f(buf, count, datatype, dest, tag, comm);
}

14
hooking.h Normal file
View File

@ -0,0 +1,14 @@
#ifndef HOOKING_H
#define HOOKING_H
#define LIBMPI "libmpi.so"
#define LIBSTD LIBM_SO
/*
* smash_get_lib_func takes a null-terminated library name and a
* null-terminated symbol name. On success returns the address where the symbol
* is loaded in memory, on failure terminates the process.
*/
void *smash_get_lib_func(const char *lname, const char *fname);
#endif /* HOOKING_H */

141
parser.c Normal file
View File

@ -0,0 +1,141 @@
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <unistd.h>
#include <regex.h>
#include "parser.h"
static void *
smash_load_file_in_memory(const char *filename, size_t *data_size)
{
int fd;
char *data;
struct stat finfo;
if ((fd = open(filename, O_RDONLY, S_IRUSR | S_IWUSR)) < 0)
goto err1;
if (fstat(fd, &finfo) < 0)
goto err2;
if ((data = mmap(NULL, finfo.st_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0))
== MAP_FAILED)
goto err2;
*data_size = finfo.st_size;
close(fd);
return data;
err2:
close(fd);
err1:
return NULL;
}
static void
smash_populate_delay(const char *line, size_t n, const regmatch_t *rm, struct cfg_delays *delays)
{
delays->delays[n].src = strtol(line + rm[1].rm_so, NULL, 10);
delays->delays[n].dst = strtol(line + rm[2].rm_so, NULL, 10);
delays->delays[n].delay = strtol(line + rm[3].rm_so, NULL, 10);
delays->delays[n].msg = strtol(line + rm[4].rm_so, NULL, 10);
}
static void
smash_populate_failure(const char *line, size_t n, const regmatch_t *rm, struct cfg_failures *failures)
{
failures->failures[n].node = strtol(line + rm[1].rm_so, NULL, 10);
failures->failures[n].time = strtol(line + rm[2].rm_so, NULL, 10);
}
static char *
smash_get_config_path(enum CFG ctype)
{
return getenv((ctype == CFG_DELAY ? CFG_DELAY_PATH : CFG_FAILURE_PATH));
}
static int
count_lines(const char *rs)
{
int lines = 0;
while (*(rs)++ != '\0')
if (*rs == '\n' || *rs == '\r')
lines++;
return lines;
}
int
smash_parse_cfg(enum CFG ctype, void **cfg)
{
struct cfg_delays *delays;
struct cfg_failures *failures;
void *data;
int ret, lines;
size_t data_size, nline, n_cfg;
char *config_path, *line, err_buf[100];
const char *rs;
regex_t r;
regmatch_t rm[5];
void (*f)();
if (!(config_path = smash_get_config_path(ctype))) {
*cfg = NULL;
return ctype == CFG_DELAY ? -1 : 0;
}
if (!(data = smash_load_file_in_memory(config_path, &data_size)))
return -1;
lines = count_lines(data);
*cfg = malloc(ctype == CFG_DELAY ? sizeof(struct cfg_delays) +
lines * sizeof(struct cfg_delay)
: sizeof(struct cfg_failures) +
lines * sizeof(struct cfg_delay));
if (ctype == CFG_DELAY) {
delays = *cfg;
delays->size = lines;
rs = "([0-9]+);([0-9]+);([0-9]+);(-?[0-9]+)";
n_cfg = 5;
f = smash_populate_delay;
} else {
failures = *cfg;
failures->size = lines;
rs = "([0-9]+);([0-9]+)";
n_cfg = 3;
f = smash_populate_failure;
}
if ((ret = regcomp(&r, rs, REG_EXTENDED)) != 0) {
regerror(ret, &r, err_buf, 100);
fprintf(stderr, "failed to compile regex <%s>:%s\n", rs,
err_buf);
goto err;
}
nline = 0;
line = strtok(data, "\n");
while (line) {
/* if line is a comment or snaphot, do smth */
if ((ret = regexec(&r, line, n_cfg, rm, 0)) != 0) {
regerror(ret, &r, err_buf, 100);
fprintf(stderr, "line %ld: %s\n", nline + 1, err_buf);
goto err;
}
f(line, nline, rm, *cfg);
nline++;
line = strtok(NULL, "\n");
}
regfree(&r);
munmap(data, data_size);
return 0;
err:
regfree(&r);
munmap(data, data_size);
return -1;
}

49
parser.h Normal file
View File

@ -0,0 +1,49 @@
#ifndef PARSER_H
#define PARSER_H
#define CFG_DELAY_PATH "SMASH_MPI_DELAY"
#define CFG_FAILURE_PATH "SMASH_MPI_FAILURE"
enum CFG { CFG_DELAY, CFG_FAILURE };
int smash_parse_cfg(enum CFG ctype, void **data);
struct cfg_delay {
unsigned long int delay;
unsigned int src, dst;
int msg;
};
struct cfg_failure {
unsigned long int time;
unsigned int node;
};
/*
* Structure tail padding optimization
* with flexible array member, valid in C99.
*
* To allocate do:
*
* struct cfg_delays *delays;
*
* delays = malloc(sizeof(struct cfg_delays) + VECTOR_SIZE * sizeof(config_delay));
* delays->size = VECTOR_SIZE;
*
* and to free everything do:
* free(delays)
*
* Where VECTOR_SIZE is the number of lines in one config file.
*/
struct cfg_delays {
unsigned int size;
struct cfg_delay delays[];
};
struct cfg_failures {
unsigned int size;
struct cfg_failure failures[];
};
#endif /* PARSER_H */