Implemented hearbeat

This commit is contained in:
2026-01-04 20:56:32 +01:00
parent 36ae15c320
commit 4a089ed368
3 changed files with 127 additions and 40 deletions
+34 -17
View File
@@ -1,16 +1,28 @@
#include "commands.h"
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/msg.h> #include <sys/msg.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include "commands.h"
#define send(queue_id, msgbuf_ptr) \ #define send(queue_id, msgbuf_ptr) \
msgsnd(queue_id, msgbuf_ptr, sizeof(*(msgbuf_ptr)) - sizeof(long), 0) msgsnd(queue_id, msgbuf_ptr, sizeof(*(msgbuf_ptr)) - sizeof(long), 0)
static void heartbeat_loop(int server_queue_id, const char *client_id) {
msgbuf_t msg = {.mtype = Hearbeat, .sender = ""};
strncpy(msg.sender, client_id, COMMAND_LENGTH - 1);
while (1) {
if (send(server_queue_id, &msg) == -1) {
perror("msgsnd(heartbeat) failed");
break;
}
sleep(HEARTBEAT_INTERVAL);
}
}
static void cleanup_queue(int qid) { static void cleanup_queue(int qid) {
if (qid != -1) { if (qid != -1) {
msgctl(qid, IPC_RMID, NULL); msgctl(qid, IPC_RMID, NULL);
@@ -33,21 +45,22 @@ int main(int argc, char *argv[]) {
int server_queue_id = -1; int server_queue_id = -1;
const char *id_path = "/home/piotr/server_queue_id"; const char *id_path = "/home/piotr/server_queue_id";
FILE *f = fopen(id_path, "rb"); int fd = open(id_path, O_RDONLY);
if (fd == -1) {
perror("Failed to open server queue id file");
cleanup_queue(client_queue_id);
return 1;
}
if (fread(&server_queue_id, sizeof(int), 1, f) != 1) { if (read(fd, &server_queue_id, sizeof(int)) == -1) {
fprintf(stderr, "Failed to read server queue id from %s\n", id_path); fprintf(stderr, "Failed to read server queue id from %s\n", id_path);
server_queue_id = -1; server_queue_id = -1;
} }
fclose(f); close(fd);
msgbuf_t msg; msgbuf_t msg = {.mtype = Login, .sender = ""};
memset(&msg, 0, sizeof(msgbuf_t)); strncpy(msg.sender, client_id, COMMAND_LENGTH - 1);
msg.mtype = Login; snprintf(msg.command, COMMAND_LENGTH, "%d", client_queue_id);
/* command field holds client id for login */
strncpy(msg.command, client_id, COMMAND_LENGTH - 1);
/* message field holds the client's queue id as string */
snprintf(msg.message, MESSAGE_LENGTH, "%d", client_queue_id);
if (msgsnd(server_queue_id, &msg, sizeof(msg) - sizeof(long), 0) == -1) { if (msgsnd(server_queue_id, &msg, sizeof(msg) - sizeof(long), 0) == -1) {
perror("msgsnd(login) failed"); perror("msgsnd(login) failed");
@@ -73,11 +86,14 @@ int main(int argc, char *argv[]) {
printf("Login accepted for id '%s'\n", client_id); printf("Login accepted for id '%s'\n", client_id);
if (fork() == 0) {
heartbeat_loop(server_queue_id, client_id);
return 0;
}
/* Send a test message to the server */ /* Send a test message to the server */
msgbuf_t test = { msgbuf_t test = {.mtype = Message, .sender = ""};
.mtype = Message, strncpy(test.sender, client_id, COMMAND_LENGTH - 1);
.sender = "the_only_one",
};
snprintf(test.message, MESSAGE_LENGTH, "Hello from %s", client_id); snprintf(test.message, MESSAGE_LENGTH, "Hello from %s", client_id);
if (msgsnd(server_queue_id, &test, sizeof(test) - sizeof(long), 0) == -1) { if (msgsnd(server_queue_id, &test, sizeof(test) - sizeof(long), 0) == -1) {
@@ -90,7 +106,8 @@ int main(int argc, char *argv[]) {
printf("Sent test message to server: \"%s\"\n", test.message); printf("Sent test message to server: \"%s\"\n", test.message);
/* Now wait for server forwarded message(s) and a signal ack. /* Now wait for server forwarded message(s) and a signal ack.
We'll wait until we've seen both a Message and a Signal ack or timeout. */ We'll wait until we've seen both a Message and a Signal ack or timeout.
*/
int seen_message = 0; int seen_message = 0;
int seen_ack = 0; int seen_ack = 0;
const int MAX_ITER = 10; const int MAX_ITER = 10;
+3
View File
@@ -1,6 +1,9 @@
#define MESSAGE_LENGTH 256 #define MESSAGE_LENGTH 256
#define COMMAND_LENGTH 16 #define COMMAND_LENGTH 16
#define HEARTBEAT_INTERVAL 3 // in seconds
#define HEARTBEAT_TIMEOUT 10
typedef enum { typedef enum {
Message = 1, Message = 1,
Login, Login,
+82 -15
View File
@@ -1,14 +1,19 @@
#include "commands.h" #include "bits/types/struct_timeval.h"
#include "sys/ipc.h" #include "sys/ipc.h"
#include "sys/shm.h"
#include <fcntl.h> #include <fcntl.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/msg.h> #include <sys/msg.h>
#include <sys/sem.h>
#include <sys/time.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include "commands.h"
#define send(queue_id, msgbuf_ptr) \ #define send(queue_id, msgbuf_ptr) \
msgsnd(queue_id, msgbuf_ptr, sizeof(*(msgbuf_ptr)) - sizeof(long), 0) msgsnd(queue_id, msgbuf_ptr, sizeof(*(msgbuf_ptr)) - sizeof(long), 0)
@@ -18,6 +23,7 @@ typedef struct {
char *muted_clients; char *muted_clients;
char nickname[16]; char nickname[16];
int queue_id; int queue_id;
long last_seen;
} Client; } Client;
typedef struct { typedef struct {
@@ -26,21 +32,56 @@ typedef struct {
int client_count; int client_count;
} Group; } Group;
static void handle_hearbeat_timeouts(Client *clients, int client_count,
int semaphore_id) {
struct sembuf sem_op = {
.sem_num = 0,
.sem_op = 1,
.sem_flg = 0,
};
while (1) {
sem_op.sem_op = 1;
semop(semaphore_id, &sem_op, 1);
struct timeval tv;
gettimeofday(&tv, NULL);
for (int i = 0; i < client_count; i++) {
if (clients[i].logged_in &&
(tv.tv_sec - clients[i].last_seen) > HEARTBEAT_TIMEOUT) {
printf("Client %s has timed out due to missed heartbeats\n",
clients[i].id);
clients[i].logged_in = false;
clients[i].queue_id = -1;
}
}
sem_op.sem_op = -1;
semop(semaphore_id, &sem_op, 1);
sleep(HEARTBEAT_TIMEOUT);
}
}
int main(int argc, char **argv) { int main(int argc, char **argv) {
// TODO: Load config file with clients and groups // TODO: Load config file with clients and groups
int client_count = 1; int client_count = 2;
int group_count = 3; int group_count = 3;
Client *clients = malloc(sizeof(Client) * 9); int clients_memory_id =
shmget(IPC_PRIVATE, sizeof(Client) * client_count, IPC_CREAT | 0666);
Client *clients = shmat(clients_memory_id, NULL, 0);
Group *groups = malloc(sizeof(Group) * 3); Group *groups = malloc(sizeof(Group) * 3);
clients[0] = (Client){ int semaphore_id = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
.id = "the_only_one", semctl(semaphore_id, 0, SETVAL, 1);
clients[0] = (Client){.id = "test1",
.logged_in = false, .logged_in = false,
.muted_clients = NULL, .muted_clients = NULL,
.nickname = "Kregielnia", .nickname = "Kregielnia",
}; .last_seen = 0};
clients[1] = (Client){.id = "test2",
.logged_in = false,
.muted_clients = NULL,
.nickname = "Bajzel",
.last_seen = 0};
// key_t key = ftok(argv[0], 555);
int server_queue_id = msgget(IPC_PRIVATE, 0666 | IPC_CREAT); int server_queue_id = msgget(IPC_PRIVATE, 0666 | IPC_CREAT);
if (server_queue_id == -1) { if (server_queue_id == -1) {
@@ -61,7 +102,11 @@ int main(int argc, char **argv) {
} }
close(id_file_handle); close(id_file_handle);
// TODO: Fork here, for the heartbeat process than can sleep if (fork() == 0) {
handle_hearbeat_timeouts(clients, client_count, semaphore_id);
return 0;
}
msgbuf_t msgbuf; msgbuf_t msgbuf;
while (1) { while (1) {
// Read from the ipc, then handle the command // Read from the ipc, then handle the command
@@ -70,19 +115,19 @@ int main(int argc, char **argv) {
switch (msgbuf.mtype) { switch (msgbuf.mtype) {
case Login: case Login:
printf("Received login request for id: %s\n", msgbuf.command); printf("Received login request for id: %s\n", msgbuf.sender);
Client *client = NULL; Client *client = NULL;
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
if (strncmp(clients[i].id, msgbuf.command, COMMAND_LENGTH) == 0) { if (strncmp(clients[i].id, msgbuf.sender, COMMAND_LENGTH) == 0) {
client = &clients[i]; client = &clients[i];
break; break;
} }
} }
// Reuse the buffer to get the client's message queue id // Reuse the buffer to get the client's message queue id
int msg_queue_id = atoi(msgbuf.message); int msg_queue_id = atoi(msgbuf.command);
if (client == NULL) { if (client == NULL) {
printf("User not found: %s\n", msgbuf.command); printf("User not found: %s\n", msgbuf.sender);
msgbuf_t response = { msgbuf_t response = {
.mtype = Login, .mtype = Login,
.stype = ERR_USER_NOT_FOUND, .stype = ERR_USER_NOT_FOUND,
@@ -93,7 +138,7 @@ int main(int argc, char **argv) {
} }
if (client->logged_in) { if (client->logged_in) {
printf("User already logged in: %s\n", msgbuf.command); printf("User already logged in: %s\n", msgbuf.sender);
msgbuf_t response = { msgbuf_t response = {
.mtype = Login, .mtype = Login,
.stype = ERR_ALREADY_LOGGED_IN, .stype = ERR_ALREADY_LOGGED_IN,
@@ -105,15 +150,17 @@ int main(int argc, char **argv) {
client->logged_in = true; client->logged_in = true;
client->queue_id = msg_queue_id; client->queue_id = msg_queue_id;
struct timeval tv;
gettimeofday(&tv, NULL);
client->last_seen = tv.tv_sec;
msgbuf_t response = { msgbuf_t response = {
.mtype = Login, .mtype = Login,
.stype = ACK_ACCEPTED, .stype = ACK_ACCEPTED,
}; };
printf("User accepted: %s\n", msgbuf.command); printf("User accepted: %s\n", msgbuf.sender);
send(msg_queue_id, &response); send(msg_queue_id, &response);
break; break;
case Message: case Message:
printf("Recieved message, checking which client\n"); printf("Recieved message, checking which client\n");
// Find the client that sent the message then forward it to all the // Find the client that sent the message then forward it to all the
// specified recipients // specified recipients
@@ -161,6 +208,26 @@ int main(int argc, char **argv) {
break; break;
case Logout: case Logout:
case Hearbeat: case Hearbeat:
for (int i = 0; i < client_count; i++) {
if (strncmp(clients[i].id, msgbuf.sender, COMMAND_LENGTH) == 0 &&
clients[i].logged_in) {
// Take the semaphore to update last_seen
struct sembuf sem_op = {
.sem_num = 0,
.sem_op = -1,
.sem_flg = 0,
};
semop(semaphore_id, &sem_op, 1);
struct timeval tv;
gettimeofday(&tv, NULL);
clients[i].last_seen = tv.tv_sec;
// Free the semaphore
sem_op.sem_op = 1;
semop(semaphore_id, &sem_op, 1);
}
}
break;
case List_clients: case List_clients:
break; break;
case Mute: case Mute: