filed.c at [c35e6a2d15]

File filed.c artifact c73ad10afc part of check-in c35e6a2d15


#include <sys/sendfile.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <pthread.h>
#include <strings.h>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>

/* Compile time constants */
#define FILED_SENDFILE_MAX 16777215

/* Default values */
#define MAX_FAILURE_COUNT 30
#define PORT 8080
#define THREAD_COUNT 10
#define BIND_ADDR "::"
#define CACHE_SIZE 8192

/* Arguments for worker threads */
struct filed_worker_thread_args {
	int fd;
};

/* File information */
struct filed_fileinfo {
	pthread_mutex_t mutex;
	char *path;
	int fd;
	size_t len;
	char *lastmod;
	char lastmod_b[64];
	char *type;
};

/* Request variables */
struct filed_http_request {
	/** Buffers **/
	struct filed_fileinfo fileinfo;
	char path_b[1010];
	char tmpbuf[1010];

	/** HTTP Request information **/
	char *path;     /*** Path being requested ***/

	struct {
		struct {
			int present;
			off_t offset;   /*** Range start ***/
			ssize_t length; /*** Range length ***/
		} range;
	} headers;
};

/* Global variables */
/** Open File cache **/
struct filed_fileinfo *filed_fileinfo_fdcache;
unsigned int filed_fileinfo_fdcache_size = CACHE_SIZE;

/* Initialize process */
static int filed_init(void) {
	unsigned int idx;
	int mutex_init_ret;

	mlockall(MCL_CURRENT | MCL_FUTURE);

	filed_fileinfo_fdcache = malloc(sizeof(*filed_fileinfo_fdcache) * filed_fileinfo_fdcache_size);
	if (filed_fileinfo_fdcache == NULL) {
		return(1);
	}

	for (idx = 0; idx < filed_fileinfo_fdcache_size; idx++) {
		mutex_init_ret = pthread_mutex_init(&filed_fileinfo_fdcache[idx].mutex, NULL);
		if (mutex_init_ret != 0) {
			return(1);
		}

		filed_fileinfo_fdcache[idx].path = strdup("");
		filed_fileinfo_fdcache[idx].fd = -1;
		filed_fileinfo_fdcache[idx].lastmod = "";
		filed_fileinfo_fdcache[idx].type = "";
	}

	signal(SIGPIPE, SIG_IGN);

	return(0);
}

/* Listen on a particular address/port */
static int filed_listen(const char *address, unsigned int port) {
	struct sockaddr_in6 addr;
	int pton_ret, bind_ret, listen_ret;
	int fd;

	addr.sin6_family = AF_INET6;
	addr.sin6_flowinfo = 0;
	addr.sin6_scope_id = 0;
	addr.sin6_port = htons(port);
	pton_ret = inet_pton(AF_INET6, address, addr.sin6_addr.s6_addr);
	if (pton_ret != 1) {
		return(-1);
	}

	fd = socket(AF_INET6, SOCK_STREAM, 0);
	if (fd < 0) {
		return(fd);
	}

	bind_ret = bind(fd, (const struct sockaddr *) &addr, sizeof(addr));
	if (bind_ret < 0) {
		close(fd);

		return(-1);
	}

	listen_ret = listen(fd, 128);
	if (listen_ret != 0) {
		close(fd);

		return(-1);
	}

	return(fd);
}

/* Log a message */
//#define FILED_DONT_LOG
#ifdef FILED_DONT_LOG
#  define filed_logging_thread_init() 0
#  define filed_log_msg_debug(x, ...) /**/
#  define filed_log_msg(x) /**/
#else
/* Initialize logging thread */
static int filed_logging_thread_init(void) {
	/* XXX:TODO: Unimplemented */
	return(0);
}

/* XXX:TODO: Unimplemented */
#define filed_log_msg_debug(x, ...) { fprintf(stderr, x, __VA_ARGS__); fprintf(stderr, "\n"); fflush(stderr); }

