diff options
author | fadhil riyanto <me@fadev.org> | 2024-09-29 20:22:09 +0700 |
---|---|---|
committer | fadhil riyanto <me@fadev.org> | 2024-09-29 20:22:09 +0700 |
commit | 2c1f230a8af377f4c4ab5ca11e961419d7128070 (patch) | |
tree | 4cf8125242d8e4de0955dcd953a493a4a3f5d05a | |
parent | 08882a27f6f24e95daef413b6b5e5b4a759550e4 (diff) |
Add thread pool gc
when read() return invalid retcode, we call mark_conn_inactive
then, the marked read by start_clean_conn_gc and start realloc memory
and close the fd.
Signed-off-by: fadhil riyanto <me@fadev.org>
-rw-r--r-- | main.c | 63 |
1 files changed, 60 insertions, 3 deletions
@@ -67,6 +67,7 @@ struct posix_thread_handler { struct posix_thread_poll_thread poll_thread; struct posix_thread_poll_thread poll_recv_thread; + struct posix_thread_poll_thread gc_eventloop; }; @@ -81,6 +82,8 @@ struct fd_sockaddr_list { pthread_mutex_t fd_sockaddr_lock; struct _fd_sockaddr_list *list; int size; + + short all_empty; /* true while no thread is initalized */ }; struct server_ctx { @@ -283,6 +286,8 @@ static void init_fd_sockaddr(struct fd_sockaddr_list *fdsocklist) pthread_mutex_init(&fdsocklist->fd_sockaddr_lock, NULL); + fdsocklist->all_empty = 1; + fdsocklist->size = 1; } @@ -317,6 +322,8 @@ static int add_fd_sockaddr(struct fd_sockaddr_list *fdsocklist, pthread_mutex_unlock(&fdsocklist->fd_sockaddr_lock); + fdsocklist->all_empty = 0; + return 0; } @@ -374,6 +381,21 @@ static pthread_t* init_get_pthread_arrptr(struct fd_sockaddr_list *fdsocklist, } +static void delete_pthread_arrptr(struct fd_sockaddr_list *fdsocklist, + int fd_num) +{ + for(int i = 0; i < fdsocklist->size; i++) { + /* guarantee is_active is 0 */ + if (fdsocklist->list[i].is_active == 0 && + fdsocklist->list[i].fd == fd_num) { + + free(fdsocklist->list[i].private_conn_thread); + } + } + +} + + static struct sockaddr_in* del_fd_sockaddr(struct fd_sockaddr_list *fdsocklist, int fd_num) { @@ -476,8 +498,35 @@ static void* start_long_poll(void *srv_ctx_voidptr) { } } +/* running when inactive mark is set */ +static void* start_clean_conn_gc(void *srv_ctx_voidptr) +{ + struct server_ctx *srv_ctx = (struct server_ctx*)srv_ctx_voidptr; + + while(!g_need_exit) { + for(int i = 0; i < srv_ctx->fd_sockaddr_list->size; i++) { + + if (srv_ctx->fd_sockaddr_list->all_empty == 1) { + /* pass */ + } else if (srv_ctx->fd_sockaddr_list->list[i].is_active == 0) { + + delete_pthread_arrptr(srv_ctx->fd_sockaddr_list, + srv_ctx->fd_sockaddr_list->list[i].fd); + + del_fd_sockaddr(srv_ctx->fd_sockaddr_list, + srv_ctx->fd_sockaddr_list->list[i].fd); + + close(srv_ctx->fd_sockaddr_list->list[i].fd); + + } + } + } +} + static void* start_private_conn(void* start_private_conn_details) { + int ret = 0; + struct start_private_conn_details *priv_conn_inside = (struct start_private_conn_details*)start_private_conn_details; @@ -487,8 +536,13 @@ static void* start_private_conn(void* start_private_conn_details) char tempbuf[100]; memset(tempbuf, 0, 100); - read(current_fd, + ret = read(current_fd, tempbuf, 100); + if (ret == 0) { + mark_conn_inactive(srv_ctx->fd_sockaddr_list, + current_fd); + perror("read"); + } printf("thread xyz says: %s\n", tempbuf); @@ -502,8 +556,7 @@ static void* start_private_conn(void* start_private_conn_details) /* call when send error in future, can cause SIGSEGV because init_get_pthread_arrptr check conn is active or not */ - // mark_conn_inactive(srv_ctx->fd_sockaddr_list, - // current_fd); + } static void* start_long_poll_receiver(void *srv_ctx_voidptr) @@ -578,6 +631,10 @@ static int enter_eventloop(struct server_ctx *srv_ctx) /* set state to 1 */ posix_thread_handler.poll_recv_thread.state = 1; + pthread_create(&posix_thread_handler.gc_eventloop.pthread, + NULL, start_clean_conn_gc, (void*)srv_ctx); + posix_thread_handler.gc_eventloop.state = 1; + /* start busy wait */ while(!g_need_exit) { usleep(200); |