1 /* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
2 * BSD-style license. Please see the COPYING file for details.
11 #include <sys/types.h>
16 #include "configuration.h"
17 #include "subscription.h"
22 #define CHECK_VALID_BLERG(r) \
23 if (blerg == NULL) { \
24 fprintf(stderr, "Invalid struct blerg\n"); \
29 if (!blerg_configuration_init()) {
35 uint64_t blerg_get_record_count(struct blerg *blerg) {
37 flock(blerg->meta_fd, LOCK_SH);
38 count = blerg->meta->sequence;
39 flock(blerg->meta_fd, LOCK_UN);
43 /* Returns last usable record */
44 uint64_t blerg_increment_record_count(struct blerg *blerg) {
46 flock(blerg->meta_fd, LOCK_EX);
47 count = blerg->meta->sequence++;
48 flock(blerg->meta_fd, LOCK_UN);
52 void blerg_segment_close(struct blerg *blerg) {
53 if (blerg->data != NULL)
54 munmap((void *)blerg->data, blerg->data_size);
55 if (blerg->data_fd != -1)
56 close(blerg->data_fd);
57 if (blerg->index != NULL)
58 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
59 if (blerg->index_fd != -1)
60 close(blerg->index_fd);
63 int blerg_remap_data(struct blerg *blerg) {
66 if (blerg->data != NULL)
67 munmap(blerg->data, blerg->data_size);
68 fstat(blerg->data_fd, &st);
69 blerg->data_size = st.st_size;
70 if (blerg->data_size == 0) {
71 /* Can't map an empty data file. */
74 blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
75 if (blerg->data == MAP_FAILED) {
76 perror("Could not remap data");
82 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
83 char filename[FILENAME_MAX];
84 uint64_t max_sequence_no = blerg_get_record_count(blerg);
87 if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
91 if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
92 fprintf(stderr, "Cannot switch to sequence beyond last record\n");
95 if (new_segment < 0) {
96 fprintf(stderr, "Cannot switch to negative segment\n");
100 blerg_segment_close(blerg);
102 /* Load and map the index */
103 snprintf(filename, FILENAME_MAX, "%s/index%d", blerg->base_path, new_segment);
104 blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
105 if (blerg->index_fd == -1) {
106 perror("Could not open index");
107 goto open_failed_index_open;
109 flock(blerg->index_fd, LOCK_EX);
110 fstat(blerg->index_fd, &st);
111 if (st.st_size == 0) {
112 /* ftruncate() means never having to say you're sorry. Sorry
113 in this case meaning "allocating disk space for a 1MB file
115 ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
117 flock(blerg->index_fd, LOCK_UN);
119 blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
120 if (blerg->index == MAP_FAILED) {
121 perror("Could not mmap index");
122 goto open_failed_index_mmap;
126 snprintf(filename, FILENAME_MAX, "%s/data%d", blerg->base_path, new_segment);
127 blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
128 if (blerg->data_fd == -1) {
129 perror("Could not open data");
130 goto open_failed_data_open;
133 if (!blerg_remap_data(blerg)) {
134 goto open_failed_data_mmap;
137 blerg->current_segment = new_segment;
141 open_failed_data_mmap:
142 close(blerg->data_fd);
143 open_failed_data_open:
144 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
145 open_failed_index_mmap:
146 close(blerg->index_fd);
147 open_failed_index_open:
151 int blerg_exists(const char *name) {
152 char filename[FILENAME_MAX];
154 if (!valid_name(name)) {
155 fprintf(stderr, "Invalid name\n");
159 snprintf(filename, FILENAME_MAX, "%s/%s", blergconf.data_path, name);
160 if (access(filename, F_OK) == -1)
166 struct blerg *blerg_open(const char *name) {
167 int namelen = strlen(name);
168 char filename[FILENAME_MAX];
172 if (!valid_name(name)) {
173 fprintf(stderr, "Invalid name\n");
176 struct blerg *blerg = malloc(sizeof(struct blerg));
178 perror("Cannot allocate memory for blerg");
179 goto open_failed_blerg_malloc;
181 blerg->name = malloc(namelen + 1);
182 memcpy(blerg->name, name, namelen + 1);
183 blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
188 /* Make the directory if it doesn't exist */
189 blerg->base_path = malloc(FILENAME_MAX);
190 snprintf(blerg->base_path, FILENAME_MAX, "%s/%s", blergconf.data_path, name);
191 if (access(blerg->base_path, F_OK) == -1)
192 mkdir(blerg->base_path, 0755);
194 /* Open and map metadata */
195 snprintf(filename, FILENAME_MAX, "%s/meta", blerg->base_path);
196 blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
197 if (blerg->meta_fd == -1) {
198 perror("Could not open metadata");
199 goto open_failed_meta_open;
201 fstat(blerg->meta_fd, &st);
202 if (st.st_size < sizeof(struct meta)) {
203 /* Extend the file if sizeof(struct meta) is larger than the
204 file. This allows seamless upgrades as long as struct meta
205 only adds members. */
206 posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
208 blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
209 if (blerg->meta == MAP_FAILED) {
210 perror("Could not map metadata");
211 goto open_failed_meta_mmap;
214 /* Open and map index and data for the current segment */
215 blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
216 if (!blerg_segment_switch(blerg, blerg->current_segment)) {
217 fprintf(stderr, "Could not switch segment\n");
218 goto open_failed_segment_switch;
223 open_failed_segment_switch:
224 munmap((void *)blerg->meta, sizeof(struct meta));
225 open_failed_meta_mmap:
226 close(blerg->meta_fd);
227 open_failed_meta_open:
230 open_failed_blerg_malloc:
234 int blerg_close(struct blerg *blerg) {
236 blerg_segment_close(blerg);
237 munmap((void *)blerg->meta, sizeof(struct meta));
238 close(blerg->meta_fd);
239 free(blerg->base_path);
245 uint64_t blerg_store(struct blerg *blerg, const char *data, int length) {
249 CHECK_VALID_BLERG(BLERG_INVALID_RECORD)
251 if (length > MAX_RECORD_SIZE || length <= 0) {
252 fprintf(stderr, "length out of bounds\n");
253 return BLERG_INVALID_RECORD;
256 flock(blerg->index_fd, LOCK_EX);
257 flock(blerg->data_fd, LOCK_EX);
259 uint64_t record = blerg_get_record_count(blerg);
260 if (record == -1) { /* Intentional signed-unsigned coercion */
261 fprintf(stderr, "Could not find free record\n");
262 return BLERG_INVALID_RECORD;
264 int segment = record / RECORDS_PER_SEGMENT;
265 if (segment != blerg->current_segment)
266 blerg_segment_switch(blerg, segment);
267 int seg_rec = record % RECORDS_PER_SEGMENT;
269 /* Get the position for the new data */
270 fstat(blerg->data_fd, &st);
271 int curpos = st.st_size;
273 /* Write data to the data log */
274 n = write(blerg->data_fd, data, length);
276 perror("Could not write data");
277 /* Truncate anything we may have written */
278 ftruncate(blerg->data_fd, curpos);
279 return BLERG_INVALID_RECORD;
282 /* Update the index */
283 blerg->index[seg_rec].flags = 0x0001;
284 blerg->index[seg_rec].offset = curpos;
285 blerg->index[seg_rec].length = length;
286 blerg->index[seg_rec].timestamp = time(NULL);
288 /* And finally increment the record count */
289 blerg_increment_record_count(blerg);
291 flock(blerg->data_fd, LOCK_UN);
292 flock(blerg->index_fd, LOCK_UN);
294 if (!blerg_get_mute(blerg)) {
295 /* Now do those dirty microblogging deeds */
296 tag_scan(blerg->name, data, length, record);
297 subscription_notify(blerg->name, record);
303 int blerg_fetch(struct blerg *blerg, uint64_t record, char **data, int *length) {
305 if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
306 fprintf(stderr, "Invalid record\n");
309 if (data == NULL || length == NULL) {
310 fprintf(stderr, "data or length is null\n");
314 int segment = record / RECORDS_PER_SEGMENT;
315 if (segment != blerg->current_segment)
316 blerg_segment_switch(blerg, segment);
317 int seg_rec = record % RECORDS_PER_SEGMENT;
319 if ((blerg->index[seg_rec].flags & 0x1) == 0) {
320 fprintf(stderr, "Invalid record\n");
324 int rec_offset = blerg->index[seg_rec].offset;
325 int rec_length = blerg->index[seg_rec].length;
326 if (rec_offset >= blerg->data_size) {
327 /* We're accessing an out-of-bounds record in our mmap. Remap
329 if (!blerg_remap_data(blerg)) {
332 if (rec_offset >= blerg->data_size) {
333 fprintf(stderr, "Record offset outside of data!?");
338 *data = malloc(rec_length);
340 perror("Could not allocate string in fetch");
344 memcpy(*data, blerg->data + rec_offset, rec_length);
346 *length = rec_length;
351 time_t blerg_get_timestamp(struct blerg *blerg, uint64_t record) {
353 if (record == BLERG_INVALID_RECORD || record >= blerg_get_record_count(blerg)) {
354 fprintf(stderr, "Invalid record\n");
358 int segment = record / RECORDS_PER_SEGMENT;
359 if (segment != blerg->current_segment)
360 blerg_segment_switch(blerg, segment);
361 int seg_rec = record % RECORDS_PER_SEGMENT;
363 if ((blerg->index[seg_rec].flags & 0x1) == 0) {
364 fprintf(stderr, "Invalid record\n");
368 return blerg->index[seg_rec].timestamp;
371 int blerg_set_subscription_mark(struct blerg *blerg) {
373 blerg->meta->subscription_mark = subscription_count_items(blerg->name);
377 uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
379 return blerg->meta->subscription_mark;
382 int blerg_set_mute(struct blerg *blerg, int v) {
385 blerg->meta->status |= BLERGMETA_MUTED;
387 blerg->meta->status &= ~BLERGMETA_MUTED;
392 int blerg_get_mute(struct blerg *blerg) {
394 return (blerg->meta->status & BLERGMETA_MUTED) > 0;