diff --git a/again.go b/again.go index b5bf757..11a03e3 100644 --- a/again.go +++ b/again.go @@ -19,6 +19,18 @@ type Schedule struct { Webhooks []Webhook `json:"webhooks" db"webhooks"` } +type Schedules []*Schedule + +func (s Schedules) Len() int { + return len(s) +} +func (s Schedules) Less(i, j int) bool { + return s[i].NextRunAt.Sub(s[j].NextRunAt) < 0 +} +func (s Schedules) Swap(i, j int) { + s[j], s[i] = s[i], s[j] +} + // https://yourbasic.org/golang/time-change-convert-location-timezone/ // https://sebest.github.io/post/create-a-small-docker-image-for-a-golang-binary/ // https://github.com/FKSE/docker-golang-base @@ -48,7 +60,7 @@ func Run() { []int{2019, 11, 10, 23, 59, 59, 0}, []int{2019, 11, 31, 23, 0, 0, 0}, } { - err := Exists(st, "America/Denver") + _, err := Exists(st, "America/Denver") if nil != err { fmt.Println(err) } @@ -117,58 +129,58 @@ func (err ErrNoExist) Error() string { // fmt.Println(time.Date(2016, time.December, 31, 23, 59, 60, 0, time.UTC)) // "2020-12-02 02:00:00 +0000 UTC" // should be "2016-12-31 23:59:60 +0000 UTC" // -func Exists(st []int, tzstr string) error { +func Exists(st []int, tzstr string) (*time.Time, error) { tz, err := time.LoadLocation(tzstr) if nil != err { - return err + return nil, err } m := time.Month(st[1]) - t1 := time.Date(st[0], m, st[2], st[3], st[4], st[5], st[6], tz) + t1 := time.Date(st[0], m, st[2], st[3], st[4], st[5], 0, tz) if st[5] != t1.Second() { - return ErrNoExist{ + return nil, ErrNoExist{ t: st, z: tzstr, e: "invalid second, probably just bad math on your part", } } if st[4] != t1.Minute() { - return ErrNoExist{ + return nil, ErrNoExist{ t: st, z: tzstr, e: "invalid minute, probably just bad math on your part, but perhaps a half-hour daylight savings or summer time", } } if st[3] != t1.Hour() { - return ErrNoExist{ + return nil, ErrNoExist{ t: st, z: tzstr, e: "invalid hour, possibly a Daylight Savings or Summer Time error, or perhaps bad math on your part", } } if st[2] != t1.Day() { - return ErrNoExist{ + return nil, ErrNoExist{ t: st, z: tzstr, e: "invalid day of month, most likely bad math on your part. Remember: 31 28ΒΌ 31 30 31 30 31 31 30 31 30 31", } } if st[1] != int(t1.Month()) { - return ErrNoExist{ + return nil, ErrNoExist{ t: st, z: tzstr, e: "invalid month, most likely bad math on your part. Remember: Decemberween isn't until next year", } } if st[0] != t1.Year() { - return ErrNoExist{ + return nil, ErrNoExist{ t: st, z: tzstr, e: "invalid year, must have reached the end of time...", } } - return nil + return &t1, nil } // Check if the time happens more than once in a given timezone. @@ -210,13 +222,13 @@ func IsAmbiguous(st []int, tzstr string) error { } m := time.Month(st[1]) - t1 := time.Date(st[0], m, st[2], st[3], st[4], st[5], st[6], tz) + t1 := time.Date(st[0], m, st[2], st[3], st[4], st[5], 0, tz) u1 := t1.UTC() // Australia/Lord_Howe has a 30-minute DST // 60-minute DST is common // Antarctica/Troll has a 120-minute DST for _, n := range []int{30, 60, 120} { - t2 := time.Date(st[0], m, st[2], st[3], st[4]+n, st[5], st[6], tz) + t2 := time.Date(st[0], m, st[2], st[3], st[4]+n, st[5], 0, tz) u2 := t2.UTC() if u1.Equal(u2) { return fmt.Errorf("Ambiguous: %s, %s, %+d\n", t1, t2, n) diff --git a/cmd/again/again.go b/cmd/again/again.go index cc7329d..eac82d4 100644 --- a/cmd/again/again.go +++ b/cmd/again/again.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "log" + "math/rand" "net/http" "os" "strconv" @@ -14,6 +15,7 @@ import ( again "git.rootprojects.org/root/go-again" "git.rootprojects.org/root/go-again/data/jsondb" + webhooks "git.rootprojects.org/root/go-again/webhooks" ) func main() { @@ -82,6 +84,8 @@ func main() { // TODO Filebox FS mux.Handle("/", http.FileServer(http.Dir("./public"))) + go s.RunTasks() + fmt.Println("Listening on", server.Addr) log.Fatal(server.ListenAndServe()) } @@ -90,12 +94,68 @@ type ScheduleDB interface { List(string) ([]*again.Schedule, error) Set(again.Schedule) (*again.Schedule, error) Delete(accessID string, id string) (*again.Schedule, error) + Upcoming(min time.Time, max time.Time) ([]*again.Schedule, error) } type scheduler struct { DB ScheduleDB } +func (s *scheduler) RunTasks() { + log.Println("[info] Task Queue Started") + // Tick every 4 minutes, + // but run tasks for up to 5 minutes before getting more. + ticker := time.NewTicker(4 * time.Minute) + + // TODO some way to add things to the live queue + // (maybe a select between the ticker and an incoming channel) + + // 'min' should be >= 'last' at least one second + last := time.Now() + for { + min := time.Now() + if last.Unix() > min.Unix() { + min = last + } + max := min.Add(5 * time.Minute) + scheds, err := s.DB.Upcoming(min, max) + if nil != err { + // this seems pretty unrecoverable + // TODO check DB, reconnect + os.Exit(911) + return + } + log.Printf("[info] Got %d upcoming tasks", len(scheds)) + log.Println(time.Now()) + log.Println(min) + log.Println(max) + log.Println() + log.Println(time.Now().UTC()) + log.Println(min.UTC()) + log.Println(max.UTC()) + + for i := range scheds { + sched := scheds[i] + fmt.Println("it's in the queue") + sleep := sched.NextRunAt.Sub(time.Now()) + // TODO create ticker to select on instead + time.Sleep(sleep) + + fmt.Println("it's happening") + for i := range sched.Webhooks { + h := sched.Webhooks[i] + h.TZ = sched.TZ + webhooks.Run(webhooks.Webhook(h)) + } + + // we only deal in second resulotion + last = sched.NextRunAt.Add(1 * time.Second) + } + + <-ticker.C + } +} + func (s *scheduler) Handle(w http.ResponseWriter, r *http.Request) { // note: no go-routines reading body in handlers to follow defer r.Body.Close() @@ -164,6 +224,79 @@ func (s *scheduler) Create(w http.ResponseWriter, r *http.Request) { fmt.Printf("New Schedule:\n%#v\n", sched) // TODO validate and modify + dateParts := strings.Split(sched.Date, "-") + if 3 != len(dateParts) { + http.Error(w, "Invalid date", http.StatusBadRequest) + return + } + + timeParts := strings.Split(sched.Time, ":") + if 2 == len(timeParts) { + timeParts = append(timeParts, "00") + } + if 3 != len(timeParts) { + http.Error(w, "Invalid time", http.StatusBadRequest) + return + } + // sub-minute resolution not supported yet + timeParts[2] = "00" + + dtParts := []int{0, 0, 0, 0, 0, 0} + for i := range dateParts { + n, err := strconv.Atoi(dateParts[i]) + if nil != err { + http.Error(w, fmt.Sprintf("Invalid date part '%s'", n), http.StatusBadRequest) + return + } + dtParts[i] = n + } + for i := range timeParts { + n, err := strconv.Atoi(timeParts[i]) + if nil != err { + http.Error(w, fmt.Sprintf("Invalid time part '%s'", n), http.StatusBadRequest) + return + } + dtParts[i+3] = n + } + + _, err = again.Exists(dtParts, time.UTC.String()) + if nil != err { + http.Error(w, fmt.Sprintf("Invalid datetime: %s", err), http.StatusBadRequest) + return + } + + // TODO warn on non-existant / ambiguous timing + loc, err := time.LoadLocation(sched.TZ) + if nil != err { + http.Error(w, fmt.Sprintf("Invalid timezone: %s", err), http.StatusBadRequest) + return + } + + t := time.Date(dtParts[0], time.Month(dtParts[1]), dtParts[2], dtParts[3], dtParts[4], dtParts[5], 0, loc).UTC() + now := time.Now().UTC() + // 4.5 minutes (about 5 minutes) + ahead := t.Sub(now) + if ahead < 270 { + http.Error(w, + fmt.Sprintf("Invalid datetime: should be 5+ minutes into the future, not just %d seconds", ahead), + http.StatusBadRequest) + return + } + + fmt.Println("Time in UTC:", t) + // stagger + t = t.Add(time.Duration(rand.Intn(300*1000)-150*1000) * time.Millisecond) + // Avoid the Leap Second + if 23 == t.Hour() && 59 == t.Minute() && 59 == t.Second() { + j := rand.Intn(1) - 2 + n := rand.Intn(3) + // +/- 3 seconds + t = t.Add(time.Duration(j*n) * time.Second) + } + fmt.Println("Staggered Time:", t) + + // TODO add to immediate queue if soon enough + sched.NextRunAt = t sched.AccessID = accessID sched2, err := s.DB.Set(*sched) if nil != err { diff --git a/data/jsondb/jsondb.go b/data/jsondb/jsondb.go index b84877a..0542fc0 100644 --- a/data/jsondb/jsondb.go +++ b/data/jsondb/jsondb.go @@ -15,6 +15,7 @@ import ( "net/url" "os" "path/filepath" + "sort" "strings" "sync" "time" @@ -103,10 +104,11 @@ type Schedule struct { } func (db *JSONDB) List(accessID string) ([]*again.Schedule, error) { + nowish := time.Now().Add(time.Duration(30) * time.Second) schedules := []*again.Schedule{} for i := range db.json.Schedules { s := db.json.Schedules[i] - if !s.Disabled && ctcmp(accessID, s.AccessID) { + if !s.Disabled && ctcmp(accessID, s.AccessID) && s.NextRunAt.Sub(nowish) > 0 { schedules = append(schedules, &again.Schedule{ ID: s.ID, AccessID: s.AccessID, @@ -204,6 +206,26 @@ func (db *JSONDB) Delete(accessID string, id string) (*again.Schedule, error) { }, nil } +func (db *JSONDB) Upcoming(min time.Time, max time.Time) ([]*again.Schedule, error) { + schedules := []*again.Schedule{} + for i := range db.json.Schedules { + s := db.json.Schedules[i] + if !s.Disabled && s.NextRunAt.Sub(min) > 0 && max.Sub(s.NextRunAt) > 0 { + schedules = append(schedules, &again.Schedule{ + ID: s.ID, + AccessID: s.AccessID, + Date: s.Date, + Time: s.Time, + TZ: s.TZ, + NextRunAt: s.NextRunAt, + Webhooks: s.Webhooks, + }) + } + } + sort.Sort(again.Schedules(schedules)) + return []*again.Schedule(schedules), nil +} + func ctcmp(x string, y string) bool { return 1 == subtle.ConstantTimeCompare([]byte(x), []byte(y)) } diff --git a/public/app.js b/public/app.js index 95be2d2..8ee989f 100644 --- a/public/app.js +++ b/public/app.js @@ -7,9 +7,7 @@ var state = { account: { schedules: [] } }; - var $grantTpl; - var $devTpl; - var $updateTpl; + var $schedTpl; var $headerTpl; var $webhookTpl; var $webhookHeaderTpl; @@ -31,7 +29,7 @@ $('.js-schedule .js-webhooks').innerHTML = ''; // after blanking all inner templates - $devTpl = $('.js-schedule').outerHTML; + $schedTpl = $('.js-schedule').outerHTML; var $form = $('.js-new-schedule'); // Pick a date and time on an even number @@ -70,11 +68,10 @@ $('body').addEventListener('submit', function(ev) { if (ev.target.matches('.js-new-schedule')) { + console.log('new schedule'); newSchedule(ev.target); } else if (ev.target.matches('.js-schedules-list')) { doLogin(); - } else if (ev.target.matches('.js-schedules-new')) { - scheduleTask(); } else { return; } @@ -108,7 +105,23 @@ hook.headers[key] = val; } }); - hook.body = $('.js-body-template', $hook).value; + + var auth = { + user: $('.js-http-user', $hook).value || '', + pass: $('.js-http-pass', $hook).value || '' + }; + if (auth.user || auth.pass) { + hook.auth = auth; + } + + var body = $('.js-body-template', $hook).value; + if ('json' === $('.js-body-type:checked', $hook).value) { + hook.json = (body && JSON.parse(body)) || undefined; + } else { + // TODO try query parse + hook.form = (body && JSON.parse(body)) || undefined; + // TODO raw string as well + } // TODO update on template change and show preview var opts = { @@ -202,7 +215,7 @@ $devs.innerHTML = ''; data.schedules.forEach(function(d) { console.log('schedule', d); - var $dev = $.create($devTpl); + var $dev = $.create($schedTpl); $('.js-id', $dev).value = d.id; $('.js-date', $dev).value = d.date; $('.js-time', $dev).value = d.time; @@ -330,25 +343,6 @@ document.querySelector('.js-schedules-output').innerText = JSON.stringify(schedules, null, 2); } - function scheduleTask() { - return window - .fetch('/api/v0/schedules/new', { - method: 'POST', - headers: { - Authorization: getToken(), - 'Content-Type': 'application/json' - }, - body: JSON.stringify(task) - }) - .then(function(resp) { - return resp.json().then(function(schedule) { - console.log('New Schedule:', schedule); - allSchedules.push(schedule); - renderSchedules(allSchedules); - }); - }); - } - console.log('whatever'); $('.js-auth-token').value = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'; //window.addEventListener('load', run); diff --git a/public/index.html b/public/index.html index 74953b6..1d519b3 100644 --- a/public/index.html +++ b/public/index.html @@ -87,6 +87,10 @@ +
+ HTTP Basic Auth (optional): + +
@@ -96,6 +100,10 @@
+ Request Body Type: + + +