diff options
author | fadhil riyanto <me@fadev.org> | 2024-10-01 08:11:00 +0700 |
---|---|---|
committer | fadhil riyanto <me@fadev.org> | 2024-10-01 08:11:00 +0700 |
commit | 13bdab6ce8012124c6169c87710e3ecd1706dde8 (patch) | |
tree | 78bacf8d674084ec238070f69a138f188a9dc93d | |
parent | 4ef48072c061b9146dc1c43fdb64bad0513d5d39 (diff) |
fix sigsegv when thread ended. Use thread pool instead
use own th_pool struct instead use fd_sockaddr_list
Signed-off-by: fadhil riyanto <me@fadev.org>
-rw-r--r-- | config.h | 1 | ||||
-rw-r--r-- | main.c | 134 |
2 files changed, 111 insertions, 24 deletions
@@ -3,5 +3,6 @@ #define MAX_ACCEPT_WORKER 5 #define EPOLL_ACCEPTFD_WATCHLIST_LEN 20 +#define FREE_THREAD_ALLOC 50 #endif
\ No newline at end of file @@ -86,6 +86,22 @@ struct fd_sockaddr_list { short all_empty; /* true while no thread is initalized */ }; +/* thread pool section */ + +struct _th_pool { + int handled_fd; + short is_active; + short need_join; + + pthread_t th; +}; + +struct th_pool { + pthread_mutex_t th_pool_mutex; + struct _th_pool *th_pool; + int size; +}; + struct server_ctx { /* our tcp-fd */ int tcpfd; @@ -99,6 +115,8 @@ struct server_ctx { /* hold stack ptr from enter_eventloop func */ struct fd_sockaddr_list *fd_sockaddr_list; + + struct th_pool *th_pool; struct epoll_fd_queue *epoll_fd_queue; /* probably unused */ @@ -267,7 +285,7 @@ static int server_run_worker(struct server_ctx *srv_ctx, close(ret); - return 0; + // return 0; } } @@ -479,6 +497,47 @@ static int uninstall_acceptfd_from_epoll(struct server_ctx *srv_ctx, int acceptf return 0; } +static int init_th_for_fd(struct th_pool *thpool, int fd) +{ + + + for(int i = 0; i < FREE_THREAD_ALLOC; i++) { + if (thpool->th_pool[i].is_active == 0 && thpool->th_pool[i].need_join == 0) { + + pthread_mutex_lock(&thpool->th_pool_mutex); + + thpool->th_pool[i].is_active = 1; + thpool->th_pool[i].need_join = 0; + + pthread_mutex_unlock(&thpool->th_pool_mutex); + + /* alloc here */ + return i; + } + } + + + + return -1; +} + +static void uninst_th_for_fd(struct th_pool *thpool, int fd) +{ + pthread_mutex_lock(&thpool->th_pool_mutex); + + for(int i = 0; i < FREE_THREAD_ALLOC; i++) { + if (thpool->th_pool[i].is_active == 1 && thpool->th_pool[i].need_join == 0 && + thpool->th_pool[i].handled_fd == fd) { + + thpool->th_pool[i].is_active = 0; + thpool->th_pool[i].need_join = 1; + + } + } + + pthread_mutex_unlock(&thpool->th_pool_mutex); +} + static void* start_long_poll(void *srv_ctx_voidptr) { struct server_ctx *srv_ctx = (struct server_ctx*)srv_ctx_voidptr; @@ -564,19 +623,30 @@ static void* start_private_conn(void* start_private_conn_details) ret = read(current_fd, tempbuf, 100); - // if (ret == 0) { - // mark_conn_inactive(srv_ctx->fd_sockaddr_list, - // current_fd); - // perror("read"); - // } + if (ret == 0) { + mark_conn_inactive(srv_ctx->fd_sockaddr_list, + current_fd); + + del_fd_sockaddr(srv_ctx->fd_sockaddr_list, current_fd); - printf("thread xyz says: %s\n", tempbuf); + uninstall_acceptfd_from_epoll(srv_ctx, current_fd); + // del_fd_sockaddr(srv_ctx->fd_sockaddr_list, current_fd); + + printf("closed ... "); + close(current_fd); + } else { + printf("thread xyz says: %s\n", tempbuf); + } + + + + /* do on cleaner, parent thread del_fd_sockaddr(srv_ctx->fd_sockaddr_list, current_fd); - close(current_fd); + */ /* call when send error in future, can cause SIGSEGV because init_get_pthread_arrptr check @@ -585,11 +655,38 @@ static void* start_private_conn(void* start_private_conn_details) } +static void prepare_priv_conn(struct start_private_conn_details *start_private_conn2thread, + struct server_ctx *srv_ctx) +{ + int ret = 0; + + ret = init_th_for_fd(srv_ctx->th_pool, start_private_conn2thread->acceptfd); + + if (ret < 0) { + perror("init thread failed, no sufficient thread available"); + close(start_private_conn2thread->acceptfd); + } else { + pthread_create(&srv_ctx->th_pool->th_pool[ret].th, NULL, + start_private_conn, (void*)start_private_conn2thread); + } +} + static void* start_long_poll_receiver(void *srv_ctx_voidptr) { struct server_ctx *srv_ctx = (struct server_ctx*)srv_ctx_voidptr; struct start_private_conn_details start_private_conn2thread; + + struct _th_pool _th_pool[FREE_THREAD_ALLOC]; + struct th_pool th_pool = { + .th_pool = _th_pool, + .size = 0, + }; + + pthread_mutex_init(&th_pool.th_pool_mutex, NULL); + + srv_ctx->th_pool = &th_pool; + start_private_conn2thread.srv_ctx = srv_ctx; int n_ready_read = 0; @@ -602,30 +699,19 @@ static void* start_long_poll_receiver(void *srv_ctx_voidptr) if (n_ready_read > 0) { for (int i = 0; i < n_ready_read; i++) { - // pthread_t *start_priv_connptr = init_get_pthread_arrptr( - // srv_ctx->fd_sockaddr_list, - // srv_ctx->acceptfd_watchlist_event[i].data.fd - // ); - /* use thread pool instread */ - // start_private_conn2thread.acceptfd = srv_ctx - // ->acceptfd_watchlist_event[i].data.fd; - + start_private_conn2thread.acceptfd = srv_ctx + ->acceptfd_watchlist_event[i].data.fd; + + prepare_priv_conn(&start_private_conn2thread, srv_ctx); // pthread_create(start_priv_connptr, NULL, // start_private_conn, // (void*)&start_private_conn2thread); // get_by_fd_sockaddr(srv_ctx->fd_sockaddr_list, // srv_ctx->acceptfd_watchlist_event[i].data.fd); - char buf[100]; - memset(buf, 0, 100);; - - int readctx = read( - srv_ctx->acceptfd_watchlist_event[i].data.fd, - buf, 100); - printf("%s\n", buf); - close(srv_ctx->acceptfd_watchlist_event[i].data.fd); + } |