1 /* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
2 * BSD-style license. Please see the COPYING file for details.
11 #include <sys/types.h>
16 #include "subscription.h"
20 #define CHECK_VALID_BLERG(r) \
21 if (blerg == NULL) { \
22 fprintf(stderr, "Invalid struct blerg\n"); \
26 uint64_t blerg_get_record_count(struct blerg *blerg) {
28 flock(blerg->meta_fd, LOCK_SH);
29 count = blerg->meta->sequence;
30 flock(blerg->meta_fd, LOCK_UN);
34 /* Returns last usable record */
35 uint64_t blerg_increment_record_count(struct blerg *blerg) {
37 flock(blerg->meta_fd, LOCK_EX);
38 count = blerg->meta->sequence++;
39 flock(blerg->meta_fd, LOCK_UN);
43 void blerg_segment_close(struct blerg *blerg) {
44 if (blerg->data != NULL)
45 munmap((void *)blerg->data, blerg->data_size);
46 if (blerg->data_fd != -1)
47 close(blerg->data_fd);
48 if (blerg->index != NULL)
49 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
50 if (blerg->index_fd != -1)
51 close(blerg->index_fd);
54 int blerg_remap_data(struct blerg *blerg) {
57 if (blerg->data != NULL)
58 munmap(blerg->data, blerg->data_size);
59 fstat(blerg->data_fd, &st);
60 blerg->data_size = st.st_size;
61 if (blerg->data_size == 0) {
62 /* Can't map an empty data file. */
65 blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
66 if (blerg->data == MAP_FAILED) {
67 perror("Could not remap data");
73 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
75 uint64_t max_sequence_no = blerg_get_record_count(blerg);
78 if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
82 if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
83 fprintf(stderr, "Cannot switch to sequence beyond last record\n");
86 if (new_segment < 0) {
87 fprintf(stderr, "Cannot switch to negative segment\n");
91 blerg_segment_close(blerg);
93 /* Load and map the index */
94 snprintf(filename, 512, "%s/index%d", blerg->base_path, new_segment);
95 blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
96 if (blerg->index_fd == -1) {
97 perror("Could not open index");
98 goto open_failed_index_open;
100 flock(blerg->index_fd, LOCK_EX);
101 fstat(blerg->index_fd, &st);
102 if (st.st_size == 0) {
103 /* ftruncate() means never having to say you're sorry. Sorry
104 in this case meaning "allocating disk space for a 1MB file
106 ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
108 flock(blerg->index_fd, LOCK_UN);
110 blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
111 if (blerg->index == MAP_FAILED) {
112 perror("Could not mmap index");
113 goto open_failed_index_mmap;
117 sprintf(filename, "%s/data%d", blerg->base_path, new_segment);
118 blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
119 if (blerg->data_fd == -1) {
120 perror("Could not open data");
121 goto open_failed_data_open;
124 if (!blerg_remap_data(blerg)) {
125 goto open_failed_data_mmap;
128 blerg->current_segment = new_segment;
132 open_failed_data_mmap:
133 close(blerg->data_fd);
134 open_failed_data_open:
135 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
136 open_failed_index_mmap:
137 close(blerg->index_fd);
138 open_failed_index_open:
142 int blerg_exists(const char *name) {
143 int namelen = strlen(name);
146 if (!valid_name(name)) {
147 fprintf(stderr, "Invalid name\n");
151 snprintf(filename, 512, "%s/%s", DATA_PATH, name);
152 if (access(filename, F_OK) == -1)
158 struct blerg *blerg_open(const char *name) {
159 int namelen = strlen(name);
164 if (!valid_name(name)) {
165 fprintf(stderr, "Invalid name\n");
168 struct blerg *blerg = malloc(sizeof(struct blerg));
170 perror("Cannot allocate memory for blerg");
171 goto open_failed_blerg_malloc;
173 blerg->name = malloc(namelen + 1);
174 memcpy(blerg->name, name, namelen + 1);
175 blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
180 /* Make the directory if it doesn't exist */
181 blerg->base_path = malloc(512);
182 snprintf(blerg->base_path, 512, "%s/%s", DATA_PATH, name);
183 if (access(blerg->base_path, F_OK) == -1)
184 mkdir(blerg->base_path, 0755);
186 /* Open and map metadata */
187 snprintf(filename, 512, "%s/meta", blerg->base_path);
188 blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
189 if (blerg->meta_fd == -1) {
190 perror("Could not open metadata");
191 goto open_failed_meta_open;
193 fstat(blerg->meta_fd, &st);
194 if (st.st_size < sizeof(struct meta)) {
195 /* Extend the file if sizeof(struct meta) is larger than the
196 file. This allows seamless upgrades as long as struct meta
197 only adds members. */
198 posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
200 blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
201 if (blerg->meta == MAP_FAILED) {
202 perror("Could not map metadata");
203 goto open_failed_meta_mmap;
206 /* Open and map index and data for the current segment */
207 blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
208 if (!blerg_segment_switch(blerg, blerg->current_segment)) {
209 fprintf(stderr, "Could not switch segment\n");
210 goto open_failed_segment_switch;
215 open_failed_segment_switch:
216 munmap((void *)blerg->meta, sizeof(struct meta));
217 open_failed_meta_mmap:
218 close(blerg->meta_fd);
219 open_failed_meta_open:
222 open_failed_blerg_malloc:
226 int blerg_close(struct blerg *blerg) {
228 blerg_segment_close(blerg);
229 munmap((void *)blerg->meta, sizeof(struct meta));
230 close(blerg->meta_fd);
231 free(blerg->base_path);
237 uint64_t blerg_store(struct blerg *blerg, const char *data, int length) {
241 CHECK_VALID_BLERG(BLERG_INVALID_RECORD)
243 if (length > MAX_RECORD_SIZE || length <= 0) {
244 fprintf(stderr, "length out of bounds\n");
245 return BLERG_INVALID_RECORD;
248 flock(blerg->index_fd, LOCK_EX);
249 flock(blerg->data_fd, LOCK_EX);
251 uint64_t record = blerg_get_record_count(blerg);
252 if (record == -1) { /* Intentional signed-unsigned coercion */
253 fprintf(stderr, "Could not find free record\n");
254 return BLERG_INVALID_RECORD;
256 int segment = record / RECORDS_PER_SEGMENT;
257 if (segment != blerg->current_segment)
258 blerg_segment_switch(blerg, segment);
259 int seg_rec = record % RECORDS_PER_SEGMENT;
261 /* Get the position for the new data */
262 fstat(blerg->data_fd, &st);
263 int curpos = st.st_size;
265 /* Write data to the data log */
266 n = write(blerg->data_fd, data, length);
268 perror("Could not write data");
269 /* Truncate anything we may have written */
270 ftruncate(blerg->data_fd, curpos);
271 return BLERG_INVALID_RECORD;
274 /* Update the index */
275 blerg->index[seg_rec].flags = 0x0001;
276 blerg->index[seg_rec].offset = curpos;
277 blerg->index[seg_rec].length = length;
278 blerg->index[seg_rec].timestamp = time(NULL);
280 /* And finally increment the record count */
281 blerg_increment_record_count(blerg);
283 flock(blerg->data_fd, LOCK_UN);
284 flock(blerg->index_fd, LOCK_UN);
286 if (!blerg_get_mute(blerg)) {
287 /* Now do those dirty microblogging deeds */
288 tag_scan(blerg->name, data, length, record);
289 subscription_notify(blerg->name, record);
295 int blerg_fetch(struct blerg *blerg, uint64_t record, char **data, int *length) {
297 if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
298 fprintf(stderr, "Invalid record\n");
301 if (data == NULL || length == NULL) {
302 fprintf(stderr, "data or length is null\n");
306 int segment = record / RECORDS_PER_SEGMENT;
307 if (segment != blerg->current_segment)
308 blerg_segment_switch(blerg, segment);
309 int seg_rec = record % RECORDS_PER_SEGMENT;
311 if ((blerg->index[seg_rec].flags & 0x1) == 0) {
312 fprintf(stderr, "Invalid record\n");
316 int rec_offset = blerg->index[seg_rec].offset;
317 int rec_length = blerg->index[seg_rec].length;
318 if (rec_offset >= blerg->data_size) {
319 /* We're accessing an out-of-bounds record in our mmap. Remap
321 if (!blerg_remap_data(blerg)) {
324 if (rec_offset >= blerg->data_size) {
325 fprintf(stderr, "Record offset outside of data!?");
330 *data = malloc(rec_length);
332 perror("Could not allocate string in fetch");
336 memcpy(*data, blerg->data + rec_offset, rec_length);
338 *length = rec_length;
343 time_t blerg_get_timestamp(struct blerg *blerg, uint64_t record) {
345 if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
346 fprintf(stderr, "Invalid record\n");
350 int segment = record / RECORDS_PER_SEGMENT;
351 if (segment != blerg->current_segment)
352 blerg_segment_switch(blerg, segment);
353 int seg_rec = record % RECORDS_PER_SEGMENT;
355 if ((blerg->index[seg_rec].flags & 0x1) == 0) {
356 fprintf(stderr, "Invalid record\n");
360 return blerg->index[seg_rec].timestamp;
363 int blerg_set_subscription_mark(struct blerg *blerg) {
365 blerg->meta->subscription_mark = subscription_count_items(blerg->name);
369 uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
371 return blerg->meta->subscription_mark;
374 int blerg_set_mute(struct blerg *blerg, int v) {
377 blerg->meta->status |= BLERGMETA_MUTED;
379 blerg->meta->status &= ~BLERGMETA_MUTED;
384 int blerg_get_mute(struct blerg *blerg) {
386 return (blerg->meta->status & BLERGMETA_MUTED) > 0;