diff --git a/chatserver.go b/chatserver.go index af769ef..e860090 100644 --- a/chatserver.go +++ b/chatserver.go @@ -44,6 +44,7 @@ type tcpUser struct { bufConn bufferedConn userCount chan int email string + newMsg chan string } // So we can peek at net.Conn, which we can't do natively @@ -83,20 +84,24 @@ type myMsg struct { User string `json:"user"` } type JsonMsg struct { - Messages []myMsg `json:"messages"` + Messages []*myMsg `json:"messages"` } -//var firstMsgs chan myMsg -//var myRooms map[string](chan myMsg) -var msgHistory []myMsg +type chatHist struct { + msgs []*myMsg + i int + c int +} + +var myChatHist chatHist var broadcastMsg chan myMsg var newConns chan net.Conn -var newTcpChat chan bufferedConn +var wantsServerHello chan bufferedConn var authTcpChat chan tcpUser -var delTcpChat chan bufferedConn -var newHttpChat chan bufferedConn -var newHttpClient chan bufferedConn +var delTcpChat chan tcpUser +var gotClientHello chan bufferedConn +var demuxHttpClient chan bufferedConn var delHttpChat chan bufferedConn var newAuthReqs chan authReq var valAuthReqs chan authReq @@ -122,7 +127,13 @@ func genAuthCode() (string, error) { return base64.URLEncoding.EncodeToString(b), nil } -func handleRaw(bufConn bufferedConn) { +// Trying to keep it slim with just one goroutine per client for reads and one goroutine per client for writes. +// Initially I was spawning a goroutine per write, but my guess is that constantly allocating and cleaning up 4k +// of memory (or perhaps less these days https://blog.nindalf.com/posts/how-goroutines-work/) is probably not +// very efficient for small tweet-sized network writes + +// Auth & Reads +func handleTelnetConn(bufConn bufferedConn) { // TODO // What happens if this is being read from range // when it's being added here (data race)? @@ -135,6 +146,7 @@ func handleRaw(bufConn bufferedConn) { // Handle all subsequent packets buffer := make([]byte, 1024) + var u *tcpUser for { //fmt.Fprintf(os.Stdout, "[raw] Waiting for message...\n") count, err := bufConn.Read(buffer) @@ -144,15 +156,24 @@ func handleRaw(bufConn bufferedConn) { } fmt.Fprintf(os.Stdout, "Ending socket\n") - delTcpChat <- bufConn + if nil != u { + delTcpChat <- *u + } break } buf := buffer[:count] + // Rate Limit: Reasonable poor man's DoS prevention (Part 1) + // A human does not send messages super fast and blocking the + // writes of other incoming messages to this client for this long + // won't hinder the user experience (and may in fact enhance it) + // TODO: should do this for HTTP as well (or, better yet, implement hashcash) + time.Sleep(150 * time.Millisecond) + // Fun fact: if the buffer's current length (not capacity) is 0 // then the Read returns 0 without error if 0 == count { - fmt.Fprintf(os.Stdout, "Weird") + fmt.Fprintf(os.Stdout, "[SANITY FAIL] using a 0-length buffer") break } @@ -181,7 +202,7 @@ func handleRaw(bufConn bufferedConn) { // This little ditty is meant to act as a psuedo-progress bar to engage the user // Aside: a keystroke typically takes >=50s to type (probably closer to 200ms between words) // https://stackoverflow.com/questions/22505698/what-is-a-typical-keypress-duration - var wg sync.WaitGroup + wg := sync.WaitGroup{} wg.Add(1) go func() { time.Sleep(50 * time.Millisecond) @@ -225,21 +246,28 @@ func handleRaw(bufConn bufferedConn) { authn = true time.Sleep(150 * time.Millisecond) fmt.Fprintf(bufConn, "\n") - u := tcpUser{ + u = &tcpUser{ bufConn: bufConn, email: email, userCount: make(chan int, 1), + newMsg: make(chan string, 10), // reasonably sized } - authTcpChat <- u + authTcpChat <- *u // prevent data race on len(myRawConns) // XXX (there can't be a race between these two lines, right?) count := <-u.userCount close(u.userCount) u.userCount = nil + + // Note: There's a 500ms gap between when we accept the client + // and when it can start receiving messages and when it begins + // to handle them, however, it's unlikely that >= 10 messages + // will simultaneously flood in during that time + time.Sleep(50 * time.Millisecond) fmt.Fprintf(bufConn, "\n") time.Sleep(50 * time.Millisecond) - fmt.Fprintf(bufConn, "Welcome to #general (%d users)!", count) + fmt.Fprintf(bufConn, "\033[1;32m"+"Welcome to #general (%d users)!"+"\033[22;39m", count) time.Sleep(50 * time.Millisecond) fmt.Fprintf(bufConn, "\n") time.Sleep(50 * time.Millisecond) @@ -248,9 +276,11 @@ func handleRaw(bufConn bufferedConn) { time.Sleep(100 * time.Millisecond) fmt.Fprintf(bufConn, "\n") - // this would be cool, but won't work since other messages will come - // in before the person responds + // Would be cool to write a prompt... + // I wonder if I could send the correct ANSI codes for that... //fmt.Fprintf(bufConn, "\n%s> ", email) + + go handleTelnetBroadcast(u) } continue } @@ -268,7 +298,23 @@ func handleRaw(bufConn bufferedConn) { } } -func handleSorted(conn bufferedConn) { +// Writes (post Auth) +func handleTelnetBroadcast(u *tcpUser) { + for { + msg := <-u.newMsg + // Disallow Reverse Rate Limit: Reasonable poor man's DoS prevention (Part 3) + // https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/ + timeoutDuration := 2 * time.Second + u.bufConn.SetWriteDeadline(time.Now().Add(timeoutDuration)) + _, err := fmt.Fprintf(u.bufConn, msg) + if nil != err { + delTcpChat <- *u + break + } + } +} + +func muxTcp(conn bufferedConn) { // Wish List for protocol detection // * PROXY protocol (and loop) // * HTTP CONNECT (proxy) (and loop) @@ -316,7 +362,7 @@ func handleSorted(conn bufferedConn) { if "" == protocol { fmt.Fprintf(conn, "\n\nWelcome to Sample Chat! You're not an HTTP client, assuming Telnet.\nYou must authenticate via email to participate\n\nEmail: ") - newTcpChat <- conn + wantsServerHello <- conn return } else if "HTTP" != protocol { defer conn.Close() @@ -324,48 +370,7 @@ func handleSorted(conn bufferedConn) { return } - newHttpClient <- conn - - /* - firstMsgs <- myMsg{ - ReceivedAt: time.Now(), - sender: conn, - Message: firstMsg, - Channel: "general", - } - - // TODO - // * TCP-CHAT - // * HTTP - // * TLS - - // Handle all subsequent packets - buf := make([]byte, 1024) - for { - fmt.Fprintf(os.Stdout, "[sortable] Waiting for message...\n") - count, err := conn.Read(buf) - if nil != err { - if io.EOF != err { - fmt.Fprintf(os.Stderr, "Non-EOF socket error: %s\n", err) - } - fmt.Fprintf(os.Stdout, "Ending socket\n") - break - } - // Fun fact: if the buffer's current length (not capacity) is 0 - // then the Read returns 0 without error - if 0 == count { - // fmt.Fprintf(os.Stdout, "Weird") - continue - } - //myRooms["general"] <- myMsg{ - broadcastMsg <- myMsg{ - ReceivedAt: time.Now(), - sender: conn, - Message: string(buf[0:count]), - Channel: "general", - } - } - */ + demuxHttpClient <- conn } func handleConnection(netConn net.Conn) { @@ -388,9 +393,9 @@ func handleConnection(netConn net.Conn) { m.Lock() if virgin { virgin = false - newHttpChat <- bufConn + gotClientHello <- bufConn } else { - newTcpChat <- bufConn + wantsServerHello <- bufConn } m.Unlock() }() @@ -462,18 +467,18 @@ func sendAuthCode(cnf ConfMailer, to string) (string, error) { return code, nil } -type myServer struct { +type myHttpServer struct { chans chan bufferedConn net.Listener } -func (m *myServer) Accept() (net.Conn, error) { +func (m *myHttpServer) Accept() (net.Conn, error) { bufConn := <-m.chans return bufConn, nil } -func newMyServer(l net.Listener) *myServer { - return &myServer{make(chan bufferedConn), l} +func newHttpServer(l net.Listener) *myHttpServer { + return &myHttpServer{make(chan bufferedConn), l} } var config Conf @@ -655,7 +660,7 @@ func listMsgs(req *restful.Request, resp *restful.Response) { // Also, data race? the list could be added to while this is iterating? // For now we'll just let the client sort the list resp.WriteEntity(&JsonMsg{ - Messages: msgHistory, + Messages: myChatHist.msgs[:myChatHist.c], }) } func postMsg(req *restful.Request, resp *restful.Response) { @@ -716,8 +721,8 @@ func main() { newConns = make(chan net.Conn, 128) // TCP & Authentication - myRawConns := make(map[bufferedConn]bool) - newTcpChat = make(chan bufferedConn, 128) + myRawConns := make(map[bufferedConn]tcpUser) + wantsServerHello = make(chan bufferedConn, 128) authTcpChat = make(chan tcpUser, 128) // HTTP & Authentication @@ -725,12 +730,11 @@ func main() { newAuthReqs = make(chan authReq, 128) valAuthReqs = make(chan authReq, 128) delAuthReqs = make(chan authReq, 128) - newHttpChat = make(chan bufferedConn, 128) - newHttpClient = make(chan bufferedConn, 128) + gotClientHello = make(chan bufferedConn, 128) + demuxHttpClient = make(chan bufferedConn, 128) // cruft to delete //myRooms = make(map[string](chan myMsg)) - //firstMsgs = make(chan myMsg, 128) //myRooms["general"] = make(chan myMsg, 128) // Note: I had considered dynamically select on channels for rooms. @@ -739,7 +743,7 @@ func main() { broadcastMsg = make(chan myMsg, 128) // Poor-Man's container/ring (circular buffer) - msgHistory = make([]myMsg, 128) + myChatHist.msgs = make([]*myMsg, 128) msgIndex := 0 var addr string @@ -796,7 +800,7 @@ func main() { Addr: addr, Handler: container, } - myHttpServer := newMyServer(sock) + myHttpServer := newHttpServer(sock) go func() { server.Serve(myHttpServer) }() @@ -810,7 +814,7 @@ func main() { case u := <-authTcpChat: // allow to receive messages // (and be counted among the users) - myRawConns[u.bufConn] = true + myRawConns[u.bufConn] = u // is chan chan the right way to handle this? u.userCount <- len(myRawConns) broadcastMsg <- myMsg{ @@ -835,26 +839,29 @@ func main() { } case ar := <-delAuthReqs: delete(myAuthReqs, ar.Cid) - case bufConn := <-newTcpChat: - go handleRaw(bufConn) - case bufConn := <-delTcpChat: - // we can safely ignore this error - bufConn.Close() - delete(myRawConns, bufConn) - case bufConn := <-newHttpChat: - go handleSorted(bufConn) - //case msg := <- myRooms["general"]: - //delete(myRooms["general"], bufConn) - case bufConn := <-newHttpClient: - // this will be Accept()ed immediately by restful + case bufConn := <-wantsServerHello: + go handleTelnetConn(bufConn) + case u := <-delTcpChat: + // we can safely ignore this error, if any + u.bufConn.Close() + delete(myRawConns, u.bufConn) + case bufConn := <-gotClientHello: + go muxTcp(bufConn) + case bufConn := <-demuxHttpClient: + // this will be Accept()ed immediately by the go-restful container // NOTE: we don't store these HTTP connections for broadcast - // as we manage the session by HTTP Auth Bearer rather than TCP + // since we manage the session by HTTP Auth Bearer rather than TCP myHttpServer.chans <- bufConn case msg := <-broadcastMsg: - msgHistory[msgIndex] = msg - msgIndex += 1 - msgIndex %= len(msgHistory) + // copy comes in, pointer gets saved (and not GC'd, I hope) + myChatHist.msgs[msgIndex] = &msg + myChatHist.i += 1 + if myChatHist.c < cap(myChatHist.msgs) { + myChatHist.c += 1 + } + myChatHist.i %= len(myChatHist.msgs) + // print the system message (the "log") t := msg.ReceivedAt tf := "%d-%02d-%02d %02d:%02d:%02d (%s)" var sender string @@ -867,49 +874,50 @@ func main() { // I wonder if we could use IP detection to get the client's tz // ... could probably make time for this in the authentication loop zone, _ := msg.ReceivedAt.Zone() - - // TODO put logging here - //ts, err := msg.ReceivedAt.MarshalJSON() fmt.Fprintf(os.Stdout, tf+" [%s] (%s):\n\t%s", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), zone, sender, msg.User, msg.Message) - for conn, _ := range myRawConns { + for _, u := range myRawConns { // Don't echo back to the original client - if msg.sender == conn { + if msg.sender == u.bufConn { continue } - // To ask: Why do I have to pass in conn to prevent a data race? Is it garbage collection? - // Don't block the rest of the loop - // TODO maybe use a chan to send to the socket's event loop - go func(conn bufferedConn) { - // Protect against malicious clients to prevent DoS - // https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/ - timeoutDuration := 5 * time.Second - conn.SetWriteDeadline(time.Now().Add(timeoutDuration)) - _, err := fmt.Fprintf(conn, tf+" [%s]: %s", - t.Year(), t.Month(), t.Day(), - t.Hour(), t.Minute(), t.Second(), zone, - msg.User, msg.Message) - if nil != err { - delTcpChat <- conn - } - }(conn) + msg := fmt.Sprintf(tf+" [%s]: %s", t.Year(), t.Month(), t.Day(), t.Hour(), + t.Minute(), t.Second(), zone, msg.User, msg.Message) + select { + case u.newMsg <- msg: + // all is well, client was ready to receive + default: + // Rate Limit: Reasonable poor man's DoS prevention (Part 2) + // This client's send channel buffer is full. + // It is consuming data too slowly. It may be malicious. + // In the case that it's experiencing network issues, + // well, these things happen when you're having network issues. + // It can reconnect. + delTcpChat <- u + } + + /* + // To ask: Why do I have to pass in conn to prevent a data race? Is it garbage collection? + // Don't block the rest of the loop + // TODONE maybe use a chan to send to the socket's event loop + // (left this in to remind myself to ask questions) + go func(conn bufferedConn) { + // Protect against malicious clients to prevent DoS + // https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/ + timeoutDuration := 2 * time.Second + conn.SetWriteDeadline(time.Now().Add(timeoutDuration)) + _, err := fmt.Fprintf(conn, msg) + if nil != err { + delTcpChat <- u + } + }(conn) + */ } - /* - case msg := <-firstMsgs: - fmt.Fprintf(os.Stdout, "f [First Message]\n") - ts, err := msg.ReceivedAt.MarshalJSON() - if nil != err { - fmt.Fprintf(os.Stderr, "f [Error] %s\n", err) - } - fmt.Fprintf(os.Stdout, "f [Timestamp] %s\n", ts) - fmt.Fprintf(os.Stdout, "f [Remote] %s\n", msg.sender.RemoteAddr().String()) - fmt.Fprintf(os.Stdout, "f [Message] %s\n", msg.Message) - */ } } }