+/* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
+ * BSD-style license. Please see the COPYING file for details.
+ */
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/file.h>
#include <fcntl.h>
#include "database.h"
+#include "subscription.h"
+#include "util.h"
#include "config.h"
uint64_t blerg_get_record_count(struct blerg *blerg) {
int namelen = strlen(name);
char filename[512];
- if (namelen > 32) {
- perror("Name too long");
+ if (!valid_name(name)) {
+ fprintf(stderr, "Invalid name\n");
return 0;
}
struct stat st;
uint64_t sequence;
- if (namelen > 32) {
- perror("Name too long");
+ if (!valid_name(name)) {
+ fprintf(stderr, "Invalid name\n");
return NULL;
}
struct blerg *blerg = malloc(sizeof(struct blerg));
goto open_failed_meta_open;
}
fstat(blerg->meta_fd, &st);
- if (st.st_size == 0) {
- char *buf = (char *) malloc(sizeof(struct meta));
- memset(buf, 0, sizeof(struct meta));
- write(blerg->meta_fd, buf, sizeof(struct meta));
+ if (st.st_size < sizeof(struct meta)) {
+ // Fill the difference in size between sizeof(struct meta) and
+ // the file size with nulls. This allows seamless upgrades as
+ // long as struct meta only adds members.
+ int len = sizeof(struct meta) - st.st_size;
+ char *buf = (char *) malloc(len);
+ memset(buf, 0, len);
+ int tmpfd = dup(blerg->meta_fd);
+ FILE* tmp = fdopen(tmpfd, "a");
+ fwrite(buf, len, 1, tmp);
+ fclose(tmp);
free(buf);
}
blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
int blerg_store(struct blerg *blerg, const char *data, int len) {
if (len > MAX_RECORD_SIZE) {
- printf("len > 64K\n");
+ fprintf(stderr, "len > 64K\n");
return -1;
}
uint64_t record = blerg_increment_record_count(blerg);
if (record == -1) {
- printf("Could not find free record\n");
+ fprintf(stderr, "Could not find free record\n");
return -1;
}
int segment = record / RECORDS_PER_SEGMENT;
blerg->index[seg_rec].length = len;
blerg->index[seg_rec].timestamp = time(NULL);
- tag_scan(blerg->name, data, len, record);
-
flock(blerg->data_fd, LOCK_UN);
flock(blerg->index_fd, LOCK_UN);
+ tag_scan(blerg->name, data, len, record);
+ subscription_notify(blerg->name, record);
+
return record;
}
int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) {
if (record < 0) {
- printf("Invalid record\n");
+ fprintf(stderr, "Invalid record\n");
return 0;
}
int seg_rec = record % RECORDS_PER_SEGMENT;
if ((blerg->index[seg_rec].flags & 0x1) == 0) {
- printf("Invalid record\n");
+ fprintf(stderr, "Invalid record\n");
return 0;
}
fstat(blerg->data_fd, &st);
blerg->data_size = st.st_size;
if (rec_offset > blerg->data_size) {
- printf("Record offset outside of data!?");
+ fprintf(stderr, "Record offset outside of data!?");
return 0;
}
time_t blerg_get_timestamp(struct blerg *blerg, int record) {
if (record < 0) {
- printf("Invalid record\n");
+ fprintf(stderr, "Invalid record\n");
return 0;
}
int seg_rec = record % RECORDS_PER_SEGMENT;
if ((blerg->index[seg_rec].flags & 0x1) == 0) {
- printf("Invalid record\n");
+ fprintf(stderr, "Invalid record\n");
return 0;
}
return blerg->index[seg_rec].timestamp;
}
+
+int blerg_set_subscription_mark(struct blerg *blerg) {
+ blerg->meta->subscription_mark = subscription_count_items(blerg->name);
+}
+
+uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
+ return blerg->meta->subscription_mark;
+}