283 lines
7.4 KiB
Go
283 lines
7.4 KiB
Go
package mailgun
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/mailgun/mailgun-go/v3/events"
|
|
"github.com/mailru/easyjson"
|
|
)
|
|
|
|
// ListEventOptions{} modifies the behavior of ListEvents()
|
|
type ListEventOptions struct {
|
|
// Limits the results to a specific start and end time
|
|
Begin, End time.Time
|
|
// ForceAscending and ForceDescending are used to force Mailgun to use a given
|
|
// traversal order of the events. If both ForceAscending and ForceDescending are
|
|
// true, an error will result. If none, the default will be inferred from the Begin
|
|
// and End parameters.
|
|
ForceAscending, ForceDescending bool
|
|
// Compact, if true, compacts the returned JSON to minimize transmission bandwidth.
|
|
Compact bool
|
|
// Limit caps the number of results returned. If left unspecified, MailGun assumes 100.
|
|
Limit int
|
|
// Filter allows the caller to provide more specialized filters on the query.
|
|
// Consult the Mailgun documentation for more details.
|
|
Filter map[string]string
|
|
PollInterval time.Duration
|
|
}
|
|
|
|
// EventIterator maintains the state necessary for paging though small parcels of a larger set of events.
|
|
type EventIterator struct {
|
|
events.Response
|
|
mg Mailgun
|
|
err error
|
|
}
|
|
|
|
// Create an new iterator to fetch a page of events from the events api
|
|
func (mg *MailgunImpl) ListEvents(opts *ListEventOptions) *EventIterator {
|
|
req := newHTTPRequest(generateApiUrl(mg, eventsEndpoint))
|
|
if opts != nil {
|
|
if opts.Limit > 0 {
|
|
req.addParameter("limit", fmt.Sprintf("%d", opts.Limit))
|
|
}
|
|
if opts.Compact {
|
|
req.addParameter("pretty", "no")
|
|
}
|
|
if opts.ForceAscending {
|
|
req.addParameter("ascending", "yes")
|
|
} else if opts.ForceDescending {
|
|
req.addParameter("ascending", "no")
|
|
}
|
|
if !opts.Begin.IsZero() {
|
|
req.addParameter("begin", formatMailgunTime(opts.Begin))
|
|
}
|
|
if !opts.End.IsZero() {
|
|
req.addParameter("end", formatMailgunTime(opts.End))
|
|
}
|
|
if opts.Filter != nil {
|
|
for k, v := range opts.Filter {
|
|
req.addParameter(k, v)
|
|
}
|
|
}
|
|
}
|
|
url, err := req.generateUrlWithParameters()
|
|
return &EventIterator{
|
|
mg: mg,
|
|
Response: events.Response{Paging: events.Paging{Next: url, First: url}},
|
|
err: err,
|
|
}
|
|
}
|
|
|
|
// If an error occurred during iteration `Err()` will return non nil
|
|
func (ei *EventIterator) Err() error {
|
|
return ei.err
|
|
}
|
|
|
|
// Next retrieves the next page of events from the api. Returns false when there
|
|
// no more pages to retrieve or if there was an error. Use `.Err()` to retrieve
|
|
// the error
|
|
func (ei *EventIterator) Next(ctx context.Context, events *[]Event) bool {
|
|
if ei.err != nil {
|
|
return false
|
|
}
|
|
ei.err = ei.fetch(ctx, ei.Paging.Next)
|
|
if ei.err != nil {
|
|
return false
|
|
}
|
|
*events, ei.err = ParseEvents(ei.Items)
|
|
if ei.err != nil {
|
|
return false
|
|
}
|
|
if len(ei.Items) == 0 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// First retrieves the first page of events from the api. Returns false if there
|
|
// was an error. It also sets the iterator object to the first page.
|
|
// Use `.Err()` to retrieve the error.
|
|
func (ei *EventIterator) First(ctx context.Context, events *[]Event) bool {
|
|
if ei.err != nil {
|
|
return false
|
|
}
|
|
ei.err = ei.fetch(ctx, ei.Paging.First)
|
|
if ei.err != nil {
|
|
return false
|
|
}
|
|
*events, ei.err = ParseEvents(ei.Items)
|
|
return true
|
|
}
|
|
|
|
// Last retrieves the last page of events from the api.
|
|
// Calling Last() is invalid unless you first call First() or Next()
|
|
// Returns false if there was an error. It also sets the iterator object
|
|
// to the last page. Use `.Err()` to retrieve the error.
|
|
func (ei *EventIterator) Last(ctx context.Context, events *[]Event) bool {
|
|
if ei.err != nil {
|
|
return false
|
|
}
|
|
ei.err = ei.fetch(ctx, ei.Paging.Last)
|
|
if ei.err != nil {
|
|
return false
|
|
}
|
|
*events, ei.err = ParseEvents(ei.Items)
|
|
return true
|
|
}
|
|
|
|
// Previous retrieves the previous page of events from the api. Returns false when there
|
|
// no more pages to retrieve or if there was an error. Use `.Err()` to retrieve
|
|
// the error if any
|
|
func (ei *EventIterator) Previous(ctx context.Context, events *[]Event) bool {
|
|
if ei.err != nil {
|
|
return false
|
|
}
|
|
if ei.Paging.Previous == "" {
|
|
return false
|
|
}
|
|
ei.err = ei.fetch(ctx, ei.Paging.Previous)
|
|
if ei.err != nil {
|
|
return false
|
|
}
|
|
*events, ei.err = ParseEvents(ei.Items)
|
|
if len(ei.Items) == 0 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (ei *EventIterator) fetch(ctx context.Context, url string) error {
|
|
r := newHTTPRequest(url)
|
|
r.setClient(ei.mg.Client())
|
|
r.setBasicAuth(basicAuthUser, ei.mg.APIKey())
|
|
|
|
resp, err := makeRequest(ctx, r, "GET", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := easyjson.Unmarshal(resp.Data, &ei.Response); err != nil {
|
|
return fmt.Errorf("failed to un-marshall event.Response: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// EventPoller maintains the state necessary for polling events
|
|
type EventPoller struct {
|
|
it *EventIterator
|
|
opts ListEventOptions
|
|
thresholdTime time.Time
|
|
beginTime time.Time
|
|
sleepUntil time.Time
|
|
mg Mailgun
|
|
err error
|
|
}
|
|
|
|
// Poll the events api and return new events as they occur
|
|
// it = mg.PollEvents(&ListEventOptions{
|
|
// // Only events with a timestamp after this date/time will be returned
|
|
// Begin: time.Now().Add(time.Second * -3),
|
|
// // How often we poll the api for new events
|
|
// PollInterval: time.Second * 4
|
|
// })
|
|
//
|
|
// var events []Event
|
|
// ctx, cancel := context.WithCancel(context.Background())
|
|
//
|
|
// // Blocks until new events appear or context is cancelled
|
|
// for it.Poll(ctx, &events) {
|
|
// for _, event := range(events) {
|
|
// fmt.Printf("Event %+v\n", event)
|
|
// }
|
|
// }
|
|
// if it.Err() != nil {
|
|
// log.Fatal(it.Err())
|
|
// }
|
|
func (mg *MailgunImpl) PollEvents(opts *ListEventOptions) *EventPoller {
|
|
now := time.Now()
|
|
// ForceAscending must be set
|
|
opts.ForceAscending = true
|
|
|
|
// Default begin time is 30 minutes ago
|
|
if opts.Begin.IsZero() {
|
|
opts.Begin = now.Add(time.Minute * -30)
|
|
}
|
|
|
|
// Set a 15 second poll interval if none set
|
|
if opts.PollInterval.Nanoseconds() == 0 {
|
|
opts.PollInterval = time.Duration(time.Second * 15)
|
|
}
|
|
|
|
return &EventPoller{
|
|
it: mg.ListEvents(opts),
|
|
opts: *opts,
|
|
mg: mg,
|
|
}
|
|
}
|
|
|
|
// If an error occurred during polling `Err()` will return non nil
|
|
func (ep *EventPoller) Err() error {
|
|
return ep.err
|
|
}
|
|
|
|
func (ep *EventPoller) Poll(ctx context.Context, events *[]Event) bool {
|
|
var currentPage string
|
|
var results []Event
|
|
|
|
if ep.opts.Begin.IsZero() {
|
|
ep.beginTime = time.Now().UTC()
|
|
}
|
|
|
|
for {
|
|
// Remember our current page url
|
|
currentPage = ep.it.Paging.Next
|
|
|
|
// Attempt to get a page of events
|
|
var page []Event
|
|
if ep.it.Next(ctx, &page) == false {
|
|
if ep.it.Err() == nil && len(page) == 0 {
|
|
// No events, sleep for our poll interval
|
|
goto SLEEP
|
|
}
|
|
ep.err = ep.it.Err()
|
|
return false
|
|
}
|
|
|
|
for _, e := range page {
|
|
// If any events on the page are older than our being time
|
|
if e.GetTimestamp().After(ep.beginTime) {
|
|
results = append(results, e)
|
|
}
|
|
}
|
|
|
|
// If we have events to return
|
|
if len(results) != 0 {
|
|
*events = results
|
|
results = nil
|
|
return true
|
|
}
|
|
|
|
SLEEP:
|
|
// Since we didn't find an event older than our
|
|
// threshold, fetch this same page again
|
|
ep.it.Paging.Next = currentPage
|
|
|
|
// Sleep the rest of our duration
|
|
tick := time.NewTicker(ep.opts.PollInterval)
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case <-tick.C:
|
|
tick.Stop()
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// Given time.Time{} return a float64 as given in mailgun event timestamps
|
|
func TimeToFloat(t time.Time) float64 {
|
|
return float64(t.Unix()) + (float64(t.Nanosecond()/int(time.Microsecond)) / float64(1000000))
|
|
}
|