#include "socket-common.h"
#include "response.h"
#include <stdlib.h>
#include <ctype.h>

#define STDIN_BUF_SIZE 1024

int port=3333; //port to listen
int connections=0;
int maxConn=16; //max connections - 16

guint signalMyCb; //signal to register which is used in cbClientInput(), step 10 from requirements
GAsyncQueue** stdinQueue=NULL;
GThread **peers; //actual connected peers


//GIOChannel callback for stdin
static gboolean
stdinAvailable  (GIOChannel *source, GIOCondition condition, gpointer data)
{
	static char* buffer=NULL;
	static gint64 allocated=0;
	GAsyncQueue* queue=NULL;

	//temporary buffer to read from stdin
	char tmp[STDIN_BUF_SIZE];
	memset(tmp,0,STDIN_BUF_SIZE);

	int size = read(0, tmp, STDIN_BUF_SIZE);
	if(size<=0) {
		return TRUE;
	}

	//fix for \n or \r sending
	if(size ==1)
		if(tmp[0] == '\r' || tmp[0] == '\n') {
			return TRUE;
		}

	//the termination characters
	if(!strncmp(tmp, "..", size-1)) {
		//final reallocation of the buffer to accomodate the whole string
		buffer=realloc(buffer,allocated+1);
		buffer[allocated] = '\0';
		int idx=0;

		g_print_debug("Got the buffer: \"%s\", tid: %p\n", buffer, g_thread_self());

		if (tmp[0] == '.' && tmp[1] == '.') { //fix for single dot sending
			int count=0;

			//send the final buffer to the queue
			for(count=0; count<maxConn; count++) {
				//allocate on the heap and deallocate in response.c, cbClientOutput()
				char *toSend = realloc(NULL, allocated+1);
				memcpy(toSend, buffer, allocated+1);

				//make sure we send to only valid peers
				if(peers[count] != 0) {
					g_async_queue_push(stdinQueue[count], toSend);
				}
			}

			//free the buffer
			free(buffer);
			buffer=NULL;
			allocated=0;
			return TRUE;
		}
	}

	//allocate on the heap. buffer is NULL the first time
	buffer=realloc(buffer, allocated+size);
	memcpy(buffer+allocated, tmp, size);
	allocated += size; //keep track of allocation


	return TRUE;
}

/** pulls an integer position from the vector which contains the
	position to use for the stdinQueue vector
	@param id - the thread id
*/
static int pullIndex(GThread* id)
{
	int i=0,ret=-1;
	static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
	g_static_mutex_lock (&mutex);
	for(i=0; i<maxConn; i++)
		if(peers[i] == id) {
			peers[i] = (GThread*)0;
			ret=i;
			break;
		}
	g_static_mutex_unlock (&mutex);
	return ret;
}

/**
 * pushed in the peers vector the thread id for future reference
 * @param id
 */
static int pushIndex(GThread* id)
{
	int i=0,ret=-1;
	static GStaticMutex mutex = G_STATIC_MUTEX_INIT;
	g_static_mutex_lock (&mutex);
	for(i=0; i<maxConn; i++)
		if(peers[i] == 0) {
			peers[i] = id;
			ret=i;
			break;
		}

	g_static_mutex_unlock (&mutex);
	return ret;
}

//handler for incoming connection, closes the connection if maxCon connections are already handled
static gboolean
incomingConnection (GSocketService *service,
                    GSocketConnection      *connection,
                    GSocketListener        *listener,
                    gpointer                user_data)
{
	if(connections +1 > maxConn) {
		g_print_debug("Connection closed. Max reached\n");
		return TRUE;
	}
	connections++;
	g_print_debug("Incoming connection\n");
	return FALSE;
}


