some cleanup

This commit is contained in:
AJ ONeal 2018-08-02 00:23:55 -06:00
parent d84d2e63e6
commit 0f7580954e
1 changed files with 132 additions and 124 deletions

View File

@ -44,6 +44,7 @@ type tcpUser struct {
bufConn bufferedConn bufConn bufferedConn
userCount chan int userCount chan int
email string email string
newMsg chan string
} }
// So we can peek at net.Conn, which we can't do natively // So we can peek at net.Conn, which we can't do natively
@ -83,20 +84,24 @@ type myMsg struct {
User string `json:"user"` User string `json:"user"`
} }
type JsonMsg struct { type JsonMsg struct {
Messages []myMsg `json:"messages"` Messages []*myMsg `json:"messages"`
} }
//var firstMsgs chan myMsg type chatHist struct {
//var myRooms map[string](chan myMsg) msgs []*myMsg
var msgHistory []myMsg i int
c int
}
var myChatHist chatHist
var broadcastMsg chan myMsg var broadcastMsg chan myMsg
var newConns chan net.Conn var newConns chan net.Conn
var newTcpChat chan bufferedConn var wantsServerHello chan bufferedConn
var authTcpChat chan tcpUser var authTcpChat chan tcpUser
var delTcpChat chan bufferedConn var delTcpChat chan tcpUser
var newHttpChat chan bufferedConn var gotClientHello chan bufferedConn
var newHttpClient chan bufferedConn var demuxHttpClient chan bufferedConn
var delHttpChat chan bufferedConn var delHttpChat chan bufferedConn
var newAuthReqs chan authReq var newAuthReqs chan authReq
var valAuthReqs chan authReq var valAuthReqs chan authReq
@ -122,7 +127,13 @@ func genAuthCode() (string, error) {
return base64.URLEncoding.EncodeToString(b), nil return base64.URLEncoding.EncodeToString(b), nil
} }
func handleRaw(bufConn bufferedConn) { // Trying to keep it slim with just one goroutine per client for reads and one goroutine per client for writes.
// Initially I was spawning a goroutine per write, but my guess is that constantly allocating and cleaning up 4k
// of memory (or perhaps less these days https://blog.nindalf.com/posts/how-goroutines-work/) is probably not
// very efficient for small tweet-sized network writes
// Auth & Reads
func handleTelnetConn(bufConn bufferedConn) {
// TODO // TODO
// What happens if this is being read from range // What happens if this is being read from range
// when it's being added here (data race)? // when it's being added here (data race)?
@ -135,6 +146,7 @@ func handleRaw(bufConn bufferedConn) {
// Handle all subsequent packets // Handle all subsequent packets
buffer := make([]byte, 1024) buffer := make([]byte, 1024)
var u *tcpUser
for { for {
//fmt.Fprintf(os.Stdout, "[raw] Waiting for message...\n") //fmt.Fprintf(os.Stdout, "[raw] Waiting for message...\n")
count, err := bufConn.Read(buffer) count, err := bufConn.Read(buffer)
@ -144,15 +156,24 @@ func handleRaw(bufConn bufferedConn) {
} }
fmt.Fprintf(os.Stdout, "Ending socket\n") fmt.Fprintf(os.Stdout, "Ending socket\n")
delTcpChat <- bufConn if nil != u {
delTcpChat <- *u
}
break break
} }
buf := buffer[:count] buf := buffer[:count]
// Rate Limit: Reasonable poor man's DoS prevention (Part 1)
// A human does not send messages super fast and blocking the
// writes of other incoming messages to this client for this long
// won't hinder the user experience (and may in fact enhance it)
// TODO: should do this for HTTP as well (or, better yet, implement hashcash)
time.Sleep(150 * time.Millisecond)
// Fun fact: if the buffer's current length (not capacity) is 0 // Fun fact: if the buffer's current length (not capacity) is 0
// then the Read returns 0 without error // then the Read returns 0 without error
if 0 == count { if 0 == count {
fmt.Fprintf(os.Stdout, "Weird") fmt.Fprintf(os.Stdout, "[SANITY FAIL] using a 0-length buffer")
break break
} }
@ -181,7 +202,7 @@ func handleRaw(bufConn bufferedConn) {
// This little ditty is meant to act as a psuedo-progress bar to engage the user // This little ditty is meant to act as a psuedo-progress bar to engage the user
// Aside: a keystroke typically takes >=50s to type (probably closer to 200ms between words) // Aside: a keystroke typically takes >=50s to type (probably closer to 200ms between words)
// https://stackoverflow.com/questions/22505698/what-is-a-typical-keypress-duration // https://stackoverflow.com/questions/22505698/what-is-a-typical-keypress-duration
var wg sync.WaitGroup wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -225,21 +246,28 @@ func handleRaw(bufConn bufferedConn) {
authn = true authn = true
time.Sleep(150 * time.Millisecond) time.Sleep(150 * time.Millisecond)
fmt.Fprintf(bufConn, "\n") fmt.Fprintf(bufConn, "\n")
u := tcpUser{ u = &tcpUser{
bufConn: bufConn, bufConn: bufConn,
email: email, email: email,
userCount: make(chan int, 1), userCount: make(chan int, 1),
newMsg: make(chan string, 10), // reasonably sized
} }
authTcpChat <- u authTcpChat <- *u
// prevent data race on len(myRawConns) // prevent data race on len(myRawConns)
// XXX (there can't be a race between these two lines, right?) // XXX (there can't be a race between these two lines, right?)
count := <-u.userCount count := <-u.userCount
close(u.userCount) close(u.userCount)
u.userCount = nil u.userCount = nil
// Note: There's a 500ms gap between when we accept the client
// and when it can start receiving messages and when it begins
// to handle them, however, it's unlikely that >= 10 messages
// will simultaneously flood in during that time
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
fmt.Fprintf(bufConn, "\n") fmt.Fprintf(bufConn, "\n")
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
fmt.Fprintf(bufConn, "Welcome to #general (%d users)!", count) fmt.Fprintf(bufConn, "\033[1;32m"+"Welcome to #general (%d users)!"+"\033[22;39m", count)
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
fmt.Fprintf(bufConn, "\n") fmt.Fprintf(bufConn, "\n")
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -248,9 +276,11 @@ func handleRaw(bufConn bufferedConn) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
fmt.Fprintf(bufConn, "\n") fmt.Fprintf(bufConn, "\n")
// this would be cool, but won't work since other messages will come // Would be cool to write a prompt...
// in before the person responds // I wonder if I could send the correct ANSI codes for that...
//fmt.Fprintf(bufConn, "\n%s> ", email) //fmt.Fprintf(bufConn, "\n%s> ", email)
go handleTelnetBroadcast(u)
} }
continue continue
} }
@ -268,7 +298,23 @@ func handleRaw(bufConn bufferedConn) {
} }
} }
func handleSorted(conn bufferedConn) { // Writes (post Auth)
func handleTelnetBroadcast(u *tcpUser) {
for {
msg := <-u.newMsg
// Disallow Reverse Rate Limit: Reasonable poor man's DoS prevention (Part 3)
// https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/
timeoutDuration := 2 * time.Second
u.bufConn.SetWriteDeadline(time.Now().Add(timeoutDuration))
_, err := fmt.Fprintf(u.bufConn, msg)
if nil != err {
delTcpChat <- *u
break
}
}
}
func muxTcp(conn bufferedConn) {
// Wish List for protocol detection // Wish List for protocol detection
// * PROXY protocol (and loop) // * PROXY protocol (and loop)
// * HTTP CONNECT (proxy) (and loop) // * HTTP CONNECT (proxy) (and loop)
@ -316,7 +362,7 @@ func handleSorted(conn bufferedConn) {
if "" == protocol { if "" == protocol {
fmt.Fprintf(conn, "\n\nWelcome to Sample Chat! You're not an HTTP client, assuming Telnet.\nYou must authenticate via email to participate\n\nEmail: ") fmt.Fprintf(conn, "\n\nWelcome to Sample Chat! You're not an HTTP client, assuming Telnet.\nYou must authenticate via email to participate\n\nEmail: ")
newTcpChat <- conn wantsServerHello <- conn
return return
} else if "HTTP" != protocol { } else if "HTTP" != protocol {
defer conn.Close() defer conn.Close()
@ -324,48 +370,7 @@ func handleSorted(conn bufferedConn) {
return return
} }
newHttpClient <- conn demuxHttpClient <- conn
/*
firstMsgs <- myMsg{
ReceivedAt: time.Now(),
sender: conn,
Message: firstMsg,
Channel: "general",
}
// TODO
// * TCP-CHAT
// * HTTP
// * TLS
// Handle all subsequent packets
buf := make([]byte, 1024)
for {
fmt.Fprintf(os.Stdout, "[sortable] Waiting for message...\n")
count, err := conn.Read(buf)
if nil != err {
if io.EOF != err {
fmt.Fprintf(os.Stderr, "Non-EOF socket error: %s\n", err)
}
fmt.Fprintf(os.Stdout, "Ending socket\n")
break
}
// Fun fact: if the buffer's current length (not capacity) is 0
// then the Read returns 0 without error
if 0 == count {
// fmt.Fprintf(os.Stdout, "Weird")
continue
}
//myRooms["general"] <- myMsg{
broadcastMsg <- myMsg{
ReceivedAt: time.Now(),
sender: conn,
Message: string(buf[0:count]),
Channel: "general",
}
}
*/
} }
func handleConnection(netConn net.Conn) { func handleConnection(netConn net.Conn) {
@ -388,9 +393,9 @@ func handleConnection(netConn net.Conn) {
m.Lock() m.Lock()
if virgin { if virgin {
virgin = false virgin = false
newHttpChat <- bufConn gotClientHello <- bufConn
} else { } else {
newTcpChat <- bufConn wantsServerHello <- bufConn
} }
m.Unlock() m.Unlock()
}() }()
@ -462,18 +467,18 @@ func sendAuthCode(cnf ConfMailer, to string) (string, error) {
return code, nil return code, nil
} }
type myServer struct { type myHttpServer struct {
chans chan bufferedConn chans chan bufferedConn
net.Listener net.Listener
} }
func (m *myServer) Accept() (net.Conn, error) { func (m *myHttpServer) Accept() (net.Conn, error) {
bufConn := <-m.chans bufConn := <-m.chans
return bufConn, nil return bufConn, nil
} }
func newMyServer(l net.Listener) *myServer { func newHttpServer(l net.Listener) *myHttpServer {
return &myServer{make(chan bufferedConn), l} return &myHttpServer{make(chan bufferedConn), l}
} }
var config Conf var config Conf
@ -655,7 +660,7 @@ func listMsgs(req *restful.Request, resp *restful.Response) {
// Also, data race? the list could be added to while this is iterating? // Also, data race? the list could be added to while this is iterating?
// For now we'll just let the client sort the list // For now we'll just let the client sort the list
resp.WriteEntity(&JsonMsg{ resp.WriteEntity(&JsonMsg{
Messages: msgHistory, Messages: myChatHist.msgs[:myChatHist.c],
}) })
} }
func postMsg(req *restful.Request, resp *restful.Response) { func postMsg(req *restful.Request, resp *restful.Response) {
@ -716,8 +721,8 @@ func main() {
newConns = make(chan net.Conn, 128) newConns = make(chan net.Conn, 128)
// TCP & Authentication // TCP & Authentication
myRawConns := make(map[bufferedConn]bool) myRawConns := make(map[bufferedConn]tcpUser)
newTcpChat = make(chan bufferedConn, 128) wantsServerHello = make(chan bufferedConn, 128)
authTcpChat = make(chan tcpUser, 128) authTcpChat = make(chan tcpUser, 128)
// HTTP & Authentication // HTTP & Authentication
@ -725,12 +730,11 @@ func main() {
newAuthReqs = make(chan authReq, 128) newAuthReqs = make(chan authReq, 128)
valAuthReqs = make(chan authReq, 128) valAuthReqs = make(chan authReq, 128)
delAuthReqs = make(chan authReq, 128) delAuthReqs = make(chan authReq, 128)
newHttpChat = make(chan bufferedConn, 128) gotClientHello = make(chan bufferedConn, 128)
newHttpClient = make(chan bufferedConn, 128) demuxHttpClient = make(chan bufferedConn, 128)
// cruft to delete // cruft to delete
//myRooms = make(map[string](chan myMsg)) //myRooms = make(map[string](chan myMsg))
//firstMsgs = make(chan myMsg, 128)
//myRooms["general"] = make(chan myMsg, 128) //myRooms["general"] = make(chan myMsg, 128)
// Note: I had considered dynamically select on channels for rooms. // Note: I had considered dynamically select on channels for rooms.
@ -739,7 +743,7 @@ func main() {
broadcastMsg = make(chan myMsg, 128) broadcastMsg = make(chan myMsg, 128)
// Poor-Man's container/ring (circular buffer) // Poor-Man's container/ring (circular buffer)
msgHistory = make([]myMsg, 128) myChatHist.msgs = make([]*myMsg, 128)
msgIndex := 0 msgIndex := 0
var addr string var addr string
@ -796,7 +800,7 @@ func main() {
Addr: addr, Addr: addr,
Handler: container, Handler: container,
} }
myHttpServer := newMyServer(sock) myHttpServer := newHttpServer(sock)
go func() { go func() {
server.Serve(myHttpServer) server.Serve(myHttpServer)
}() }()
@ -810,7 +814,7 @@ func main() {
case u := <-authTcpChat: case u := <-authTcpChat:
// allow to receive messages // allow to receive messages
// (and be counted among the users) // (and be counted among the users)
myRawConns[u.bufConn] = true myRawConns[u.bufConn] = u
// is chan chan the right way to handle this? // is chan chan the right way to handle this?
u.userCount <- len(myRawConns) u.userCount <- len(myRawConns)
broadcastMsg <- myMsg{ broadcastMsg <- myMsg{
@ -835,26 +839,29 @@ func main() {
} }
case ar := <-delAuthReqs: case ar := <-delAuthReqs:
delete(myAuthReqs, ar.Cid) delete(myAuthReqs, ar.Cid)
case bufConn := <-newTcpChat: case bufConn := <-wantsServerHello:
go handleRaw(bufConn) go handleTelnetConn(bufConn)
case bufConn := <-delTcpChat: case u := <-delTcpChat:
// we can safely ignore this error // we can safely ignore this error, if any
bufConn.Close() u.bufConn.Close()
delete(myRawConns, bufConn) delete(myRawConns, u.bufConn)
case bufConn := <-newHttpChat: case bufConn := <-gotClientHello:
go handleSorted(bufConn) go muxTcp(bufConn)
//case msg := <- myRooms["general"]: case bufConn := <-demuxHttpClient:
//delete(myRooms["general"], bufConn) // this will be Accept()ed immediately by the go-restful container
case bufConn := <-newHttpClient:
// this will be Accept()ed immediately by restful
// NOTE: we don't store these HTTP connections for broadcast // NOTE: we don't store these HTTP connections for broadcast
// as we manage the session by HTTP Auth Bearer rather than TCP // since we manage the session by HTTP Auth Bearer rather than TCP
myHttpServer.chans <- bufConn myHttpServer.chans <- bufConn
case msg := <-broadcastMsg: case msg := <-broadcastMsg:
msgHistory[msgIndex] = msg // copy comes in, pointer gets saved (and not GC'd, I hope)
msgIndex += 1 myChatHist.msgs[msgIndex] = &msg
msgIndex %= len(msgHistory) myChatHist.i += 1
if myChatHist.c < cap(myChatHist.msgs) {
myChatHist.c += 1
}
myChatHist.i %= len(myChatHist.msgs)
// print the system message (the "log")
t := msg.ReceivedAt t := msg.ReceivedAt
tf := "%d-%02d-%02d %02d:%02d:%02d (%s)" tf := "%d-%02d-%02d %02d:%02d:%02d (%s)"
var sender string var sender string
@ -867,49 +874,50 @@ func main() {
// I wonder if we could use IP detection to get the client's tz // I wonder if we could use IP detection to get the client's tz
// ... could probably make time for this in the authentication loop // ... could probably make time for this in the authentication loop
zone, _ := msg.ReceivedAt.Zone() zone, _ := msg.ReceivedAt.Zone()
// TODO put logging here
//ts, err := msg.ReceivedAt.MarshalJSON()
fmt.Fprintf(os.Stdout, tf+" [%s] (%s):\n\t%s", fmt.Fprintf(os.Stdout, tf+" [%s] (%s):\n\t%s",
t.Year(), t.Month(), t.Day(), t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute(), t.Second(), zone, t.Hour(), t.Minute(), t.Second(), zone,
sender, sender,
msg.User, msg.Message) msg.User, msg.Message)
for conn, _ := range myRawConns { for _, u := range myRawConns {
// Don't echo back to the original client // Don't echo back to the original client
if msg.sender == conn { if msg.sender == u.bufConn {
continue continue
} }
// To ask: Why do I have to pass in conn to prevent a data race? Is it garbage collection? msg := fmt.Sprintf(tf+" [%s]: %s", t.Year(), t.Month(), t.Day(), t.Hour(),
// Don't block the rest of the loop t.Minute(), t.Second(), zone, msg.User, msg.Message)
// TODO maybe use a chan to send to the socket's event loop select {
go func(conn bufferedConn) { case u.newMsg <- msg:
// Protect against malicious clients to prevent DoS // all is well, client was ready to receive
// https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/ default:
timeoutDuration := 5 * time.Second // Rate Limit: Reasonable poor man's DoS prevention (Part 2)
conn.SetWriteDeadline(time.Now().Add(timeoutDuration)) // This client's send channel buffer is full.
_, err := fmt.Fprintf(conn, tf+" [%s]: %s", // It is consuming data too slowly. It may be malicious.
t.Year(), t.Month(), t.Day(), // In the case that it's experiencing network issues,
t.Hour(), t.Minute(), t.Second(), zone, // well, these things happen when you're having network issues.
msg.User, msg.Message) // It can reconnect.
if nil != err { delTcpChat <- u
delTcpChat <- conn }
}
}(conn) /*
// To ask: Why do I have to pass in conn to prevent a data race? Is it garbage collection?
// Don't block the rest of the loop
// TODONE maybe use a chan to send to the socket's event loop
// (left this in to remind myself to ask questions)
go func(conn bufferedConn) {
// Protect against malicious clients to prevent DoS
// https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/
timeoutDuration := 2 * time.Second
conn.SetWriteDeadline(time.Now().Add(timeoutDuration))
_, err := fmt.Fprintf(conn, msg)
if nil != err {
delTcpChat <- u
}
}(conn)
*/
} }
/*
case msg := <-firstMsgs:
fmt.Fprintf(os.Stdout, "f [First Message]\n")
ts, err := msg.ReceivedAt.MarshalJSON()
if nil != err {
fmt.Fprintf(os.Stderr, "f [Error] %s\n", err)
}
fmt.Fprintf(os.Stdout, "f [Timestamp] %s\n", ts)
fmt.Fprintf(os.Stdout, "f [Remote] %s\n", msg.sender.RemoteAddr().String())
fmt.Fprintf(os.Stdout, "f [Message] %s\n", msg.Message)
*/
} }
} }
} }