go-mockid/vendor/github.com/mailgun/mailgun-go/v3/events.go

283 lines
7.4 KiB
Go

package mailgun
import (
"context"
"fmt"
"time"
"github.com/mailgun/mailgun-go/v3/events"
"github.com/mailru/easyjson"
)
// ListEventOptions{} modifies the behavior of ListEvents()
type ListEventOptions struct {
// Limits the results to a specific start and end time
Begin, End time.Time
// ForceAscending and ForceDescending are used to force Mailgun to use a given
// traversal order of the events. If both ForceAscending and ForceDescending are
// true, an error will result. If none, the default will be inferred from the Begin
// and End parameters.
ForceAscending, ForceDescending bool
// Compact, if true, compacts the returned JSON to minimize transmission bandwidth.
Compact bool
// Limit caps the number of results returned. If left unspecified, MailGun assumes 100.
Limit int
// Filter allows the caller to provide more specialized filters on the query.
// Consult the Mailgun documentation for more details.
Filter map[string]string
PollInterval time.Duration
}
// EventIterator maintains the state necessary for paging though small parcels of a larger set of events.
type EventIterator struct {
events.Response
mg Mailgun
err error
}
// Create an new iterator to fetch a page of events from the events api
func (mg *MailgunImpl) ListEvents(opts *ListEventOptions) *EventIterator {
req := newHTTPRequest(generateApiUrl(mg, eventsEndpoint))
if opts != nil {
if opts.Limit > 0 {
req.addParameter("limit", fmt.Sprintf("%d", opts.Limit))
}
if opts.Compact {
req.addParameter("pretty", "no")
}
if opts.ForceAscending {
req.addParameter("ascending", "yes")
} else if opts.ForceDescending {
req.addParameter("ascending", "no")
}
if !opts.Begin.IsZero() {
req.addParameter("begin", formatMailgunTime(opts.Begin))
}
if !opts.End.IsZero() {
req.addParameter("end", formatMailgunTime(opts.End))
}
if opts.Filter != nil {
for k, v := range opts.Filter {
req.addParameter(k, v)
}
}
}
url, err := req.generateUrlWithParameters()
return &EventIterator{
mg: mg,
Response: events.Response{Paging: events.Paging{Next: url, First: url}},
err: err,
}
}
// If an error occurred during iteration `Err()` will return non nil
func (ei *EventIterator) Err() error {
return ei.err
}
// Next retrieves the next page of events from the api. Returns false when there
// no more pages to retrieve or if there was an error. Use `.Err()` to retrieve
// the error
func (ei *EventIterator) Next(ctx context.Context, events *[]Event) bool {
if ei.err != nil {
return false
}
ei.err = ei.fetch(ctx, ei.Paging.Next)
if ei.err != nil {
return false
}
*events, ei.err = ParseEvents(ei.Items)
if ei.err != nil {
return false
}
if len(ei.Items) == 0 {
return false
}
return true
}
// First retrieves the first page of events from the api. Returns false if there
// was an error. It also sets the iterator object to the first page.
// Use `.Err()` to retrieve the error.
func (ei *EventIterator) First(ctx context.Context, events *[]Event) bool {
if ei.err != nil {
return false
}
ei.err = ei.fetch(ctx, ei.Paging.First)
if ei.err != nil {
return false
}
*events, ei.err = ParseEvents(ei.Items)
return true
}
// Last retrieves the last page of events from the api.
// Calling Last() is invalid unless you first call First() or Next()
// Returns false if there was an error. It also sets the iterator object
// to the last page. Use `.Err()` to retrieve the error.
func (ei *EventIterator) Last(ctx context.Context, events *[]Event) bool {
if ei.err != nil {
return false
}
ei.err = ei.fetch(ctx, ei.Paging.Last)
if ei.err != nil {
return false
}
*events, ei.err = ParseEvents(ei.Items)
return true
}
// Previous retrieves the previous page of events from the api. Returns false when there
// no more pages to retrieve or if there was an error. Use `.Err()` to retrieve
// the error if any
func (ei *EventIterator) Previous(ctx context.Context, events *[]Event) bool {
if ei.err != nil {
return false
}
if ei.Paging.Previous == "" {
return false
}
ei.err = ei.fetch(ctx, ei.Paging.Previous)
if ei.err != nil {
return false
}
*events, ei.err = ParseEvents(ei.Items)
if len(ei.Items) == 0 {
return false
}
return true
}
func (ei *EventIterator) fetch(ctx context.Context, url string) error {
r := newHTTPRequest(url)
r.setClient(ei.mg.Client())
r.setBasicAuth(basicAuthUser, ei.mg.APIKey())
resp, err := makeRequest(ctx, r, "GET", nil)
if err != nil {
return err
}
if err := easyjson.Unmarshal(resp.Data, &ei.Response); err != nil {
return fmt.Errorf("failed to un-marshall event.Response: %s", err)
}
return nil
}
// EventPoller maintains the state necessary for polling events
type EventPoller struct {
it *EventIterator
opts ListEventOptions
thresholdTime time.Time
beginTime time.Time
sleepUntil time.Time
mg Mailgun
err error
}
// Poll the events api and return new events as they occur
// it = mg.PollEvents(&ListEventOptions{
// // Only events with a timestamp after this date/time will be returned
// Begin: time.Now().Add(time.Second * -3),
// // How often we poll the api for new events
// PollInterval: time.Second * 4
// })
//
// var events []Event
// ctx, cancel := context.WithCancel(context.Background())
//
// // Blocks until new events appear or context is cancelled
// for it.Poll(ctx, &events) {
// for _, event := range(events) {
// fmt.Printf("Event %+v\n", event)
// }
// }
// if it.Err() != nil {
// log.Fatal(it.Err())
// }
func (mg *MailgunImpl) PollEvents(opts *ListEventOptions) *EventPoller {
now := time.Now()
// ForceAscending must be set
opts.ForceAscending = true
// Default begin time is 30 minutes ago
if opts.Begin.IsZero() {
opts.Begin = now.Add(time.Minute * -30)
}
// Set a 15 second poll interval if none set
if opts.PollInterval.Nanoseconds() == 0 {
opts.PollInterval = time.Duration(time.Second * 15)
}
return &EventPoller{
it: mg.ListEvents(opts),
opts: *opts,
mg: mg,
}
}
// If an error occurred during polling `Err()` will return non nil
func (ep *EventPoller) Err() error {
return ep.err
}
func (ep *EventPoller) Poll(ctx context.Context, events *[]Event) bool {
var currentPage string
var results []Event
if ep.opts.Begin.IsZero() {
ep.beginTime = time.Now().UTC()
}
for {
// Remember our current page url
currentPage = ep.it.Paging.Next
// Attempt to get a page of events
var page []Event
if ep.it.Next(ctx, &page) == false {
if ep.it.Err() == nil && len(page) == 0 {
// No events, sleep for our poll interval
goto SLEEP
}
ep.err = ep.it.Err()
return false
}
for _, e := range page {
// If any events on the page are older than our being time
if e.GetTimestamp().After(ep.beginTime) {
results = append(results, e)
}
}
// If we have events to return
if len(results) != 0 {
*events = results
results = nil
return true
}
SLEEP:
// Since we didn't find an event older than our
// threshold, fetch this same page again
ep.it.Paging.Next = currentPage
// Sleep the rest of our duration
tick := time.NewTicker(ep.opts.PollInterval)
select {
case <-ctx.Done():
return false
case <-tick.C:
tick.Stop()
}
}
}
// Given time.Time{} return a float64 as given in mailgun event timestamps
func TimeToFloat(t time.Time) float64 {
return float64(t.Unix()) + (float64(t.Nanosecond()/int(time.Microsecond)) / float64(1000000))
}