static void filed_log_msg(const char *buffer) {
	/* XXX:TODO: Unimplemented */
	fprintf(stderr, "%s\n", buffer);

	return;
}

#endif
/* Format time per RFC2616 */
static char *filed_format_time(char *buffer, size_t buffer_len, const time_t timeinfo) {
	struct tm timeinfo_tm, *timeinfo_tm_p;

	timeinfo_tm_p = gmtime_r(&timeinfo, &timeinfo_tm);
	if (timeinfo_tm_p == NULL) {
		return("unknown");
	}

	buffer[buffer_len - 1] = '\0';
	buffer_len = strftime(buffer, buffer_len - 1, "%a, %d %b %Y %H:%M:%S GMT", timeinfo_tm_p);

	return(buffer);
}

/* hash */
static unsigned int filed_hash(const unsigned char *value, unsigned int modulus) {
	unsigned char curr;
	unsigned int retval;

	retval = modulus - 1;

	while ((curr = *value)) {
		if (curr < 32) {
			curr = 255 - curr;
		} else {
			curr -= 32;
		}

		retval <<= 5;
		retval += curr;

		value++;
	}

	retval = retval % modulus;

	return(retval);
}

/* Open a file and return file information */
static struct filed_fileinfo *filed_open_file(const char *path, struct filed_fileinfo *buffer) {
	struct filed_fileinfo *cache;
	unsigned int cache_idx;
	off_t len;
	int fd;

	cache_idx = filed_hash((const unsigned char *) path, filed_fileinfo_fdcache_size);

	cache = &filed_fileinfo_fdcache[cache_idx];

	filed_log_msg_debug("Locking mutex for idx: %lu", (unsigned long) cache_idx);

	pthread_mutex_lock(&cache->mutex);

	filed_log_msg_debug("Completed locking mutex for idx: %lu", (unsigned long) cache_idx);

	if (strcmp(path, cache->path) != 0) {
		filed_log_msg_debug("Cache miss for idx: %lu: OLD \"%s\", NEW \"%s\"", (unsigned long) cache_idx, cache->path, path);

		fd = open(path, O_RDONLY | O_LARGEFILE);
		if (fd < 0) {
			pthread_mutex_unlock(&cache->mutex);

			return(NULL);
		}

		free(cache->path);
		if (cache->fd >= 0) {
			close(cache->fd);
		}

		len = lseek(fd, 0, SEEK_END);
		lseek(fd, 0, SEEK_SET);

		cache->fd = fd;
		cache->len = len;
		cache->path = strdup(path);

		/* XXX:TODO: Determine */
		cache->type = "video/mp4";
		cache->lastmod = filed_format_time(cache->lastmod_b, sizeof(cache->lastmod_b), time(NULL) - 30);
	} else {
		filed_log_msg_debug("Cache hit for idx: %lu: PATH \"%s\"", (unsigned long) cache_idx, path);
	}

	/*
	 * We have to make a duplicate FD, because once we release the cache
	 * mutex, the file descriptor may be closed
	 */
	fd = dup(cache->fd);
	if (fd < 0) {
		pthread_mutex_unlock(&cache->mutex);

		return(NULL);
	}

	buffer->fd = fd;
	buffer->len = cache->len;
	buffer->type = cache->type;
	memcpy(buffer->lastmod_b, cache->lastmod_b, sizeof(buffer->lastmod_b));
	buffer->lastmod = buffer->lastmod_b + (cache->lastmod - cache->lastmod_b);

	pthread_mutex_unlock(&cache->mutex);

	return(buffer);
}

/* Process an HTTP request and return the path requested */
static struct filed_http_request *filed_get_http_request(FILE *fp, struct filed_http_request *buffer_st) {
	char *method, *path;
	char *buffer, *tmpbuffer, *workbuffer, *workbuffer_next;
	size_t buffer_len, tmpbuffer_len;
	off_t range_start, range_end;
	ssize_t range_length;
	int range_request;
	int i;

