#include "socket-common.h" #include "response.h" #include #include #define STDIN_BUF_SIZE 1024 int port=3333; //port to listen int connections=0; int maxConn=16; //max connections - 16 guint signalMyCb; //signal to register which is used in cbClientInput(), step 10 from requirements GAsyncQueue** stdinQueue=NULL; GThread **peers; //actual connected peers //GIOChannel callback for stdin static gboolean stdinAvailable (GIOChannel *source, GIOCondition condition, gpointer data) { static char* buffer=NULL; static gint64 allocated=0; GAsyncQueue* queue=NULL; //temporary buffer to read from stdin char tmp[STDIN_BUF_SIZE]; memset(tmp,0,STDIN_BUF_SIZE); int size = read(0, tmp, STDIN_BUF_SIZE); if(size<=0) return TRUE; //fix for \n or \r sending if(size ==1) if(tmp[0] == '\r' || tmp[0] == '\n') return TRUE; //the termination characters if(!strncmp(tmp, "..", size-1)) { //final reallocation of the buffer to accomodate the whole string buffer=realloc(buffer,allocated+1); buffer[allocated] = '\0'; int idx=0; g_print_debug("Got the buffer: \"%s\", tid: %p\n", buffer, g_thread_self()); if (tmp[0] == '.' && tmp[1] == '.') //fix for single dot sending { int count=0; //send the final buffer to the queue for(count=0; count maxConn) { g_print_debug("Connection closed. Max reached\n"); return TRUE; } connections++; g_print_debug("Incoming connection\n"); return FALSE; } //thread connection handler static gboolean handler (GThreadedSocketService *service, GSocketConnection *connection, GSocketListener *listener, gpointer user_data) { GOutputStream *out; GInputStream *in; gssize size; out = g_io_stream_get_output_stream (G_IO_STREAM (connection)); in = g_io_stream_get_input_stream (G_IO_STREAM (connection)); g_print_debug("Handling, connections: %d\n", connections); //register ourselves in the peers vector, use the index obtained in the stdinQueue //should not get -1 GThread* self = g_thread_self(); int index=pushIndex(self); GSource *outSource = NULL; GSource *inSource = g_pollable_input_stream_create_source((struct GPollableInputStream*)in, NULL); //get a reference to the async queue GAsyncQueue *queue = g_async_queue_ref(stdinQueue[index]); //input from client g_source_set_callback(inSource, cbClientInput, (GPollableOutputStream*)out, NULL); g_source_attach(inSource, NULL); //keep thread alive, every 1000 microseconds while(g_source_is_destroyed(inSource)==FALSE) { g_usleep(1000); gint elems; //verify our queue length and activate/deactivate the "out" Source for this connection //if we don't do this, the out Source will be scheduled frequently and will busy loop if((elems=g_async_queue_length(queue))>0) { if(outSource == NULL) { outSource = g_pollable_output_stream_create_source((struct GPollableOutputStream*)out, NULL); g_source_set_callback(outSource, cbClientOutput, queue, NULL); g_source_attach(outSource, NULL); } else { g_source_destroy(outSource); g_source_unref(outSource); outSource = NULL; } } else { if(outSource!= NULL && !g_source_is_destroyed(outSource)) { g_print_debug("Destroy source\n"); g_source_destroy(outSource); g_source_unref(outSource); outSource = NULL; //added } } //end of activate/deactivate } //reached the end of the thread if (g_output_stream_close(out, NULL, NULL) == FALSE) g_print_debug("out not closed\n"); if (g_input_stream_close(in, NULL, NULL) == FALSE) g_print_debug("in not closed\n"); g_print_debug("Thread end\n"); connections--; //keep track of connections g_async_queue_unref(queue); //unreference the queue pullIndex(g_thread_self()); //unregister from the peers vector return TRUE; } int main(int argc, char **argv) { GSocketService *service = NULL; GError *error = NULL; GIOChannel* stdinChannel = NULL; g_type_init (); //register stdin channel stdinChannel = g_io_channel_unix_new(0); if(stdinChannel == NULL) { g_printerr("No io channel\n"); exit(-1); } g_io_add_watch(stdinChannel, G_IO_IN, stdinAvailable, NULL); int idx=0; //allocate a maxConn queue on the heap stdinQueue = malloc(sizeof(struct GAsyncQueue*) * maxConn); //allocate peers vector on the heap peers = malloc(sizeof(GThread*) * maxConn); for(idx=0;idxmessage); free(stdinQueue); free(peers); return 1; } //init the signal signalMyCb initSignals(&signalMyCb); g_print_debug("Server listening on port %d\n", port); g_signal_connect (service, "run", G_CALLBACK (handler), NULL); g_signal_connect (service, "incoming", G_CALLBACK(incomingConnection), NULL); g_main_loop_run (g_main_loop_new (NULL, FALSE)); free(stdinQueue); free(peers); return 0; }