1 /* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
2 * BSD-style license. Please see the COPYING file for details.
11 #include <sys/types.h>
16 #include "subscription.h"
20 uint64_t blerg_get_record_count(struct blerg *blerg) {
22 flock(blerg->meta_fd, LOCK_SH);
23 count = blerg->meta->sequence;
24 flock(blerg->meta_fd, LOCK_UN);
28 /* Returns last usable record */
29 uint64_t blerg_increment_record_count(struct blerg *blerg) {
31 flock(blerg->meta_fd, LOCK_EX);
32 count = blerg->meta->sequence++;
33 flock(blerg->meta_fd, LOCK_UN);
37 void blerg_segment_close(struct blerg *blerg) {
38 if (blerg->data != NULL)
39 munmap((void *)blerg->data, blerg->data_size);
40 if (blerg->data_fd != -1)
41 close(blerg->data_fd);
42 if (blerg->index != NULL)
43 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
44 if (blerg->index_fd != -1)
45 close(blerg->index_fd);
48 int blerg_remap_data(struct blerg *blerg) {
51 if (blerg->data != NULL)
52 munmap(blerg->data, blerg->data_size);
53 fstat(blerg->data_fd, &st);
54 if (st.st_size == 0) {
55 /* Can't map an empty data file. */
58 blerg->data = (char *) mmap(NULL, st.st_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
59 if (blerg->data == MAP_FAILED) {
60 perror("Could not remap data");
63 blerg->data_size = st.st_size;
67 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
69 uint64_t max_sequence_no = blerg_get_record_count(blerg);
72 if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
76 if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
77 fprintf(stderr, "Cannot switch to sequence beyond last record\n");
80 if (new_segment < 0) {
81 fprintf(stderr, "Cannot switch to negative segment\n");
85 blerg_segment_close(blerg);
87 /* Load and map the index */
88 snprintf(filename, 512, "%s/index%d", blerg->base_path, new_segment);
89 blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
90 if (blerg->index_fd == -1) {
91 perror("Could not open index");
92 goto open_failed_index_open;
94 flock(blerg->index_fd, LOCK_EX);
95 fstat(blerg->index_fd, &st);
96 if (st.st_size == 0) {
97 /* ftruncate() means never having to say you're sorry. Sorry
98 in this case meaning "allocating disk space for a 1MB file
100 ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
102 flock(blerg->index_fd, LOCK_UN);
104 blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
105 if (blerg->index == MAP_FAILED) {
106 perror("Could not mmap index");
107 goto open_failed_index_mmap;
111 sprintf(filename, "%s/data%d", blerg->base_path, new_segment);
112 blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
113 if (blerg->data_fd == -1) {
114 perror("Could not open data");
115 goto open_failed_data_open;
118 if (!blerg_remap_data(blerg)) {
119 goto open_failed_data_mmap;
122 blerg->current_segment = new_segment;
126 open_failed_data_mmap:
127 close(blerg->data_fd);
128 open_failed_data_open:
129 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
130 open_failed_index_mmap:
131 close(blerg->index_fd);
132 open_failed_index_open:
136 int blerg_exists(const char *name) {
137 int namelen = strlen(name);
140 if (!valid_name(name)) {
141 fprintf(stderr, "Invalid name\n");
145 snprintf(filename, 512, "%s/%s", DATA_PATH, name);
146 if (access(filename, F_OK) == -1)
152 struct blerg *blerg_open(const char *name) {
153 int namelen = strlen(name);
158 if (!valid_name(name)) {
159 fprintf(stderr, "Invalid name\n");
162 struct blerg *blerg = malloc(sizeof(struct blerg));
164 perror("Cannot allocate memory for blerg");
165 goto open_failed_blerg_malloc;
167 blerg->name = malloc(namelen + 1);
168 memcpy(blerg->name, name, namelen + 1);
169 blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
174 /* Make the directory if it doesn't exist */
175 blerg->base_path = malloc(512);
176 snprintf(blerg->base_path, 512, "%s/%s", DATA_PATH, name);
177 if (access(blerg->base_path, F_OK) == -1)
178 mkdir(blerg->base_path, 0755);
180 /* Open and map metadata */
181 snprintf(filename, 512, "%s/meta", blerg->base_path);
182 blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
183 if (blerg->meta_fd == -1) {
184 perror("Could not open metadata");
185 goto open_failed_meta_open;
187 fstat(blerg->meta_fd, &st);
188 if (st.st_size < sizeof(struct meta)) {
189 /* Extend the file if sizeof(struct meta) is larger than the
190 file. This allows seamless upgrades as long as struct meta
191 only adds members. */
192 posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
194 blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
195 if (blerg->meta == MAP_FAILED) {
196 perror("Could not map metadata");
197 goto open_failed_meta_mmap;
200 /* Open and map index and data for the current segment */
201 blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
202 if (!blerg_segment_switch(blerg, blerg->current_segment)) {
203 fprintf(stderr, "Could not switch segment\n");
204 goto open_failed_segment_switch;
209 open_failed_segment_switch:
210 munmap((void *)blerg->meta, sizeof(struct meta));
211 open_failed_meta_mmap:
212 close(blerg->meta_fd);
213 open_failed_meta_open:
216 open_failed_blerg_malloc:
220 int blerg_close(struct blerg *blerg) {
221 blerg_segment_close(blerg);
222 munmap((void *)blerg->meta, sizeof(struct meta));
223 close(blerg->meta_fd);
224 free(blerg->base_path);
230 int blerg_store(struct blerg *blerg, const char *data, int len) {
234 if (len > MAX_RECORD_SIZE) {
235 fprintf(stderr, "len > 64K\n");
239 flock(blerg->index_fd, LOCK_EX);
240 flock(blerg->data_fd, LOCK_EX);
242 uint64_t record = blerg_get_record_count(blerg);
243 if (record == -1) { /* Intentional signed-unsigned coercion */
244 fprintf(stderr, "Could not find free record\n");
247 int segment = record / RECORDS_PER_SEGMENT;
248 if (segment != blerg->current_segment)
249 blerg_segment_switch(blerg, segment);
250 int seg_rec = record % RECORDS_PER_SEGMENT;
252 /* Get the position for the new data */
253 fstat(blerg->data_fd, &st);
254 int curpos = st.st_size;
256 /* Write data to the data log */
257 n = write(blerg->data_fd, data, len);
259 perror("Could not write data");
260 /* Truncate anything we may have written */
261 ftruncate(blerg->data_fd, curpos);
265 /* Update the index */
266 blerg->index[seg_rec].flags = 0x0001;
267 blerg->index[seg_rec].offset = curpos;
268 blerg->index[seg_rec].length = len;
269 blerg->index[seg_rec].timestamp = time(NULL);
271 /* And finally increment the record count */
272 blerg_increment_record_count(blerg);
274 flock(blerg->data_fd, LOCK_UN);
275 flock(blerg->index_fd, LOCK_UN);
277 /* Now do those dirty microblogging deeds */
278 tag_scan(blerg->name, data, len, record);
279 subscription_notify(blerg->name, record);
284 int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) {
286 fprintf(stderr, "Invalid record\n");
289 if (record >= blerg_get_record_count(blerg)) {
290 fprintf(stderr, "Invalid record\n");
294 int segment = record / RECORDS_PER_SEGMENT;
295 if (segment != blerg->current_segment)
296 blerg_segment_switch(blerg, segment);
297 int seg_rec = record % RECORDS_PER_SEGMENT;
299 if ((blerg->index[seg_rec].flags & 0x1) == 0) {
300 fprintf(stderr, "Invalid record\n");
304 int rec_offset = blerg->index[seg_rec].offset;
305 int rec_length = blerg->index[seg_rec].length;
306 if (rec_offset >= blerg->data_size) {
307 /* We're accessing an out-of-bounds record in our mmap. Remap
309 if (!blerg_remap_data(blerg)) {
312 if (rec_offset >= blerg->data_size) {
313 fprintf(stderr, "Record offset outside of data!?");
318 *data = malloc(rec_length);
320 perror("Could not allocate string in fetch");
324 memcpy(*data, blerg->data + rec_offset, rec_length);
326 *length = rec_length;
331 time_t blerg_get_timestamp(struct blerg *blerg, int record) {
333 fprintf(stderr, "Invalid record\n");
336 if (record >= blerg_get_record_count(blerg)) {
337 fprintf(stderr, "Invalid record\n");
341 int segment = record / RECORDS_PER_SEGMENT;
342 if (segment != blerg->current_segment)
343 blerg_segment_switch(blerg, segment);
344 int seg_rec = record % RECORDS_PER_SEGMENT;
346 if ((blerg->index[seg_rec].flags & 0x1) == 0) {
347 fprintf(stderr, "Invalid record\n");
351 return blerg->index[seg_rec].timestamp;
354 int blerg_set_subscription_mark(struct blerg *blerg) {
355 blerg->meta->subscription_mark = subscription_count_items(blerg->name);
358 uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
359 return blerg->meta->subscription_mark;