+/* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
+ * BSD-style license. Please see the COPYING file for details.
+ */
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/file.h>
#include <fcntl.h>
#include "database.h"
+#include "subscription.h"
#include "util.h"
#include "config.h"
+#define CHECK_VALID_BLERG(r) \
+ if (blerg == NULL) { \
+ fprintf(stderr, "Invalid struct blerg\n"); \
+ return r; \
+ }
+
uint64_t blerg_get_record_count(struct blerg *blerg) {
uint64_t count;
flock(blerg->meta_fd, LOCK_SH);
close(blerg->index_fd);
}
+int blerg_remap_data(struct blerg *blerg) {
+ struct stat st;
+
+ if (blerg->data != NULL)
+ munmap(blerg->data, blerg->data_size);
+ fstat(blerg->data_fd, &st);
+ blerg->data_size = st.st_size;
+ if (blerg->data_size == 0) {
+ /* Can't map an empty data file. */
+ return 1;
+ }
+ blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
+ if (blerg->data == MAP_FAILED) {
+ perror("Could not remap data");
+ return 0;
+ }
+ return 1;
+}
+
int blerg_segment_switch(struct blerg *blerg, int new_segment) {
char filename[512];
- uint64_t max_sequence = blerg_get_record_count(blerg);
+ uint64_t max_sequence_no = blerg_get_record_count(blerg);
struct stat st;
- if (new_segment > max_sequence / RECORDS_PER_SEGMENT) {
+ if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
+ return 1;
+ }
+
+ if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
fprintf(stderr, "Cannot switch to sequence beyond last record\n");
return 0;
}
+ if (new_segment < 0) {
+ fprintf(stderr, "Cannot switch to negative segment\n");
+ return 0;
+ }
blerg_segment_close(blerg);
flock(blerg->index_fd, LOCK_EX);
fstat(blerg->index_fd, &st);
if (st.st_size == 0) {
- int i;
- struct record r;
- memset((void *)&r, 0, sizeof(struct record));
- for (i = 0; i < RECORDS_PER_SEGMENT; i++) {
- write(blerg->index_fd, &r, sizeof(struct record));
- }
+ /* ftruncate() means never having to say you're sorry. Sorry
+ in this case meaning "allocating disk space for a 1MB file
+ full or zeroes". */
+ ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
}
flock(blerg->index_fd, LOCK_UN);
/* Load data file */
sprintf(filename, "%s/data%d", blerg->base_path, new_segment);
blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
- fstat(blerg->data_fd, &st);
- blerg->data_size = st.st_size;
if (blerg->data_fd == -1) {
perror("Could not open data");
goto open_failed_data_open;
}
- if (blerg->data_size > 0) {
- blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
- if (blerg->data == MAP_FAILED) {
- perror("Could not mmap data");
- goto open_failed_data_mmap;
- }
+ if (!blerg_remap_data(blerg)) {
+ goto open_failed_data_mmap;
}
+ blerg->current_segment = new_segment;
+
return 1;
open_failed_data_mmap:
close(blerg->data_fd);
open_failed_data_open:
- munmap((void *)blerg->index, sizeof(RECORDS_PER_SEGMENT * sizeof(struct record)));
+ munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
open_failed_index_mmap:
close(blerg->index_fd);
open_failed_index_open:
goto open_failed_meta_open;
}
fstat(blerg->meta_fd, &st);
- if (st.st_size == 0) {
- char *buf = (char *) malloc(sizeof(struct meta));
- memset(buf, 0, sizeof(struct meta));
- write(blerg->meta_fd, buf, sizeof(struct meta));
- free(buf);
+ if (st.st_size < sizeof(struct meta)) {
+ /* Extend the file if sizeof(struct meta) is larger than the
+ file. This allows seamless upgrades as long as struct meta
+ only adds members. */
+ posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
}
blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
if (blerg->meta == MAP_FAILED) {
}
int blerg_close(struct blerg *blerg) {
+ CHECK_VALID_BLERG(0)
blerg_segment_close(blerg);
munmap((void *)blerg->meta, sizeof(struct meta));
close(blerg->meta_fd);
return 1;
}
-int blerg_store(struct blerg *blerg, const char *data, int len) {
- if (len > MAX_RECORD_SIZE) {
- fprintf(stderr, "len > 64K\n");
- return -1;
+uint64_t blerg_store(struct blerg *blerg, const char *data, int length) {
+ struct stat st;
+ int n;
+
+ CHECK_VALID_BLERG(BLERG_INVALID_RECORD)
+
+ if (length > MAX_RECORD_SIZE || length <= 0) {
+ fprintf(stderr, "length out of bounds\n");
+ return BLERG_INVALID_RECORD;
}
flock(blerg->index_fd, LOCK_EX);
flock(blerg->data_fd, LOCK_EX);
- uint64_t record = blerg_increment_record_count(blerg);
- if (record == -1) {
+ uint64_t record = blerg_get_record_count(blerg);
+ if (record == -1) { /* Intentional signed-unsigned coercion */
fprintf(stderr, "Could not find free record\n");
- return -1;
+ return BLERG_INVALID_RECORD;
}
int segment = record / RECORDS_PER_SEGMENT;
if (segment != blerg->current_segment)
int seg_rec = record % RECORDS_PER_SEGMENT;
/* Get the position for the new data */
- FILE *datafile = fdopen(dup(blerg->data_fd), "a");
- fseek(datafile, 0, SEEK_END);
- int curpos = ftell(datafile);
- fclose(datafile);
-
- int bytes = 0;
- do {
- int n = write(blerg->data_fd, data + bytes, len);
- if (n == -1) {
- perror("Could not write data");
- /* Truncate anything we may have written */
- ftruncate(blerg->data_fd, curpos);
- return -1;
- }
- bytes += n;
- } while (bytes < len);
+ fstat(blerg->data_fd, &st);
+ int curpos = st.st_size;
+
+ /* Write data to the data log */
+ n = write(blerg->data_fd, data, length);
+ if (n < length) {
+ perror("Could not write data");
+ /* Truncate anything we may have written */
+ ftruncate(blerg->data_fd, curpos);
+ return BLERG_INVALID_RECORD;
+ }
+
+ /* Update the index */
blerg->index[seg_rec].flags = 0x0001;
blerg->index[seg_rec].offset = curpos;
- blerg->index[seg_rec].length = len;
+ blerg->index[seg_rec].length = length;
blerg->index[seg_rec].timestamp = time(NULL);
- tag_scan(blerg->name, data, len, record);
+ /* And finally increment the record count */
+ blerg_increment_record_count(blerg);
flock(blerg->data_fd, LOCK_UN);
flock(blerg->index_fd, LOCK_UN);
+ if (!blerg_get_mute(blerg)) {
+ /* Now do those dirty microblogging deeds */
+ tag_scan(blerg->name, data, length, record);
+ subscription_notify(blerg->name, record);
+ }
+
return record;
}
-int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) {
- if (record < 0) {
+int blerg_fetch(struct blerg *blerg, uint64_t record, char **data, int *length) {
+ CHECK_VALID_BLERG(0)
+ if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
fprintf(stderr, "Invalid record\n");
return 0;
}
+ if (data == NULL || length == NULL) {
+ fprintf(stderr, "data or length is null\n");
+ return 0;
+ }
int segment = record / RECORDS_PER_SEGMENT;
if (segment != blerg->current_segment)
int rec_offset = blerg->index[seg_rec].offset;
int rec_length = blerg->index[seg_rec].length;
if (rec_offset >= blerg->data_size) {
- /* We're accessing an out-of-bounds record in our mmap.
- Recheck size and remap. */
- struct stat st;
- fstat(blerg->data_fd, &st);
- blerg->data_size = st.st_size;
- if (rec_offset > blerg->data_size) {
- fprintf(stderr, "Record offset outside of data!?");
+ /* We're accessing an out-of-bounds record in our mmap. Remap
+ and recheck. */
+ if (!blerg_remap_data(blerg)) {
return 0;
}
-
- munmap(blerg->data, blerg->data_size);
- blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
- if (blerg->data == MAP_FAILED) {
- perror("Could not remap data");
+ if (rec_offset >= blerg->data_size) {
+ fprintf(stderr, "Record offset outside of data!?");
return 0;
}
}
return 1;
}
-time_t blerg_get_timestamp(struct blerg *blerg, int record) {
- if (record < 0) {
+time_t blerg_get_timestamp(struct blerg *blerg, uint64_t record) {
+ CHECK_VALID_BLERG(0)
+ if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
fprintf(stderr, "Invalid record\n");
return 0;
}
return blerg->index[seg_rec].timestamp;
}
+
+int blerg_set_subscription_mark(struct blerg *blerg) {
+ CHECK_VALID_BLERG(0)
+ blerg->meta->subscription_mark = subscription_count_items(blerg->name);
+ return 1;
+}
+
+uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
+ CHECK_VALID_BLERG(0)
+ return blerg->meta->subscription_mark;
+}
+
+int blerg_set_mute(struct blerg *blerg, int v) {
+ CHECK_VALID_BLERG(0)
+ if (v) {
+ blerg->meta->status |= BLERGMETA_MUTED;
+ } else {
+ blerg->meta->status &= ~BLERGMETA_MUTED;
+ }
+ return 1;
+}
+
+int blerg_get_mute(struct blerg *blerg) {
+ CHECK_VALID_BLERG(0)
+ return (blerg->meta->status & BLERGMETA_MUTED) > 0;
+}