diff --git a/client.c b/client.c index 4d1492f..2502841 100644 --- a/client.c +++ b/client.c @@ -1,16 +1,28 @@ -#include "commands.h" #include #include #include -#include #include #include #include #include +#include "commands.h" + #define send(queue_id, msgbuf_ptr) \ msgsnd(queue_id, msgbuf_ptr, sizeof(*(msgbuf_ptr)) - sizeof(long), 0) +static void heartbeat_loop(int server_queue_id, const char *client_id) { + msgbuf_t msg = {.mtype = Hearbeat, .sender = ""}; + strncpy(msg.sender, client_id, COMMAND_LENGTH - 1); + while (1) { + if (send(server_queue_id, &msg) == -1) { + perror("msgsnd(heartbeat) failed"); + break; + } + sleep(HEARTBEAT_INTERVAL); + } +} + static void cleanup_queue(int qid) { if (qid != -1) { msgctl(qid, IPC_RMID, NULL); @@ -33,21 +45,22 @@ int main(int argc, char *argv[]) { int server_queue_id = -1; const char *id_path = "/home/piotr/server_queue_id"; - FILE *f = fopen(id_path, "rb"); + int fd = open(id_path, O_RDONLY); + if (fd == -1) { + perror("Failed to open server queue id file"); + cleanup_queue(client_queue_id); + return 1; + } - if (fread(&server_queue_id, sizeof(int), 1, f) != 1) { + if (read(fd, &server_queue_id, sizeof(int)) == -1) { fprintf(stderr, "Failed to read server queue id from %s\n", id_path); server_queue_id = -1; } - fclose(f); + close(fd); - msgbuf_t msg; - memset(&msg, 0, sizeof(msgbuf_t)); - msg.mtype = Login; - /* command field holds client id for login */ - strncpy(msg.command, client_id, COMMAND_LENGTH - 1); - /* message field holds the client's queue id as string */ - snprintf(msg.message, MESSAGE_LENGTH, "%d", client_queue_id); + msgbuf_t msg = {.mtype = Login, .sender = ""}; + strncpy(msg.sender, client_id, COMMAND_LENGTH - 1); + snprintf(msg.command, COMMAND_LENGTH, "%d", client_queue_id); if (msgsnd(server_queue_id, &msg, sizeof(msg) - sizeof(long), 0) == -1) { perror("msgsnd(login) failed"); @@ -73,11 +86,14 @@ int main(int argc, char *argv[]) { printf("Login accepted for id '%s'\n", client_id); + if (fork() == 0) { + heartbeat_loop(server_queue_id, client_id); + return 0; + } + /* Send a test message to the server */ - msgbuf_t test = { - .mtype = Message, - .sender = "the_only_one", - }; + msgbuf_t test = {.mtype = Message, .sender = ""}; + strncpy(test.sender, client_id, COMMAND_LENGTH - 1); snprintf(test.message, MESSAGE_LENGTH, "Hello from %s", client_id); if (msgsnd(server_queue_id, &test, sizeof(test) - sizeof(long), 0) == -1) { @@ -90,7 +106,8 @@ int main(int argc, char *argv[]) { printf("Sent test message to server: \"%s\"\n", test.message); /* Now wait for server forwarded message(s) and a signal ack. - We'll wait until we've seen both a Message and a Signal ack or timeout. */ + We'll wait until we've seen both a Message and a Signal ack or timeout. + */ int seen_message = 0; int seen_ack = 0; const int MAX_ITER = 10; @@ -102,7 +119,7 @@ int main(int argc, char *argv[]) { sizeof(incoming) - sizeof(long), 0, 0); perror("Got a message damn"); if (got == -1) { - perror("msgrcv(incoming) failed"); + perror("msgrcv(incoming) failed"); if (errno == EINTR) { continue; diff --git a/commands.h b/commands.h index 910f05e..53dbad1 100644 --- a/commands.h +++ b/commands.h @@ -1,6 +1,9 @@ #define MESSAGE_LENGTH 256 #define COMMAND_LENGTH 16 +#define HEARTBEAT_INTERVAL 3 // in seconds +#define HEARTBEAT_TIMEOUT 10 + typedef enum { Message = 1, Login, diff --git a/server.c b/server.c index c5093a4..36e7751 100644 --- a/server.c +++ b/server.c @@ -1,14 +1,19 @@ -#include "commands.h" +#include "bits/types/struct_timeval.h" #include "sys/ipc.h" +#include "sys/shm.h" #include #include #include #include #include #include +#include +#include #include #include +#include "commands.h" + #define send(queue_id, msgbuf_ptr) \ msgsnd(queue_id, msgbuf_ptr, sizeof(*(msgbuf_ptr)) - sizeof(long), 0) @@ -18,6 +23,7 @@ typedef struct { char *muted_clients; char nickname[16]; int queue_id; + long last_seen; } Client; typedef struct { @@ -26,21 +32,56 @@ typedef struct { int client_count; } Group; +static void handle_hearbeat_timeouts(Client *clients, int client_count, + int semaphore_id) { + struct sembuf sem_op = { + .sem_num = 0, + .sem_op = 1, + .sem_flg = 0, + }; + while (1) { + sem_op.sem_op = 1; + semop(semaphore_id, &sem_op, 1); + struct timeval tv; + gettimeofday(&tv, NULL); + for (int i = 0; i < client_count; i++) { + if (clients[i].logged_in && + (tv.tv_sec - clients[i].last_seen) > HEARTBEAT_TIMEOUT) { + printf("Client %s has timed out due to missed heartbeats\n", + clients[i].id); + clients[i].logged_in = false; + clients[i].queue_id = -1; + } + } + sem_op.sem_op = -1; + semop(semaphore_id, &sem_op, 1); + sleep(HEARTBEAT_TIMEOUT); + } +} + int main(int argc, char **argv) { // TODO: Load config file with clients and groups - int client_count = 1; + int client_count = 2; int group_count = 3; - Client *clients = malloc(sizeof(Client) * 9); + int clients_memory_id = + shmget(IPC_PRIVATE, sizeof(Client) * client_count, IPC_CREAT | 0666); + Client *clients = shmat(clients_memory_id, NULL, 0); Group *groups = malloc(sizeof(Group) * 3); - clients[0] = (Client){ - .id = "the_only_one", - .logged_in = false, - .muted_clients = NULL, - .nickname = "Kregielnia", - }; + int semaphore_id = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666); + semctl(semaphore_id, 0, SETVAL, 1); + + clients[0] = (Client){.id = "test1", + .logged_in = false, + .muted_clients = NULL, + .nickname = "Kregielnia", + .last_seen = 0}; + clients[1] = (Client){.id = "test2", + .logged_in = false, + .muted_clients = NULL, + .nickname = "Bajzel", + .last_seen = 0}; - // key_t key = ftok(argv[0], 555); int server_queue_id = msgget(IPC_PRIVATE, 0666 | IPC_CREAT); if (server_queue_id == -1) { @@ -61,7 +102,11 @@ int main(int argc, char **argv) { } close(id_file_handle); - // TODO: Fork here, for the heartbeat process than can sleep + if (fork() == 0) { + handle_hearbeat_timeouts(clients, client_count, semaphore_id); + return 0; + } + msgbuf_t msgbuf; while (1) { // Read from the ipc, then handle the command @@ -70,19 +115,19 @@ int main(int argc, char **argv) { switch (msgbuf.mtype) { case Login: - printf("Received login request for id: %s\n", msgbuf.command); + printf("Received login request for id: %s\n", msgbuf.sender); Client *client = NULL; for (int i = 0; i < 1; i++) { - if (strncmp(clients[i].id, msgbuf.command, COMMAND_LENGTH) == 0) { + if (strncmp(clients[i].id, msgbuf.sender, COMMAND_LENGTH) == 0) { client = &clients[i]; break; } } // Reuse the buffer to get the client's message queue id - int msg_queue_id = atoi(msgbuf.message); + int msg_queue_id = atoi(msgbuf.command); if (client == NULL) { - printf("User not found: %s\n", msgbuf.command); + printf("User not found: %s\n", msgbuf.sender); msgbuf_t response = { .mtype = Login, .stype = ERR_USER_NOT_FOUND, @@ -93,7 +138,7 @@ int main(int argc, char **argv) { } if (client->logged_in) { - printf("User already logged in: %s\n", msgbuf.command); + printf("User already logged in: %s\n", msgbuf.sender); msgbuf_t response = { .mtype = Login, .stype = ERR_ALREADY_LOGGED_IN, @@ -105,24 +150,26 @@ int main(int argc, char **argv) { client->logged_in = true; client->queue_id = msg_queue_id; + struct timeval tv; + gettimeofday(&tv, NULL); + client->last_seen = tv.tv_sec; msgbuf_t response = { .mtype = Login, .stype = ACK_ACCEPTED, }; - printf("User accepted: %s\n", msgbuf.command); + printf("User accepted: %s\n", msgbuf.sender); send(msg_queue_id, &response); break; case Message: - printf("Recieved message, checking which client\n"); // Find the client that sent the message then forward it to all the // specified recipients client = NULL; for (int i = 0; i < client_count; i++) { - if (strncmp(clients[i].id, msgbuf.sender, COMMAND_LENGTH) == 0) { - client = &clients[i]; - break; - } + if (strncmp(clients[i].id, msgbuf.sender, COMMAND_LENGTH) == 0) { + client = &clients[i]; + break; + } } printf("Received message from client id %s: %s\n", msgbuf.sender, @@ -161,6 +208,26 @@ int main(int argc, char **argv) { break; case Logout: case Hearbeat: + for (int i = 0; i < client_count; i++) { + if (strncmp(clients[i].id, msgbuf.sender, COMMAND_LENGTH) == 0 && + clients[i].logged_in) { + // Take the semaphore to update last_seen + struct sembuf sem_op = { + .sem_num = 0, + .sem_op = -1, + .sem_flg = 0, + }; + semop(semaphore_id, &sem_op, 1); + struct timeval tv; + gettimeofday(&tv, NULL); + clients[i].last_seen = tv.tv_sec; + + // Free the semaphore + sem_op.sem_op = 1; + semop(semaphore_id, &sem_op, 1); + } + } + break; case List_clients: break; case Mute: