commit 64ae6749d197066e296642e7ad11501b65adeaf7 Author: vpatuta Date: Sun Nov 19 13:05:38 2023 +0100 backup diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..23db0d8 --- /dev/null +++ b/LICENSE @@ -0,0 +1,28 @@ +BSD 3-Clause License + +Copyright (c) 2023, Martin Hart + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..af47e7c --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +TARGET := smash-MPI.so +CC := mpicc +CFLAGS := -std=gnu99 -fPIC -Wall -Wextra -c -I. +LIB := -ldl +SRC := $(wildcard *.c) +OBJ := $(SRC:.c=.o) + +.DEFAULT_GOAL := $(TARGET) + +.PHONY: clean + +$(TARGET): $(OBJ) + $(CC) $(LIB) -shared $? -o $@ + +%.o: %.c + $(CC) $(CFLAGS) $< + +clean: + @rm -f $(OBJ) $(TARGET) diff --git a/README.md b/README.md new file mode 100644 index 0000000..cec16b2 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# smash-MPI \ No newline at end of file diff --git a/callout.c b/callout.c new file mode 100644 index 0000000..d546b83 --- /dev/null +++ b/callout.c @@ -0,0 +1,121 @@ +#include "callout.h" + +#include +#include +#include +#include +#include +#include + +struct callo callout[NCALL] = { 0 }; +extern timer_t smash_timer_id; + +void +smash_print_callout(void) +{ + size_t i; + + for (i = 0; callout[i].c_func != NULL; ++i) + printf("c_time: %lli\nc_arg: %i\nc_func @ %p\n\n", + callout[i].c_time, callout[i].c_arg, + (void *)callout[i].c_func); +} + +void +smash_set_timer(int time) +{ + int t, s, ns; + struct itimerspec curr; + + timer_gettime(smash_timer_id, &curr); + + if (time == -1 || (curr.it_value.tv_nsec == 0 && curr.it_value.tv_sec == 0)) { + t = time == -1 ? callout[0].c_time : time; + s = t / 1000; + ns = (t % 1000) * 1000000; + + if (s == 0 && ns == 0) + return; + + struct itimerspec its; + + its.it_value.tv_sec = s; + its.it_value.tv_nsec = ns; + + its.it_interval.tv_sec = 0; + its.it_interval.tv_nsec = 0; + if (timer_settime(smash_timer_id, 0, &its, NULL) < 0) + err(1, "timer_settime"); + } +} + +sem_t * +smash_timeout(int (*func)(), int arg, int time, struct mpi_send_args *args) +{ + int t; + struct callo *p1, *p2; + + + t = time; + p1 = &callout[0]; + while (p1->c_func != 0 && p1->c_time <= t) { + t -= p1->c_time; + p1++; + } + p1->c_time -= t; + p2 = p1; + while (p2->c_func != 0) + p2++; + while (p2 >= p1) { + (p2+1)->c_time = p2->c_time; + (p2+1)->c_func = p2->c_func; + (p2+1)->c_arg = p2->c_arg; + (p2+1)->c_send_args = p2->c_send_args; + p2--; + } + p1->c_time = t; + p1->c_func = func; + p1->c_arg = arg; + if (args != NULL) + memcpy(&p1->c_send_args, args, sizeof(struct mpi_send_args)); + + smash_set_timer(t); + + return &p1->c_lock; +} + +void +smash_clock(void) +{ + struct callo *p1, *p2; + + if (callout[0].c_func != 0) { + p1 = p2 = &callout[0]; + p1->c_time = 0; + + while (p1->c_func != 0 && p1->c_time == 0) { + switch (p1->c_arg) { + case 6: + p1->c_func(p1->c_send_args.buf, p1->c_send_args.count, + p1->c_send_args.datatype, p1->c_send_args.dest, + p1->c_send_args.tag, p1->c_send_args.comm); + free(p1->c_send_args.buf); + sem_post(&p1->c_lock); + break; + case 0: + p1->c_func(); + break; + } + p1++; + } + + while ((p2->c_func = p1->c_func)) { + p2->c_time = p1->c_time; + p2->c_arg = p1->c_arg; + p2->c_send_args = p1->c_send_args; + p1++; + p2++; + } + smash_set_timer(-1); + } +} diff --git a/callout.h b/callout.h new file mode 100644 index 0000000..6ed93fe --- /dev/null +++ b/callout.h @@ -0,0 +1,31 @@ +#ifndef CALLOUT_H +#define CALLOUT_H + +#define NCALL 256 + +#include +#include + +struct mpi_send_args { + void *buf; + MPI_Comm comm; + MPI_Datatype datatype; + int count, dest, tag; +}; + +struct callo { + long long c_time; /* incremental time */ + int c_arg; /* argument to routine */ + int (*c_func)(); /* routine */ + sem_t c_lock; /* lock to preserve locking semantic */ + struct mpi_send_args c_send_args; /* args for MPI_Send */ +}; + +void smash_print_callout(void); + +sem_t *smash_timeout(int (*func)(), int arg, int time, + struct mpi_send_args *args); + +void smash_clock(void); + +#endif /* CALLOUT_H */ diff --git a/hooking.c b/hooking.c new file mode 100644 index 0000000..6b39a32 --- /dev/null +++ b/hooking.c @@ -0,0 +1,310 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "callout.h" +#include "hooking.h" +#include "parser.h" + +#define SMASH_GRAPH 0x1234 + +timer_t smash_timer_id; +unsigned int smash_my_rank; +int smash_dead, smash_world_size, smash_alarm; + +struct cfg_delays *smash_delays; +struct cfg_failures *smash_failures; + +struct smash_graph_msg { + int src, dst; +}; + +struct smash_graph_msgs { + size_t i; + struct smash_graph_msg msgs[4096]; +} smash_graph_msgs; + +static int master_done = 0; + +int +smash_failure(void) +{ + int buf; + MPI_Status status; + size_t recv = 0; + int (*f)(); + + smash_dead = 1; + f = smash_get_lib_func(LIBMPI, "MPI_Recv"); + while (recv != smash_world_size - smash_failures->size) { + f(&buf, 1, MPI_INT, MPI_ANY_SOURCE, 0xdead, MPI_COMM_WORLD, &status); + recv++; + } + MPI_Finalize(); + exit(0); +} + +int +MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, + MPI_Comm comm, MPI_Status *status) { + int (*f)(), res; + + f = smash_get_lib_func(LIBMPI, "MPI_Recv"); + + while (1) { + res = f(buf, count, datatype, source, tag, comm, status); + if (status->MPI_TAG != 0xdead || status->MPI_TAG != SMASH_GRAPH) + break; + bzero(status, sizeof(MPI_Status)); + master_done = status->MPI_TAG == SMASH_GRAPH; + } + + smash_graph_msgs.msgs[smash_graph_msgs.i].src = status->MPI_SOURCE; + smash_graph_msgs.msgs[smash_graph_msgs.i].dst = smash_my_rank; + smash_graph_msgs.i++; + + return res; +} + +void * +smash_get_lib_func(const char *lname, const char *fname) +{ + void *lib, *p; + + if (!(lib = dlopen(lname, RTLD_LAZY))) + errx(EXIT_FAILURE, "%s", dlerror()); + + if (!(p = dlsym(lib, fname))) + errx(EXIT_FAILURE, "%s", dlerror()); + + dlclose(lib); + return p; +} + +static void +smash_handler(__attribute__((unused)) int signum) +{ + smash_clock(); +} + +timer_t +smash_setup_alarm(void) +{ + timer_t timerid; + struct sigaction sa; + struct sigevent sev; + + sa.sa_handler = smash_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART; + sigaction(SIGALRM, &sa, NULL); + + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = SIGALRM; + sev.sigev_value.sival_ptr = &timerid; + if (timer_create(CLOCK_REALTIME, &sev, &timerid) < 0) + errx(1, "timer_create"); + + return timerid; +} + +int +__libc_start_main( + int (*main)(int, char **, char **), + int argc, + char **argv, + int (*init)(int, char **, char **), + void (*fini)(void), + void (*rtld_fini)(void), + void *stack_end) +{ + int (*f)(); + + if (smash_parse_cfg(CFG_DELAY, (void **)&smash_delays) < 0) + errx(EXIT_FAILURE, "error in CFG_DELAY\n"); + + if (smash_parse_cfg(CFG_FAILURE, (void **)&smash_failures) < 0) + errx(EXIT_FAILURE, "error in CFG_FAILURE\n"); + + f = smash_get_lib_func(LIBSTD, "__libc_start_main"); + smash_alarm = 0; + smash_dead = 0; + return f(main, argc, argv, init, fini, rtld_fini, stack_end); +} + +int +MPI_Init(int *argc, char ***argv) +{ + unsigned int i; + int (*f)(int *, char ***), res, rank; + + if (!smash_alarm) { + smash_timer_id = smash_setup_alarm(); + smash_alarm = 1; + } + + smash_graph_msgs.i = 0; + + f = smash_get_lib_func(LIBMPI, "MPI_Init"); + res = f(argc, argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + smash_my_rank = rank; + MPI_Comm_size(MPI_COMM_WORLD, &smash_world_size); + + if (smash_failures != NULL) { + for (i = 0; i < smash_failures->size; ++i) { + if (smash_failures->failures[i].node == smash_my_rank) { + smash_timeout(smash_failure, 0, smash_failures->failures[i].time, NULL); + } + } + } + return res; +} + +int +save_graph(struct smash_graph_msgs *m) +{ + FILE *fs; + size_t i; + char *filepath; + + filepath = getenv("SMASH_MPI_GRAPH"); + if (!filepath) + filepath = "graph.dot"; + + if (!(fs = fopen(filepath, "w+"))) + return -1; + + fprintf(fs, "digraph SMASH_MPI {\n layout=twopi\n ranksep=3;\n ratio=auto;\n"); + for (i = 0; i < m->i; ++i) { + fprintf(fs, "\"p%d\" -> \"p%d\" [ color=\"purple\" ];\n", + m->msgs[i].src, + m->msgs[i].dst); + } + fprintf(fs, "}"); + fflush(fs); + return 0; +} + +int +MPI_Finalize(void) +{ + int (*f)(void); + size_t i, j; + int (*ssend)(); + int (*recv)(); + + recv = smash_get_lib_func(LIBMPI, "MPI_Recv"); + ssend = smash_get_lib_func(LIBMPI, "MPI_Ssend"); + + if (smash_failures != NULL) { + if (!smash_dead) { + for (i = 0; i < smash_failures->size; i++) + ssend(&smash_world_size, 1, MPI_INT, smash_failures->failures[i].node, 0xdead, MPI_COMM_WORLD); + } + } + + int done; + if (smash_my_rank == 0) { + struct smash_graph_msgs tmp = {0}; + MPI_Status status; + for (i = 1; i < (unsigned int)smash_world_size; ++i) { + done = 1; + ssend(&done, 1, MPI_INT, i, SMASH_GRAPH, MPI_COMM_WORLD, &status); + recv(&tmp, sizeof(struct smash_graph_msgs), MPI_CHAR, + i, SMASH_GRAPH, MPI_COMM_WORLD, + &status); + + for (j = 0; j < tmp.i; ++j) { + smash_graph_msgs.msgs[smash_graph_msgs.i].src = tmp.msgs[j].src; + smash_graph_msgs.msgs[smash_graph_msgs.i].dst = tmp.msgs[j].dst; + smash_graph_msgs.i++; + } + } + /* Output graph */ + save_graph(&smash_graph_msgs); + } else { + if (!master_done) + recv(&done, 1, MPI_INT, 0, SMASH_GRAPH, MPI_COMM_WORLD); + ssend(&smash_graph_msgs, sizeof(struct smash_graph_msgs), + MPI_CHAR, 0, SMASH_GRAPH, MPI_COMM_WORLD); + } + + free(smash_delays); + free(smash_failures); + f = smash_get_lib_func(LIBMPI, "MPI_Finalize"); + return f(); +} + +int +MPI_Ssend(const void *buf, int count, MPI_Datatype datatype, int dest, + int tag, MPI_Comm comm) +{ + int (*f)(); + unsigned int i; + struct mpi_send_args args = { + .count = count, + .datatype = datatype, + .dest = dest, + .tag = tag, + .comm = comm, + }; + args.buf = malloc(sizeof(buf) * count); + memcpy(args.buf, buf, sizeof(buf) * count); + + f = smash_get_lib_func(LIBMPI, "MPI_Ssend"); + + for (i = 0; i < smash_delays->size; ++i) { + /* If a delay in the config file matches our rank and the target rank, inject it in the callout struct. */ + if (smash_delays->delays[i].dst == (unsigned int)dest && + smash_delays->delays[i].src == smash_my_rank && + (smash_delays->delays[i].msg > 0 || + smash_delays->delays[i].msg == -1)) { + sem_wait(smash_timeout(f, 6, smash_delays->delays[i].delay, &args)); + smash_delays->delays[i].msg -= 1 * (smash_delays->delays[i].msg != -1); + return 0; + } + } + /* If there is no delay to apply, call MPI_Ssend directly. */ + return f(buf, count, datatype, dest, tag, comm); +} + +int +MPI_Send(const void *buf, int count, MPI_Datatype datatype, int dest, + int tag, MPI_Comm comm) +{ + int (*f)(); + unsigned int i; + struct mpi_send_args args = { + .count = count, + .datatype = datatype, + .dest = dest, + .tag = tag, + .comm = comm, + }; + args.buf = malloc(sizeof(buf) * count); + memcpy(args.buf, buf, sizeof(buf) * count); + + f = smash_get_lib_func(LIBMPI, "MPI_Send"); + + for (i = 0; i < smash_delays->size; ++i) { + /* If a delay in the config file matches our rank and the target rank, inject it in the callout struct. */ + if (smash_delays->delays[i].dst == (unsigned int)dest && + smash_delays->delays[i].src == smash_my_rank && + (smash_delays->delays[i].msg > 0 || + smash_delays->delays[i].msg == -1)) { + smash_timeout(f, 6, smash_delays->delays[i].delay, &args); + smash_delays->delays[i].msg -= 1 * (smash_delays->delays[i].msg != -1); + return 0; + } + } + /* If there is no delay to apply, call MPI_Send directly. */ + return f(buf, count, datatype, dest, tag, comm); +} diff --git a/hooking.h b/hooking.h new file mode 100644 index 0000000..cf38a8c --- /dev/null +++ b/hooking.h @@ -0,0 +1,14 @@ +#ifndef HOOKING_H +#define HOOKING_H + +#define LIBMPI "libmpi.so" +#define LIBSTD LIBM_SO + +/* + * smash_get_lib_func takes a null-terminated library name and a + * null-terminated symbol name. On success returns the address where the symbol + * is loaded in memory, on failure terminates the process. + */ +void *smash_get_lib_func(const char *lname, const char *fname); + +#endif /* HOOKING_H */ diff --git a/parser.c b/parser.c new file mode 100644 index 0000000..8aa4fdb --- /dev/null +++ b/parser.c @@ -0,0 +1,141 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "parser.h" + +static void * +smash_load_file_in_memory(const char *filename, size_t *data_size) +{ + int fd; + char *data; + struct stat finfo; + + if ((fd = open(filename, O_RDONLY, S_IRUSR | S_IWUSR)) < 0) + goto err1; + + if (fstat(fd, &finfo) < 0) + goto err2; + + if ((data = mmap(NULL, finfo.st_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0)) + == MAP_FAILED) + goto err2; + + *data_size = finfo.st_size; + close(fd); + return data; +err2: + close(fd); +err1: + return NULL; +} + +static void +smash_populate_delay(const char *line, size_t n, const regmatch_t *rm, struct cfg_delays *delays) +{ + delays->delays[n].src = strtol(line + rm[1].rm_so, NULL, 10); + delays->delays[n].dst = strtol(line + rm[2].rm_so, NULL, 10); + delays->delays[n].delay = strtol(line + rm[3].rm_so, NULL, 10); + delays->delays[n].msg = strtol(line + rm[4].rm_so, NULL, 10); +} + +static void +smash_populate_failure(const char *line, size_t n, const regmatch_t *rm, struct cfg_failures *failures) +{ + failures->failures[n].node = strtol(line + rm[1].rm_so, NULL, 10); + failures->failures[n].time = strtol(line + rm[2].rm_so, NULL, 10); +} + +static char * +smash_get_config_path(enum CFG ctype) +{ + return getenv((ctype == CFG_DELAY ? CFG_DELAY_PATH : CFG_FAILURE_PATH)); +} + +static int +count_lines(const char *rs) +{ + int lines = 0; + while (*(rs)++ != '\0') + if (*rs == '\n' || *rs == '\r') + lines++; + return lines; +} + +int +smash_parse_cfg(enum CFG ctype, void **cfg) +{ + struct cfg_delays *delays; + struct cfg_failures *failures; + void *data; + int ret, lines; + size_t data_size, nline, n_cfg; + char *config_path, *line, err_buf[100]; + const char *rs; + regex_t r; + regmatch_t rm[5]; + void (*f)(); + + if (!(config_path = smash_get_config_path(ctype))) { + *cfg = NULL; + return ctype == CFG_DELAY ? -1 : 0; + } + + if (!(data = smash_load_file_in_memory(config_path, &data_size))) + return -1; + + lines = count_lines(data); + *cfg = malloc(ctype == CFG_DELAY ? sizeof(struct cfg_delays) + + lines * sizeof(struct cfg_delay) + : sizeof(struct cfg_failures) + + lines * sizeof(struct cfg_delay)); + + if (ctype == CFG_DELAY) { + delays = *cfg; + delays->size = lines; + rs = "([0-9]+);([0-9]+);([0-9]+);(-?[0-9]+)"; + n_cfg = 5; + f = smash_populate_delay; + } else { + failures = *cfg; + failures->size = lines; + rs = "([0-9]+);([0-9]+)"; + n_cfg = 3; + f = smash_populate_failure; + } + + if ((ret = regcomp(&r, rs, REG_EXTENDED)) != 0) { + regerror(ret, &r, err_buf, 100); + fprintf(stderr, "failed to compile regex <%s>:%s\n", rs, + err_buf); + goto err; + } + + nline = 0; + line = strtok(data, "\n"); + while (line) { + /* if line is a comment or snaphot, do smth */ + if ((ret = regexec(&r, line, n_cfg, rm, 0)) != 0) { + regerror(ret, &r, err_buf, 100); + fprintf(stderr, "line %ld: %s\n", nline + 1, err_buf); + goto err; + } + f(line, nline, rm, *cfg); + nline++; + line = strtok(NULL, "\n"); + } + + regfree(&r); + munmap(data, data_size); + return 0; +err: + regfree(&r); + munmap(data, data_size); + return -1; +} diff --git a/parser.h b/parser.h new file mode 100644 index 0000000..52690ac --- /dev/null +++ b/parser.h @@ -0,0 +1,49 @@ +#ifndef PARSER_H +#define PARSER_H + +#define CFG_DELAY_PATH "SMASH_MPI_DELAY" +#define CFG_FAILURE_PATH "SMASH_MPI_FAILURE" + +enum CFG { CFG_DELAY, CFG_FAILURE }; + +int smash_parse_cfg(enum CFG ctype, void **data); + +struct cfg_delay { + unsigned long int delay; + unsigned int src, dst; + int msg; +}; + +struct cfg_failure { + unsigned long int time; + unsigned int node; +}; + +/* + * Structure tail padding optimization + * with flexible array member, valid in C99. + * + * To allocate do: + * + * struct cfg_delays *delays; + * + * delays = malloc(sizeof(struct cfg_delays) + VECTOR_SIZE * sizeof(config_delay)); + * delays->size = VECTOR_SIZE; + * + * and to free everything do: + * free(delays) + * + * Where VECTOR_SIZE is the number of lines in one config file. + */ + +struct cfg_delays { + unsigned int size; + struct cfg_delay delays[]; +}; + +struct cfg_failures { + unsigned int size; + struct cfg_failure failures[]; +}; + +#endif /* PARSER_H */