package main import ( "context" "encoding/json" "flag" "fmt" "log" "math/rand" "net/http" "os" "strconv" "strings" "time" again "git.rootprojects.org/root/go-again" "git.rootprojects.org/root/go-again/data/jsondb" webhooks "git.rootprojects.org/root/go-again/webhooks" ) func main() { portEnv := os.Getenv("PORT") dbEnv := os.Getenv("DATABASE_URL") portInt := flag.Int("port", 0, "port on which to serve http") addr := flag.String("addr", "", "address on which to serve http") dburl := flag.String("database-url", "", "For example: json://relative-path/db.json or json:///absolute-path/db.json") flag.Parse() if "" != portEnv { if 0 != *portInt { log.Fatal("You may set PORT or --port, but not both.") return } n, err := strconv.Atoi(portEnv) if nil != err { log.Fatalf("Could not parse PORT=%q.", n) return } *portInt = n } if *portInt < 1024 || *portInt > 65535 { log.Fatalf("`port` should be between 1024 and 65535, not %d.", *portInt) return } portEnv = strconv.Itoa(*portInt) if "" != dbEnv { if "" != *dburl { log.Fatal("You may set DATABASE_URL or --database-url, but not both.") return } *dburl = dbEnv // TODO parse string? // TODO have each connector try in sequence by registering with build tags like go-migrate does? } if "" == *dburl { log.Fatalf("`database-url` must be specified." + " Something like --database-url='json:///var/go-again/db.json' should do nicely.") return } db, err := jsondb.Connect(*dburl) if nil != err { log.Fatalf("Could not connect to database %q: %s", *dburl, err) return } s := &scheduler{ DB: db, } mux := http.NewServeMux() server := &http.Server{ Addr: fmt.Sprintf("%s:%s", *addr, portEnv), Handler: mux, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } //mux.Handle("/api/", http.HandlerFunc(handleFunc)) mux.HandleFunc("/api/v0/schedules", s.Handle) mux.HandleFunc("/api/v0/schedules/", s.Handle) // TODO Filebox FS mux.Handle("/", http.FileServer(http.Dir("./public"))) go s.RunTasks() fmt.Println("Listening on", server.Addr) log.Fatal(server.ListenAndServe()) } type ScheduleDB interface { List(string) ([]*again.Schedule, error) Set(again.Schedule) (*again.Schedule, error) Delete(accessID string, id string) (*again.Schedule, error) Upcoming(min time.Time, max time.Time) ([]*again.Schedule, error) } type scheduler struct { DB ScheduleDB } func (s *scheduler) RunTasks() { log.Println("[info] Task Queue Started") // Tick every 4 minutes, // but run tasks for up to 5 minutes before getting more. ticker := time.NewTicker(4 * time.Minute) // TODO some way to add things to the live queue // (maybe a select between the ticker and an incoming channel) // 'min' should be >= 'last' at least one second last := time.Now() for { min := time.Now() if last.Unix() > min.Unix() { min = last } max := min.Add(5 * time.Minute) scheds, err := s.DB.Upcoming(min, max) if nil != err { // this seems pretty unrecoverable // TODO check DB, reconnect os.Exit(911) return } log.Printf("[info] Got %d upcoming tasks", len(scheds)) log.Println(time.Now()) log.Println(min) log.Println(max) log.Println() log.Println(time.Now().UTC()) log.Println(min.UTC()) log.Println(max.UTC()) for i := range scheds { sched := scheds[i] fmt.Println("it's in the queue") sleep := sched.NextRunAt.Sub(time.Now()) // TODO create ticker to select on instead time.Sleep(sleep) fmt.Println("it's happening") for i := range sched.Webhooks { h := sched.Webhooks[i] h.TZ = sched.TZ webhooks.Run(webhooks.Webhook(h)) } // we only deal in second resulotion last = sched.NextRunAt.Add(1 * time.Second) } <-ticker.C } } func (s *scheduler) Handle(w http.ResponseWriter, r *http.Request) { // note: no go-routines reading body in handlers to follow defer r.Body.Close() token := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ") if 32 != len(token) { http.Error(w, "Authorization Header did not contain a valid token", http.StatusForbidden) return } ctx := r.Context() ctx = context.WithValue(ctx, "token", token) r = r.WithContext(ctx) switch r.Method { case http.MethodGet: s.List(w, r) return case http.MethodPost: s.Create(w, r) return case http.MethodDelete: s.Delete(w, r) return default: http.Error(w, "Not Implemented", http.StatusNotImplemented) return } } func (s *scheduler) List(w http.ResponseWriter, r *http.Request) { accessID := r.Context().Value("token").(string) schedules, err := s.DB.List(accessID) if nil != err { http.Error(w, err.Error(), http.StatusInternalServerError) return } buf, err := json.Marshal(schedules) if nil != err { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Write(buf) } func (s *scheduler) Create(w http.ResponseWriter, r *http.Request) { // TODO validate user accessID := r.Context().Value("token").(string) /* br, bw := io.Pipe() b := io.TeeReader(r.Body, bw) go func() { x, _ := ioutil.ReadAll(b) fmt.Println("[debug] http body", string(x)) bw.Close() }() decoder := json.NewDecoder(br) */ decoder := json.NewDecoder(r.Body) sched := &again.Schedule{} err := decoder.Decode(sched) if nil != err { http.Error(w, err.Error(), http.StatusInternalServerError) return } fmt.Printf("New Schedule:\n%#v\n", sched) // TODO validate and modify dateParts := strings.Split(sched.Date, "-") if 3 != len(dateParts) { http.Error(w, "Invalid date", http.StatusBadRequest) return } timeParts := strings.Split(sched.Time, ":") if 2 == len(timeParts) { timeParts = append(timeParts, "00") } if 3 != len(timeParts) { http.Error(w, "Invalid time", http.StatusBadRequest) return } // sub-minute resolution not supported yet timeParts[2] = "00" dtParts := []int{0, 0, 0, 0, 0, 0} for i := range dateParts { n, err := strconv.Atoi(dateParts[i]) if nil != err { http.Error(w, fmt.Sprintf("Invalid date part '%s'", n), http.StatusBadRequest) return } dtParts[i] = n } for i := range timeParts { n, err := strconv.Atoi(timeParts[i]) if nil != err { http.Error(w, fmt.Sprintf("Invalid time part '%s'", n), http.StatusBadRequest) return } dtParts[i+3] = n } _, err = again.Exists(dtParts, time.UTC.String()) if nil != err { http.Error(w, fmt.Sprintf("Invalid datetime: %s", err), http.StatusBadRequest) return } // TODO warn on non-existant / ambiguous timing loc, err := time.LoadLocation(sched.TZ) if nil != err { http.Error(w, fmt.Sprintf("Invalid timezone: %s", err), http.StatusBadRequest) return } t := time.Date(dtParts[0], time.Month(dtParts[1]), dtParts[2], dtParts[3], dtParts[4], dtParts[5], 0, loc).UTC() now := time.Now().UTC() // 4.5 minutes (about 5 minutes) ahead := t.Sub(now) if ahead < 270 { http.Error(w, fmt.Sprintf("Invalid datetime: should be 5+ minutes into the future, not just %d seconds", ahead), http.StatusBadRequest) return } fmt.Println("Time in UTC:", t) // stagger t = t.Add(time.Duration(rand.Intn(300*1000)-150*1000) * time.Millisecond) // Avoid the Leap Second if 23 == t.Hour() && 59 == t.Minute() && 59 == t.Second() { j := rand.Intn(1) - 2 n := rand.Intn(3) // +/- 3 seconds t = t.Add(time.Duration(j*n) * time.Second) } fmt.Println("Staggered Time:", t) // TODO add to immediate queue if soon enough sched.NextRunAt = t sched.AccessID = accessID sched2, err := s.DB.Set(*sched) if nil != err { http.Error(w, err.Error(), http.StatusInternalServerError) return } buf, err := json.Marshal(sched2) if nil != err { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Write(buf) } func (s *scheduler) Delete(w http.ResponseWriter, r *http.Request) { // TODO validate user accessID := r.Context().Value("token").(string) parts := strings.Split(r.URL.Path, "/") // ""/"api"/"v0"/"schedules"/":id" if 5 != len(parts) { http.Error(w, "Not Found", http.StatusNotFound) return } id := parts[4] sched2, err := s.DB.Delete(accessID, id) if nil != err { http.Error(w, err.Error(), http.StatusInternalServerError) return } buf, err := json.Marshal(sched2) if nil != err { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Write(buf) }