From 9eefbb4afc9601024eac6191addd98e21f90a5b3 Mon Sep 17 00:00:00 2001 From: Aldrik Ramaekers Date: Sun, 18 Dec 2022 16:53:46 +0100 Subject: multithread outgoing network messages, thread safe allocator, refactor --- src/protocol.c | 97 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 70 insertions(+), 27 deletions(-) (limited to 'src/protocol.c') diff --git a/src/protocol.c b/src/protocol.c index 56d8c6c..468e80a 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -1,98 +1,141 @@ #include "../include/protocol.h" #include "../include/players.h" +#define alloc_network_message(_type) mem_alloc(sizeof(_type) + 20); + network_message create_protocol_get_id_up(u32 id) { - protocol_get_id_upstream *buf = (protocol_get_id_upstream *)network_buffer; + protocol_get_id_upstream *buf = alloc_network_message(protocol_get_id_upstream); buf->type = MESSAGE_GET_ID_UPSTREAM; buf->id = id; - return network_create_message(network_buffer, sizeof(protocol_get_id_upstream), MAX_NETWORK_BUFFER_SIZE); + return network_create_message((u8*)buf, sizeof(protocol_get_id_upstream), MAX_NETWORK_BUFFER_SIZE); } network_message create_protocol_get_id_down(u32 id) { - protocol_get_id_downstream *buf = (protocol_get_id_downstream *)network_buffer; + protocol_get_id_downstream *buf = alloc_network_message(protocol_get_id_downstream); buf->type = MESSAGE_GET_ID_DOWNSTREAM; buf->id = id; - return network_create_message(network_buffer, sizeof(protocol_get_id_downstream), MAX_NETWORK_BUFFER_SIZE); + return network_create_message((u8*)buf, sizeof(protocol_get_id_downstream), MAX_NETWORK_BUFFER_SIZE); } network_message create_protocol_user_list() { - protocol_user_list *buf = (protocol_user_list *)network_buffer; + protocol_user_list *buf = alloc_network_message(protocol_user_list); buf->type = MESSAGE_USER_LIST; memcpy(buf->players, players, sizeof(players)); - return network_create_message(network_buffer, sizeof(protocol_user_list), MAX_NETWORK_BUFFER_SIZE); + return network_create_message((u8*)buf, sizeof(protocol_user_list), MAX_NETWORK_BUFFER_SIZE); } network_message create_protocol_zombie_list() { - protocol_zombie_list *buf = (protocol_zombie_list *)network_buffer; + protocol_zombie_list *buf = alloc_network_message(protocol_zombie_list); buf->type = MESSAGE_ZOMBIE_LIST; memcpy(buf->zombies, zombies, sizeof(zombies)); - return network_create_message(network_buffer, sizeof(protocol_zombie_list), MAX_NETWORK_BUFFER_SIZE); + return network_create_message((u8*)buf, sizeof(protocol_zombie_list), MAX_NETWORK_BUFFER_SIZE); } network_message create_protocol_bullets_list() { - protocol_bullets_list *buf = (protocol_bullets_list *)network_buffer; + protocol_bullets_list *buf = alloc_network_message(protocol_bullets_list); buf->type = MESSAGE_BULLET_LIST; memcpy(buf->bullets, bullets, sizeof(bullets)); - return network_create_message(network_buffer, sizeof(protocol_bullets_list), MAX_NETWORK_BUFFER_SIZE); + return network_create_message((u8*)buf, sizeof(protocol_bullets_list), MAX_NETWORK_BUFFER_SIZE); } network_message create_protocol_drop_list() { - protocol_drop_list *buf = (protocol_drop_list *)network_buffer; + protocol_drop_list *buf = alloc_network_message(protocol_drop_list); buf->type = MESSAGE_DROP_LIST; memcpy(buf->drops, drops, sizeof(drops)); - return network_create_message(network_buffer, sizeof(protocol_drop_list), MAX_NETWORK_BUFFER_SIZE); + return network_create_message((u8*)buf, sizeof(protocol_drop_list), MAX_NETWORK_BUFFER_SIZE); } network_message create_protocol_user_moved(protocol_move_type move, u32 id) { - protocol_move *buf = (protocol_move *)network_buffer; + protocol_move *buf = alloc_network_message(protocol_move); buf->type = MESSAGE_USER_MOVED; buf->move = move; buf->id = id; buf->delta = update_delta; - return network_create_message(network_buffer, sizeof(protocol_move), MAX_NETWORK_BUFFER_SIZE); + return network_create_message((u8*)buf, sizeof(protocol_move), MAX_NETWORK_BUFFER_SIZE); } network_message create_protocol_user_shoot(u32 id, float dirx, float diry) { - protocol_user_shoot *buf = (protocol_user_shoot *)network_buffer; + protocol_user_shoot *buf = alloc_network_message(protocol_user_shoot); buf->type = MESSAGE_USER_SHOOT; buf->id = id; buf->dirx = dirx; buf->diry = diry; - return network_create_message(network_buffer, sizeof(protocol_user_shoot), MAX_NETWORK_BUFFER_SIZE); + return network_create_message((u8*)buf, sizeof(protocol_user_shoot), MAX_NETWORK_BUFFER_SIZE); } network_message create_protocol_user_look(u32 id, float gunx, float guny) { - protocol_user_look *buf = (protocol_user_look *)network_buffer; + protocol_user_look *buf = alloc_network_message(protocol_user_look); buf->type = MESSAGE_USER_LOOK; buf->id = id; buf->gunx = gunx; buf->guny = guny; - return network_create_message(network_buffer, sizeof(protocol_user_look), MAX_NETWORK_BUFFER_SIZE); + return network_create_message((u8*)buf, sizeof(protocol_user_look), MAX_NETWORK_BUFFER_SIZE); } void server_on_message_received(u8 *buffer, u32 length, u64 timestamp, network_client client) { - protocol_generic_message *allocated_buf = mem_alloc(sizeof(protocol_generic_message)); - allocated_buf->client = client; - allocated_buf->message = mem_alloc(length); - allocated_buf->send_timestamp = timestamp; - memcpy(allocated_buf->message, buffer, length); - array_push(&messages_received_on_server, (u8 *)&allocated_buf); + mutex_lock(&messages_received_on_server.mutex); + { + protocol_generic_message *allocated_buf = allocator_alloc(&server_incomming_allocator, sizeof(protocol_generic_message)); + allocated_buf->client = client; + allocated_buf->message = allocator_alloc(&server_incomming_allocator, length); + allocated_buf->send_timestamp = timestamp; + memcpy(allocated_buf->message, buffer, length); + array_push(&messages_received_on_server, (u8 *)&allocated_buf); + } + mutex_unlock(&messages_received_on_server.mutex); } void client_on_message_received(u8 *buffer, u32 length) { - u8 *allocated_buf = mem_alloc(length); - memcpy(allocated_buf, buffer, length); - array_push(&messages_received_on_client, (u8 *)&allocated_buf); + mutex_lock(&messages_received_on_client.mutex); + { + u8 *allocated_buf = allocator_alloc(&client_incomming_allocator, length); + memcpy(allocated_buf, buffer, length); + array_push(&messages_received_on_client, (u8 *)&allocated_buf); + } + mutex_unlock(&messages_received_on_client.mutex); +} + + +void add_message_to_outgoing_queue(send_queue_entry entry) { + for (int i = 0; i < OUTGOING_QUEUE_SIZE; i++) + { + if (messages_to_send_queue[i].active) continue; + messages_to_send_queue[i] = entry; + messages_to_send_queue[i].active = true; + return; + } + log_info("Outgoing network queue is full"); } + +void* network_send_thread(void* args) { + while (1) { + for (int i = 0; i < OUTGOING_QUEUE_SIZE; i++) + { + if (!messages_to_send_queue[i].active) continue; + mutex_lock(&messages_to_send_queue_mutex); + send_queue_entry message = messages_to_send_queue[i]; + messages_to_send_queue[i].active = false; + mutex_unlock(&messages_to_send_queue_mutex); + + for (int x = 0; x < 10; x++) { + network_client c = message.recipients[x]; + if (c.ConnectSocket != 0) { + network_client_send(&c, message.message); + } + } + mem_free(message.message.data); + } + } +} \ No newline at end of file -- cgit v1.2.3-70-g09d2