summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfadhil riyanto <me@fadev.org>2024-09-29 20:22:09 +0700
committerfadhil riyanto <me@fadev.org>2024-09-29 20:22:09 +0700
commit2c1f230a8af377f4c4ab5ca11e961419d7128070 (patch)
tree4cf8125242d8e4de0955dcd953a493a4a3f5d05a
parent08882a27f6f24e95daef413b6b5e5b4a759550e4 (diff)
Add thread pool gc
when read() return invalid retcode, we call mark_conn_inactive then, the marked read by start_clean_conn_gc and start realloc memory and close the fd. Signed-off-by: fadhil riyanto <me@fadev.org>
-rw-r--r--main.c63
1 files changed, 60 insertions, 3 deletions
diff --git a/main.c b/main.c
index 31c6cd4..d84e2d9 100644
--- a/main.c
+++ b/main.c
@@ -67,6 +67,7 @@ struct posix_thread_handler
{
struct posix_thread_poll_thread poll_thread;
struct posix_thread_poll_thread poll_recv_thread;
+ struct posix_thread_poll_thread gc_eventloop;
};
@@ -81,6 +82,8 @@ struct fd_sockaddr_list {
pthread_mutex_t fd_sockaddr_lock;
struct _fd_sockaddr_list *list;
int size;
+
+ short all_empty; /* true while no thread is initalized */
};
struct server_ctx {
@@ -283,6 +286,8 @@ static void init_fd_sockaddr(struct fd_sockaddr_list *fdsocklist)
pthread_mutex_init(&fdsocklist->fd_sockaddr_lock, NULL);
+ fdsocklist->all_empty = 1;
+
fdsocklist->size = 1;
}
@@ -317,6 +322,8 @@ static int add_fd_sockaddr(struct fd_sockaddr_list *fdsocklist,
pthread_mutex_unlock(&fdsocklist->fd_sockaddr_lock);
+ fdsocklist->all_empty = 0;
+
return 0;
}
@@ -374,6 +381,21 @@ static pthread_t* init_get_pthread_arrptr(struct fd_sockaddr_list *fdsocklist,
}
+static void delete_pthread_arrptr(struct fd_sockaddr_list *fdsocklist,
+ int fd_num)
+{
+ for(int i = 0; i < fdsocklist->size; i++) {
+ /* guarantee is_active is 0 */
+ if (fdsocklist->list[i].is_active == 0 &&
+ fdsocklist->list[i].fd == fd_num) {
+
+ free(fdsocklist->list[i].private_conn_thread);
+ }
+ }
+
+}
+
+
static struct sockaddr_in* del_fd_sockaddr(struct fd_sockaddr_list *fdsocklist,
int fd_num)
{
@@ -476,8 +498,35 @@ static void* start_long_poll(void *srv_ctx_voidptr) {
}
}
+/* running when inactive mark is set */
+static void* start_clean_conn_gc(void *srv_ctx_voidptr)
+{
+ struct server_ctx *srv_ctx = (struct server_ctx*)srv_ctx_voidptr;
+
+ while(!g_need_exit) {
+ for(int i = 0; i < srv_ctx->fd_sockaddr_list->size; i++) {
+
+ if (srv_ctx->fd_sockaddr_list->all_empty == 1) {
+ /* pass */
+ } else if (srv_ctx->fd_sockaddr_list->list[i].is_active == 0) {
+
+ delete_pthread_arrptr(srv_ctx->fd_sockaddr_list,
+ srv_ctx->fd_sockaddr_list->list[i].fd);
+
+ del_fd_sockaddr(srv_ctx->fd_sockaddr_list,
+ srv_ctx->fd_sockaddr_list->list[i].fd);
+
+ close(srv_ctx->fd_sockaddr_list->list[i].fd);
+
+ }
+ }
+ }
+}
+
static void* start_private_conn(void* start_private_conn_details)
{
+ int ret = 0;
+
struct start_private_conn_details *priv_conn_inside =
(struct start_private_conn_details*)start_private_conn_details;
@@ -487,8 +536,13 @@ static void* start_private_conn(void* start_private_conn_details)
char tempbuf[100];
memset(tempbuf, 0, 100);
- read(current_fd,
+ ret = read(current_fd,
tempbuf, 100);
+ if (ret == 0) {
+ mark_conn_inactive(srv_ctx->fd_sockaddr_list,
+ current_fd);
+ perror("read");
+ }
printf("thread xyz says: %s\n", tempbuf);
@@ -502,8 +556,7 @@ static void* start_private_conn(void* start_private_conn_details)
/* call when send error in future, can cause SIGSEGV because init_get_pthread_arrptr check
conn is active or not */
- // mark_conn_inactive(srv_ctx->fd_sockaddr_list,
- // current_fd);
+
}
static void* start_long_poll_receiver(void *srv_ctx_voidptr)
@@ -578,6 +631,10 @@ static int enter_eventloop(struct server_ctx *srv_ctx)
/* set state to 1 */
posix_thread_handler.poll_recv_thread.state = 1;
+ pthread_create(&posix_thread_handler.gc_eventloop.pthread,
+ NULL, start_clean_conn_gc, (void*)srv_ctx);
+ posix_thread_handler.gc_eventloop.state = 1;
+
/* start busy wait */
while(!g_need_exit) {
usleep(200);