/main/file.c
#include <stdio.h>
#include <stdbool.h>
#include "freertos/FreeRTOS.h"
#include "freertos/queue.h"
#include "freertos/semphr.h"
#include "freertos/stream_buffer.h"
#include "esp_log.h"
#include "file.h"
#define BUFSIZE 4096
#define TAG "file reader"
typedef enum {
FILE_READER_OPEN,
FILE_READER_SUBSCRIBE,
FILE_READER_START,
FILE_READER_STOP,
FILE_READER_SEEK,
FILE_READER_CLOSE,
} file_reader_command_t;
typedef struct {
long offset;
int whence;
} file_reader_seek_args_t;
typedef struct {
file_reader_command_t command;
union {
const char* open_filename;
StreamBufferHandle_t subscribe_sb;
file_reader_seek_args_t seek;
};
} file_reader_command_args_t;
QueueHandle_t file_reader_command_queue = NULL;
TaskHandle_t file_reader_task_handle = NULL;
StreamBufferHandle_t file_reader_streambuffer = NULL;
SemaphoreHandle_t seek_sync_sem = NULL;
void file_reader_task(void* pvParameters) {
BaseType_t err;
bool reading = false;
size_t bytes_read = 0, bytes_sent = 0, total_bytes = 0;
FILE* f = NULL;
int64_t start = 0;
uint8_t *buf = (uint8_t *) malloc(BUFSIZE);
if (buf == NULL) {
ESP_LOGE(TAG, "Could not allocate read buffer");
return;
}
ESP_LOGI(TAG, "Reader task started");
while (1) {
file_reader_command_args_t a;
err = xQueueReceive(file_reader_command_queue, &a, reading ? 0 : portMAX_DELAY);
if (err == pdTRUE) {
ESP_LOGI(TAG, "Received command %d", a.command);
switch (a.command) {
case FILE_READER_OPEN:
if (f) {
fclose(f);
}
f = fopen(a.open_filename, "r");
if (f == NULL) {
ESP_LOGE(TAG, "Could not open %s", a.open_filename);
}
break;
case FILE_READER_SUBSCRIBE:
file_reader_streambuffer = a.subscribe_sb;
break;
case FILE_READER_START:
start = esp_timer_get_time();
reading = true;
break;
case FILE_READER_STOP:
reading = false;
break;
case FILE_READER_SEEK:
if (f == NULL) {
ESP_LOGE(TAG, "Cannot seek without file");
break;
}
fseek(f, a.seek.offset, a.seek.whence);
bytes_sent = bytes_read = 0;
if (file_reader_streambuffer) {
err = xStreamBufferReset(file_reader_streambuffer);
if (err != pdPASS) {
ESP_LOGE(TAG, "Could not reset streambuffer during seek");
}
}
xSemaphoreGive(seek_sync_sem);
break;
case FILE_READER_CLOSE:
fclose(f);
f = NULL;
reading = false;
bytes_sent = bytes_read = 0;
break;
}
}
if (reading) {
//ESP_LOGI(TAG, "%d sent %d read", bytes_sent, bytes_read);
if (bytes_sent == bytes_read) {
if (f == NULL) {
ESP_LOGE(TAG, "FILE pointer null while reading");
reading = false;
} else if (feof(f)) {
ESP_LOGI(TAG, "end of file");
fclose(f);
break;
} else {
bytes_sent = 0;
bytes_read = fread(buf, 1, BUFSIZE, f);
total_bytes += bytes_read;
if (total_bytes >= 1048576) {
int64_t now = esp_timer_get_time();
ESP_LOGI(TAG, "%0.1f KB/sec", 1000.0 * (double)total_bytes / (double)(now - start));
start = now;
total_bytes = 0;
}
//ESP_LOGI(TAG, "read %d bytes", bytes_read);
}
}
bytes_sent += xStreamBufferSend(
file_reader_streambuffer,
buf + bytes_sent,
bytes_read - bytes_sent,
500 / portTICK_PERIOD_MS
);
}
}
vTaskDelete(NULL);
}
int file_init() {
BaseType_t err;
seek_sync_sem = xSemaphoreCreateBinary();
file_reader_command_queue = xQueueCreate(1, sizeof(file_reader_command_args_t));
err = xTaskCreate(file_reader_task, "file reader", 4096, NULL, 2, &file_reader_task_handle);
if (err != pdPASS) {
ESP_LOGE(TAG, "could not initialize file reader task: %d", err);
return 0;
}
return 1;
}
void file_reader_open(const char* filename) {
file_reader_command_args_t a;
a.command = FILE_READER_OPEN;
a.open_filename = filename;
xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}
void file_reader_subscribe(StreamBufferHandle_t bh) {
file_reader_command_args_t a;
a.command = FILE_READER_SUBSCRIBE;
a.subscribe_sb = bh;
xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}
void file_reader_start() {
file_reader_command_args_t a;
a.command = FILE_READER_START;
xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}
void file_reader_stop() {
file_reader_command_args_t a;
a.command = FILE_READER_STOP;
xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}
void file_reader_seek(long offset, int whence) {
file_reader_command_args_t a;
a.command = FILE_READER_SEEK;
a.seek.offset = offset;
a.seek.whence = whence;
xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
xSemaphoreTake(seek_sync_sem, portMAX_DELAY);
}
void file_reader_close() {
file_reader_command_args_t a;
a.command = FILE_READER_CLOSE;
xQueueSend(file_reader_command_queue, &a, portMAX_DELAY);
}