X-Git-Url: http://git.bytex64.net/?a=blobdiff_plain;f=database%2Fdatabase.c;h=9d96f43ab28d25ca2ff8dad40ca86fc4045c2468;hb=3e3138025852408ef03f3213972e042e12841bed;hp=a45c92bc3dbdaa7f1cb8f9fc902215aa26f5703a;hpb=ec8746b44dc85fd3e3b42835f779890684a9e90a;p=blerg.git diff --git a/database/database.c b/database/database.c index a45c92b..9d96f43 100644 --- a/database/database.c +++ b/database/database.c @@ -13,10 +13,25 @@ #include #include #include "database.h" +#include "configuration.h" #include "subscription.h" +#include "tags.h" #include "util.h" #include "config.h" +#define CHECK_VALID_BLERG(r) \ + if (blerg == NULL) { \ + fprintf(stderr, "Invalid struct blerg\n"); \ + return r; \ + } + +int blerg_init() { + if (!blerg_configuration_init()) { + return 0; + } + return 1; +} + uint64_t blerg_get_record_count(struct blerg *blerg) { uint64_t count; flock(blerg->meta_fd, LOCK_SH); @@ -45,20 +60,47 @@ void blerg_segment_close(struct blerg *blerg) { close(blerg->index_fd); } +int blerg_remap_data(struct blerg *blerg) { + struct stat st; + + if (blerg->data != NULL) + munmap(blerg->data, blerg->data_size); + fstat(blerg->data_fd, &st); + blerg->data_size = st.st_size; + if (blerg->data_size == 0) { + /* Can't map an empty data file. */ + return 1; + } + blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0); + if (blerg->data == MAP_FAILED) { + perror("Could not remap data"); + return 0; + } + return 1; +} + int blerg_segment_switch(struct blerg *blerg, int new_segment) { - char filename[512]; - uint64_t max_sequence = blerg_get_record_count(blerg); + char filename[FILENAME_MAX]; + uint64_t max_sequence_no = blerg_get_record_count(blerg); struct stat st; - if (new_segment > max_sequence / RECORDS_PER_SEGMENT) { + if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) { + return 1; + } + + if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) { fprintf(stderr, "Cannot switch to sequence beyond last record\n"); return 0; } + if (new_segment < 0) { + fprintf(stderr, "Cannot switch to negative segment\n"); + return 0; + } blerg_segment_close(blerg); /* Load and map the index */ - snprintf(filename, 512, "%s/index%d", blerg->base_path, new_segment); + snprintf(filename, FILENAME_MAX, "%s/index%d", blerg->base_path, new_segment); blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600); if (blerg->index_fd == -1) { perror("Could not open index"); @@ -67,12 +109,10 @@ int blerg_segment_switch(struct blerg *blerg, int new_segment) { flock(blerg->index_fd, LOCK_EX); fstat(blerg->index_fd, &st); if (st.st_size == 0) { - int i; - struct record r; - memset((void *)&r, 0, sizeof(struct record)); - for (i = 0; i < RECORDS_PER_SEGMENT; i++) { - write(blerg->index_fd, &r, sizeof(struct record)); - } + /* ftruncate() means never having to say you're sorry. Sorry + in this case meaning "allocating disk space for a 1MB file + full or zeroes". */ + ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record)); } flock(blerg->index_fd, LOCK_UN); @@ -83,29 +123,25 @@ int blerg_segment_switch(struct blerg *blerg, int new_segment) { } /* Load data file */ - sprintf(filename, "%s/data%d", blerg->base_path, new_segment); + snprintf(filename, FILENAME_MAX, "%s/data%d", blerg->base_path, new_segment); blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600); - fstat(blerg->data_fd, &st); - blerg->data_size = st.st_size; if (blerg->data_fd == -1) { perror("Could not open data"); goto open_failed_data_open; } - if (blerg->data_size > 0) { - blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0); - if (blerg->data == MAP_FAILED) { - perror("Could not mmap data"); - goto open_failed_data_mmap; - } + if (!blerg_remap_data(blerg)) { + goto open_failed_data_mmap; } + blerg->current_segment = new_segment; + return 1; open_failed_data_mmap: close(blerg->data_fd); open_failed_data_open: - munmap((void *)blerg->index, sizeof(RECORDS_PER_SEGMENT * sizeof(struct record))); + munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record)); open_failed_index_mmap: close(blerg->index_fd); open_failed_index_open: @@ -113,15 +149,14 @@ open_failed_index_open: } int blerg_exists(const char *name) { - int namelen = strlen(name); - char filename[512]; + char filename[FILENAME_MAX]; if (!valid_name(name)) { fprintf(stderr, "Invalid name\n"); return 0; } - snprintf(filename, 512, "%s/%s", DATA_PATH, name); + snprintf(filename, FILENAME_MAX, "%s/%s", blergconf.data_path, name); if (access(filename, F_OK) == -1) return 0; else @@ -130,9 +165,8 @@ int blerg_exists(const char *name) { struct blerg *blerg_open(const char *name) { int namelen = strlen(name); - char filename[512]; + char filename[FILENAME_MAX]; struct stat st; - uint64_t sequence; if (!valid_name(name)) { fprintf(stderr, "Invalid name\n"); @@ -151,24 +185,24 @@ struct blerg *blerg_open(const char *name) { blerg->data = NULL; /* Make the directory if it doesn't exist */ - blerg->base_path = malloc(512); - snprintf(blerg->base_path, 512, "%s/%s", DATA_PATH, name); + blerg->base_path = malloc(FILENAME_MAX); + snprintf(blerg->base_path, FILENAME_MAX, "%s/%s", blergconf.data_path, name); if (access(blerg->base_path, F_OK) == -1) mkdir(blerg->base_path, 0755); /* Open and map metadata */ - snprintf(filename, 512, "%s/meta", blerg->base_path); + snprintf(filename, FILENAME_MAX, "%s/meta", blerg->base_path); blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600); if (blerg->meta_fd == -1) { perror("Could not open metadata"); goto open_failed_meta_open; } fstat(blerg->meta_fd, &st); - if (st.st_size == 0) { - char *buf = (char *) malloc(sizeof(struct meta)); - memset(buf, 0, sizeof(struct meta)); - write(blerg->meta_fd, buf, sizeof(struct meta)); - free(buf); + if (st.st_size < sizeof(struct meta)) { + /* Extend the file if sizeof(struct meta) is larger than the + file. This allows seamless upgrades as long as struct meta + only adds members. */ + posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta)); } blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0); if (blerg->meta == MAP_FAILED) { @@ -197,6 +231,7 @@ open_failed_blerg_malloc: } int blerg_close(struct blerg *blerg) { + CHECK_VALID_BLERG(0) blerg_segment_close(blerg); munmap((void *)blerg->meta, sizeof(struct meta)); close(blerg->meta_fd); @@ -206,19 +241,24 @@ int blerg_close(struct blerg *blerg) { return 1; } -int blerg_store(struct blerg *blerg, const char *data, int len) { - if (len > MAX_RECORD_SIZE) { - fprintf(stderr, "len > 64K\n"); - return -1; +uint64_t blerg_store(struct blerg *blerg, const char *data, int length) { + struct stat st; + int n; + + CHECK_VALID_BLERG(BLERG_INVALID_RECORD) + + if (length > MAX_RECORD_SIZE || length <= 0) { + fprintf(stderr, "length out of bounds\n"); + return BLERG_INVALID_RECORD; } flock(blerg->index_fd, LOCK_EX); flock(blerg->data_fd, LOCK_EX); - uint64_t record = blerg_increment_record_count(blerg); - if (record == -1) { + uint64_t record = blerg_get_record_count(blerg); + if (record == -1) { /* Intentional signed-unsigned coercion */ fprintf(stderr, "Could not find free record\n"); - return -1; + return BLERG_INVALID_RECORD; } int segment = record / RECORDS_PER_SEGMENT; if (segment != blerg->current_segment) @@ -226,41 +266,49 @@ int blerg_store(struct blerg *blerg, const char *data, int len) { int seg_rec = record % RECORDS_PER_SEGMENT; /* Get the position for the new data */ - FILE *datafile = fdopen(dup(blerg->data_fd), "a"); - fseek(datafile, 0, SEEK_END); - int curpos = ftell(datafile); - fclose(datafile); - - int bytes = 0; - do { - int n = write(blerg->data_fd, data + bytes, len); - if (n == -1) { - perror("Could not write data"); - /* Truncate anything we may have written */ - ftruncate(blerg->data_fd, curpos); - return -1; - } - bytes += n; - } while (bytes < len); + fstat(blerg->data_fd, &st); + int curpos = st.st_size; + + /* Write data to the data log */ + n = write(blerg->data_fd, data, length); + if (n < length) { + perror("Could not write data"); + /* Truncate anything we may have written */ + ftruncate(blerg->data_fd, curpos); + return BLERG_INVALID_RECORD; + } + + /* Update the index */ blerg->index[seg_rec].flags = 0x0001; blerg->index[seg_rec].offset = curpos; - blerg->index[seg_rec].length = len; + blerg->index[seg_rec].length = length; blerg->index[seg_rec].timestamp = time(NULL); + /* And finally increment the record count */ + blerg_increment_record_count(blerg); + flock(blerg->data_fd, LOCK_UN); flock(blerg->index_fd, LOCK_UN); - tag_scan(blerg->name, data, len, record); - subscription_notify(blerg->name, record); + if (!blerg_get_status(blerg, BLERGSTATUS_MUTED)) { + /* Now do those dirty microblogging deeds */ + tag_scan(blerg->name, data, length, record); + subscription_notify(blerg->name, record); + } return record; } -int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) { - if (record < 0) { +int blerg_fetch(struct blerg *blerg, uint64_t record, char **data, int *length) { + CHECK_VALID_BLERG(0) + if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) { fprintf(stderr, "Invalid record\n"); return 0; } + if (data == NULL || length == NULL) { + fprintf(stderr, "data or length is null\n"); + return 0; + } int segment = record / RECORDS_PER_SEGMENT; if (segment != blerg->current_segment) @@ -275,20 +323,13 @@ int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) { int rec_offset = blerg->index[seg_rec].offset; int rec_length = blerg->index[seg_rec].length; if (rec_offset >= blerg->data_size) { - /* We're accessing an out-of-bounds record in our mmap. - Recheck size and remap. */ - struct stat st; - fstat(blerg->data_fd, &st); - blerg->data_size = st.st_size; - if (rec_offset > blerg->data_size) { - fprintf(stderr, "Record offset outside of data!?"); + /* We're accessing an out-of-bounds record in our mmap. Remap + and recheck. */ + if (!blerg_remap_data(blerg)) { return 0; } - - munmap(blerg->data, blerg->data_size); - blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0); - if (blerg->data == MAP_FAILED) { - perror("Could not remap data"); + if (rec_offset >= blerg->data_size) { + fprintf(stderr, "Record offset outside of data!?"); return 0; } } @@ -306,8 +347,9 @@ int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) { return 1; } -time_t blerg_get_timestamp(struct blerg *blerg, int record) { - if (record < 0) { +time_t blerg_get_timestamp(struct blerg *blerg, uint64_t record) { + CHECK_VALID_BLERG(0) + if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) { fprintf(stderr, "Invalid record\n"); return 0; } @@ -324,3 +366,29 @@ time_t blerg_get_timestamp(struct blerg *blerg, int record) { return blerg->index[seg_rec].timestamp; } + +int blerg_set_subscription_mark(struct blerg *blerg) { + CHECK_VALID_BLERG(0) + blerg->meta->subscription_mark = subscription_count_items(blerg->name); + return 1; +} + +uint64_t blerg_get_subscription_mark(struct blerg *blerg) { + CHECK_VALID_BLERG(0) + return blerg->meta->subscription_mark; +} + +int blerg_set_status(struct blerg *blerg, uint32_t status, int v) { + CHECK_VALID_BLERG(0) + if (v) { + blerg->meta->status |= status; + } else { + blerg->meta->status &= ~status; + } + return 1; +} + +int blerg_get_status(struct blerg *blerg, uint32_t status) { + CHECK_VALID_BLERG(0) + return (blerg->meta->status & status) > 0; +}