c0ec591553b8ff0170ce0e6d4d71c59d82da345a
[blerg.git] / database / database.c
1 /* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
2  * BSD-style license.  Please see the COPYING file for details.
3  */
4 #include <stdint.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <stdio.h>
8 #include <time.h>
9 #include <unistd.h>
10 #include <sys/stat.h>
11 #include <sys/types.h>
12 #include <sys/mman.h>
13 #include <sys/file.h>
14 #include <fcntl.h>
15 #include "database.h"
16 #include "subscription.h"
17 #include "util.h"
18 #include "config.h"
19
20 uint64_t blerg_get_record_count(struct blerg *blerg) {
21         uint64_t count;
22         flock(blerg->meta_fd, LOCK_SH);
23         count = blerg->meta->sequence;
24         flock(blerg->meta_fd, LOCK_UN);
25         return count;
26 }
27
28 /* Returns last usable record */
29 uint64_t blerg_increment_record_count(struct blerg *blerg) {
30         uint64_t count;
31         flock(blerg->meta_fd, LOCK_EX);
32         count = blerg->meta->sequence++;
33         flock(blerg->meta_fd, LOCK_UN);
34         return count;
35 }
36
37 void blerg_segment_close(struct blerg *blerg) {
38         if (blerg->data != NULL)
39                 munmap((void *)blerg->data, blerg->data_size);
40         if (blerg->data_fd != -1)
41                 close(blerg->data_fd);
42         if (blerg->index != NULL)
43                 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
44         if (blerg->index_fd != -1)
45                 close(blerg->index_fd);
46 }
47
48 int blerg_remap_data(struct blerg *blerg) {
49         struct stat st;
50
51         if (blerg->data != NULL)
52                 munmap(blerg->data, blerg->data_size);
53         fstat(blerg->data_fd, &st);
54         if (st.st_size == 0) {
55                 /* Can't map an empty data file. */
56                 return 1;
57         }
58         blerg->data = (char *) mmap(NULL, st.st_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
59         if (blerg->data == MAP_FAILED) {
60                 perror("Could not remap data");
61                 return 0;
62         }
63         blerg->data_size = st.st_size;
64         return 1;
65 }
66
67 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
68         char filename[512];
69         uint64_t max_sequence_no = blerg_get_record_count(blerg);
70         struct stat st;
71
72         if (blerg->index != NULL && blerg->data != NULL && new_segment == blerg->current_segment) {
73                 return 1;
74         }
75
76         if (new_segment > max_sequence_no / RECORDS_PER_SEGMENT) {
77                 fprintf(stderr, "Cannot switch to sequence beyond last record\n");
78                 return 0;
79         }
80         if (new_segment < 0) {
81                 fprintf(stderr, "Cannot switch to negative segment\n");
82                 return 0;
83         }
84
85         blerg_segment_close(blerg);
86
87         /* Load and map the index */
88         snprintf(filename, 512, "%s/index%d", blerg->base_path, new_segment);
89         blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
90         if (blerg->index_fd == -1) {
91                 perror("Could not open index");
92                 goto open_failed_index_open;
93         }
94         flock(blerg->index_fd, LOCK_EX);
95         fstat(blerg->index_fd, &st);
96         if (st.st_size == 0) {
97                 /* ftruncate() means never having to say you're sorry.  Sorry
98                    in this case meaning "allocating disk space for a 1MB file
99                    full or zeroes". */
100                 ftruncate(blerg->index_fd, RECORDS_PER_SEGMENT * sizeof(struct record));
101         }
102         flock(blerg->index_fd, LOCK_UN);
103
104         blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
105         if (blerg->index == MAP_FAILED) {
106                 perror("Could not mmap index");
107                 goto open_failed_index_mmap;
108         }
109
110         /* Load data file */
111         sprintf(filename, "%s/data%d", blerg->base_path, new_segment);
112         blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
113         if (blerg->data_fd == -1) {
114                 perror("Could not open data");
115                 goto open_failed_data_open;
116         }
117
118         if (!blerg_remap_data(blerg)) {
119                 goto open_failed_data_mmap;
120         }
121
122         blerg->current_segment = new_segment;
123
124         return 1;
125
126 open_failed_data_mmap:
127         close(blerg->data_fd);
128 open_failed_data_open:
129         munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
130 open_failed_index_mmap:
131         close(blerg->index_fd);
132 open_failed_index_open:
133         return 0;
134 }
135
136 int blerg_exists(const char *name) {
137         int namelen = strlen(name);
138         char filename[512];
139
140         if (!valid_name(name)) {
141                 fprintf(stderr, "Invalid name\n");
142                 return 0;
143         }
144
145         snprintf(filename, 512, "%s/%s", DATA_PATH, name);
146         if (access(filename, F_OK) == -1)
147                 return 0;
148         else
149                 return 1;
150 }
151
152 struct blerg *blerg_open(const char *name) {
153         int namelen = strlen(name);
154         char filename[512];
155         struct stat st;
156         uint64_t sequence;
157
158         if (!valid_name(name)) {
159                 fprintf(stderr, "Invalid name\n");
160                 return NULL;
161         }
162         struct blerg *blerg = malloc(sizeof(struct blerg));
163         if (!blerg) {
164                 perror("Cannot allocate memory for blerg");
165                 goto open_failed_blerg_malloc;
166         }
167         blerg->name = malloc(namelen + 1);
168         memcpy(blerg->name, name, namelen + 1);
169         blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
170         blerg->meta = NULL;
171         blerg->index = NULL;
172         blerg->data = NULL;
173
174         /* Make the directory if it doesn't exist */
175         blerg->base_path = malloc(512);
176         snprintf(blerg->base_path, 512, "%s/%s", DATA_PATH, name);
177         if (access(blerg->base_path, F_OK) == -1)
178                 mkdir(blerg->base_path, 0755);
179
180         /* Open and map metadata */
181         snprintf(filename, 512, "%s/meta", blerg->base_path);
182         blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
183         if (blerg->meta_fd == -1) {
184                 perror("Could not open metadata");
185                 goto open_failed_meta_open;
186         }
187         fstat(blerg->meta_fd, &st);
188         if (st.st_size < sizeof(struct meta)) {
189                 /* Extend the file if sizeof(struct meta) is larger than the
190                    file. This allows seamless upgrades as long as struct meta
191                    only adds members. */
192                 posix_fallocate(blerg->meta_fd, 0, sizeof(struct meta));
193         }
194         blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
195         if (blerg->meta == MAP_FAILED) {
196                 perror("Could not map metadata");
197                 goto open_failed_meta_mmap;
198         }
199
200         /* Open and map index and data for the current segment */
201         blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
202         if (!blerg_segment_switch(blerg, blerg->current_segment)) {
203                 fprintf(stderr, "Could not switch segment\n");
204                 goto open_failed_segment_switch;
205         }
206
207         return blerg;
208
209 open_failed_segment_switch:
210         munmap((void *)blerg->meta, sizeof(struct meta));
211 open_failed_meta_mmap:
212         close(blerg->meta_fd);
213 open_failed_meta_open:
214         free(blerg->name);
215         free(blerg);
216 open_failed_blerg_malloc:
217         return NULL;
218 }
219
220 int blerg_close(struct blerg *blerg) {
221         blerg_segment_close(blerg);
222         munmap((void *)blerg->meta, sizeof(struct meta));
223         close(blerg->meta_fd);
224         free(blerg->base_path);
225         free(blerg->name);
226         free(blerg);
227         return 1;
228 }
229
230 int blerg_store(struct blerg *blerg, const char *data, int len) {
231         struct stat st;
232         int n;
233
234         if (len > MAX_RECORD_SIZE) {
235                 fprintf(stderr, "len > 64K\n");
236                 return -1;
237         }
238
239         flock(blerg->index_fd, LOCK_EX);
240         flock(blerg->data_fd, LOCK_EX);
241
242         uint64_t record = blerg_get_record_count(blerg);
243         if (record == -1) {  /* Intentional signed-unsigned coercion */
244                 fprintf(stderr, "Could not find free record\n");
245                 return -1;
246         }
247         int segment = record / RECORDS_PER_SEGMENT;
248         if (segment != blerg->current_segment)
249                 blerg_segment_switch(blerg, segment);
250         int seg_rec = record % RECORDS_PER_SEGMENT;
251
252         /* Get the position for the new data */
253         fstat(blerg->data_fd, &st);
254         int curpos = st.st_size;
255
256         /* Write data to the data log */
257         n = write(blerg->data_fd, data, len);
258         if (n < len) {
259                 perror("Could not write data");
260                 /* Truncate anything we may have written */
261                 ftruncate(blerg->data_fd, curpos);
262                 return -1;
263         }
264
265         /* Update the index */
266         blerg->index[seg_rec].flags = 0x0001;
267         blerg->index[seg_rec].offset = curpos;
268         blerg->index[seg_rec].length = len;
269         blerg->index[seg_rec].timestamp = time(NULL);
270
271         /* And finally increment the record count */
272         blerg_increment_record_count(blerg);
273
274         flock(blerg->data_fd, LOCK_UN);
275         flock(blerg->index_fd, LOCK_UN);
276
277         /* Now do those dirty microblogging deeds */
278         tag_scan(blerg->name, data, len, record);
279         subscription_notify(blerg->name, record);
280
281         return record;
282 }
283
284 int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) {
285         if (record < 0) {
286                 fprintf(stderr, "Invalid record\n");
287                 return 0;
288         }
289         if (record >= blerg_get_record_count(blerg)) {
290                 fprintf(stderr, "Invalid record\n");
291                 return 0;
292         }
293
294         int segment = record / RECORDS_PER_SEGMENT;
295         if (segment != blerg->current_segment)
296                 blerg_segment_switch(blerg, segment);
297         int seg_rec = record % RECORDS_PER_SEGMENT;
298
299         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
300                 fprintf(stderr, "Invalid record\n");
301                 return 0;
302         }
303
304         int rec_offset = blerg->index[seg_rec].offset;
305         int rec_length = blerg->index[seg_rec].length;
306         if (rec_offset >= blerg->data_size) {
307                 /* We're accessing an out-of-bounds record in our mmap.  Remap
308                    and recheck. */
309                 if (!blerg_remap_data(blerg)) {
310                         return 0;
311                 }
312                 if (rec_offset >= blerg->data_size) {
313                         fprintf(stderr, "Record offset outside of data!?");
314                         return 0;
315                 }
316         }
317
318         *data = malloc(rec_length);
319         if (*data == NULL) {
320                 perror("Could not allocate string in fetch");
321                 return 0;
322         }
323
324         memcpy(*data, blerg->data + rec_offset, rec_length);
325
326         *length = rec_length;
327
328         return 1;
329 }
330
331 time_t blerg_get_timestamp(struct blerg *blerg, int record) {
332         if (record < 0) {
333                 fprintf(stderr, "Invalid record\n");
334                 return 0;
335         }
336         if (record >= blerg_get_record_count(blerg)) {
337                 fprintf(stderr, "Invalid record\n");
338                 return 0;
339         }
340
341         int segment = record / RECORDS_PER_SEGMENT;
342         if (segment != blerg->current_segment)
343                 blerg_segment_switch(blerg, segment);
344         int seg_rec = record % RECORDS_PER_SEGMENT;
345
346         if ((blerg->index[seg_rec].flags & 0x1) == 0) {
347                 fprintf(stderr, "Invalid record\n");
348                 return 0;
349         }
350
351         return blerg->index[seg_rec].timestamp;
352 }
353
354 int blerg_set_subscription_mark(struct blerg *blerg) {
355         blerg->meta->subscription_mark = subscription_count_items(blerg->name);
356 }
357
358 uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
359         return blerg->meta->subscription_mark;
360 }