summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfadhil riyanto <me@fadev.org>2024-10-01 08:11:00 +0700
committerfadhil riyanto <me@fadev.org>2024-10-01 08:11:00 +0700
commit13bdab6ce8012124c6169c87710e3ecd1706dde8 (patch)
tree78bacf8d674084ec238070f69a138f188a9dc93d
parent4ef48072c061b9146dc1c43fdb64bad0513d5d39 (diff)
fix sigsegv when thread ended. Use thread pool instead
use own th_pool struct instead use fd_sockaddr_list Signed-off-by: fadhil riyanto <me@fadev.org>
-rw-r--r--config.h1
-rw-r--r--main.c134
2 files changed, 111 insertions, 24 deletions
diff --git a/config.h b/config.h
index baeab66..7d50464 100644
--- a/config.h
+++ b/config.h
@@ -3,5 +3,6 @@
#define MAX_ACCEPT_WORKER 5
#define EPOLL_ACCEPTFD_WATCHLIST_LEN 20
+#define FREE_THREAD_ALLOC 50
#endif \ No newline at end of file
diff --git a/main.c b/main.c
index 40cb363..4621570 100644
--- a/main.c
+++ b/main.c
@@ -86,6 +86,22 @@ struct fd_sockaddr_list {
short all_empty; /* true while no thread is initalized */
};
+/* thread pool section */
+
+struct _th_pool {
+ int handled_fd;
+ short is_active;
+ short need_join;
+
+ pthread_t th;
+};
+
+struct th_pool {
+ pthread_mutex_t th_pool_mutex;
+ struct _th_pool *th_pool;
+ int size;
+};
+
struct server_ctx {
/* our tcp-fd */
int tcpfd;
@@ -99,6 +115,8 @@ struct server_ctx {
/* hold stack ptr from enter_eventloop func */
struct fd_sockaddr_list *fd_sockaddr_list;
+
+ struct th_pool *th_pool;
struct epoll_fd_queue *epoll_fd_queue; /* probably unused */
@@ -267,7 +285,7 @@ static int server_run_worker(struct server_ctx *srv_ctx,
close(ret);
- return 0;
+ // return 0;
}
}
@@ -479,6 +497,47 @@ static int uninstall_acceptfd_from_epoll(struct server_ctx *srv_ctx, int acceptf
return 0;
}
+static int init_th_for_fd(struct th_pool *thpool, int fd)
+{
+
+
+ for(int i = 0; i < FREE_THREAD_ALLOC; i++) {
+ if (thpool->th_pool[i].is_active == 0 && thpool->th_pool[i].need_join == 0) {
+
+ pthread_mutex_lock(&thpool->th_pool_mutex);
+
+ thpool->th_pool[i].is_active = 1;
+ thpool->th_pool[i].need_join = 0;
+
+ pthread_mutex_unlock(&thpool->th_pool_mutex);
+
+ /* alloc here */
+ return i;
+ }
+ }
+
+
+
+ return -1;
+}
+
+static void uninst_th_for_fd(struct th_pool *thpool, int fd)
+{
+ pthread_mutex_lock(&thpool->th_pool_mutex);
+
+ for(int i = 0; i < FREE_THREAD_ALLOC; i++) {
+ if (thpool->th_pool[i].is_active == 1 && thpool->th_pool[i].need_join == 0 &&
+ thpool->th_pool[i].handled_fd == fd) {
+
+ thpool->th_pool[i].is_active = 0;
+ thpool->th_pool[i].need_join = 1;
+
+ }
+ }
+
+ pthread_mutex_unlock(&thpool->th_pool_mutex);
+}
+
static void* start_long_poll(void *srv_ctx_voidptr) {
struct server_ctx *srv_ctx = (struct server_ctx*)srv_ctx_voidptr;
@@ -564,19 +623,30 @@ static void* start_private_conn(void* start_private_conn_details)
ret = read(current_fd,
tempbuf, 100);
- // if (ret == 0) {
- // mark_conn_inactive(srv_ctx->fd_sockaddr_list,
- // current_fd);
- // perror("read");
- // }
+ if (ret == 0) {
+ mark_conn_inactive(srv_ctx->fd_sockaddr_list,
+ current_fd);
+
+ del_fd_sockaddr(srv_ctx->fd_sockaddr_list, current_fd);
- printf("thread xyz says: %s\n", tempbuf);
+ uninstall_acceptfd_from_epoll(srv_ctx, current_fd);
+ // del_fd_sockaddr(srv_ctx->fd_sockaddr_list, current_fd);
+
+ printf("closed ... ");
+ close(current_fd);
+ } else {
+ printf("thread xyz says: %s\n", tempbuf);
+ }
+
+
+
+
/* do on cleaner, parent thread
del_fd_sockaddr(srv_ctx->fd_sockaddr_list,
current_fd);
- close(current_fd);
+
*/
/* call when send error in future, can cause SIGSEGV because init_get_pthread_arrptr check
@@ -585,11 +655,38 @@ static void* start_private_conn(void* start_private_conn_details)
}
+static void prepare_priv_conn(struct start_private_conn_details *start_private_conn2thread,
+ struct server_ctx *srv_ctx)
+{
+ int ret = 0;
+
+ ret = init_th_for_fd(srv_ctx->th_pool, start_private_conn2thread->acceptfd);
+
+ if (ret < 0) {
+ perror("init thread failed, no sufficient thread available");
+ close(start_private_conn2thread->acceptfd);
+ } else {
+ pthread_create(&srv_ctx->th_pool->th_pool[ret].th, NULL,
+ start_private_conn, (void*)start_private_conn2thread);
+ }
+}
+
static void* start_long_poll_receiver(void *srv_ctx_voidptr)
{
struct server_ctx *srv_ctx = (struct server_ctx*)srv_ctx_voidptr;
struct start_private_conn_details start_private_conn2thread;
+
+ struct _th_pool _th_pool[FREE_THREAD_ALLOC];
+ struct th_pool th_pool = {
+ .th_pool = _th_pool,
+ .size = 0,
+ };
+
+ pthread_mutex_init(&th_pool.th_pool_mutex, NULL);
+
+ srv_ctx->th_pool = &th_pool;
+
start_private_conn2thread.srv_ctx = srv_ctx;
int n_ready_read = 0;
@@ -602,30 +699,19 @@ static void* start_long_poll_receiver(void *srv_ctx_voidptr)
if (n_ready_read > 0) {
for (int i = 0; i < n_ready_read; i++) {
- // pthread_t *start_priv_connptr = init_get_pthread_arrptr(
- // srv_ctx->fd_sockaddr_list,
- // srv_ctx->acceptfd_watchlist_event[i].data.fd
- // );
-
/* use thread pool instread */
- // start_private_conn2thread.acceptfd = srv_ctx
- // ->acceptfd_watchlist_event[i].data.fd;
-
+ start_private_conn2thread.acceptfd = srv_ctx
+ ->acceptfd_watchlist_event[i].data.fd;
+
+ prepare_priv_conn(&start_private_conn2thread, srv_ctx);
// pthread_create(start_priv_connptr, NULL,
// start_private_conn,
// (void*)&start_private_conn2thread);
// get_by_fd_sockaddr(srv_ctx->fd_sockaddr_list,
// srv_ctx->acceptfd_watchlist_event[i].data.fd);
- char buf[100];
- memset(buf, 0, 100);;
-
- int readctx = read(
- srv_ctx->acceptfd_watchlist_event[i].data.fd,
- buf, 100);
- printf("%s\n", buf);
- close(srv_ctx->acceptfd_watchlist_event[i].data.fd);
+
}