/database/database.c
/* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
 * BSD-style license.  Please see the COPYING file for details.
 */
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <time.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/file.h>
#include <fcntl.h>
#include "database.h"
#include "configuration.h"
#include "subscription.h"
#include "tags.h"
#include "util.h"
#include "config.h"

#define CHECK_VALID_BLERG(r)                               \
	if (blerg == NULL) {                               \
		fprintf(stderr, "Invalid struct blerg\n"); \
		return r;                                  \
	}

int blerg_init() {
	if (!blerg_configuration_init()) {
		return 0;
	}
	return 1;
}

uint64_t blerg_get_record_count(struct blerg *blerg) {
	uint64_t count;
	flock(blerg->meta_fd, LOCK_SH);
	count = blerg->meta->sequence;
	flock(blerg->meta_fd, LOCK_UN);
	return count;
}

/* Returns last usable record */
uint64_t blerg_increment_record_count(struct blerg *blerg) {
	uint64_t count;
	flock(blerg->meta_fd, LOCK_EX);
	count = blerg->meta->sequence++;
	flock(blerg->meta_fd, LOCK_UN);
	return count;
}

void blerg_segment_close(struct blerg *blerg) {
	if (blerg->data != NULL)
		munmap((void *)blerg->data, blerg->data_size);
	if (blerg->data_fd != -1)
		close(blerg->data_fd);
	if (blerg->index != NULL)
		munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
	if (blerg->index_fd != -1)
		close(blerg->index_fd);
}

int blerg_remap_data(struct blerg *blerg) {
	struct stat st;

	if (blerg->data != NULL)
		munmap(blerg->data, blerg->data_size);
	fstat(blerg->data_fd, &st);
	blerg->data_size = st.st_size;
	if (blerg->data_size == 0) {
		/* Can't map an empty data file. */
		return 1;
	}
	blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
	if (blerg->data == MAP_FAILED) {
		perror("Could not remap data");
		return 0;
	}
	return 1;
}

int blerg_segment_switch(struct blerg *blerg, int new_segment) {
	char filename[FILENAME_MAX];
	uint64_t max_sequence_no = blerg_get_record_count(blerg);
	struct stat st;

	if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
		return 1;
	}

	if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
		fprintf(stderr, "Cannot switch to sequence beyond last record\n");
		return 0;
	}
	if (new_segment < 0) {
		fprintf(stderr, "Cannot switch to negative segment\n");
		return 0;
	}

	blerg_segment_close(blerg);

	/* Load and map the index */
	snprintf(filename, FILENAME_MAX, "%s/index%d", blerg->base_path, new_segment);
	blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
	if (blerg->index_fd == -1) {
		perror("Could not open index");
		goto open_failed_index_open;
	}
	flock(blerg->index_fd, LOCK_EX);
	fstat(blerg->index_fd, &st);
	if (st.st_size == 0) {
		/* ftruncate() means never having to say you're sorry.  Sorry
		   in this case meaning "allocating disk space for a 1MB file
		   full or zeroes". */
		ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
	}
	flock(blerg->index_fd, LOCK_UN);

	blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
	if (blerg->index == MAP_FAILED) {
		perror("Could not mmap index");
		goto open_failed_index_mmap;
	}

	/* Load data file */
	snprintf(filename, FILENAME_MAX, "%s/data%d", blerg->base_path, new_segment);
	blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
	if (blerg->data_fd == -1) {
		perror("Could not open data");
		goto open_failed_data_open;
	}

	if (!blerg_remap_data(blerg)) {
		goto open_failed_data_mmap;
	}

	blerg->current_segment = new_segment;

	return 1;

open_failed_data_mmap:
	close(blerg->data_fd);
open_failed_data_open:
	munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
open_failed_index_mmap:
	close(blerg->index_fd);
open_failed_index_open:
	return 0;
}

int blerg_exists(const char *name) {
	char filename[FILENAME_MAX];

	if (!valid_name(name)) {
		fprintf(stderr, "Invalid name\n");
		return 0;
	}

	snprintf(filename, FILENAME_MAX, "%s/%s", blergconf.data_path, name);
	if (access(filename, F_OK) == -1)
		return 0;
	else
		return 1;
}

struct blerg *blerg_open(const char *name) {
	int namelen = strlen(name);
	char filename[FILENAME_MAX];
	struct stat st;

	if (!valid_name(name)) {
		fprintf(stderr, "Invalid name\n");
		return NULL;
	}
	struct blerg *blerg = malloc(sizeof(struct blerg));
	if (!blerg) {
		perror("Cannot allocate memory for blerg");
		goto open_failed_blerg_malloc;
	}
	blerg->name = malloc(namelen + 1);
	memcpy(blerg->name, name, namelen + 1);
	blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
	blerg->meta = NULL;
	blerg->index = NULL;
	blerg->data = NULL;

	/* Make the directory if it doesn't exist */
	blerg->base_path = malloc(FILENAME_MAX);
	snprintf(blerg->base_path, FILENAME_MAX, "%s/%s", blergconf.data_path, name);
	if (access(blerg->base_path, F_OK) == -1)
		mkdir(blerg->base_path, 0755);

	/* Open and map metadata */
	snprintf(filename, FILENAME_MAX, "%s/meta", blerg->base_path);
	blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
	if (blerg->meta_fd == -1) {
		perror("Could not open metadata");
		goto open_failed_meta_open;
	}
	fstat(blerg->meta_fd, &st);
	if (st.st_size < sizeof(struct meta)) {
		/* Extend the file if sizeof(struct meta) is larger than the
		   file. This allows seamless upgrades as long as struct meta
		   only adds members. */
		posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
	}
	blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
	if (blerg->meta == MAP_FAILED) {
		perror("Could not map metadata");
		goto open_failed_meta_mmap;
	}