	range_start = 0;
	range_end   = 0;
	range_request = 0;
	range_length = -1;

	buffer = buffer_st->path_b;
	buffer_len = sizeof(buffer_st->path_b);

	tmpbuffer = buffer_st->tmpbuf;
	tmpbuffer_len = sizeof(buffer_st->tmpbuf);

	filed_log_msg("WAIT_FOR_REQUEST FD=...");

	fgets(buffer, buffer_len, fp);

	method = buffer;

	buffer = strchr(buffer, ' ');
	if (buffer == NULL) {
		filed_log_msg("GOT_REQUEST FD=... ERROR=format");

		return(NULL);
	}

	*buffer = '\0';
	buffer++;

	path = buffer;

	buffer = strchr(buffer, ' ');
	if (buffer != NULL) {
		*buffer = '\0';
		buffer++;
	}

	filed_log_msg("GOT_REQUEST FD=... PATH=...");

	filed_log_msg("WAIT_FOR_HEADERS FD=...");

	for (i = 0; i < 100; i++) {
		fgets(tmpbuffer, tmpbuffer_len, fp);

		if (strncasecmp(tmpbuffer, "Range: ", 7) == 0) {
			workbuffer = tmpbuffer + 7;

			if (strncasecmp(workbuffer, "bytes=", 6) == 0) {
				workbuffer += 6;

				range_request = 1;

				range_start = strtoull(workbuffer, &workbuffer_next, 10);

				workbuffer = workbuffer_next;

				if (*workbuffer == '-') {
					workbuffer++;

					if (*workbuffer != '\r' && *workbuffer != '\n') {
						range_end = strtoull(workbuffer, &workbuffer_next, 10);
					}
				}
			}
		}

		if (memcmp(tmpbuffer, "\r\n", 2) == 0) {
			break;
		}
	}

	filed_log_msg("GOT_HEADERS FD=...");

	/* We only handle the "GET" method */
	if (strcasecmp(method, "get") != 0) {
		return(NULL);
	}

	/* Determine range */
	if (range_end != 0) {
		if (range_end <= range_start) {
			return(NULL);
		}

		range_length = range_end - range_start;

		filed_log_msg_debug("Computing length parameter: %llu = %llu - %llu",
			(unsigned long long) range_length,
			(unsigned long long) range_end,
			(unsigned long long) range_start
		);
	}

	/* Fill up structure to return */
	buffer_st->path   = path;
	buffer_st->headers.range.present = range_request;
	buffer_st->headers.range.offset  = range_start;
	buffer_st->headers.range.length  = range_length;

	return(buffer_st);
}

/* Return an error page */
static void filed_error_page(FILE *fp, const char *date_current, int error_number) {
	char *error_string = "<html><head><title>ERROR</title></head><body>Unable to process request</body></html>";

	fprintf(fp, "HTTP/1.1 %i OK\r\nDate: %s\r\nServer: filed\r\nLast-Modified: %s\r\nContent-Length: %llu\r\nContent-Type: %s\r\nConnection: close\r\n\r\n%s",
		error_number,
		date_current,
		date_current,
		(unsigned long long) strlen(error_string),
		"text/html",
		error_string
	);
}

