diff --git a/src/apps/relay/dbdrivers/dbd_mysql.c b/src/apps/relay/dbdrivers/dbd_mysql.c index 25bd46db..37b705b1 100644 --- a/src/apps/relay/dbdrivers/dbd_mysql.c +++ b/src/apps/relay/dbdrivers/dbd_mysql.c @@ -240,6 +240,7 @@ static MYSQL *get_mydb_connection(void) { mydbconnection=NULL; } else if(!donot_print_connection_success) { TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "MySQL DB connection success: %s\n",pud->userdb); + donot_print_connection_success = 1; } } MyconninfoFree(co); @@ -876,7 +877,6 @@ static int mysql_list_realm_options(u08bits *realm) { static void mysql_auth_ping(void * rch) { UNUSED_ARG(rch); - donot_print_connection_success = 1; MYSQL * myc = get_mydb_connection(); if(myc) { char statement[TURN_LONG_STRING_SIZE]; diff --git a/src/apps/relay/dbdrivers/dbd_pgsql.c b/src/apps/relay/dbdrivers/dbd_pgsql.c index 52e9bd96..c35a3f85 100644 --- a/src/apps/relay/dbdrivers/dbd_pgsql.c +++ b/src/apps/relay/dbdrivers/dbd_pgsql.c @@ -77,6 +77,7 @@ static PGconn *get_pqdb_connection(void) { TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "Cannot open PostgreSQL DB connection: <%s>, runtime error\n",pud->userdb); } else if(!donot_print_connection_success){ TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "PostgreSQL DB connection success: %s\n",pud->userdb); + donot_print_connection_success = 1; } } } @@ -633,7 +634,6 @@ static int pgsql_list_realm_options(u08bits *realm) { static void pgsql_auth_ping(void * rch) { UNUSED_ARG(rch); - donot_print_connection_success = 1; PGconn * pqc = get_pqdb_connection(); if(pqc) { char statement[TURN_LONG_STRING_SIZE]; diff --git a/src/apps/relay/dbdrivers/dbd_redis.c b/src/apps/relay/dbdrivers/dbd_redis.c index c72c939a..eb54c940 100644 --- a/src/apps/relay/dbdrivers/dbd_redis.c +++ b/src/apps/relay/dbdrivers/dbd_redis.c @@ -254,6 +254,7 @@ redis_context_handle get_redis_async_connection(struct event_base *base, const c TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "Cannot initialize Redis DB connection\n"); } else if (is_redis_asyncconn_good(ret) && !donot_print_connection_success) { TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "Redis DB async connection to be used: %s\n", connection_string); + donot_print_connection_success = 1; } RyconninfoFree(co); } @@ -348,6 +349,7 @@ static redisContext *get_redis_connection(void) { TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "Cannot initialize Redis DB connection\n"); } else if (!donot_print_connection_success) { TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "Redis DB sync connection success: %s\n", pud->userdb); + donot_print_connection_success = 1; } RyconninfoFree(co); @@ -1024,7 +1026,6 @@ static int redis_list_realm_options(u08bits *realm) { } static void redis_auth_ping(void * rch) { - donot_print_connection_success = 1; redisContext *rc = get_redis_connection(); if(rc) { turnFreeRedisReply(redisCommand(rc, "keys turn/origin/*")); diff --git a/src/apps/relay/dbdrivers/dbd_sqlite.c b/src/apps/relay/dbdrivers/dbd_sqlite.c index 57d3e0b6..9667621a 100644 --- a/src/apps/relay/dbdrivers/dbd_sqlite.c +++ b/src/apps/relay/dbdrivers/dbd_sqlite.c @@ -135,6 +135,7 @@ static sqlite3 * get_sqlite_connection(void) { } else if(!donot_print_connection_success){ init_sqlite_database(sqliteconnection); TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "SQLite DB connection success: %s\n",pud->userdb); + donot_print_connection_success = 1; } if(sqliteconnection) { (void) pthread_setspecific(connection_key, sqliteconnection); diff --git a/src/apps/relay/mainrelay.c b/src/apps/relay/mainrelay.c index c9b060ad..43a739e5 100644 --- a/src/apps/relay/mainrelay.c +++ b/src/apps/relay/mainrelay.c @@ -120,8 +120,9 @@ LOW_DEFAULT_PORTS_BOUNDARY,HIGH_DEFAULT_PORTS_BOUNDARY,0,0,0,"", /////////////// MISC PARAMS //////////////// 0,0,0,0,0,SHATYPE_SHA1,':',0,0,TURN_CREDENTIALS_NONE,0,0,0,0,0,0, ///////////// Users DB ////////////// -{ (TURN_USERDB_TYPE)0, {"\0"}, {0,NULL,NULL, {NULL,0}} } - +{ (TURN_USERDB_TYPE)0, {"\0"}, {0,NULL,NULL, {NULL,0}} }, +///////////// CPUs ////////////////// +DEFAULT_CPUS_NUMBER }; //////////////// OpenSSL Init ////////////////////// @@ -1801,14 +1802,14 @@ int main(int argc, char **argv) #if defined(_SC_NPROCESSORS_ONLN) { - long cpus = (long)sysconf(_SC_NPROCESSORS_CONF); + turn_params.cpus = (long)sysconf(_SC_NPROCESSORS_CONF); - if(cpus<1) - cpus = 1; - else if(cpus>MAX_NUMBER_OF_GENERAL_RELAY_SERVERS) - cpus = MAX_NUMBER_OF_GENERAL_RELAY_SERVERS; + if(turn_params.cpusMAX_NUMBER_OF_GENERAL_RELAY_SERVERS) + turn_params.cpus = MAX_NUMBER_OF_GENERAL_RELAY_SERVERS; - turn_params.general_relay_servers_number = (turnserver_id)cpus; + turn_params.general_relay_servers_number = (turnserver_id)turn_params.cpus; } #endif diff --git a/src/apps/relay/mainrelay.h b/src/apps/relay/mainrelay.h index b5062b48..be020acb 100644 --- a/src/apps/relay/mainrelay.h +++ b/src/apps/relay/mainrelay.h @@ -100,6 +100,8 @@ extern "C" { #define TURNSERVER_ID_BOUNDARY_BETWEEN_TCP_AND_UDP MAX_NUMBER_OF_GENERAL_RELAY_SERVERS #define TURNSERVER_ID_BOUNDARY_BETWEEN_UDP_AND_TCP TURNSERVER_ID_BOUNDARY_BETWEEN_TCP_AND_UDP +#define DEFAULT_CPUS_NUMBER (2) + /////////// TYPES /////////////////////////////////// enum _DH_KEY_SIZE { @@ -304,6 +306,10 @@ typedef struct _turn_params_ { default_users_db_t default_users_db; +/////// CPUs ////////////// + + unsigned long cpus; + } turn_params_t; extern turn_params_t turn_params; diff --git a/src/apps/relay/netengine.c b/src/apps/relay/netengine.c index 762b16b4..49bafc4e 100644 --- a/src/apps/relay/netengine.c +++ b/src/apps/relay/netengine.c @@ -39,7 +39,10 @@ static pthread_barrier_t barrier; ////////////// Auth Server //////////////// +typedef unsigned char authserver_id; + struct auth_server { + authserver_id id; struct event_base* event_base; struct bufferevent *in_buf; struct bufferevent *out_buf; @@ -47,7 +50,9 @@ struct auth_server { redis_context_handle rch; }; -static struct auth_server authserver = {NULL,NULL,NULL,0,NULL}; +#define MIN_AUTHSERVER_NUMBER (3) +static authserver_id authserver_number = MIN_AUTHSERVER_NUMBER; +static struct auth_server authserver[256]; ////////////////////////////////////////////// @@ -366,9 +371,20 @@ static void allocate_relay_addrs_ports(void) { static int handle_relay_message(relay_server_handle rs, struct message_to_relay *sm); +static pthread_mutex_t auth_message_counter_mutex = PTHREAD_MUTEX_INITIALIZER; +static authserver_id auth_message_counter = 1; + void send_auth_message_to_auth_server(struct auth_message *am) { - struct evbuffer *output = bufferevent_get_output(authserver.out_buf); + pthread_mutex_lock(&auth_message_counter_mutex); + if(auth_message_counter>=authserver_number) auth_message_counter = 1; + else if(auth_message_counter<1) auth_message_counter = 1; + authserver_id sn = auth_message_counter++; + pthread_mutex_unlock(&auth_message_counter_mutex); + + printf("%s: 111.111: %d\n",__FUNCTION__,(int)sn); + + struct evbuffer *output = bufferevent_get_output(authserver[sn].out_buf); if(evbuffer_add(output,am,sizeof(struct auth_message))<0) { fprintf(stderr,"%s: Weird buffer error\n",__FUNCTION__); } @@ -1703,35 +1719,48 @@ static void* run_auth_server_thread(void *arg) struct auth_server *as = (struct auth_server*)arg; - ns_bzero(as,sizeof(struct auth_server)); + authserver_id id = as->id; - as->event_base = turn_event_base_new(); - TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO,"IO method (auth thread): %s\n",event_base_get_method(as->event_base)); + if(id == 0) { - struct bufferevent *pair[2]; + barrier_wait(); - bufferevent_pair_new(as->event_base, TURN_BUFFEREVENTS_OPTIONS, pair); - as->in_buf = pair[0]; - as->out_buf = pair[1]; - bufferevent_setcb(as->in_buf, auth_server_receive_message, NULL, NULL, as); - bufferevent_enable(as->in_buf, EV_READ); + while(run_auth_server_flag) { + reread_realms(); + update_white_and_black_lists(); +#if defined(DB_TEST) + run_db_test(); +#endif + sleep(5); + } + + } else { + + ns_bzero(as,sizeof(struct auth_server)); + + as->id = id; + + as->event_base = turn_event_base_new(); + TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO,"IO method (auth thread): %s\n",event_base_get_method(as->event_base)); + + struct bufferevent *pair[2]; + + bufferevent_pair_new(as->event_base, TURN_BUFFEREVENTS_OPTIONS, pair); + as->in_buf = pair[0]; + as->out_buf = pair[1]; + bufferevent_setcb(as->in_buf, auth_server_receive_message, NULL, NULL, as); + bufferevent_enable(as->in_buf, EV_READ); #if !defined(TURN_NO_HIREDIS) - as->rch = get_redis_async_connection(as->event_base, turn_params.redis_statsdb, 1); + as->rch = get_redis_async_connection(as->event_base, turn_params.redis_statsdb, 1); #endif - struct event_base *eb = as->event_base; + barrier_wait(); - barrier_wait(); - - while(run_auth_server_flag) { - reread_realms(); - update_white_and_black_lists(); - auth_ping(as->rch); - run_events(eb,NULL); -#if defined(DB_TEST) - run_db_test(); -#endif + while(run_auth_server_flag) { + auth_ping(as->rch); + run_events(as->event_base,NULL); + } } return arg; @@ -1781,11 +1810,15 @@ void setup_server(void) pthread_mutex_init(&mutex_bps, NULL); + authserver_number = 1 + (authserver_id)(turn_params.cpus / 2); + + if(authserver_number < MIN_AUTHSERVER_NUMBER) authserver_number = MIN_AUTHSERVER_NUMBER; + #if !defined(TURN_NO_THREAD_BARRIERS) - /* relay threads plus auth thread plus main listener thread */ + /* relay threads plus auth threads plus main listener thread */ /* udp address listener thread(s) will start later */ - barrier_count = turn_params.general_relay_servers_number+2; + barrier_count = turn_params.general_relay_servers_number+authserver_number+1; if(use_cli) { barrier_count += 1; @@ -1828,7 +1861,14 @@ void setup_server(void) } } - setup_auth_server(&authserver); + { + authserver_id sn = 0; + for(sn = 0; sn < authserver_number;++sn) { + authserver[sn].id = sn; + setup_auth_server(&(authserver[sn])); + } + } + if(use_cli) setup_cli_server(); diff --git a/src/apps/relay/userdb.c b/src/apps/relay/userdb.c index aae0e411..21783953 100644 --- a/src/apps/relay/userdb.c +++ b/src/apps/relay/userdb.c @@ -108,37 +108,37 @@ void create_default_realm() o_to_realm = ur_string_map_create(turn_free_simple); default_realm_params_ptr = &_default_realm_params; realms = ur_string_map_create(NULL); - ur_string_map_lock(realms); + lock_realms(); default_realm_params_ptr->status.alloc_counters = ur_string_map_create(NULL); - ur_string_map_unlock(realms); + unlock_realms(); } void get_default_realm_options(realm_options_t* ro) { if(ro) { - ur_string_map_lock(realms); + lock_realms(); ns_bcopy(&(default_realm_params_ptr->options),ro,sizeof(realm_options_t)); - ur_string_map_unlock(realms); + unlock_realms(); } } void set_default_realm_name(char *realm) { - ur_string_map_lock(realms); + lock_realms(); ur_string_map_value_type value = (ur_string_map_value_type)default_realm_params_ptr; STRCPY(default_realm_params_ptr->options.name,realm); ur_string_map_put(realms, (ur_string_map_key_type)default_realm_params_ptr->options.name, value); add_to_secrets_list(&realms_list, realm); - ur_string_map_unlock(realms); + unlock_realms(); } realm_params_t* get_realm(char* name) { if(name && name[0]) { - ur_string_map_lock(realms); + lock_realms(); ur_string_map_value_type value = 0; ur_string_map_key_type key = (ur_string_map_key_type)name; if (ur_string_map_get(realms, key, &value)) { - ur_string_map_unlock(realms); + unlock_realms(); return (realm_params_t*)value; } else { realm_params_t *ret = (realm_params_t*)turn_malloc(sizeof(realm_params_t)); @@ -148,7 +148,7 @@ realm_params_t* get_realm(char* name) ur_string_map_put(realms, key, value); ret->status.alloc_counters = ur_string_map_create(NULL); add_to_secrets_list(&realms_list, name); - ur_string_map_unlock(realms); + unlock_realms(); return ret; } } @@ -158,9 +158,9 @@ realm_params_t* get_realm(char* name) int get_realm_data(char* name, realm_params_t* rp) { - ur_string_map_lock(realms); + lock_realms(); ns_bcopy(get_realm(name),rp,sizeof(realm_params_t)); - ur_string_map_unlock(realms); + unlock_realms(); return 0; } @@ -193,20 +193,20 @@ void get_realm_options_by_name(char *realm, realm_options_t* ro) int change_total_quota(char *realm, int value) { int ret = value; - ur_string_map_lock(realms); + lock_realms(); realm_params_t* rp = get_realm(realm); rp->options.perf_options.total_quota = value; - ur_string_map_unlock(realms); + unlock_realms(); return ret; } int change_user_quota(char *realm, int value) { int ret = value; - ur_string_map_lock(realms); + lock_realms(); realm_params_t* rp = get_realm(realm); rp->options.perf_options.user_quota = value; - ur_string_map_unlock(realms); + unlock_realms(); return ret; } @@ -1285,16 +1285,16 @@ void reread_realms(void) { { realm_params_t* defrp = get_realm(NULL); - ur_string_map_lock(realms); + lock_realms(); defrp->options.perf_options.max_bps = turn_params.max_bps; defrp->options.perf_options.total_quota = turn_params.total_quota; defrp->options.perf_options.user_quota = turn_params.user_quota; - ur_string_map_unlock(realms); + unlock_realms(); } - const turn_dbdriver_t * dbd = get_dbdriver(); - if (dbd && dbd->reread_realms) { - (*dbd->reread_realms)(&realms_list); + const turn_dbdriver_t * dbd = get_dbdriver(); + if (dbd && dbd->reread_realms) { + (*dbd->reread_realms)(&realms_list); } }