1 /* Blerg is (C) 2011 The Dominion of Awesome, and is distributed under a
2 * BSD-style license. Please see the COPYING file for details.
11 #include <sys/types.h>
16 #include "subscription.h"
20 uint64_t blerg_get_record_count(struct blerg *blerg) {
22 flock(blerg->meta_fd, LOCK_SH);
23 count = blerg->meta->sequence;
24 flock(blerg->meta_fd, LOCK_UN);
28 /* Returns last usable record */
29 uint64_t blerg_increment_record_count(struct blerg *blerg) {
31 flock(blerg->meta_fd, LOCK_EX);
32 count = blerg->meta->sequence++;
33 flock(blerg->meta_fd, LOCK_UN);
37 void blerg_segment_close(struct blerg *blerg) {
38 if (blerg->data != NULL)
39 munmap((void *)blerg->data, blerg->data_size);
40 if (blerg->data_fd != -1)
41 close(blerg->data_fd);
42 if (blerg->index != NULL)
43 munmap((void *)blerg->index, RECORDS_PER_SEGMENT * sizeof(struct record));
44 if (blerg->index_fd != -1)
45 close(blerg->index_fd);
48 int blerg_segment_switch(struct blerg *blerg, int new_segment) {
50 uint64_t max_sequence = blerg_get_record_count(blerg);
53 if (new_segment > max_sequence / RECORDS_PER_SEGMENT) {
54 fprintf(stderr, "Cannot switch to sequence beyond last record\n");
58 blerg_segment_close(blerg);
60 /* Load and map the index */
61 snprintf(filename, 512, "%s/index%d", blerg->base_path, new_segment);
62 blerg->index_fd = open(filename, O_RDWR | O_CREAT, 0600);
63 if (blerg->index_fd == -1) {
64 perror("Could not open index");
65 goto open_failed_index_open;
67 flock(blerg->index_fd, LOCK_EX);
68 fstat(blerg->index_fd, &st);
69 if (st.st_size == 0) {
72 memset((void *)&r, 0, sizeof(struct record));
73 for (i = 0; i < RECORDS_PER_SEGMENT; i++) {
74 write(blerg->index_fd, &r, sizeof(struct record));
77 flock(blerg->index_fd, LOCK_UN);
79 blerg->index = (struct record *) mmap(NULL, RECORDS_PER_SEGMENT * sizeof(struct record), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->index_fd, 0);
80 if (blerg->index == MAP_FAILED) {
81 perror("Could not mmap index");
82 goto open_failed_index_mmap;
86 sprintf(filename, "%s/data%d", blerg->base_path, new_segment);
87 blerg->data_fd = open(filename, O_RDWR | O_APPEND | O_CREAT, 0600);
88 fstat(blerg->data_fd, &st);
89 blerg->data_size = st.st_size;
90 if (blerg->data_fd == -1) {
91 perror("Could not open data");
92 goto open_failed_data_open;
95 if (blerg->data_size > 0) {
96 blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
97 if (blerg->data == MAP_FAILED) {
98 perror("Could not mmap data");
99 goto open_failed_data_mmap;
105 open_failed_data_mmap:
106 close(blerg->data_fd);
107 open_failed_data_open:
108 munmap((void *)blerg->index, sizeof(RECORDS_PER_SEGMENT * sizeof(struct record)));
109 open_failed_index_mmap:
110 close(blerg->index_fd);
111 open_failed_index_open:
115 int blerg_exists(const char *name) {
116 int namelen = strlen(name);
119 if (!valid_name(name)) {
120 fprintf(stderr, "Invalid name\n");
124 snprintf(filename, 512, "%s/%s", DATA_PATH, name);
125 if (access(filename, F_OK) == -1)
131 struct blerg *blerg_open(const char *name) {
132 int namelen = strlen(name);
137 if (!valid_name(name)) {
138 fprintf(stderr, "Invalid name\n");
141 struct blerg *blerg = malloc(sizeof(struct blerg));
143 perror("Cannot allocate memory for blerg");
144 goto open_failed_blerg_malloc;
146 blerg->name = malloc(namelen + 1);
147 memcpy(blerg->name, name, namelen + 1);
148 blerg->meta_fd = blerg->index_fd = blerg->data_fd = -1;
153 /* Make the directory if it doesn't exist */
154 blerg->base_path = malloc(512);
155 snprintf(blerg->base_path, 512, "%s/%s", DATA_PATH, name);
156 if (access(blerg->base_path, F_OK) == -1)
157 mkdir(blerg->base_path, 0755);
159 /* Open and map metadata */
160 snprintf(filename, 512, "%s/meta", blerg->base_path);
161 blerg->meta_fd = open(filename, O_RDWR | O_CREAT, 0600);
162 if (blerg->meta_fd == -1) {
163 perror("Could not open metadata");
164 goto open_failed_meta_open;
166 fstat(blerg->meta_fd, &st);
167 if (st.st_size < sizeof(struct meta)) {
168 // Fill the difference in size between sizeof(struct meta) and
169 // the file size with nulls. This allows seamless upgrades as
170 // long as struct meta only adds members.
171 int len = sizeof(struct meta) - st.st_size;
172 char *buf = (char *) malloc(len);
174 int tmpfd = dup(blerg->meta_fd);
175 FILE* tmp = fdopen(tmpfd, "a");
176 fwrite(buf, len, 1, tmp);
180 blerg->meta = (struct meta *) mmap(NULL, sizeof(struct meta), PROT_READ | PROT_WRITE, MAP_SHARED, blerg->meta_fd, 0);
181 if (blerg->meta == MAP_FAILED) {
182 perror("Could not map metadata");
183 goto open_failed_meta_mmap;
186 /* Open and map index and data for the current segment */
187 blerg->current_segment = blerg_get_record_count(blerg) / RECORDS_PER_SEGMENT;
188 if (!blerg_segment_switch(blerg, blerg->current_segment)) {
189 fprintf(stderr, "Could not switch segment\n");
190 goto open_failed_segment_switch;
195 open_failed_segment_switch:
196 munmap((void *)blerg->meta, sizeof(struct meta));
197 open_failed_meta_mmap:
198 close(blerg->meta_fd);
199 open_failed_meta_open:
202 open_failed_blerg_malloc:
206 int blerg_close(struct blerg *blerg) {
207 blerg_segment_close(blerg);
208 munmap((void *)blerg->meta, sizeof(struct meta));
209 close(blerg->meta_fd);
210 free(blerg->base_path);
216 int blerg_store(struct blerg *blerg, const char *data, int len) {
217 if (len > MAX_RECORD_SIZE) {
218 fprintf(stderr, "len > 64K\n");
222 flock(blerg->index_fd, LOCK_EX);
223 flock(blerg->data_fd, LOCK_EX);
225 uint64_t record = blerg_increment_record_count(blerg);
227 fprintf(stderr, "Could not find free record\n");
230 int segment = record / RECORDS_PER_SEGMENT;
231 if (segment != blerg->current_segment)
232 blerg_segment_switch(blerg, segment);
233 int seg_rec = record % RECORDS_PER_SEGMENT;
235 /* Get the position for the new data */
236 FILE *datafile = fdopen(dup(blerg->data_fd), "a");
237 fseek(datafile, 0, SEEK_END);
238 int curpos = ftell(datafile);
243 int n = write(blerg->data_fd, data + bytes, len);
245 perror("Could not write data");
246 /* Truncate anything we may have written */
247 ftruncate(blerg->data_fd, curpos);
251 } while (bytes < len);
252 blerg->index[seg_rec].flags = 0x0001;
253 blerg->index[seg_rec].offset = curpos;
254 blerg->index[seg_rec].length = len;
255 blerg->index[seg_rec].timestamp = time(NULL);
257 flock(blerg->data_fd, LOCK_UN);
258 flock(blerg->index_fd, LOCK_UN);
260 tag_scan(blerg->name, data, len, record);
261 subscription_notify(blerg->name, record);
266 int blerg_fetch(struct blerg *blerg, int record, char **data, int *length) {
268 fprintf(stderr, "Invalid record\n");
272 int segment = record / RECORDS_PER_SEGMENT;
273 if (segment != blerg->current_segment)
274 blerg_segment_switch(blerg, segment);
275 int seg_rec = record % RECORDS_PER_SEGMENT;
277 if ((blerg->index[seg_rec].flags & 0x1) == 0) {
278 fprintf(stderr, "Invalid record\n");
282 int rec_offset = blerg->index[seg_rec].offset;
283 int rec_length = blerg->index[seg_rec].length;
284 if (rec_offset >= blerg->data_size) {
285 /* We're accessing an out-of-bounds record in our mmap.
286 Recheck size and remap. */
288 fstat(blerg->data_fd, &st);
289 blerg->data_size = st.st_size;
290 if (rec_offset > blerg->data_size) {
291 fprintf(stderr, "Record offset outside of data!?");
295 munmap(blerg->data, blerg->data_size);
296 blerg->data = (char *) mmap(NULL, blerg->data_size, PROT_READ, MAP_SHARED, blerg->data_fd, 0);
297 if (blerg->data == MAP_FAILED) {
298 perror("Could not remap data");
303 *data = malloc(rec_length);
305 perror("Could not allocate string in fetch");
309 memcpy(*data, blerg->data + rec_offset, rec_length);
311 *length = rec_length;
316 time_t blerg_get_timestamp(struct blerg *blerg, int record) {
318 fprintf(stderr, "Invalid record\n");
322 int segment = record / RECORDS_PER_SEGMENT;
323 if (segment != blerg->current_segment)
324 blerg_segment_switch(blerg, segment);
325 int seg_rec = record % RECORDS_PER_SEGMENT;
327 if ((blerg->index[seg_rec].flags & 0x1) == 0) {
328 fprintf(stderr, "Invalid record\n");
332 return blerg->index[seg_rec].timestamp;
335 int blerg_set_subscription_mark(struct blerg *blerg) {
336 blerg->meta->subscription_mark = subscription_count_items(blerg->name);
339 uint64_t blerg_get_subscription_mark(struct blerg *blerg) {
340 return blerg->meta->subscription_mark;