/* Handle a single request from a client */
static void filed_handle_client(int fd, struct filed_http_request *request) {
	struct filed_fileinfo *fileinfo;
	ssize_t sendfile_ret;
	size_t sendfile_len, sendfile_sent, sendfile_size;
	off_t sendfile_offset;
	char *path;
	char *date_current, date_current_b[64];
	int http_code;
	FILE *fp;

	/* Determine current time */
	date_current = filed_format_time(date_current_b, sizeof(date_current_b), time(NULL));

	/* Open socket as ANSI I/O for ease of use */
	fp = fdopen(fd, "w+b");
	if (fp == NULL) {
		close(fd);

		return;
	}

	request = filed_get_http_request(fp, request);

	filed_log_msg("PROCESS_REPLY_START FD=... PATH=... RANGE_START=... RANGE_LENGTH=...");

	if (request == NULL || request->path == NULL) {
		filed_error_page(fp, date_current, 500);

		filed_log_msg("PROCESS_REPLY_COMPLETE FD=... ERROR=500");

		fclose(fp);

		return;
	}

	path = request->path;

	http_code = -1;

	fileinfo = filed_open_file(path, &request->fileinfo);
	if (fileinfo == NULL) {
		filed_error_page(fp, date_current, 404);

		filed_log_msg("PROCESS_REPLY_COMPLETE FD=... ERROR=404");
	} else {
		if (request->headers.range.offset != 0 || request->headers.range.length >= 0) {
			if ((size_t) request->headers.range.offset >= fileinfo->len) {
				filed_log_msg("PROCESS_REPLY_COMPLETE FD=... ERROR=416");

				filed_error_page(fp, date_current, 416);
			} else {
				if (request->headers.range.length < 0) {
					filed_log_msg_debug("Computing length to fit in bounds: fileinfo->len = %llu, request->headers.range.offset = %llu",
						(unsigned long long) fileinfo->len,
						(unsigned long long) request->headers.range.offset
					);

					request->headers.range.length = fileinfo->len - request->headers.range.offset;
				}

				filed_log_msg_debug("Partial request, starting at: %llu and running for %llu bytes", (unsigned long long) request->headers.range.offset, (unsigned long long) request->headers.range.length);

				http_code = 206;
			}
		} else {
			if (request->headers.range.present) {
				http_code = 206;
			} else {
				http_code = 200;
			}
			request->headers.range.offset = 0;
			request->headers.range.length = fileinfo->len;
		}

		if (http_code > 0) {
			fprintf(fp, "HTTP/1.1 %i OK\r\nDate: %s\r\nServer: filed\r\nLast-Modified: %s\r\nContent-Length: %llu\r\nAccept-Ranges: bytes\r\nContent-Type: %s\r\nConnection: close\r\n",
				http_code,
				date_current,
				fileinfo->lastmod,
				(unsigned long long) request->headers.range.length,
				fileinfo->type
			);
			if (http_code == 206) {
				fprintf(fp, "Content-Range: bytes %llu-%llu/%llu\r\n",
					(unsigned long long) request->headers.range.offset,
					(unsigned long long) (request->headers.range.offset + request->headers.range.length - 1),
					(unsigned long long) fileinfo->len
				);
			}
			fprintf(fp, "\r\n");
			fflush(fp);

			filed_log_msg("PROCESS_REPLY_COMPLETE FD=... STATUS=20X");

#ifdef FILED_NONBLOCK_HTTP
			int socket_flags;
			fd_set rfd, wfd;
			char sinkbuf[8192];
			ssize_t read_ret;

			FD_ZERO(&rfd);
			FD_ZERO(&wfd);
			FD_SET(fd, &rfd);
			FD_SET(fd, &wfd);

			socket_flags = fcntl(fd, F_GETFL);
			fcntl(fd, F_SETFL, socket_flags | O_NONBLOCK);
#endif

			filed_log_msg("SEND_START IFD=... OFD=... BYTES=...");

			sendfile_offset = request->headers.range.offset;
			sendfile_len = request->headers.range.length;
			sendfile_sent = 0;
			while (1) {
				if (sendfile_len > FILED_SENDFILE_MAX) {
					sendfile_size = FILED_SENDFILE_MAX;
				} else {
					sendfile_size = sendfile_len;
				}

				sendfile_ret = sendfile(fd, fileinfo->fd, &sendfile_offset, sendfile_size);
				if (sendfile_ret <= 0) {
#ifdef FILED_NONBLOCK_HTTP
					if (errno == EAGAIN) {
						sendfile_ret = 0;

						while (1) {
							select(fd + 1, &rfd, &wfd, NULL, NULL);
							if (FD_ISSET(fd, &rfd)) {
								read_ret = read(fd, sinkbuf, sizeof(sinkbuf));

								if (read_ret <= 0) {
									break;
								}
							}

							if (FD_ISSET(fd, &wfd)) {
								read_ret = 1;

								break;
							}
						}

						if (read_ret <= 0) {
							break;
						}
					} else {
						break;
					}
#else
					break;
#endif
				}

				sendfile_len -= sendfile_ret;
				sendfile_sent += sendfile_ret;
				if (sendfile_len == 0) {
					break;
				}
			}

			filed_log_msg("SEND_COMPLETE STATUS=... IFD=... OFD=... BYTES=... BYTES_SENT=...");
		}

		close(fileinfo->fd);

		filed_log_msg("CLOSE_FILE FD=...");
	}

	filed_log_msg("CLOSE_CONNECTION FD=...");

	fclose(fp);

	return;
}

