WIP: Works!

This commit is contained in:
AJ ONeal 2019-06-23 19:02:31 -06:00
parent d87b197cc0
commit 7e60e39a11
6 changed files with 358 additions and 41 deletions

View File

@ -19,6 +19,18 @@ type Schedule struct {
Webhooks []Webhook `json:"webhooks" db"webhooks"` Webhooks []Webhook `json:"webhooks" db"webhooks"`
} }
type Schedules []*Schedule
func (s Schedules) Len() int {
return len(s)
}
func (s Schedules) Less(i, j int) bool {
return s[i].NextRunAt.Sub(s[j].NextRunAt) < 0
}
func (s Schedules) Swap(i, j int) {
s[j], s[i] = s[i], s[j]
}
// https://yourbasic.org/golang/time-change-convert-location-timezone/ // https://yourbasic.org/golang/time-change-convert-location-timezone/
// https://sebest.github.io/post/create-a-small-docker-image-for-a-golang-binary/ // https://sebest.github.io/post/create-a-small-docker-image-for-a-golang-binary/
// https://github.com/FKSE/docker-golang-base // https://github.com/FKSE/docker-golang-base
@ -48,7 +60,7 @@ func Run() {
[]int{2019, 11, 10, 23, 59, 59, 0}, []int{2019, 11, 10, 23, 59, 59, 0},
[]int{2019, 11, 31, 23, 0, 0, 0}, []int{2019, 11, 31, 23, 0, 0, 0},
} { } {
err := Exists(st, "America/Denver") _, err := Exists(st, "America/Denver")
if nil != err { if nil != err {
fmt.Println(err) fmt.Println(err)
} }
@ -117,58 +129,58 @@ func (err ErrNoExist) Error() string {
// fmt.Println(time.Date(2016, time.December, 31, 23, 59, 60, 0, time.UTC)) // fmt.Println(time.Date(2016, time.December, 31, 23, 59, 60, 0, time.UTC))
// "2020-12-02 02:00:00 +0000 UTC" // should be "2016-12-31 23:59:60 +0000 UTC" // "2020-12-02 02:00:00 +0000 UTC" // should be "2016-12-31 23:59:60 +0000 UTC"
// //
func Exists(st []int, tzstr string) error { func Exists(st []int, tzstr string) (*time.Time, error) {
tz, err := time.LoadLocation(tzstr) tz, err := time.LoadLocation(tzstr)
if nil != err { if nil != err {
return err return nil, err
} }
m := time.Month(st[1]) m := time.Month(st[1])
t1 := time.Date(st[0], m, st[2], st[3], st[4], st[5], st[6], tz) t1 := time.Date(st[0], m, st[2], st[3], st[4], st[5], 0, tz)
if st[5] != t1.Second() { if st[5] != t1.Second() {
return ErrNoExist{ return nil, ErrNoExist{
t: st, t: st,
z: tzstr, z: tzstr,
e: "invalid second, probably just bad math on your part", e: "invalid second, probably just bad math on your part",
} }
} }
if st[4] != t1.Minute() { if st[4] != t1.Minute() {
return ErrNoExist{ return nil, ErrNoExist{
t: st, t: st,
z: tzstr, z: tzstr,
e: "invalid minute, probably just bad math on your part, but perhaps a half-hour daylight savings or summer time", e: "invalid minute, probably just bad math on your part, but perhaps a half-hour daylight savings or summer time",
} }
} }
if st[3] != t1.Hour() { if st[3] != t1.Hour() {
return ErrNoExist{ return nil, ErrNoExist{
t: st, t: st,
z: tzstr, z: tzstr,
e: "invalid hour, possibly a Daylight Savings or Summer Time error, or perhaps bad math on your part", e: "invalid hour, possibly a Daylight Savings or Summer Time error, or perhaps bad math on your part",
} }
} }
if st[2] != t1.Day() { if st[2] != t1.Day() {
return ErrNoExist{ return nil, ErrNoExist{
t: st, t: st,
z: tzstr, z: tzstr,
e: "invalid day of month, most likely bad math on your part. Remember: 31 28¼ 31 30 31 30 31 31 30 31 30 31", e: "invalid day of month, most likely bad math on your part. Remember: 31 28¼ 31 30 31 30 31 31 30 31 30 31",
} }
} }
if st[1] != int(t1.Month()) { if st[1] != int(t1.Month()) {
return ErrNoExist{ return nil, ErrNoExist{
t: st, t: st,
z: tzstr, z: tzstr,
e: "invalid month, most likely bad math on your part. Remember: Decemberween isn't until next year", e: "invalid month, most likely bad math on your part. Remember: Decemberween isn't until next year",
} }
} }
if st[0] != t1.Year() { if st[0] != t1.Year() {
return ErrNoExist{ return nil, ErrNoExist{
t: st, t: st,
z: tzstr, z: tzstr,
e: "invalid year, must have reached the end of time...", e: "invalid year, must have reached the end of time...",
} }
} }
return nil return &t1, nil
} }
// Check if the time happens more than once in a given timezone. // Check if the time happens more than once in a given timezone.
@ -210,13 +222,13 @@ func IsAmbiguous(st []int, tzstr string) error {
} }
m := time.Month(st[1]) m := time.Month(st[1])
t1 := time.Date(st[0], m, st[2], st[3], st[4], st[5], st[6], tz) t1 := time.Date(st[0], m, st[2], st[3], st[4], st[5], 0, tz)
u1 := t1.UTC() u1 := t1.UTC()
// Australia/Lord_Howe has a 30-minute DST // Australia/Lord_Howe has a 30-minute DST
// 60-minute DST is common // 60-minute DST is common
// Antarctica/Troll has a 120-minute DST // Antarctica/Troll has a 120-minute DST
for _, n := range []int{30, 60, 120} { for _, n := range []int{30, 60, 120} {
t2 := time.Date(st[0], m, st[2], st[3], st[4]+n, st[5], st[6], tz) t2 := time.Date(st[0], m, st[2], st[3], st[4]+n, st[5], 0, tz)
u2 := t2.UTC() u2 := t2.UTC()
if u1.Equal(u2) { if u1.Equal(u2) {
return fmt.Errorf("Ambiguous: %s, %s, %+d\n", t1, t2, n) return fmt.Errorf("Ambiguous: %s, %s, %+d\n", t1, t2, n)

View File

@ -6,6 +6,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"log" "log"
"math/rand"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
@ -14,6 +15,7 @@ import (
again "git.rootprojects.org/root/go-again" again "git.rootprojects.org/root/go-again"
"git.rootprojects.org/root/go-again/data/jsondb" "git.rootprojects.org/root/go-again/data/jsondb"
webhooks "git.rootprojects.org/root/go-again/webhooks"
) )
func main() { func main() {
@ -82,6 +84,8 @@ func main() {
// TODO Filebox FS // TODO Filebox FS
mux.Handle("/", http.FileServer(http.Dir("./public"))) mux.Handle("/", http.FileServer(http.Dir("./public")))
go s.RunTasks()
fmt.Println("Listening on", server.Addr) fmt.Println("Listening on", server.Addr)
log.Fatal(server.ListenAndServe()) log.Fatal(server.ListenAndServe())
} }
@ -90,12 +94,68 @@ type ScheduleDB interface {
List(string) ([]*again.Schedule, error) List(string) ([]*again.Schedule, error)
Set(again.Schedule) (*again.Schedule, error) Set(again.Schedule) (*again.Schedule, error)
Delete(accessID string, id string) (*again.Schedule, error) Delete(accessID string, id string) (*again.Schedule, error)
Upcoming(min time.Time, max time.Time) ([]*again.Schedule, error)
} }
type scheduler struct { type scheduler struct {
DB ScheduleDB DB ScheduleDB
} }
func (s *scheduler) RunTasks() {
log.Println("[info] Task Queue Started")
// Tick every 4 minutes,
// but run tasks for up to 5 minutes before getting more.
ticker := time.NewTicker(4 * time.Minute)
// TODO some way to add things to the live queue
// (maybe a select between the ticker and an incoming channel)
// 'min' should be >= 'last' at least one second
last := time.Now()
for {
min := time.Now()
if last.Unix() > min.Unix() {
min = last
}
max := min.Add(5 * time.Minute)
scheds, err := s.DB.Upcoming(min, max)
if nil != err {
// this seems pretty unrecoverable
// TODO check DB, reconnect
os.Exit(911)
return
}
log.Printf("[info] Got %d upcoming tasks", len(scheds))
log.Println(time.Now())
log.Println(min)
log.Println(max)
log.Println()
log.Println(time.Now().UTC())
log.Println(min.UTC())
log.Println(max.UTC())
for i := range scheds {
sched := scheds[i]
fmt.Println("it's in the queue")
sleep := sched.NextRunAt.Sub(time.Now())
// TODO create ticker to select on instead
time.Sleep(sleep)
fmt.Println("it's happening")
for i := range sched.Webhooks {
h := sched.Webhooks[i]
h.TZ = sched.TZ
webhooks.Run(webhooks.Webhook(h))
}
// we only deal in second resulotion
last = sched.NextRunAt.Add(1 * time.Second)
}
<-ticker.C
}
}
func (s *scheduler) Handle(w http.ResponseWriter, r *http.Request) { func (s *scheduler) Handle(w http.ResponseWriter, r *http.Request) {
// note: no go-routines reading body in handlers to follow // note: no go-routines reading body in handlers to follow
defer r.Body.Close() defer r.Body.Close()
@ -164,6 +224,79 @@ func (s *scheduler) Create(w http.ResponseWriter, r *http.Request) {
fmt.Printf("New Schedule:\n%#v\n", sched) fmt.Printf("New Schedule:\n%#v\n", sched)
// TODO validate and modify // TODO validate and modify
dateParts := strings.Split(sched.Date, "-")
if 3 != len(dateParts) {
http.Error(w, "Invalid date", http.StatusBadRequest)
return
}
timeParts := strings.Split(sched.Time, ":")
if 2 == len(timeParts) {
timeParts = append(timeParts, "00")
}
if 3 != len(timeParts) {
http.Error(w, "Invalid time", http.StatusBadRequest)
return
}
// sub-minute resolution not supported yet
timeParts[2] = "00"
dtParts := []int{0, 0, 0, 0, 0, 0}
for i := range dateParts {
n, err := strconv.Atoi(dateParts[i])
if nil != err {
http.Error(w, fmt.Sprintf("Invalid date part '%s'", n), http.StatusBadRequest)
return
}
dtParts[i] = n
}
for i := range timeParts {
n, err := strconv.Atoi(timeParts[i])
if nil != err {
http.Error(w, fmt.Sprintf("Invalid time part '%s'", n), http.StatusBadRequest)
return
}
dtParts[i+3] = n
}
_, err = again.Exists(dtParts, time.UTC.String())
if nil != err {
http.Error(w, fmt.Sprintf("Invalid datetime: %s", err), http.StatusBadRequest)
return
}
// TODO warn on non-existant / ambiguous timing
loc, err := time.LoadLocation(sched.TZ)
if nil != err {
http.Error(w, fmt.Sprintf("Invalid timezone: %s", err), http.StatusBadRequest)
return
}
t := time.Date(dtParts[0], time.Month(dtParts[1]), dtParts[2], dtParts[3], dtParts[4], dtParts[5], 0, loc).UTC()
now := time.Now().UTC()
// 4.5 minutes (about 5 minutes)
ahead := t.Sub(now)
if ahead < 270 {
http.Error(w,
fmt.Sprintf("Invalid datetime: should be 5+ minutes into the future, not just %d seconds", ahead),
http.StatusBadRequest)
return
}
fmt.Println("Time in UTC:", t)
// stagger
t = t.Add(time.Duration(rand.Intn(300*1000)-150*1000) * time.Millisecond)
// Avoid the Leap Second
if 23 == t.Hour() && 59 == t.Minute() && 59 == t.Second() {
j := rand.Intn(1) - 2
n := rand.Intn(3)
// +/- 3 seconds
t = t.Add(time.Duration(j*n) * time.Second)
}
fmt.Println("Staggered Time:", t)
// TODO add to immediate queue if soon enough
sched.NextRunAt = t
sched.AccessID = accessID sched.AccessID = accessID
sched2, err := s.DB.Set(*sched) sched2, err := s.DB.Set(*sched)
if nil != err { if nil != err {

View File

@ -15,6 +15,7 @@ import (
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -103,10 +104,11 @@ type Schedule struct {
} }
func (db *JSONDB) List(accessID string) ([]*again.Schedule, error) { func (db *JSONDB) List(accessID string) ([]*again.Schedule, error) {
nowish := time.Now().Add(time.Duration(30) * time.Second)
schedules := []*again.Schedule{} schedules := []*again.Schedule{}
for i := range db.json.Schedules { for i := range db.json.Schedules {
s := db.json.Schedules[i] s := db.json.Schedules[i]
if !s.Disabled && ctcmp(accessID, s.AccessID) { if !s.Disabled && ctcmp(accessID, s.AccessID) && s.NextRunAt.Sub(nowish) > 0 {
schedules = append(schedules, &again.Schedule{ schedules = append(schedules, &again.Schedule{
ID: s.ID, ID: s.ID,
AccessID: s.AccessID, AccessID: s.AccessID,
@ -204,6 +206,26 @@ func (db *JSONDB) Delete(accessID string, id string) (*again.Schedule, error) {
}, nil }, nil
} }
func (db *JSONDB) Upcoming(min time.Time, max time.Time) ([]*again.Schedule, error) {
schedules := []*again.Schedule{}
for i := range db.json.Schedules {
s := db.json.Schedules[i]
if !s.Disabled && s.NextRunAt.Sub(min) > 0 && max.Sub(s.NextRunAt) > 0 {
schedules = append(schedules, &again.Schedule{
ID: s.ID,
AccessID: s.AccessID,
Date: s.Date,
Time: s.Time,
TZ: s.TZ,
NextRunAt: s.NextRunAt,
Webhooks: s.Webhooks,
})
}
}
sort.Sort(again.Schedules(schedules))
return []*again.Schedule(schedules), nil
}
func ctcmp(x string, y string) bool { func ctcmp(x string, y string) bool {
return 1 == subtle.ConstantTimeCompare([]byte(x), []byte(y)) return 1 == subtle.ConstantTimeCompare([]byte(x), []byte(y))
} }

View File

@ -7,9 +7,7 @@
var state = { account: { schedules: [] } }; var state = { account: { schedules: [] } };
var $grantTpl; var $schedTpl;
var $devTpl;
var $updateTpl;
var $headerTpl; var $headerTpl;
var $webhookTpl; var $webhookTpl;
var $webhookHeaderTpl; var $webhookHeaderTpl;
@ -31,7 +29,7 @@
$('.js-schedule .js-webhooks').innerHTML = ''; $('.js-schedule .js-webhooks').innerHTML = '';
// after blanking all inner templates // after blanking all inner templates
$devTpl = $('.js-schedule').outerHTML; $schedTpl = $('.js-schedule').outerHTML;
var $form = $('.js-new-schedule'); var $form = $('.js-new-schedule');
// Pick a date and time on an even number // Pick a date and time on an even number
@ -70,11 +68,10 @@
$('body').addEventListener('submit', function(ev) { $('body').addEventListener('submit', function(ev) {
if (ev.target.matches('.js-new-schedule')) { if (ev.target.matches('.js-new-schedule')) {
console.log('new schedule');
newSchedule(ev.target); newSchedule(ev.target);
} else if (ev.target.matches('.js-schedules-list')) { } else if (ev.target.matches('.js-schedules-list')) {
doLogin(); doLogin();
} else if (ev.target.matches('.js-schedules-new')) {
scheduleTask();
} else { } else {
return; return;
} }
@ -108,7 +105,23 @@
hook.headers[key] = val; hook.headers[key] = val;
} }
}); });
hook.body = $('.js-body-template', $hook).value;
var auth = {
user: $('.js-http-user', $hook).value || '',
pass: $('.js-http-pass', $hook).value || ''
};
if (auth.user || auth.pass) {
hook.auth = auth;
}
var body = $('.js-body-template', $hook).value;
if ('json' === $('.js-body-type:checked', $hook).value) {
hook.json = (body && JSON.parse(body)) || undefined;
} else {
// TODO try query parse
hook.form = (body && JSON.parse(body)) || undefined;
// TODO raw string as well
}
// TODO update on template change and show preview // TODO update on template change and show preview
var opts = { var opts = {
@ -202,7 +215,7 @@
$devs.innerHTML = ''; $devs.innerHTML = '';
data.schedules.forEach(function(d) { data.schedules.forEach(function(d) {
console.log('schedule', d); console.log('schedule', d);
var $dev = $.create($devTpl); var $dev = $.create($schedTpl);
$('.js-id', $dev).value = d.id; $('.js-id', $dev).value = d.id;
$('.js-date', $dev).value = d.date; $('.js-date', $dev).value = d.date;
$('.js-time', $dev).value = d.time; $('.js-time', $dev).value = d.time;
@ -330,25 +343,6 @@
document.querySelector('.js-schedules-output').innerText = JSON.stringify(schedules, null, 2); document.querySelector('.js-schedules-output').innerText = JSON.stringify(schedules, null, 2);
} }
function scheduleTask() {
return window
.fetch('/api/v0/schedules/new', {
method: 'POST',
headers: {
Authorization: getToken(),
'Content-Type': 'application/json'
},
body: JSON.stringify(task)
})
.then(function(resp) {
return resp.json().then(function(schedule) {
console.log('New Schedule:', schedule);
allSchedules.push(schedule);
renderSchedules(allSchedules);
});
});
}
console.log('whatever'); console.log('whatever');
$('.js-auth-token').value = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'; $('.js-auth-token').value = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx';
//window.addEventListener('load', run); //window.addEventListener('load', run);

View File

@ -87,6 +87,10 @@
<option value="PUT">PUT</option> <option value="PUT">PUT</option>
</select> </select>
<input placeholder="https://example.com/api/v1/updates" class="js-url" type="url" required /> <input placeholder="https://example.com/api/v1/updates" class="js-url" type="url" required />
<br />
HTTP Basic Auth (optional):
<input placeholder="username" class="js-http-user" type="text" />
<input placeholder="password" class="js-http-pass" type="text" />
<div class="js-headers"> <div class="js-headers">
<div class="js-header"> <div class="js-header">
<input placeholder="Header" class="js-key" type="text" /> <input placeholder="Header" class="js-key" type="text" />
@ -96,6 +100,10 @@
</div> </div>
</div> </div>
<div class="js-body"> <div class="js-body">
Request Body Type:
<label><input name="-body-type" class="js-body-type" type="radio" value="json" checked /> JSON</label>
<label><input name="-body-type" class="js-body-type" type="radio" value="form" /> Form</label>
<br />
<textarea <textarea
placeholder="Body template, use '{{ keyname }}' for template values." placeholder="Body template, use '{{ keyname }}' for template values."
class="js-body-template" class="js-body-template"

View File

@ -1,13 +1,161 @@
package webooks package webooks
import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"net/url"
"strings"
"time"
)
var logger chan string
func init() {
logger = make(chan string, 10)
go func() {
for {
msg := <-logger
log.Println(msg)
}
}()
}
type Webhook struct { type Webhook struct {
ID string `json:"id,omitempty"` ID string `json:"id,omitempty"`
Comment string `json:"comment"` Comment string `json:"comment"`
Method string `json:"method"` Method string `json:"method"`
URL string `json:"url"` URL string `json:"url"`
TZ string `json:"-"`
Auth map[string]string `json:"auth,omitempty"` Auth map[string]string `json:"auth,omitempty"`
Headers map[string]string `json:"headers,omitempty"` Headers map[string]string `json:"headers,omitempty"`
Form map[string]string `json:"form,omitempty"` Form map[string]string `json:"form,omitempty"`
JSON map[string]string `json:"json,omitempty"` JSON map[string]string `json:"json,omitempty"`
Config map[string]string `json:"config,omitempty"` Config map[string]string `json:"config,omitempty"`
} }
func Log(str string, args ...interface{}) {
logger <- fmt.Sprintf(str, args...)
}
func Run(h Webhook) {
// TODO do this in main on config init
if "" == h.Method {
h.Method = "POST"
}
var body *strings.Reader
var err error
// TODO real templates
loc, err := time.LoadLocation(h.TZ)
if nil != err {
Log("Bad timezone", h.TZ)
loc, _ = time.LoadLocation("UTC")
}
t := time.Now().In(loc)
z, _ := t.Zone()
if 0 != len(h.Form) {
form := url.Values{}
for k := range h.Form {
v := h.Form[k]
// because `{{` gets urlencoded
//v = strings.Replace(v, "{{ .Name }}", d.Name, -1)
v = strings.Replace(v, "{{ .Datetime }}", t.Format("2006-01-02 3:04:05 MST"), -1)
v = strings.Replace(v, "{{ .Date }}", t.Format("2006-01-02"), -1)
v = strings.Replace(v, "{{ .Time }}", t.Format(time.Kitchen), -1)
v = strings.Replace(v, "{{ .Zone }}", z, -1)
Log("[HEADER] %s: %s", k, v)
form.Set(k, v)
}
body = strings.NewReader(form.Encode())
} else if 0 != len(h.JSON) {
bodyBuf, err := json.Marshal(h.JSON)
if nil != err {
Log("[Notify] JSON Marshal Error for '%s': %s", h.Comment, err)
return
}
// `{{` is left alone in the body
bodyStr := string(bodyBuf)
bodyStr = strings.Replace(bodyStr, "{{ .Datetime }}", t.Format("2006-01-02 3:04:05 MST"), -1)
bodyStr = strings.Replace(bodyStr, "{{ .Date }}", t.Format("2006-01-02"), -1)
bodyStr = strings.Replace(bodyStr, "{{ .Time }}", t.Format("3:04:05PM"), -1)
bodyStr = strings.Replace(bodyStr, "{{ .Zone }}", z, -1)
body = strings.NewReader(bodyStr)
//body = strings.NewReader(string(bodyBuf))
}
if nil == body {
body = strings.NewReader("")
}
client := NewHTTPClient()
fmt.Println("bd?", h.Method, h.URL, body)
req, err := http.NewRequest(h.Method, h.URL, body)
if nil != err {
Log("[Notify] HTTP Client Network Error for '%s': %s", h.Comment, err)
return
}
if 0 != len(h.Form) {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
} else if 0 != len(h.JSON) {
req.Header.Set("Content-Type", "application/json")
}
if 0 != len(h.Auth) {
user := h.Auth["user"]
if "" == user {
user = h.Auth["username"]
}
pass := h.Auth["pass"]
if "" == user {
pass = h.Auth["password"]
}
req.SetBasicAuth(user, pass)
}
req.Header.Set("User-Agent", "Watchdog/1.0")
for k := range h.Headers {
req.Header.Set(k, h.Headers[k])
}
resp, err := client.Do(req)
if nil != err {
Log("[Notify] HTTP Client Error for '%s': %s", h.Comment, err)
return
}
if !(resp.StatusCode >= 200 && resp.StatusCode < 300) {
Log("[Notify] Response Error for '%s': %s", h.Comment, resp.Status)
return
}
// TODO json vs xml vs txt
var data map[string]interface{}
req.Header.Add("Accept", "application/json")
decoder := json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
if err != nil {
Log("[Notify] Response Body Error for '%s': %s", h.Comment, resp.Status)
return
}
// TODO some sort of way to determine if data is successful (keywords)
Log("[Notify] Success? %#v", data)
}
// The default http client uses unsafe defaults
func NewHTTPClient() *http.Client {
transport := &http.Transport{
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
}
client := &http.Client{
Timeout: time.Second * 5,
Transport: transport,
}
return client
}