diff options
Diffstat (limited to 'instr-daemon.c')
-rw-r--r-- | instr-daemon.c | 267 |
1 files changed, 267 insertions, 0 deletions
diff --git a/instr-daemon.c b/instr-daemon.c new file mode 100644 index 0000000..21864f7 --- /dev/null +++ b/instr-daemon.c @@ -0,0 +1,267 @@ +#include "socket-common.h" +#include "response.h" +#include <stdlib.h> +#include <ctype.h> + +#define STDIN_BUF_SIZE 1024 + +int port=3333; //port to listen +int connections=0; +int maxConn=16; //max connections - 16 + +guint signalMyCb; //signal to register which is used in cbClientInput(), step 10 from requirements +GAsyncQueue** stdinQueue=NULL; +GThread **peers; //actual connected peers + + +//GIOChannel callback for stdin +static gboolean +stdinAvailable (GIOChannel *source, GIOCondition condition, gpointer data) +{ + static char* buffer=NULL; + static gint64 allocated=0; + GAsyncQueue* queue=NULL; + + //temporary buffer to read from stdin + char tmp[STDIN_BUF_SIZE]; + memset(tmp,0,STDIN_BUF_SIZE); + + int size = read(0, tmp, STDIN_BUF_SIZE); + if(size<=0) { + return TRUE; + } + + //fix for \n or \r sending + if(size ==1) + if(tmp[0] == '\r' || tmp[0] == '\n') { + return TRUE; + } + + //the termination characters + if(!strncmp(tmp, "..", size-1)) { + //final reallocation of the buffer to accomodate the whole string + buffer=realloc(buffer,allocated+1); + buffer[allocated] = '\0'; + int idx=0; + + g_print_debug("Got the buffer: \"%s\", tid: %p\n", buffer, g_thread_self()); + + if (tmp[0] == '.' && tmp[1] == '.') { //fix for single dot sending + int count=0; + + //send the final buffer to the queue + for(count=0; count<maxConn; count++) { + //allocate on the heap and deallocate in response.c, cbClientOutput() + char *toSend = realloc(NULL, allocated+1); + memcpy(toSend, buffer, allocated+1); + + //make sure we send to only valid peers + if(peers[count] != 0) { + g_async_queue_push(stdinQueue[count], toSend); + } + } + + //free the buffer + free(buffer); + buffer=NULL; + allocated=0; + return TRUE; + } + } + + //allocate on the heap. buffer is NULL the first time + buffer=realloc(buffer, allocated+size); + memcpy(buffer+allocated, tmp, size); + allocated += size; //keep track of allocation + + + return TRUE; +} + +/** pulls an integer position from the vector which contains the + position to use for the stdinQueue vector + @param id - the thread id +*/ +static int pullIndex(GThread* id) +{ + int i=0,ret=-1; + static GStaticMutex mutex = G_STATIC_MUTEX_INIT; + g_static_mutex_lock (&mutex); + for(i=0; i<maxConn; i++) + if(peers[i] == id) { + peers[i] = (GThread*)0; + ret=i; + break; + } + g_static_mutex_unlock (&mutex); + return ret; +} + +/** + * pushed in the peers vector the thread id for future reference + * @param id + */ +static int pushIndex(GThread* id) +{ + int i=0,ret=-1; + static GStaticMutex mutex = G_STATIC_MUTEX_INIT; + g_static_mutex_lock (&mutex); + for(i=0; i<maxConn; i++) + if(peers[i] == 0) { + peers[i] = id; + ret=i; + break; + } + + g_static_mutex_unlock (&mutex); + return ret; +} + +//handler for incoming connection, closes the connection if maxCon connections are already handled +static gboolean +incomingConnection (GSocketService *service, + GSocketConnection *connection, + GSocketListener *listener, + gpointer user_data) +{ + if(connections +1 > maxConn) { + g_print_debug("Connection closed. Max reached\n"); + return TRUE; + } + connections++; + g_print_debug("Incoming connection\n"); + return FALSE; +} + + +//thread connection handler +static gboolean +handler (GThreadedSocketService *service, + GSocketConnection *connection, + GSocketListener *listener, + gpointer user_data) +{ + + + GOutputStream *out; + GInputStream *in; + gssize size; + + + out = g_io_stream_get_output_stream (G_IO_STREAM (connection)); + in = g_io_stream_get_input_stream (G_IO_STREAM (connection)); + + g_print_debug("Handling, connections: %d\n", connections); + + //register ourselves in the peers vector, use the index obtained in the stdinQueue + //should not get -1 + GThread* self = g_thread_self(); + int index=pushIndex(self); + + GSource *outSource = NULL; + GSource *inSource = g_pollable_input_stream_create_source((struct GPollableInputStream*)in, NULL); + + //get a reference to the async queue + GAsyncQueue *queue = g_async_queue_ref(stdinQueue[index]); + + //input from client + g_source_set_callback(inSource, cbClientInput, (GPollableOutputStream*)out, NULL); + g_source_attach(inSource, NULL); + + //keep thread alive, every 1000 microseconds + while(g_source_is_destroyed(inSource)==FALSE) { + g_usleep(1000); + gint elems; + + //verify our queue length and activate/deactivate the "out" Source for this connection + //if we don't do this, the out Source will be scheduled frequently and will busy loop + if((elems=g_async_queue_length(queue))>0) { + if(outSource == NULL) { + + outSource = g_pollable_output_stream_create_source((struct GPollableOutputStream*)out, NULL); + g_source_set_callback(outSource, cbClientOutput, queue, NULL); + g_source_attach(outSource, NULL); + } else { + g_source_destroy(outSource); + g_source_unref(outSource); + outSource = NULL; + } + + } else { + if(outSource!= NULL && !g_source_is_destroyed(outSource)) { + g_print_debug("Destroy source\n"); + g_source_destroy(outSource); + g_source_unref(outSource); + outSource = NULL; //added + } + } + //end of activate/deactivate + } + + //reached the end of the thread + if (g_output_stream_close(out, NULL, NULL) == FALSE) { + g_print_debug("out not closed\n"); + } + if (g_input_stream_close(in, NULL, NULL) == FALSE) { + g_print_debug("in not closed\n"); + } + + g_print_debug("Thread end\n"); + connections--; //keep track of connections + g_async_queue_unref(queue); //unreference the queue + pullIndex(g_thread_self()); //unregister from the peers vector + return TRUE; +} + +int main(int argc, char **argv) +{ + GSocketService *service = NULL; + GError *error = NULL; + GIOChannel* stdinChannel = NULL; + + g_type_init (); + + //register stdin channel + stdinChannel = g_io_channel_unix_new(0); + if(stdinChannel == NULL) { + g_printerr("No io channel\n"); + exit(-1); + } + g_io_add_watch(stdinChannel, G_IO_IN, stdinAvailable, NULL); + int idx=0; + + //allocate a maxConn queue on the heap + stdinQueue = malloc(sizeof(struct GAsyncQueue*) * maxConn); + //allocate peers vector on the heap + peers = malloc(sizeof(GThread*) * maxConn); + for(idx=0; idx<maxConn; idx++) { + peers[idx] = 0; + stdinQueue[idx] = g_async_queue_new(); + } + + //create a threaded service + service = g_threaded_socket_service_new (maxConn+1); + if (!g_socket_listener_add_inet_port (G_SOCKET_LISTENER (service), + port, + NULL, + &error)) { + g_printerr ("%s: %s\n", argv[0], error->message); + free(stdinQueue); + free(peers); + return 1; + } + + //init the signal signalMyCb + initSignals(&signalMyCb); + + g_print_debug("Server listening on port %d\n", port); + g_signal_connect (service, "run", G_CALLBACK (handler), NULL); + g_signal_connect (service, "incoming", G_CALLBACK(incomingConnection), NULL); + + g_main_loop_run (g_main_loop_new (NULL, FALSE)); + free(stdinQueue); + free(peers); + + return 0; + +} |