// A JSON storage strategy for Go Again // SQL would be a better choice, but... meh // // Note that we use mutexes instead of channels // because everything is both synchronous and // sequential. Meh. package jsondb import ( "crypto/rand" "crypto/subtle" "encoding/hex" "encoding/json" "fmt" "net/url" "os" "path/filepath" "sort" "strings" "sync" "time" "git.rootprojects.org/root/go-again" ) type JSONDB struct { dburl string path string json *dbjson mux sync.Mutex fmux sync.Mutex } type dbjson struct { Schedules []Schedule `json:"schedules"` } func Connect(dburl string) (*JSONDB, error) { u, err := url.Parse(dburl) if nil != err { return nil, err } // json:/abspath/to/db.json path := u.Opaque if "" == path { // json:///abspath/to/db.json path = u.Path if "" == path { // json:relpath/to/db.json // json://relpath/to/db.json path = strings.TrimSuffix(u.Host+"/"+u.Path, "/") } } f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0700) if nil != err { return nil, fmt.Errorf("Couldn't open %q: %s", path, err) } stat, err := f.Stat() if 0 == stat.Size() { _, err := f.Write([]byte(`{"schedules":[]}`)) f.Close() if nil != err { return nil, err } f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0700) if nil != err { return nil, err } } decoder := json.NewDecoder(f) db := &dbjson{} err = decoder.Decode(db) f.Close() if nil != err { return nil, fmt.Errorf("Couldn't parse %q as JSON: %s", path, err) } wd, _ := os.Getwd() fmt.Println("jsondb:", filepath.Join(wd, path)) return &JSONDB{ dburl: dburl, path: path, json: db, mux: sync.Mutex{}, fmux: sync.Mutex{}, }, nil } // A copy of again.Schedule, but with access_id json-able type Schedule struct { ID string `json:"id" db:"id"` AccessID string `json:"access_id" db:"access_id"` Date string `json:"date" db:"date"` Time string `json:"time" db:"time"` TZ string `json:"tz" db:"tz"` NextRunAt time.Time `json:"next_run_at" db:"next_run_at"` Disabled bool `json:"disabled" db:"disabled"` Webhooks []again.Webhook `json:"webhooks" db"webhooks"` } func (db *JSONDB) List(accessID string) ([]*again.Schedule, error) { nowish := time.Now().Add(time.Duration(30) * time.Second) schedules := []*again.Schedule{} for i := range db.json.Schedules { s := db.json.Schedules[i] if !s.Disabled && ctcmp(accessID, s.AccessID) && s.NextRunAt.Sub(nowish) > 0 { schedules = append(schedules, &again.Schedule{ ID: s.ID, AccessID: s.AccessID, Date: s.Date, Time: s.Time, TZ: s.TZ, NextRunAt: s.NextRunAt, Webhooks: s.Webhooks, }) } } return schedules, nil } func (db *JSONDB) Set(s again.Schedule) (*again.Schedule, error) { exists := false index := -1 if "" == s.ID { id, err := genID(16) if nil != err { return nil, err } s.ID = id } else { i, old := db.get(s.ID) index = i exists = nil != old // TODO constant time bail if !exists || !ctcmp(old.AccessID, s.AccessID) { return nil, fmt.Errorf("invalid id") } } schedule := Schedule{ ID: s.ID, AccessID: s.AccessID, Date: s.Date, Time: s.Time, TZ: s.TZ, NextRunAt: s.NextRunAt, Webhooks: s.Webhooks, } if exists { db.mux.Lock() db.json.Schedules[index] = schedule db.mux.Unlock() } else { db.mux.Lock() db.json.Schedules = append(db.json.Schedules, schedule) db.mux.Unlock() } err := db.save(s.AccessID) if nil != err { return nil, err } return &s, nil } func (db *JSONDB) Delete(accessID string, id string) (*again.Schedule, error) { _, old := db.get(id) exists := nil != old // TODO constant time bail if !exists || !ctcmp(old.AccessID, accessID) { return nil, fmt.Errorf("invalid id") } // Copy everything we keep into its own array newSchedules := []Schedule{} for i := range db.json.Schedules { schedule := db.json.Schedules[i] if old.ID != schedule.ID { newSchedules = append(newSchedules, schedule) } } db.mux.Lock() db.json.Schedules = newSchedules db.mux.Unlock() err := db.save(accessID) if nil != err { return nil, err } return &again.Schedule{ ID: old.ID, AccessID: old.AccessID, Date: old.Date, Time: old.Time, TZ: old.TZ, NextRunAt: old.NextRunAt, Webhooks: old.Webhooks, }, nil } func (db *JSONDB) Upcoming(min time.Time, max time.Time) ([]*again.Schedule, error) { schedules := []*again.Schedule{} for i := range db.json.Schedules { s := db.json.Schedules[i] if !s.Disabled && s.NextRunAt.Sub(min) > 0 && max.Sub(s.NextRunAt) > 0 { schedules = append(schedules, &again.Schedule{ ID: s.ID, AccessID: s.AccessID, Date: s.Date, Time: s.Time, TZ: s.TZ, NextRunAt: s.NextRunAt, Webhooks: s.Webhooks, }) } } sort.Sort(again.Schedules(schedules)) return []*again.Schedule(schedules), nil } func ctcmp(x string, y string) bool { return 1 == subtle.ConstantTimeCompare([]byte(x), []byte(y)) } func (db *JSONDB) get(id string) (int, *Schedule) { db.mux.Lock() scheds := db.json.Schedules db.mux.Unlock() for i := range scheds { schedule := scheds[i] if ctcmp(id, schedule.ID) { return i, &schedule } } return -1, nil } func genID(n int) (string, error) { b := make([]byte, n) _, err := rand.Read(b) if nil != err { return "", err } return hex.EncodeToString(b), nil } func (db *JSONDB) save(accessID string) error { // TODO per-user files, maybe // or probably better to spend that time building the postgres adapter rnd, err := genID(4) tmppath := db.path + "." + rnd + ".tmp" bakpath := db.path + ".bak" os.Remove(tmppath) // ignore error f, err := os.OpenFile(tmppath, os.O_RDWR|os.O_CREATE, 0700) if nil != err { return err } encoder := json.NewEncoder(f) err = encoder.Encode(db.json) f.Close() if nil != err { return err } // TODO could make async and debounce... // or spend that time on something useful db.fmux.Lock() defer db.fmux.Unlock() os.Remove(bakpath) // ignore error err = os.Rename(db.path, bakpath) if nil != err { return err } err = os.Rename(tmppath, db.path) if nil != err { return err } return nil }