//thread connection handler
static gboolean
handler (GThreadedSocketService *service,
         GSocketConnection      *connection,
         GSocketListener        *listener,
         gpointer                user_data)
{


	GOutputStream *out;
	GInputStream *in;
	gssize size;


	out = g_io_stream_get_output_stream (G_IO_STREAM (connection));
	in = g_io_stream_get_input_stream (G_IO_STREAM (connection));

	g_print_debug("Handling, connections: %d\n", connections);

	//register ourselves in the peers vector, use the index obtained in the stdinQueue
	//should not get -1
	GThread* self = g_thread_self();
	int index=pushIndex(self);

	GSource *outSource = NULL;
	GSource *inSource = g_pollable_input_stream_create_source((struct GPollableInputStream*)in, NULL);

	//get a reference to the async queue
	GAsyncQueue *queue = g_async_queue_ref(stdinQueue[index]);

	//input from client
	g_source_set_callback(inSource, cbClientInput, (GPollableOutputStream*)out, NULL);
	g_source_attach(inSource, NULL);

	//keep thread alive, every 1000 microseconds
	while(g_source_is_destroyed(inSource)==FALSE) {
		g_usleep(1000);
		gint elems;

		//verify our queue length and activate/deactivate the "out" Source for this connection
		//if we don't do this, the out Source will be scheduled frequently and will busy loop
		if((elems=g_async_queue_length(queue))>0) {
			if(outSource == NULL) {

				outSource = g_pollable_output_stream_create_source((struct GPollableOutputStream*)out, NULL);
				g_source_set_callback(outSource, cbClientOutput, queue, NULL);
				g_source_attach(outSource, NULL);
			} else {
				g_source_destroy(outSource);
				g_source_unref(outSource);
				outSource = NULL;
			}

		} else {
			if(outSource!= NULL && !g_source_is_destroyed(outSource)) {
				g_print_debug("Destroy source\n");
				g_source_destroy(outSource);
				g_source_unref(outSource);
				outSource = NULL; //added
			}
		}
		//end of activate/deactivate
	}

	//reached the end of the thread
	if (g_output_stream_close(out, NULL, NULL) == FALSE) {
		g_print_debug("out not closed\n");
	}
	if (g_input_stream_close(in, NULL, NULL) == FALSE) {
		g_print_debug("in not closed\n");
	}

	g_print_debug("Thread end\n");
	connections--; //keep track of connections
	g_async_queue_unref(queue); //unreference the queue
	pullIndex(g_thread_self()); //unregister from the peers vector
	return TRUE;
}

int main(int argc, char **argv)
{
	GSocketService *service = NULL;
	GError *error = NULL;
	GIOChannel* stdinChannel = NULL;

	g_type_init ();

	//register stdin channel
	stdinChannel = g_io_channel_unix_new(0);
	if(stdinChannel == NULL) {
		g_printerr("No io channel\n");
		exit(-1);
	}
	g_io_add_watch(stdinChannel, G_IO_IN, stdinAvailable, NULL);
	int idx=0;

	//allocate a maxConn queue on the heap
	stdinQueue = malloc(sizeof(struct GAsyncQueue*) * maxConn);
	//allocate peers vector on the heap
	peers = malloc(sizeof(GThread*) * maxConn);
	for(idx=0; idx<maxConn; idx++) {
		peers[idx] = 0;
		stdinQueue[idx] = g_async_queue_new();
	}

	//create a threaded service
	service = g_threaded_socket_service_new (maxConn+1);
	if (!g_socket_listener_add_inet_port (G_SOCKET_LISTENER (service),
	                                      port,
	                                      NULL,
	                                      &error)) {
		g_printerr ("%s: %s\n", argv[0], error->message);
		free(stdinQueue);
		free(peers);
		return 1;
	}

	//init the signal signalMyCb
	initSignals(&signalMyCb);

	g_print_debug("Server listening on port %d\n", port);
	g_signal_connect (service, "run", G_CALLBACK (handler), NULL);
	g_signal_connect (service, "incoming", G_CALLBACK(incomingConnection), NULL);

	g_main_loop_run (g_main_loop_new (NULL, FALSE));
	free(stdinQueue);
	free(peers);

	return 0;

}