From 4d8ffe7f825dfd5be42b7348f8d52224b703a615 Mon Sep 17 00:00:00 2001 From: fadhil riyanto Date: Fri, 11 Oct 2024 22:59:01 +0700 Subject: test thread --- main2.c | 146 +++++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 104 insertions(+), 42 deletions(-) (limited to 'main2.c') diff --git a/main2.c b/main2.c index 9af3850..c95627b 100644 --- a/main2.c +++ b/main2.c @@ -225,8 +225,9 @@ struct fd_bridge { struct epoll_event *events; pthread_mutex_t mutex; - u_int8_t need_exit; - + u_int8_t need_exit_1; + u_int8_t need_exit_2; + pthread_t client2srv_pthread; pthread_t srv2client_pthread; @@ -576,12 +577,12 @@ static void uninst_th_for_fd(struct th_pool *thpool, int fd) for(int i = 0; i < FREE_THREAD_ALLOC; i++) { - if (thpool->th_pool[i].handled_fd == fd && thpool->th_pool[i].need_join == 1) { - pthread_join(thpool->th_pool[i].th, NULL); + if (thpool->th_pool[i].handled_fd == fd && thpool->th_pool[i].is_active == 1) { + // pthread_join(thpool->th_pool[i].th, NULL); pthread_mutex_lock(&thpool->th_pool_mutex); thpool->th_pool[i].is_active = 0; - thpool->th_pool[i].need_join = 0; + thpool->th_pool[i].need_join = 1; thpool->size = thpool->size - 1; pthread_mutex_unlock(&thpool->th_pool_mutex); @@ -1234,25 +1235,26 @@ static void* start_exchange_data2_client2srv(void *fd_bridgeptr) if (client_ret == 0) { log_warn("client is zero, closing connection"); - close(fd_bridge->events[i].data.fd); - close(fd_bridge->target_fd); + // close(fd_bridge->events[i].data.fd); + // close(fd_bridge->target_fd); - _start_exchange_data2_epfd_uninstall(fd_bridgeptr, - fd_bridge->events[i].data.fd); + - fd_bridge->need_exit = 1; - pthread_exit(&client_ret); + fd_bridge->need_exit_1 = 1; + return 0; } if (client_ret == -1) { perror("client2srv recv:"); - close(fd_bridge->client_fd); - close(fd_bridge->target_fd); - pthread_exit(&client_ret); + // close(fd_bridge->client_fd); + // close(fd_bridge->target_fd); + fd_bridge->need_exit_1 = 1; + return 0; } else { srv_ret = send(fd_bridge->target_fd, buf, client_ret, 0); if (srv_ret == -1) { perror("client2srv send:"); + fd_bridge->need_exit_1 = 1; } } @@ -1271,7 +1273,7 @@ static void* start_exchange_data2_srv2client(void *fd_bridgeptr) _start_exchange_data2_epfd_install(fd_bridge, fd_bridge->target_fd); - while (1) { + while(1) { // dbg("srv2client") event_ret = epoll_wait(fd_bridge->fd_bridge_epfd, fd_bridge->events, THREAD_MAX_QUEUE_EVENTS, 1000); @@ -1282,28 +1284,38 @@ static void* start_exchange_data2_srv2client(void *fd_bridgeptr) memset(buf, 0, 4096); srv_ret = recv(fd_bridge->events[i].data.fd, buf, 4096, 0); - // printf("srv2clie %d\n", srv_ret); + printf("srv2clie %d\n", srv_ret); if (srv_ret == 0) { - log_warn("srv ret is zero, closing connection"); - close(fd_bridge->events[i].data.fd); - close(fd_bridge->client_fd); + log_warn("srv ret is zero, closing connectionss"); + // close(fd_bridge->events[i].data.fd); + // close(fd_bridge->client_fd); - _start_exchange_data2_epfd_uninstall(fd_bridgeptr, - fd_bridge->events[i].data.fd); + - fd_bridge->need_exit = 1; + fd_bridge->need_exit_2 = 1; + log_debug("running %d", srv_ret); - pthread_exit(&client_ret); + return 0; } + + if (srv_ret == -1) { perror("srv2client recv:"); + fd_bridge->need_exit_2 = 1; + return 0; } else { client_ret = send(fd_bridge->client_fd, buf, srv_ret, 0); if (client_ret == -1) { perror("srv2client send:"); - close(fd_bridge->client_fd); + fd_bridge->need_exit_2 = 1; + // close(fd_bridge->client_fd); + + return 0; + } else if (client_ret == 0) { + fd_bridge->need_exit_2 = 1; + // close(fd_bridge->client_fd); - pthread_exit(&client_ret); + return 0; } } } @@ -1311,13 +1323,16 @@ static void* start_exchange_data2_srv2client(void *fd_bridgeptr) } } - } static void start_exchange_data2(int client_fd, int target_fd) { + int ret; + struct fd_bridge fd_bridge; - fd_bridge.need_exit = 0; + fd_bridge.need_exit_1 = 0; + fd_bridge.need_exit_2 = 0; + fd_bridge.client_fd = client_fd; fd_bridge.target_fd = target_fd; @@ -1332,16 +1347,37 @@ static void start_exchange_data2(int client_fd, int target_fd) pthread_create(&fd_bridge.srv2client_pthread, 0, start_exchange_data2_srv2client, (void*)&fd_bridge); - - while (fd_bridge.need_exit != 1) { + + while (1) { + if (fd_bridge.need_exit_1 == 1 && fd_bridge.need_exit_2 == 1) { + break; + } sleep(1); + log_error("jalan 1{%d} 2{%d}", fd_bridge.need_exit_1, fd_bridge.need_exit_2); } - pthread_join(fd_bridge.client2srv_pthread, NULL); - pthread_join(fd_bridge.srv2client_pthread, NULL); + void *rand; + + + + ret = pthread_join(fd_bridge.client2srv_pthread, &rand); + log_error("THREAD EXITEDdua %d", ret); + ret = pthread_join(fd_bridge.srv2client_pthread, &rand); + log_error("THREAD EXITED %d", ret); + + + + _start_exchange_data2_epfd_uninstall(&fd_bridge, + target_fd); + _start_exchange_data2_epfd_uninstall(&fd_bridge, + client_fd); + close(client_fd); + close(target_fd); pthread_mutex_destroy(&fd_bridge.mutex); close(fd_bridge.fd_bridge_epfd); + + } @@ -1351,7 +1387,7 @@ static int start_unpack_packet_no_epl(int fd, void* reserved, struct socks5_sess int ret = 0; int exc_ret = 0; - do { + for (int x = 0; x < 2; x++) { ret = read(fd, buf, 4096); if (ret == 0) { return 0; @@ -1378,11 +1414,9 @@ static int start_unpack_packet_no_epl(int fd, void* reserved, struct socks5_sess next_req->port, 0); start_exchange_data2(fd, cur_conn_clientfd); - // if (exc_ret == 1) { - // close(cur_conn_clientfd); - // close(fd); - return 0; - // } + + return 0; + } else { socks5_send_connstate(fd, 3, next_req->atyp, next_req->dest, next_req->port, 0); @@ -1409,8 +1443,11 @@ static int start_unpack_packet_no_epl(int fd, void* reserved, struct socks5_sess if (ret == 0) { socks5_send_connstate(fd, 0, next_req->atyp, next_req->dest, next_req->port, 0); + start_exchange_data2(fd, cur_conn_clientfd); + + log_debug("conn done exit"); // if (exc_ret == 1) { // close(cur_conn_clientfd); // close(fd); @@ -1454,7 +1491,8 @@ static int start_unpack_packet_no_epl(int fd, void* reserved, struct socks5_sess } } } - }while (ret != 0); + // }while (ret != 0); + } return 0; } @@ -1470,9 +1508,10 @@ static void* start_private_conn_no_epl(void *priv_conn_detailsptr) socks5_session.is_auth = 0; int current_fd = priv_conn_details->acceptfd; + log_info("connection started"); int ret = start_unpack_packet_no_epl(current_fd, NULL, &socks5_session); if (ret == 0) { - log_info("connection closed"); + log_info("connection exited"); close(current_fd); uninst_th_for_fd(srv_ctx->th_pool, current_fd); @@ -1584,7 +1623,7 @@ static void* start_long_poll(void *srv_ctx_voidptr) { /* generate thread */ int th_num = init_th_for_fd(srv_ctx->th_pool, ret); if (th_num == -1) { - handle_user_max(th_num); + handle_user_max(ret); } else { /* pass the data */ @@ -1644,6 +1683,29 @@ static void cleanup_eventloop_thread(struct posix_thread_handler *thhandler) } +static void* start_clean_gc(void *srv_ctxptr) +{ + struct server_ctx *srv_ctx = srv_ctxptr; + + while(!g_need_exit) { + + for(int i = 0; i < FREE_THREAD_ALLOC; i++) { + if (srv_ctx->th_pool->th_pool[i].is_active == 0 && srv_ctx->th_pool->th_pool[i].need_join == 1) { + pthread_join(srv_ctx->th_pool->th_pool[i].th, NULL); + + pthread_mutex_lock(&srv_ctx->th_pool->th_pool_mutex); + srv_ctx->th_pool->th_pool[i].is_active = 0; + srv_ctx->th_pool->th_pool[i].need_join = 0; + srv_ctx->th_pool->size = srv_ctx->th_pool->size - 1; + pthread_mutex_unlock(&srv_ctx->th_pool->th_pool_mutex); + + log_warn("cleared gc"); + + } + } + } +} + static int enter_eventloop(struct server_ctx *srv_ctx) { struct posix_thread_handler posix_thread_handler; @@ -1685,8 +1747,8 @@ static int enter_eventloop(struct server_ctx *srv_ctx) posix_thread_handler.poll_thread.state = 1; /* start our second receiver */ - // pthread_create(&posix_thread_handler.poll_recv_thread.pthread, - // NULL, start_long_poll_receiver, (void*)srv_ctx); + pthread_create(&posix_thread_handler.poll_recv_thread.pthread, + NULL, start_clean_gc, (void*)srv_ctx); // /* set state to 1 */ // posix_thread_handler.poll_recv_thread.state = 1; -- cgit v1.2.3