/* Handle incoming connections */
static void *filed_worker_thread(void *arg_v) {
	struct filed_worker_thread_args *arg;
	struct filed_http_request request;
	struct sockaddr_in6 addr;
	socklen_t addrlen;
	int failure_count = 0, max_failure_count = MAX_FAILURE_COUNT;
	int master_fd, fd;

	/* Read arguments */
	arg = arg_v;

	master_fd = arg->fd;

	while (1) {
		/* Failure loop prevention */
		if (failure_count > max_failure_count) {
			break;
		}

		/* Accept a new client */
		addrlen = sizeof(addr);
		fd = accept(master_fd, (struct sockaddr *) &addr, &addrlen);

		/*
		 * If we fail, make a note of it so we don't go into a loop of
		 * accept() failing
		 */
		if (fd < 0) {
			/* Log the new connection */
			filed_log_msg("ACCEPT_FAILED");

			failure_count++;

			continue;
		}

		/* Log the new connection */
		filed_log_msg("NEW_CONNECTION SRC_ADDR=... SRC_PORT=... FD=...");

		/* Reset failure count*/
		failure_count = 0;

		/* Handle socket */
		filed_handle_client(fd, &request);
	}

	/* Report error */
	filed_log_msg("THREAD_DIED ABNORMAL");

	return(NULL);
}

/* Create worker threads */
static int filed_worker_threads_init(int fd, int thread_count) {
	struct filed_worker_thread_args *arg;
	pthread_t threadid;
	int pthread_ret;
	int i;

	for (i = 0; i < thread_count; i++) {
		arg = malloc(sizeof(*arg));

		arg->fd = fd;

		pthread_ret = pthread_create(&threadid, NULL, filed_worker_thread, arg);
		if (pthread_ret != 0) {
			return(-1);
		}
	}

	return(0);
}

/* Run process */
int main(int argc, char **argv) {
	int port = PORT, thread_count = THREAD_COUNT;
	const char *bind_addr = BIND_ADDR;
	int init_ret;
	int fd;

	/* XXX: TODO: Process arguments */
	argc = argc;
	argv = argv;

	/* Create listening socket */
	fd = filed_listen(bind_addr, port);
	if (fd < 0) {
		perror("filed_listen");

		return(1);
	}

	/* Become a daemon */
	/* XXX:TODO: Become a daemon */

	/* Initialize */
	init_ret = filed_init();
	if (init_ret != 0) {
		perror("filed_init");

		return(3);
	}

	/* Create logging thread */
	init_ret = filed_logging_thread_init();
	if (init_ret != 0) {
		perror("filed_logging_thread_init");

		return(4);
	}

	/* Create worker threads */
	init_ret = filed_worker_threads_init(fd, thread_count);
	if (init_ret != 0) {
		perror("filed_worker_threads_init");

		return(4);
	}

	/* Wait for threads to exit */
	/* XXX:TODO: Monitor thread usage */
	while (1) {
		sleep(60);
	}

	/* Return in failure */
	return(2);
}