summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAldrik Ramaekers <aldrik@amftech.nl>2022-12-18 16:53:46 +0100
committerAldrik Ramaekers <aldrik@amftech.nl>2022-12-18 16:53:46 +0100
commit9eefbb4afc9601024eac6191addd98e21f90a5b3 (patch)
treee26771a2ba2d93c58940f011d8701372419f6cb8
parent40027e44e5d0f0040238cfc1bd6a8d80c17c54fb (diff)
multithread outgoing network messages, thread safe allocator, refactor
-rw-r--r--build/zombies.exebin1748556 -> 1752475 bytes
-rw-r--r--include/allocator.h2
-rw-r--r--include/list.h1
-rw-r--r--include/protocol.h24
-rw-r--r--src/allocator.c15
-rw-r--r--src/bullets.c7
-rw-r--r--src/game.c45
-rw-r--r--src/protocol.c97
8 files changed, 151 insertions, 40 deletions
diff --git a/build/zombies.exe b/build/zombies.exe
index af6cba0..3371a19 100644
--- a/build/zombies.exe
+++ b/build/zombies.exe
Binary files differ
diff --git a/include/allocator.h b/include/allocator.h
index d1fe9f4..a1c8bf0 100644
--- a/include/allocator.h
+++ b/include/allocator.h
@@ -7,10 +7,12 @@ typedef struct t_allocator {
void* memory;
uint64_t cursor;
uint64_t size;
+ mutex mutex;
} allocator;
allocator create_allocator(uint64_t size);
void* allocator_alloc(allocator* al, uint64_t size);
void destroy_allocator(allocator* al);
+void allocator_clear(allocator* al);
#endif \ No newline at end of file
diff --git a/include/list.h b/include/list.h
index 41a5705..db174c7 100644
--- a/include/list.h
+++ b/include/list.h
@@ -17,6 +17,7 @@ typedef struct t_list
list_item *start;
u32 count;
u16 size;
+ mutex mutex;
allocator* al;
} list;
diff --git a/include/protocol.h b/include/protocol.h
index 0ec87d7..39e7d6a 100644
--- a/include/protocol.h
+++ b/include/protocol.h
@@ -3,6 +3,8 @@
#include <projectbase/project_base.h>
+#include "list.h"
+
typedef enum t_network_message_type
{
MESSAGE_GET_ID_UPSTREAM,
@@ -100,8 +102,19 @@ typedef struct t_protocol_user_shoot
float diry;
} protocol_user_shoot;
-#define MAX_NETWORK_BUFFER_SIZE 50000
-u8 network_buffer[50000];
+typedef struct t_send_queue_entry {
+ bool active;
+ network_client recipients[10];
+ network_message message;
+} send_queue_entry;
+
+#define MAX_NETWORK_BUFFER_SIZE 5000000
+u8 network_buffer[MAX_NETWORK_BUFFER_SIZE];
+
+allocator server_incomming_allocator;
+allocator client_incomming_allocator;
+allocator outgoing_allocator;
+
network_message create_protocol_get_id_up(u32 id);
network_message create_protocol_get_id_down(u32 id);
network_message create_protocol_user_list();
@@ -115,6 +128,13 @@ network_message create_protocol_drop_list();
array messages_received_on_server;
array messages_received_on_client;
+#define OUTGOING_QUEUE_SIZE 100
+mutex messages_to_send_queue_mutex;
+send_queue_entry messages_to_send_queue[OUTGOING_QUEUE_SIZE] = {0};
+
+void add_message_to_outgoing_queue(send_queue_entry entry);
+void* network_send_thread(void* args);
+
void server_on_message_received(u8 *buffer, u32 length, u64 timestamp, network_client client);
void client_on_message_received(u8 *buffer, u32 length);
diff --git a/src/allocator.c b/src/allocator.c
index ffb6763..bddc5e5 100644
--- a/src/allocator.c
+++ b/src/allocator.c
@@ -5,17 +5,30 @@ allocator create_allocator(uint64_t size) {
allocator.cursor = 0;
allocator.size = size;
allocator.memory = mem_alloc(size);
+ allocator.mutex = mutex_create();
return allocator;
}
void* allocator_alloc(allocator* al, uint64_t size) {
+ mutex_lock(&al->mutex);
if (al->cursor + size < al->size) {
al->cursor += size;
- return al->memory + al->cursor - size;
+ void* result = al->memory + al->cursor - size;
+ mutex_unlock(&al->mutex);
+ return result;
}
log_assert(0, "Allocator out of space");
+ mutex_unlock(&al->mutex);
}
void destroy_allocator(allocator* al) {
+ mutex_lock(&al->mutex);
mem_free(al->memory);
+ mutex_unlock(&al->mutex);
+}
+
+void allocator_clear(allocator* al) {
+ mutex_lock(&al->mutex);
+ al->cursor = 0;
+ mutex_unlock(&al->mutex);
} \ No newline at end of file
diff --git a/src/bullets.c b/src/bullets.c
index 704728e..ed9aab4 100644
--- a/src/bullets.c
+++ b/src/bullets.c
@@ -226,6 +226,13 @@ void draw_bullets(platform_window* window) {
bullet b = bullets[i];
if (!b.active) continue;
+ if (b.player_id == player_id) {
+ player *p = get_player_by_id(b.player_id);
+ b.position.x = p->gunx;
+ b.position.y = p->guny;
+ b.position.z = p->gun_height;
+ }
+
if ((int)bullets[i].position.y < (int)bullets[i].endy) { BULLET_RENDER_DEPTH((int)bullets[i].position.y); }
else { BULLET_RENDER_DEPTH((int)bullets[i].endy); }
diff --git a/src/game.c b/src/game.c
index cb39c28..7f3d1d4 100644
--- a/src/game.c
+++ b/src/game.c
@@ -9,6 +9,8 @@ static void server_on_client_disconnect(network_client c) {
}
void start_server(char* port) {
+ server_incomming_allocator = create_allocator(MAX_NETWORK_BUFFER_SIZE);
+
messages_received_on_server = array_create(sizeof(protocol_generic_message*));
array_reserve(&messages_received_on_server, 100);
@@ -24,6 +26,8 @@ static u32 get_session_id() {
}
void connect_to_server(char* ip, char* port) {
+ client_incomming_allocator = create_allocator(MAX_NETWORK_BUFFER_SIZE);
+
player_id = get_session_id();
messages_received_on_client = array_create(sizeof(protocol_generic_message*));
array_reserve(&messages_received_on_client, 100);
@@ -52,6 +56,12 @@ void connect_to_server(char* ip, char* port) {
void load_map() {
global_state.state = GAMESTATE_LOADING_MAP;
+ outgoing_allocator = create_allocator(MAX_NETWORK_BUFFER_SIZE);
+ messages_to_send_queue_mutex = mutex_create();
+
+ thread send_thread = thread_start(network_send_thread, 0);
+ thread_detach(&send_thread);
+
load_map_from_data();
create_objects();
@@ -79,11 +89,16 @@ void destroy_game() {
static void broadcast_to_clients(network_message message) {
if (!global_state.server || !global_state.server->is_open) return;
+
+ send_queue_entry entry = {0};
+ entry.message = message;
for (int i = 0; i < max_players; i++) {
player p = players[i];
if (!p.client.is_connected) continue;
- if (p.active) network_client_send(&p.client, message);
+ if (!p.active) continue;
+ entry.recipients[i] = p.client;
}
+ add_message_to_outgoing_queue(entry);
}
static void rotate_user(platform_window* window, protocol_user_look *message) {
@@ -115,6 +130,8 @@ float update_timer = 0.0f;
void update_server(platform_window* window) {
server_update_time = platform_get_time(TIME_NS, TIME_FULL);
+ mutex_lock(&messages_received_on_server.mutex);
+
for (int i = 0; i < messages_received_on_server.length; i++) {
protocol_generic_message* msg = *(protocol_generic_message**)array_at(&messages_received_on_server, i);
set_ping_for_player(msg);
@@ -128,7 +145,6 @@ void update_server(platform_window* window) {
log_info("Player connected to server");
} break;
- /*
case MESSAGE_USER_MOVED: {
protocol_move* move_msg = (protocol_move*)msg->message;
move_user(window, move_msg->id, move_msg->move, move_msg->delta);
@@ -142,18 +158,18 @@ void update_server(platform_window* window) {
protocol_user_shoot* shoot_msg = (protocol_user_shoot*)msg->message;
shoot(window, shoot_msg->id, shoot_msg->dirx, shoot_msg->diry);
} break;
- */
default:
log_info("Unhandled message received");
break;
}
- mem_free(msg->message);
- mem_free(msg);
array_remove_at(&messages_received_on_server, i);
i--;
}
+
+ allocator_clear(&server_incomming_allocator);
+ mutex_unlock(&messages_received_on_server.mutex);
update_spawners();
update_drops();
@@ -162,10 +178,15 @@ void update_server(platform_window* window) {
update_players_server();
update_zombies_server(window);
- broadcast_to_clients(create_protocol_user_list());
- broadcast_to_clients(create_protocol_zombie_list());
- broadcast_to_clients(create_protocol_bullets_list());
- broadcast_to_clients(create_protocol_drop_list());
+ if (update_timer >= (1/60.0f)) { // send at 60 ticks
+ broadcast_to_clients(create_protocol_user_list());
+ broadcast_to_clients(create_protocol_zombie_list());
+ broadcast_to_clients(create_protocol_bullets_list());
+ broadcast_to_clients(create_protocol_drop_list());
+ update_timer = 0.0f;
+ }
+
+ update_timer += update_delta;
server_update_time = platform_get_time(TIME_NS, TIME_FULL) - server_update_time;
if ((server_update_time/1000000.0f) > 5.0f) {
@@ -202,6 +223,8 @@ static void load_bullets_into_existing_list(protocol_bullets_list* msg_bullets)
}
void update_client(platform_window* window) {
+ mutex_lock(&messages_received_on_client.mutex);
+
for (int i = 0; i < messages_received_on_client.length; i++) {
protocol_generic_client_message* msg = *(protocol_generic_client_message**)array_at(&messages_received_on_client, i);
@@ -242,10 +265,12 @@ void update_client(platform_window* window) {
break;
}
- mem_free(msg);
array_remove_at(&messages_received_on_client, i);
i--;
}
+
+ allocator_clear(&client_incomming_allocator);
+ mutex_unlock(&messages_received_on_client.mutex);
}
void update_game(platform_window* window) {
diff --git a/src/protocol.c b/src/protocol.c
index 56d8c6c..468e80a 100644
--- a/src/protocol.c
+++ b/src/protocol.c
@@ -1,98 +1,141 @@
#include "../include/protocol.h"
#include "../include/players.h"
+#define alloc_network_message(_type) mem_alloc(sizeof(_type) + 20);
+
network_message create_protocol_get_id_up(u32 id)
{
- protocol_get_id_upstream *buf = (protocol_get_id_upstream *)network_buffer;
+ protocol_get_id_upstream *buf = alloc_network_message(protocol_get_id_upstream);
buf->type = MESSAGE_GET_ID_UPSTREAM;
buf->id = id;
- return network_create_message(network_buffer, sizeof(protocol_get_id_upstream), MAX_NETWORK_BUFFER_SIZE);
+ return network_create_message((u8*)buf, sizeof(protocol_get_id_upstream), MAX_NETWORK_BUFFER_SIZE);
}
network_message create_protocol_get_id_down(u32 id)
{
- protocol_get_id_downstream *buf = (protocol_get_id_downstream *)network_buffer;
+ protocol_get_id_downstream *buf = alloc_network_message(protocol_get_id_downstream);
buf->type = MESSAGE_GET_ID_DOWNSTREAM;
buf->id = id;
- return network_create_message(network_buffer, sizeof(protocol_get_id_downstream), MAX_NETWORK_BUFFER_SIZE);
+ return network_create_message((u8*)buf, sizeof(protocol_get_id_downstream), MAX_NETWORK_BUFFER_SIZE);
}
network_message create_protocol_user_list()
{
- protocol_user_list *buf = (protocol_user_list *)network_buffer;
+ protocol_user_list *buf = alloc_network_message(protocol_user_list);
buf->type = MESSAGE_USER_LIST;
memcpy(buf->players, players, sizeof(players));
- return network_create_message(network_buffer, sizeof(protocol_user_list), MAX_NETWORK_BUFFER_SIZE);
+ return network_create_message((u8*)buf, sizeof(protocol_user_list), MAX_NETWORK_BUFFER_SIZE);
}
network_message create_protocol_zombie_list()
{
- protocol_zombie_list *buf = (protocol_zombie_list *)network_buffer;
+ protocol_zombie_list *buf = alloc_network_message(protocol_zombie_list);
buf->type = MESSAGE_ZOMBIE_LIST;
memcpy(buf->zombies, zombies, sizeof(zombies));
- return network_create_message(network_buffer, sizeof(protocol_zombie_list), MAX_NETWORK_BUFFER_SIZE);
+ return network_create_message((u8*)buf, sizeof(protocol_zombie_list), MAX_NETWORK_BUFFER_SIZE);
}
network_message create_protocol_bullets_list()
{
- protocol_bullets_list *buf = (protocol_bullets_list *)network_buffer;
+ protocol_bullets_list *buf = alloc_network_message(protocol_bullets_list);
buf->type = MESSAGE_BULLET_LIST;
memcpy(buf->bullets, bullets, sizeof(bullets));
- return network_create_message(network_buffer, sizeof(protocol_bullets_list), MAX_NETWORK_BUFFER_SIZE);
+ return network_create_message((u8*)buf, sizeof(protocol_bullets_list), MAX_NETWORK_BUFFER_SIZE);
}
network_message create_protocol_drop_list()
{
- protocol_drop_list *buf = (protocol_drop_list *)network_buffer;
+ protocol_drop_list *buf = alloc_network_message(protocol_drop_list);
buf->type = MESSAGE_DROP_LIST;
memcpy(buf->drops, drops, sizeof(drops));
- return network_create_message(network_buffer, sizeof(protocol_drop_list), MAX_NETWORK_BUFFER_SIZE);
+ return network_create_message((u8*)buf, sizeof(protocol_drop_list), MAX_NETWORK_BUFFER_SIZE);
}
network_message create_protocol_user_moved(protocol_move_type move, u32 id)
{
- protocol_move *buf = (protocol_move *)network_buffer;
+ protocol_move *buf = alloc_network_message(protocol_move);
buf->type = MESSAGE_USER_MOVED;
buf->move = move;
buf->id = id;
buf->delta = update_delta;
- return network_create_message(network_buffer, sizeof(protocol_move), MAX_NETWORK_BUFFER_SIZE);
+ return network_create_message((u8*)buf, sizeof(protocol_move), MAX_NETWORK_BUFFER_SIZE);
}
network_message create_protocol_user_shoot(u32 id, float dirx, float diry)
{
- protocol_user_shoot *buf = (protocol_user_shoot *)network_buffer;
+ protocol_user_shoot *buf = alloc_network_message(protocol_user_shoot);
buf->type = MESSAGE_USER_SHOOT;
buf->id = id;
buf->dirx = dirx;
buf->diry = diry;
- return network_create_message(network_buffer, sizeof(protocol_user_shoot), MAX_NETWORK_BUFFER_SIZE);
+ return network_create_message((u8*)buf, sizeof(protocol_user_shoot), MAX_NETWORK_BUFFER_SIZE);
}
network_message create_protocol_user_look(u32 id, float gunx, float guny)
{
- protocol_user_look *buf = (protocol_user_look *)network_buffer;
+ protocol_user_look *buf = alloc_network_message(protocol_user_look);
buf->type = MESSAGE_USER_LOOK;
buf->id = id;
buf->gunx = gunx;
buf->guny = guny;
- return network_create_message(network_buffer, sizeof(protocol_user_look), MAX_NETWORK_BUFFER_SIZE);
+ return network_create_message((u8*)buf, sizeof(protocol_user_look), MAX_NETWORK_BUFFER_SIZE);
}
void server_on_message_received(u8 *buffer, u32 length, u64 timestamp, network_client client)
{
- protocol_generic_message *allocated_buf = mem_alloc(sizeof(protocol_generic_message));
- allocated_buf->client = client;
- allocated_buf->message = mem_alloc(length);
- allocated_buf->send_timestamp = timestamp;
- memcpy(allocated_buf->message, buffer, length);
- array_push(&messages_received_on_server, (u8 *)&allocated_buf);
+ mutex_lock(&messages_received_on_server.mutex);
+ {
+ protocol_generic_message *allocated_buf = allocator_alloc(&server_incomming_allocator, sizeof(protocol_generic_message));
+ allocated_buf->client = client;
+ allocated_buf->message = allocator_alloc(&server_incomming_allocator, length);
+ allocated_buf->send_timestamp = timestamp;
+ memcpy(allocated_buf->message, buffer, length);
+ array_push(&messages_received_on_server, (u8 *)&allocated_buf);
+ }
+ mutex_unlock(&messages_received_on_server.mutex);
}
void client_on_message_received(u8 *buffer, u32 length)
{
- u8 *allocated_buf = mem_alloc(length);
- memcpy(allocated_buf, buffer, length);
- array_push(&messages_received_on_client, (u8 *)&allocated_buf);
+ mutex_lock(&messages_received_on_client.mutex);
+ {
+ u8 *allocated_buf = allocator_alloc(&client_incomming_allocator, length);
+ memcpy(allocated_buf, buffer, length);
+ array_push(&messages_received_on_client, (u8 *)&allocated_buf);
+ }
+ mutex_unlock(&messages_received_on_client.mutex);
+}
+
+
+void add_message_to_outgoing_queue(send_queue_entry entry) {
+ for (int i = 0; i < OUTGOING_QUEUE_SIZE; i++)
+ {
+ if (messages_to_send_queue[i].active) continue;
+ messages_to_send_queue[i] = entry;
+ messages_to_send_queue[i].active = true;
+ return;
+ }
+ log_info("Outgoing network queue is full");
}
+
+void* network_send_thread(void* args) {
+ while (1) {
+ for (int i = 0; i < OUTGOING_QUEUE_SIZE; i++)
+ {
+ if (!messages_to_send_queue[i].active) continue;
+ mutex_lock(&messages_to_send_queue_mutex);
+ send_queue_entry message = messages_to_send_queue[i];
+ messages_to_send_queue[i].active = false;
+ mutex_unlock(&messages_to_send_queue_mutex);
+
+ for (int x = 0; x < 10; x++) {
+ network_client c = message.recipients[x];
+ if (c.ConnectSocket != 0) {
+ network_client_send(&c, message.message);
+ }
+ }
+ mem_free(message.message.data);
+ }
+ }
+} \ No newline at end of file