summaryrefslogtreecommitdiff
path: root/instr-daemon.c
diff options
context:
space:
mode:
authorMichael J. Chudobiak <mjc@avtechpulse.com>2012-07-19 08:41:04 -0400
committerMichael J. Chudobiak <mjc@avtechpulse.com>2012-07-19 08:41:04 -0400
commit299b166ff820e8413f0c34ed5a2e7afac46cd477 (patch)
treeb0b8e1520af740e8a63fd132ebae82326054e12f /instr-daemon.c
initial commit
Diffstat (limited to 'instr-daemon.c')
-rw-r--r--instr-daemon.c267
1 files changed, 267 insertions, 0 deletions
diff --git a/instr-daemon.c b/instr-daemon.c
new file mode 100644
index 0000000..21864f7
--- /dev/null
+++ b/instr-daemon.c
@@ -0,0 +1,267 @@
+#include "socket-common.h"
+#include "response.h"
+#include <stdlib.h>
+#include <ctype.h>
+
+#define STDIN_BUF_SIZE 1024
+
+int port=3333; //port to listen
+int connections=0;
+int maxConn=16; //max connections - 16
+
+guint signalMyCb; //signal to register which is used in cbClientInput(), step 10 from requirements
+GAsyncQueue** stdinQueue=NULL;
+GThread **peers; //actual connected peers
+
+
+//GIOChannel callback for stdin
+static gboolean
+stdinAvailable (GIOChannel *source, GIOCondition condition, gpointer data)
+{
+ static char* buffer=NULL;
+ static gint64 allocated=0;
+ GAsyncQueue* queue=NULL;
+
+ //temporary buffer to read from stdin
+ char tmp[STDIN_BUF_SIZE];
+ memset(tmp,0,STDIN_BUF_SIZE);
+
+ int size = read(0, tmp, STDIN_BUF_SIZE);
+ if(size<=0) {
+ return TRUE;
+ }
+
+ //fix for \n or \r sending
+ if(size ==1)
+ if(tmp[0] == '\r' || tmp[0] == '\n') {
+ return TRUE;
+ }
+
+ //the termination characters
+ if(!strncmp(tmp, "..", size-1)) {
+ //final reallocation of the buffer to accomodate the whole string
+ buffer=realloc(buffer,allocated+1);
+ buffer[allocated] = '\0';
+ int idx=0;
+
+ g_print_debug("Got the buffer: \"%s\", tid: %p\n", buffer, g_thread_self());
+
+ if (tmp[0] == '.' && tmp[1] == '.') { //fix for single dot sending
+ int count=0;
+
+ //send the final buffer to the queue
+ for(count=0; count<maxConn; count++) {
+ //allocate on the heap and deallocate in response.c, cbClientOutput()
+ char *toSend = realloc(NULL, allocated+1);
+ memcpy(toSend, buffer, allocated+1);
+
+ //make sure we send to only valid peers
+ if(peers[count] != 0) {
+ g_async_queue_push(stdinQueue[count], toSend);
+ }
+ }
+
+ //free the buffer
+ free(buffer);
+ buffer=NULL;
+ allocated=0;
+ return TRUE;
+ }
+ }
+
+ //allocate on the heap. buffer is NULL the first time
+ buffer=realloc(buffer, allocated+size);
+ memcpy(buffer+allocated, tmp, size);
+ allocated += size; //keep track of allocation
+
+
+ return TRUE;
+}
+
+/** pulls an integer position from the vector which contains the
+ position to use for the stdinQueue vector
+ @param id - the thread id
+*/
+static int pullIndex(GThread* id)
+{
+ int i=0,ret=-1;
+ static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
+ g_static_mutex_lock (&mutex);
+ for(i=0; i<maxConn; i++)
+ if(peers[i] == id) {
+ peers[i] = (GThread*)0;
+ ret=i;
+ break;
+ }
+ g_static_mutex_unlock (&mutex);
+ return ret;
+}
+
+/**
+ * pushed in the peers vector the thread id for future reference
+ * @param id
+ */
+static int pushIndex(GThread* id)
+{
+ int i=0,ret=-1;
+ static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
+ g_static_mutex_lock (&mutex);
+ for(i=0; i<maxConn; i++)
+ if(peers[i] == 0) {
+ peers[i] = id;
+ ret=i;
+ break;
+ }
+
+ g_static_mutex_unlock (&mutex);
+ return ret;
+}
+
+//handler for incoming connection, closes the connection if maxCon connections are already handled
+static gboolean
+incomingConnection (GSocketService *service,
+ GSocketConnection *connection,
+ GSocketListener *listener,
+ gpointer user_data)
+{
+ if(connections +1 > maxConn) {
+ g_print_debug("Connection closed. Max reached\n");
+ return TRUE;
+ }
+ connections++;
+ g_print_debug("Incoming connection\n");
+ return FALSE;
+}
+
+
+//thread connection handler
+static gboolean
+handler (GThreadedSocketService *service,
+ GSocketConnection *connection,
+ GSocketListener *listener,
+ gpointer user_data)
+{
+
+
+ GOutputStream *out;
+ GInputStream *in;
+ gssize size;
+
+
+ out = g_io_stream_get_output_stream (G_IO_STREAM (connection));
+ in = g_io_stream_get_input_stream (G_IO_STREAM (connection));
+
+ g_print_debug("Handling, connections: %d\n", connections);
+
+ //register ourselves in the peers vector, use the index obtained in the stdinQueue
+ //should not get -1
+ GThread* self = g_thread_self();
+ int index=pushIndex(self);
+
+ GSource *outSource = NULL;
+ GSource *inSource = g_pollable_input_stream_create_source((struct GPollableInputStream*)in, NULL);
+
+ //get a reference to the async queue
+ GAsyncQueue *queue = g_async_queue_ref(stdinQueue[index]);
+
+ //input from client
+ g_source_set_callback(inSource, cbClientInput, (GPollableOutputStream*)out, NULL);
+ g_source_attach(inSource, NULL);
+
+ //keep thread alive, every 1000 microseconds
+ while(g_source_is_destroyed(inSource)==FALSE) {
+ g_usleep(1000);
+ gint elems;
+
+ //verify our queue length and activate/deactivate the "out" Source for this connection
+ //if we don't do this, the out Source will be scheduled frequently and will busy loop
+ if((elems=g_async_queue_length(queue))>0) {
+ if(outSource == NULL) {
+
+ outSource = g_pollable_output_stream_create_source((struct GPollableOutputStream*)out, NULL);
+ g_source_set_callback(outSource, cbClientOutput, queue, NULL);
+ g_source_attach(outSource, NULL);
+ } else {
+ g_source_destroy(outSource);
+ g_source_unref(outSource);
+ outSource = NULL;
+ }
+
+ } else {
+ if(outSource!= NULL && !g_source_is_destroyed(outSource)) {
+ g_print_debug("Destroy source\n");
+ g_source_destroy(outSource);
+ g_source_unref(outSource);
+ outSource = NULL; //added
+ }
+ }
+ //end of activate/deactivate
+ }
+
+ //reached the end of the thread
+ if (g_output_stream_close(out, NULL, NULL) == FALSE) {
+ g_print_debug("out not closed\n");
+ }
+ if (g_input_stream_close(in, NULL, NULL) == FALSE) {
+ g_print_debug("in not closed\n");
+ }
+
+ g_print_debug("Thread end\n");
+ connections--; //keep track of connections
+ g_async_queue_unref(queue); //unreference the queue
+ pullIndex(g_thread_self()); //unregister from the peers vector
+ return TRUE;
+}
+
+int main(int argc, char **argv)
+{
+ GSocketService *service = NULL;
+ GError *error = NULL;
+ GIOChannel* stdinChannel = NULL;
+
+ g_type_init ();
+
+ //register stdin channel
+ stdinChannel = g_io_channel_unix_new(0);
+ if(stdinChannel == NULL) {
+ g_printerr("No io channel\n");
+ exit(-1);
+ }
+ g_io_add_watch(stdinChannel, G_IO_IN, stdinAvailable, NULL);
+ int idx=0;
+
+ //allocate a maxConn queue on the heap
+ stdinQueue = malloc(sizeof(struct GAsyncQueue*) * maxConn);
+ //allocate peers vector on the heap
+ peers = malloc(sizeof(GThread*) * maxConn);
+ for(idx=0; idx<maxConn; idx++) {
+ peers[idx] = 0;
+ stdinQueue[idx] = g_async_queue_new();
+ }
+
+ //create a threaded service
+ service = g_threaded_socket_service_new (maxConn+1);
+ if (!g_socket_listener_add_inet_port (G_SOCKET_LISTENER (service),
+ port,
+ NULL,
+ &error)) {
+ g_printerr ("%s: %s\n", argv[0], error->message);
+ free(stdinQueue);
+ free(peers);
+ return 1;
+ }
+
+ //init the signal signalMyCb
+ initSignals(&signalMyCb);
+
+ g_print_debug("Server listening on port %d\n", port);
+ g_signal_connect (service, "run", G_CALLBACK (handler), NULL);
+ g_signal_connect (service, "incoming", G_CALLBACK(incomingConnection), NULL);
+
+ g_main_loop_run (g_main_loop_new (NULL, FALSE));
+ free(stdinQueue);
+ free(peers);
+
+ return 0;
+
+}