	/* Open and map index and data for the current segment */
	blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
	if (!blerg_segment_switch(blerg, blerg->current_segment)) {
		fprintf(stderr, "Could not switch segment\n");
		goto open_failed_segment_switch;
	}

	return blerg;

open_failed_segment_switch:
	munmap((void *)blerg->meta, sizeof(struct meta));
open_failed_meta_mmap:
	close(blerg->meta_fd);
open_failed_meta_open:
	free(blerg->name);
	free(blerg);
open_failed_blerg_malloc:
	return NULL;
}

int blerg_close(struct blerg *blerg) {
	CHECK_VALID_BLERG(0)
	blerg_segment_close(blerg);
	munmap((void *)blerg->meta, sizeof(struct meta));
	close(blerg->meta_fd);
	free(blerg->base_path);
	free(blerg->name);
	free(blerg);
	return 1;
}

uint64_t blerg_store(struct blerg *blerg, const char *data, int length) {
	struct stat st;
	int n;

	CHECK_VALID_BLERG(BLERG_INVALID_RECORD)

	if (length > MAX_RECORD_SIZE || length <= 0) {
		fprintf(stderr, "length out of bounds\n");
		return BLERG_INVALID_RECORD;
	}

	flock(blerg->index_fd, LOCK_EX);
	flock(blerg->data_fd, LOCK_EX);

	uint64_t record = blerg_get_record_count(blerg);
	if (record == -1) {  /* Intentional signed-unsigned coercion */
		fprintf(stderr, "Could not find free record\n");
		return BLERG_INVALID_RECORD;
	}
	int segment = record / RECORDS_PER_SEGMENT;
	if (segment != blerg->current_segment)
		blerg_segment_switch(blerg, segment);
	int seg_rec = record % RECORDS_PER_SEGMENT;

	/* Get the position for the new data */
	fstat(blerg->data_fd, &st);
	int curpos = st.st_size;

	/* Write data to the data log */
	n = write(blerg->data_fd, data, length);
	if (n < length) {
		perror("Could not write data");
		/* Truncate anything we may have written */
		ftruncate(blerg->data_fd, curpos);
		return BLERG_INVALID_RECORD;
	}

	/* Update the index */
	blerg->index[seg_rec].flags = 0x0001;
	blerg->index[seg_rec].offset = curpos;
	blerg->index[seg_rec].length = length;
	blerg->index[seg_rec].timestamp = time(NULL);

	/* And finally increment the record count */
	blerg_increment_record_count(blerg);

	flock(blerg->data_fd, LOCK_UN);
	flock(blerg->index_fd, LOCK_UN);

	if (!blerg_get_status(blerg, BLERGSTATUS_MUTED)) {
		/* Now do those dirty microblogging deeds */
		tag_scan(blerg->name, data, length, record);
		subscription_notify(blerg->name, record);
	}

	return record;
}

int blerg_fetch(struct blerg *blerg, uint64_t record, char **data, int *length) {
	CHECK_VALID_BLERG(0)
	if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
		fprintf(stderr, "Invalid record\n");
		return 0;
	}
	if (data == NULL || length == NULL) {
		fprintf(stderr, "data or length is null\n");
		return 0;
	}

	int segment = record / RECORDS_PER_SEGMENT;
	if (segment != blerg->current_segment)
		blerg_segment_switch(blerg, segment);
	int seg_rec = record % RECORDS_PER_SEGMENT;

	if ((blerg->index[seg_rec].flags & 0x1) == 0) {
		fprintf(stderr, "Invalid record\n");
		return 0;
	}

	int rec_offset = blerg->index[seg_rec].offset;
	int rec_length = blerg->index[seg_rec].length;
	if (rec_offset >= blerg->data_size) {
		/* We're accessing an out-of-bounds record in our mmap.  Remap
		   and recheck. */
		if (!blerg_remap_data(blerg)) {
			return 0;
		}
		if (rec_offset >= blerg->data_size) {
			fprintf(stderr, "Record offset outside of data!?");
			return 0;
		}
	}

	*data = malloc(rec_length);
	if (*data == NULL) {
		perror("Could not allocate string in fetch");
		return 0;
	}

	memcpy(*data, blerg->data + rec_offset, rec_length);

	*length = rec_length;

	return 1;
}

time_t blerg_get_timestamp(struct blerg *blerg, uint64_t record) {
	CHECK_VALID_BLERG(0)
	if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
		fprintf(stderr, "Invalid record\n");
		return 0;
	}

	int segment = record / RECORDS_PER_SEGMENT;
	if (segment != blerg->current_segment)
		blerg_segment_switch(blerg, segment);
	int seg_rec = record % RECORDS_PER_SEGMENT;

	if ((blerg->index[seg_rec].flags & 0x1) == 0) {
		fprintf(stderr, "Invalid record\n");
		return 0;
	}

	return blerg->index[seg_rec].timestamp;
}

int blerg_set_subscription_mark(struct blerg *blerg) {
	CHECK_VALID_BLERG(0)
	blerg->meta->subscription_mark = subscription_count_items(blerg->name);
	return 1;
}

uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
	CHECK_VALID_BLERG(0)
	return blerg->meta->subscription_mark;
}

int blerg_set_status(struct blerg *blerg, uint32_t status, int v) {
	CHECK_VALID_BLERG(0)
	if (v) {
		blerg->meta->status |= status;
	} else {
		blerg->meta->status &= ~status;
	}
	return 1;
}

int blerg_get_status(struct blerg *blerg, uint32_t status) {
	CHECK_VALID_BLERG(0)
	return (blerg->meta->status & status) > 0;
}