tavenli 5 gadi atpakaļ
vecāks
revīzija
5e234cd97b
1 mainītis faili ar 1285 papildinājumiem un 0 dzēšanām
  1. 1285 0
      forward-core/Common/KcpPipe.go

+ 1285 - 0
forward-core/Common/KcpPipe.go

@@ -0,0 +1,1285 @@
+package Common
+
+import (
+	"encoding/binary"
+	"errors"
+	"github.com/astaxie/beego/logs"
+	"github.com/vzex/zappy"
+	"math/rand"
+	"net"
+	"sync"
+	"forward-core/ikcp"
+	"github.com/klauspost/reedsolomon"
+	"time"
+)
+
+const WriteBufferSize = 5000               //udp writer will add some data for checksum or encrypt
+const ReadBufferSize = WriteBufferSize * 2 //so reader must be larger
+
+const dataLimit = WriteBufferSize
+
+const mainV = 0
+const subV = 1
+
+func init() {
+
+}
+
+func NewKCP(addr string, setting *KcpSetting) (*UDPListener, error) {
+	udpAddr, err := net.ResolveUDPAddr("udp", addr)
+	if err != nil {
+		return nil, err
+	}
+	udpSocket, _err := net.ListenUDP("udp", udpAddr)
+	if _err != nil {
+		return nil, _err
+	}
+
+	listener := &UDPListener{connChan: make(chan *UDPMakeSession), quitChan: make(chan struct{}), sock: udpSocket, readBuffer: make([]byte, ReadBufferSize*2), sessions: make(map[string]*UDPMakeSession), setting: setting}
+	go listener.loop()
+	return listener, nil
+}
+
+
+func DefaultKcpSetting() *KcpSetting {
+	return &KcpSetting{Nodelay: 1, Interval: 10, Resend: 2, Nc: 1, Sndwnd: 1024, Rcvwnd: 1024, Mtu: 1400}
+}
+
+func DialKcpTimeout(addr string, timeout int) (*UDPMakeSession, error) {
+	return DialKcpTimeoutWithSetting(addr, timeout, DefaultKcpSetting(), 0, 0, false, false)
+}
+
+func DialKcpTimeoutWithSetting(addr string, timeout int, setting *KcpSetting, ds, ps int, comp, confuse bool) (*UDPMakeSession, error) {
+	bReset := false
+	if timeout < 5 {
+		bReset = true
+		timeout = 5
+	} else if timeout > 255 {
+		bReset = true
+		timeout = 255
+	}
+	if bReset {
+		logs.Debug("timeout should in [5, 255], force reset timeout to", timeout)
+	}
+	udpAddr, err := net.ResolveUDPAddr("udp", addr)
+	if err != nil {
+		return nil, err
+	}
+	sock, _err := net.ListenUDP("udp", &net.UDPAddr{})
+	if _err != nil {
+		logs.Debug("dial addr fail", _err.Error())
+		return nil, _err
+	}
+	session := &UDPMakeSession{readBuffer: make([]byte, ReadBufferSize*2), do: make(chan Action), do2: make(chan Action), quitChan: make(chan struct{}), recvChan: make(chan KcpCache), processBuffer: make([]byte, ReadBufferSize), closeChan: make(chan struct{}), encodeBuffer: make([]byte, 7), checkCanWrite: make(chan (chan struct{})), xor: setting.Xor}
+	session.remote = udpAddr
+	session.sock = sock
+	session.status = "firstsyn"
+	session.timeout = int64(timeout)
+	if confuse {
+		session.confuseSeed = rand.Intn(int(WriteBufferSize/2)) + 10
+	}
+	if ds != 0 && ps != 0 {
+		er := session.SetFec(ds, ps)
+		if er != nil {
+			logs.Debug("set fec error:", er.Error())
+		} else {
+			logs.Debug("set fec ok:", ds, ps)
+		}
+	}
+	if comp {
+		session.compressCache = make([]byte, zappy.MaxEncodedLen(ReadBufferSize))
+	}
+	_timeout := int(timeout / 2)
+	if _timeout < 5 {
+		timeout = 5
+	}
+	arg := int(int32(timeout) + int32(mainV<<24) + int32(subV<<16))
+	arg2 := int16((ds << 8) | ps)
+	if comp {
+		arg2 = arg2 | (1 << 7)
+	} else {
+		arg2 = arg2 & ^(1 << 7)
+	}
+	info := makeEncode(session.encodeBuffer, FirstSYN, arg, arg2, session.xor)
+	code := session.doAndWait(func() {
+		sock.WriteToUDP(info, udpAddr)
+	}, _timeout, func(status byte, arg int32, arg2 int16) int {
+		if status == ResetAck {
+			_mainV, _subV := int(byte(arg>>24)), int(byte(arg>>16))
+			logs.Debug("pipe version not eq,%d.%d=>%d.%d", mainV, subV, _mainV, _subV)
+			return 1
+		}
+		if status != FirstACK {
+			logs.Debug("recv not FirstACK", status)
+			return -1
+		} else {
+			session.status = "firstack"
+			session.id = int(arg)
+			return 0
+		}
+	})
+	if code != 0 {
+		logs.Debug("handshakefail with code", code)
+		return nil, errors.New("handshake fail,1")
+	}
+	code = session.doAndWait(func() {
+		if session.confuseSeed > 0 {
+			sock.WriteToUDP(makeEncode(session.encodeBuffer, SndSYN, session.id, 1, session.xor), udpAddr)
+		} else {
+			sock.WriteToUDP(makeEncode(session.encodeBuffer, SndSYN, session.id, 0, session.xor), udpAddr)
+		}
+	}, _timeout, func(status byte, arg int32, arg2 int16) int {
+		if status != SndACK {
+			return -1
+		} else if session.id != int(arg) {
+			return 2
+		} else {
+			session.status = "ok"
+			return 0
+		}
+	})
+	if code != 0 {
+		return nil, errors.New("handshake fail,2")
+	}
+	session.kcp = ikcp.Ikcp_create(uint32(session.id), session)
+	session.kcp.Output = udp_output
+	ikcp.Ikcp_wndsize(session.kcp, setting.Sndwnd, setting.Rcvwnd)
+	ikcp.Ikcp_nodelay(session.kcp, setting.Nodelay, setting.Interval, setting.Resend, setting.Nc)
+	ikcp.Ikcp_setmtu(session.kcp, setting.Mtu)
+	go session.loop()
+	return session, nil
+}
+
+type UDPListener struct {
+	connChan     chan *UDPMakeSession
+	quitChan     chan struct{}
+	sock         *net.UDPConn
+	readBuffer   []byte
+	sessions     map[string]*UDPMakeSession
+	sessionsLock sync.RWMutex
+	setting      *KcpSetting
+}
+
+
+type UDPMakeSession struct {
+	id                int
+	idstr             string
+	status            string
+	overTime          int64
+	quitChan          chan struct{}
+	closeChan         chan struct{}
+	recvChan          chan KcpCache
+	handShakeChan     chan string
+	handShakeChanQuit chan struct{}
+	sock              *net.UDPConn
+	remote            *net.UDPAddr
+	kcp               *ikcp.Ikcpcb
+	do                chan Action
+	do2               chan Action
+	checkCanWrite     chan (chan struct{})
+	listener          *UDPListener
+	closed            bool
+
+	readBuffer    []byte
+	processBuffer []byte
+	encodeBuffer  []byte
+	timeout       int64
+
+	xor string
+
+	compressCache   []byte
+	fecDataShards   int
+	fecParityShards int
+	fecW            *reedsolomon.Encoder
+	fecR            *reedsolomon.Encoder
+	fecRCacheTbl    map[uint]*fecInfo
+	fecWCacheTbl    *fecInfo
+	fecWriteId      uint //uint16
+	fecSendC        uint
+	fecSendL        int
+	fecRecvId       uint
+
+	confuseSeed int
+}
+
+type KcpSetting struct {
+	Nodelay  int32
+	Interval int32 //not for set
+	Resend   int32
+	Nc       int32
+
+	Sndwnd int32
+	Rcvwnd int32
+
+	Mtu int32
+
+	Xor string
+}
+
+
+type KcpCache struct {
+	b []byte
+	l int
+	c chan int
+}
+
+
+type Action struct {
+	t    string
+	args []interface{}
+}
+
+type fecInfo struct {
+	bytes    [][]byte
+	overTime int64
+}
+
+func iclock() int32 {
+	return int32((time.Now().UnixNano() / 1000000) & 0xffffffff)
+}
+
+func udp_output(buf []byte, _len int32, kcp *ikcp.Ikcpcb, user interface{}) int32 {
+	c := user.(*UDPMakeSession)
+	//logs.Debug("send udp", _len, c.remote.String())
+	c.output(buf[:_len])
+	return 0
+}
+
+
+func (l *UDPListener) Accept() (net.Conn, error) {
+	var c *UDPMakeSession
+	var err error
+	select {
+	case c = <-l.connChan:
+	case <-l.quitChan:
+	}
+	if c == nil {
+		err = errors.New("listener quit")
+	}
+	return net.Conn(c), err
+}
+
+func (l *UDPListener) Dump() {
+	l.sessionsLock.RLock()
+	defer l.sessionsLock.RUnlock()
+	for addr, session := range l.sessions {
+		logs.Debug("listener", addr, session.status)
+	}
+}
+
+func (l *UDPListener) inner_loop() {
+	sock := l.sock
+	for {
+		n, from, err := sock.ReadFromUDP(l.readBuffer)
+		if err == nil {
+			//logs.Debug("recv", n, from)
+			addr := from.String()
+			l.sessionsLock.RLock()
+			session, bHave := l.sessions[addr]
+			l.sessionsLock.RUnlock()
+			if bHave {
+				if session.status == "ok" {
+					if session.remote.String() == from.String() && (n >= int(ikcp.IKCP_OVERHEAD) || session.compressCache != nil) {
+						__xor(l.readBuffer, session.xor)
+						var buf []byte
+						if n <= 7 || session.compressCache == nil {
+							buf = make([]byte, n)
+							copy(buf, l.readBuffer[:n])
+						} else {
+							_b, _er := zappy.Decode(nil, l.readBuffer[:n])
+							if _er != nil {
+								logs.Debug("decompress fail", _er.Error())
+								//go session.Close()
+								//don't close pipe, just drop this data
+								continue
+							}
+							buf = _b
+							//logs.Debug("decompress", n, len(_b))
+						}
+						session.DoAction2("input", buf, len(buf))
+					}
+					continue
+				} else {
+					session.serverDo(string(l.readBuffer[:n]))
+				}
+			} else {
+				status, _, fec := makeDecode(l.readBuffer[:n], l.setting.Xor)
+				if status != FirstSYN {
+					go sock.WriteToUDP([]byte("0"), from)
+					logs.Debug("invalid package,reset", from, status)
+					continue
+				}
+				sessionId := GetId("udp")
+				session = &UDPMakeSession{status: "init", overTime: time.Now().Unix() + 10, remote: from, sock: sock, recvChan: make(chan KcpCache), quitChan: make(chan struct{}), readBuffer: make([]byte, ReadBufferSize*2), processBuffer: make([]byte, ReadBufferSize), timeout: 30, do: make(chan Action), do2: make(chan Action), id: sessionId, handShakeChan: make(chan string), handShakeChanQuit: make(chan struct{}), listener: l, closeChan: make(chan struct{}), encodeBuffer: make([]byte, 7), checkCanWrite: make(chan (chan struct{})), xor: l.setting.Xor}
+				if fec & ^(1<<7) != 0 {
+					er := session.SetFec((int(fec)>>8)&0xff, int(fec&^(1<<7)&0xff))
+					if er != nil {
+						logs.Debug("set fec error:", fec, er.Error())
+					} else {
+						logs.Debug("set fec ok:", (int(fec)>>8)&0xff, int(fec&^(1<<7)&0xff))
+					}
+				}
+				if int(fec)&(1<<7) != 0 {
+					session.compressCache = make([]byte, zappy.MaxEncodedLen(ReadBufferSize))
+				}
+				l.sessionsLock.Lock()
+				l.sessions[addr] = session
+				l.sessionsLock.Unlock()
+				session.serverInit(l, l.setting)
+				session.serverDo(string(l.readBuffer[:n]))
+			}
+			//logs.Debug("debug out.........")
+		} else {
+			e, ok := err.(net.Error)
+			if !ok || !e.Timeout() {
+				logs.Debug("recv error", err.Error(), from)
+				l.remove(from.String())
+				//time.Sleep(time.Second)
+				break
+			}
+		}
+	}
+}
+
+func (l *UDPListener) remove(addr string) {
+	logs.Debug("listener remove", addr)
+	l.sessionsLock.Lock()
+	defer l.sessionsLock.Unlock()
+	session, bHave := l.sessions[addr]
+	if bHave {
+		RmId("udp", session.id)
+	}
+	delete(l.sessions, addr)
+}
+
+func (l *UDPListener) Close() error {
+	if l.sock != nil {
+		l.sock.Close()
+		l.sock = nil
+	} else {
+		return nil
+	}
+	close(l.quitChan)
+	return nil
+}
+
+func (l *UDPListener) Addr() net.Addr {
+	return nil
+}
+
+func (l *UDPListener) loop() {
+	l.inner_loop()
+}
+
+
+
+const (
+	Reset     byte = 0
+	FirstSYN  byte = 6
+	FirstACK  byte = 1
+	SndSYN    byte = 2
+	SndACK    byte = 2
+	Data      byte = 4
+	Ping      byte = 5
+	Close     byte = 7
+	CloseBack byte = 8
+	ResetAck  byte = 9
+
+	Fake byte = 50
+)
+
+func __xor(s []byte, xor string) []byte {
+	if len(xor) == 0 {
+		return s
+	}
+	encodingData := []byte(xor)
+	encodingLen := len(encodingData)
+	n := len(s)
+	if n == 0 {
+		return s
+	}
+	for i := 0; i < n; i++ {
+		s[i] = s[i] ^ encodingData[i%encodingLen]
+	}
+	return s
+}
+
+func _xor(s []byte, xor string) []byte {
+	if len(xor) == 0 {
+		return s
+	}
+	encodingData := []byte(xor)
+	encodingLen := len(encodingData)
+	n := len(s)
+	if n == 0 {
+		return s
+	}
+	r := make([]byte, n)
+	for i := 0; i < n; i++ {
+		r[i] = s[i] ^ encodingData[i%encodingLen]
+	}
+	return r
+}
+
+func makeEncode(buf []byte, status byte, arg int, arg2 int16, xor string) []byte {
+	buf[0] = status
+	binary.LittleEndian.PutUint32(buf[1:], uint32(arg))
+	binary.LittleEndian.PutUint16(buf[5:], uint16(arg2))
+	return _xor(buf, xor)
+}
+
+func makeDecode(data []byte, xor string) (status byte, arg int32, arg2 int16) {
+	if len(data) < 7 {
+		return Reset, 0, 0
+	}
+	data = _xor(data, xor)
+	status = data[0]
+	arg = int32(binary.LittleEndian.Uint32(data[1:5]))
+	arg2 = int16(binary.LittleEndian.Uint16(data[5:7]))
+	return
+}
+
+
+
+
+func (session *UDPMakeSession) SetFec(DataShards, ParityShards int) (er error) {
+	session.fecDataShards = DataShards
+	session.fecParityShards = ParityShards
+	var fec reedsolomon.Encoder
+	fec, er = reedsolomon.New(DataShards, ParityShards)
+	if er != nil {
+		return
+	}
+	session.fecR = &fec
+	fec, er = reedsolomon.New(DataShards, ParityShards)
+	if er == nil {
+		session.fecRCacheTbl = make(map[uint]*fecInfo)
+		session.fecWCacheTbl = nil
+		session.fecW = &fec
+	} else {
+		session.fecR = nil
+	}
+	return
+}
+
+func (session *UDPMakeSession) doAndWait(f func(), sec int, readf func(status byte, arg int32, arg2 int16) int) (code int) {
+	t := time.NewTicker(50 * time.Millisecond)
+	currT := time.Now().Unix()
+	f()
+out:
+	for {
+		select {
+		case <-t.C:
+			if time.Now().Unix()-currT >= int64(sec) {
+				logs.Debug("session timeout")
+				code = -1
+				break out
+			}
+			session.sock.SetReadDeadline(time.Now().Add(2 * time.Second))
+			n, from, err := session.sock.ReadFromUDP(session.readBuffer)
+			if err != nil {
+				e, ok := err.(net.Error)
+				if !ok || !e.Timeout() {
+					logs.Debug("recv error", err.Error(), from)
+					code = -2
+					break out
+				}
+			} else {
+				code = readf(makeDecode(session.readBuffer[:n], session.xor))
+				if code >= 0 {
+					break out
+				}
+			}
+			go f()
+		}
+	}
+	t.Stop()
+	if code > 0 {
+		logs.Debug("handshake fail,got code", code)
+	}
+	return
+}
+
+func (session *UDPMakeSession) writeTo(b []byte) {
+	if session.compressCache != nil && len(b) > 7 {
+		enc, er := zappy.Encode(session.compressCache, b)
+		if er != nil {
+			logs.Debug("compress error", er.Error())
+			go session.Close()
+			return
+		}
+		//logs.Debug("compress", len(b), len(enc))
+		session.sock.WriteTo(__xor(enc, session.xor), session.remote)
+	} else {
+		session.sock.WriteTo(__xor(b, session.xor), session.remote)
+	}
+}
+
+func (session *UDPMakeSession) output(b []byte) {
+	if session.fecW == nil {
+		session.writeTo(b)
+	} else {
+		id := session.fecWriteId
+		session.fecSendC++
+
+		info := session.fecWCacheTbl
+		if info == nil {
+			info = &fecInfo{make([][]byte, session.fecDataShards+session.fecParityShards), time.Now().Unix() + 15}
+			session.fecWCacheTbl = info
+		}
+		_b := make([]byte, len(b)+7)
+		_len := len(b)
+		_b[0] = byte(_len & 0xff)
+		_b[1] = byte((_len >> 8) & 0xff)
+		_b[2] = byte(id & 0xff)
+		_b[3] = byte((id >> 8) & 0xff)
+		_b[4] = byte((id >> 16) & 0xff)
+		_b[5] = byte((id >> 32) & 0xff)
+		_b[6] = byte(session.fecSendC - 1)
+		copy(_b[7:], b)
+		info.bytes[session.fecSendC-1] = _b
+		if session.fecSendL < len(_b) {
+			session.fecSendL = len(_b)
+		}
+		session.writeTo(_b)
+		if session.fecSendC >= uint(session.fecDataShards) {
+			for i := 0; i < session.fecDataShards; i++ {
+				if session.fecSendL > len(info.bytes[i]) {
+					__b := make([]byte, session.fecSendL)
+					copy(__b, info.bytes[i])
+					info.bytes[i] = __b
+				}
+			}
+			for i := 0; i < session.fecParityShards; i++ {
+				info.bytes[i+session.fecDataShards] = make([]byte, session.fecSendL)
+			}
+			er := (*session.fecW).Encode(info.bytes)
+			if er != nil {
+				//logs.Debug("wocao encode err", er.Error())
+				go session.Close()
+				return
+			}
+			for i := session.fecDataShards; i < session.fecDataShards+session.fecParityShards; i++ {
+				//if rand.Intn(100) >= 15 {
+				_info := info.bytes[i]
+				session.writeTo(_info)
+				//_len := int(_info[0]) | (int(_info[1]) << 8)
+				//logs.Debug("output udp id", id, i, _len, len(_info))
+				//} else {
+				//	logs.Debug("drop output udp id", id, i, _len, len(_info))
+				//}
+			}
+			session.fecWCacheTbl = nil
+			session.fecSendC = 0
+			session.fecSendL = 0
+			session.fecWriteId++
+			//logs.Debug("flush id", id)
+		}
+		//logs.Debug("output sn", c.fecWriteId, c.fecSendC, _len)
+	}
+}
+
+func (session *UDPMakeSession) serverDo(s string) {
+	go func() {
+		//logs.Debug("prepare handshake", session.remote)
+		select {
+		case <-session.handShakeChanQuit:
+		case session.handShakeChan <- s:
+		}
+	}()
+}
+func (session *UDPMakeSession) serverInit(l *UDPListener, setting *KcpSetting) {
+	go func() {
+		c := time.NewTicker(50 * time.Millisecond)
+		defer func() {
+			close(session.handShakeChanQuit)
+			c.Stop()
+			if session.status != "ok" {
+				session.Close()
+			}
+		}()
+		overTime := time.Now().Unix() + session.timeout
+		for {
+			select {
+			case s := <-session.handShakeChan:
+				//logs.Debug("process handshake", session.remote)
+				status, arg, arg2 := makeDecode([]byte(s), session.xor)
+				switch session.status {
+				case "init":
+					if status != FirstSYN {
+						logs.Debug("status != FirstSYN, reset", session.remote, status)
+						session.sock.WriteToUDP(makeEncode(session.encodeBuffer, Reset, 0, 0, session.xor), session.remote)
+						return
+					}
+					_mainV, _subV := int(byte(arg>>24)), int(byte(arg>>16))
+					if _mainV != mainV || _subV != subV {
+						session.sock.WriteToUDP(makeEncode(session.encodeBuffer, ResetAck, (mainV<<24)+(subV<<16), 0, session.xor), session.remote)
+						logs.Debug("pipe version not eq,kickout,%d.%d=>%d.%d", mainV, subV, _mainV, _subV)
+						return
+					}
+					session.status = "firstack"
+					session.timeout = int64(arg & 0xff)
+					session.sock.WriteToUDP(makeEncode(session.encodeBuffer, FirstACK, session.id, 0, session.xor), session.remote)
+					overTime = time.Now().Unix() + session.timeout
+				case "firstack":
+					if status != SndSYN {
+						logs.Debug("status != SndSYN, nothing", session.remote, status)
+						/*
+							if status == FirstSYN {
+								session.sock.WriteToUDP(makeEncode(session.encodeBuffer, FirstACK, session.id), session.remote, session.xor)
+							}*/
+						return
+					}
+					if arg2 > 0 {
+						session.confuseSeed = rand.Intn(int(WriteBufferSize/2)) + 10
+						logs.Debug("confuse!")
+					}
+					session.status = "ok"
+					session.kcp = ikcp.Ikcp_create(uint32(session.id), session)
+					session.kcp.Output = udp_output
+					ikcp.Ikcp_wndsize(session.kcp, setting.Sndwnd, setting.Rcvwnd)
+					ikcp.Ikcp_nodelay(session.kcp, setting.Nodelay, setting.Interval, setting.Resend, setting.Nc)
+					ikcp.Ikcp_setmtu(session.kcp, setting.Mtu)
+					go session.loop()
+					go func() {
+						select {
+						case l.connChan <- session:
+						case <-l.quitChan:
+						}
+					}()
+					session.sock.WriteToUDP(makeEncode(session.encodeBuffer, SndACK, session.id, 0, session.xor), session.remote)
+					overTime = time.Now().Unix() + session.timeout
+				}
+			case <-c.C:
+				if time.Now().Unix() > overTime {
+					return
+				}
+				switch session.status {
+				case "firstack":
+					session.sock.WriteToUDP(makeEncode(session.encodeBuffer, FirstACK, session.id, 0, session.xor), session.remote)
+				case "ok":
+					buf := make([]byte, 7)
+					session.sock.WriteToUDP(makeEncode(buf, SndACK, session.id, 0, session.xor), session.remote)
+				}
+			}
+		}
+	}()
+}
+
+func (session *UDPMakeSession) loop() {
+	curr := time.Now().Unix()
+	session.overTime = curr + session.timeout
+	ping := make(chan struct{})
+	pingC := 0
+	callUpdate := false
+	updateC := make(chan struct{})
+	if session.listener == nil {
+		go func() {
+			tmp := session.readBuffer
+			t := time.Time{}
+			session.sock.SetReadDeadline(t)
+			for {
+				n, from, err := session.sock.ReadFromUDP(tmp)
+				if err != nil {
+					e, ok := err.(net.Error)
+					if !ok || !e.Timeout() {
+						break
+					}
+				}
+				if session.remote.String() == from.String() {
+					//logs.Debug("===", n, len(session.compressCache))
+					if n >= int(ikcp.IKCP_OVERHEAD) || n <= 7 || session.compressCache != nil {
+						__xor(session.readBuffer[:n], session.xor)
+						var buf []byte
+						if n <= 7 || session.compressCache == nil {
+							buf = make([]byte, n)
+							copy(buf, session.readBuffer[:n])
+						} else {
+							_b, _er := zappy.Decode(nil, session.readBuffer[:n])
+							if _er != nil {
+								logs.Debug("decompress fail", _er.Error())
+								//go session.Close()
+								//don't close pipe, just drop data
+								continue
+							}
+							buf = _b
+							//logs.Debug("decompress", n, len(_b))
+						}
+						session.DoAction2("input", buf, len(buf))
+					}
+				}
+			}
+		}()
+	}
+	updateF := func(n time.Duration) {
+		if !callUpdate {
+			callUpdate = true
+			time.AfterFunc(n*time.Millisecond, func() {
+				select {
+				case updateC <- struct{}{}:
+				case <-session.quitChan:
+				}
+			})
+		}
+	}
+
+	fastCheck := false
+	waitList := [](chan struct{}){}
+	recoverChan := make(chan struct{})
+
+	var waitRecvCache *KcpCache
+	var forceWait int64 = 0
+	go func() {
+	out:
+		for {
+			select {
+			//session.wait.Done()
+			case <-ping:
+				updateF(50)
+				pingC++
+				if pingC >= 4 {
+					pingC = 0
+					if ikcp.Ikcp_waitsnd(session.kcp) <= dataLimit/2 {
+						go session.DoWrite(makeEncode(session.encodeBuffer, Ping, 0, 0, session.xor))
+					}
+
+					if session.fecR != nil {
+						curr := time.Now().Unix()
+						for id, info := range session.fecRCacheTbl {
+							if curr >= info.overTime {
+								delete(session.fecRCacheTbl, id)
+								if session.fecRecvId <= id {
+									session.fecRecvId = id + 1
+								}
+								//logs.Debug("timeout after del", id, len(c.fecRCacheTbl))
+							}
+						}
+					}
+					if forceWait > 0 {
+						if time.Now().Unix() > forceWait && ikcp.Ikcp_waitsnd(session.kcp) <= dataLimit/2 {
+							forceWait = 0
+							go func() {
+								select {
+								case <-session.quitChan:
+								case recoverChan <- struct{}{}:
+								}
+							}()
+						}
+					}
+				}
+				if time.Now().Unix() > session.overTime {
+					logs.Debug("overtime close", session.LocalAddr().String(), session.RemoteAddr().String())
+					go session.Close()
+				} else {
+					time.AfterFunc(300*time.Millisecond, func() {
+						select {
+						case ping <- struct{}{}:
+						case <-session.quitChan:
+						}
+					})
+				}
+			case <-recoverChan:
+				fastCheck = false
+				for _, r := range waitList {
+					//logs.Debug("recover writing data")
+					select {
+					case r <- struct{}{}:
+					case <-session.quitChan:
+					}
+				}
+				waitList = [](chan struct{}){}
+			case c := <-session.checkCanWrite:
+				if session.confuseSeed > 0 {
+					if WriteBufferSize/2-rand.Intn(session.confuseSeed) < 500 || rand.Intn(100) < 5 {
+						//make a sleep
+						forceWait = time.Now().Add(time.Millisecond * time.Duration(rand.Intn(1000))).Unix()
+					}
+				}
+				if ikcp.Ikcp_waitsnd(session.kcp) > dataLimit {
+					//logs.Debug("wait for data limit")
+					waitList = append(waitList, c)
+					if !fastCheck {
+						fastCheck = true
+						var f func()
+						f = func() {
+							n := ikcp.Ikcp_waitsnd(session.kcp)
+							//logs.Debug("fast check!", n, len(waitList))
+							if n <= dataLimit/2 {
+								select {
+								case <-session.quitChan:
+									logs.Debug("recover writing data quit")
+								case recoverChan <- struct{}{}:
+								}
+							} else {
+								updateF(20)
+								time.AfterFunc(40*time.Millisecond, f)
+							}
+						}
+						time.AfterFunc(20*time.Millisecond, f)
+					}
+				} else if forceWait > 0 && time.Now().Unix() < forceWait {
+					waitList = append(waitList, c)
+				} else {
+					select {
+					case c <- struct{}{}:
+					case <-session.quitChan:
+					}
+				}
+			case ca := <-session.recvChan:
+				tmp := session.processBuffer
+				for {
+					hr := ikcp.Ikcp_recv(session.kcp, tmp, ReadBufferSize)
+					if hr > 0 {
+						status := tmp[0]
+						if status == Data {
+							n := 1
+							l := hr
+							if session.confuseSeed > 0 {
+								n = 3
+								l = (int32(tmp[1]) | (int32(tmp[2]) << 8)) + int32(n)
+							}
+							//logs.Debug("try recv", hr, n, l)
+							copy(ca.b, tmp[n:int(l)])
+							select {
+							case ca.c <- int(int(l) - n):
+							case <-session.quitChan:
+							}
+							break
+						} else if status == Fake {
+						} else {
+							session.DoAction("recv", status)
+						}
+					} else {
+						waitRecvCache = &ca
+						break
+					}
+				}
+			case action := <-session.do2:
+				switch action.t {
+				case "input":
+					session.overTime = time.Now().Unix() + session.timeout
+					args := action.args
+					s := args[0].([]byte)
+					n := args[1].(int)
+
+					if session.fecR != nil {
+						if len(s) <= 7 {
+							if n < 7 {
+								logs.Debug("recv reset")
+								go session._Close(false)
+								break
+							} else if n == 7 {
+								status, _, _ := makeDecode(s, session.xor)
+								if status == Reset || status == ResetAck {
+									logs.Debug("recv reset2", status)
+									go session._Close(false)
+								}
+								break
+							}
+							break
+						}
+						id := uint(int(s[2]) | (int(s[3]) << 8) | (int(s[4]) << 16) | (int(s[5]) << 24))
+						var seq uint = uint(s[6])
+						_len := int(s[0]) | (int(s[1]) << 8)
+
+						//binary.Read(head[:4], binary.LittleEndian, &id)
+						if id < session.fecRecvId {
+							//logs.Debug("drop id for noneed", id, seq)
+							break
+						}
+						if seq < uint(session.fecDataShards) {
+							ikcp.Ikcp_input(session.kcp, s[7:], _len)
+							//logs.Debug("direct input udp", id, seq, _len)
+						}
+						if seq >= uint(session.fecDataShards+session.fecParityShards) {
+							logs.Debug("-ds and -ps must be equal on both sides")
+							go session.Close()
+							break
+						}
+
+						tbl, have := session.fecRCacheTbl[id]
+						if !have {
+							tbl = &fecInfo{make([][]byte, session.fecDataShards+session.fecParityShards), time.Now().Unix() + 3}
+							session.fecRCacheTbl[id] = tbl
+						}
+						//logs.Debug("got", id, seq, n, _len)
+						if tbl.bytes[seq] != nil {
+							//dup, drop
+							break
+						} else {
+							tbl.bytes[seq] = s
+						}
+						count := 0
+						reaL := 0
+						for _, v := range tbl.bytes {
+							if v != nil {
+								count++
+								if reaL < len(v) {
+									reaL = len(v)
+								}
+							}
+						}
+						if count >= session.fecDataShards {
+							markTbl := make(map[int]bool, len(tbl.bytes))
+							for _seq, _b := range tbl.bytes {
+								if _b != nil {
+									markTbl[_seq] = true
+								}
+							}
+							bNeedRebuild := false
+							for i, v := range tbl.bytes {
+								if v != nil {
+									if i >= session.fecDataShards {
+										bNeedRebuild = true
+									}
+									if len(v) < reaL {
+										_b := make([]byte, reaL)
+										copy(_b, v)
+										tbl.bytes[i] = _b
+									}
+								}
+							}
+
+							if bNeedRebuild {
+								er := (*session.fecR).Reconstruct(tbl.bytes)
+								if er != nil {
+									//logs.Debug("2Reconstruct fail, close pipe", count, session.fecDataShards, session.fecParityShards, er.Error())
+									//break //broken data, may be should be closed, now just keep input
+								} else {
+									//logs.Debug("Reconstruct ok, input", id)
+									for i := 0; i < session.fecDataShards; i++ {
+										if _, have := markTbl[i]; !have {
+											_len := int(tbl.bytes[i][0]) | (int(tbl.bytes[i][1]) << 8)
+											ikcp.Ikcp_input(session.kcp, tbl.bytes[i][7:], int(_len))
+											//logs.Debug("fec input for mark ok", i, id, _len)
+										}
+									}
+								}
+							}
+							delete(session.fecRCacheTbl, id)
+							//logs.Debug("after del", id, len(c.fecRCacheTbl))
+							if session.fecRecvId <= id {
+								session.fecRecvId = id + 1
+							}
+						}
+					} else {
+						if n < 7 {
+							logs.Debug("recv reset")
+							go session._Close(false)
+							break
+						} else if n == 7 {
+							status, _, _ := makeDecode(s, session.xor)
+							if status == Reset || status == ResetAck {
+								logs.Debug("recv reset2", status)
+								go session._Close(false)
+							}
+							break
+						}
+						ikcp.Ikcp_input(session.kcp, s, n)
+					}
+
+					if waitRecvCache != nil {
+						ca := *waitRecvCache
+						tmp := session.processBuffer
+						for {
+							hr := ikcp.Ikcp_recv(session.kcp, tmp, ReadBufferSize)
+							if hr > 0 {
+								status := tmp[0]
+								if status == Data {
+									n := 1
+									l := hr
+									if session.confuseSeed > 0 {
+										n = 3
+										l = (int32(tmp[1]) | (int32(tmp[2]) << 8)) + int32(n)
+									}
+									//logs.Debug("try recv", n, l)
+									waitRecvCache = nil
+									copy(ca.b, tmp[n:int(l)])
+									select {
+									case ca.c <- (int(l) - n):
+									case <-session.quitChan:
+									}
+									break
+								} else if status == Fake {
+								} else {
+									session.DoAction("recv", status)
+								}
+							} else {
+								break
+							}
+						}
+					}
+					updateF(10)
+				case "write":
+					b := action.args[0].([]byte)
+					ikcp.Ikcp_send(session.kcp, b, len(b))
+					updateF(10)
+				}
+			case <-session.quitChan:
+				break out
+			case <-updateC:
+				now := uint32(iclock())
+				ikcp.Ikcp_update(session.kcp, now)
+				callUpdate = false
+			}
+		}
+	}()
+	select {
+	case ping <- struct{}{}:
+	case <-session.quitChan:
+	}
+out:
+	for {
+		select {
+		case action := <-session.do:
+			switch action.t {
+			case "quit":
+				if session.closed {
+					break
+				}
+				session._Close(true)
+			case "closebegin":
+				//A tell B to close and wait
+				time.AfterFunc(time.Millisecond*500, func() {
+					//logs.Debug("close over, step3", session.LocalAddr().String(), session.RemoteAddr().String())
+					go session.DoAction("closeover")
+				})
+				buf := make([]byte, 7)
+				go session.DoWrite(makeEncode(buf, Close, 0, 0, session.xor))
+			case "closeover":
+				//A call timeover
+				close(session.closeChan)
+				break out
+			case "closeend":
+				//B call close
+				session._Close(false)
+				break out
+			case "recv":
+				status := (action.args[0]).(byte)
+				switch status {
+				case CloseBack:
+					//A call from B
+					//logs.Debug("recv back close, step2", session.LocalAddr().String(), session.RemoteAddr().String())
+					go session.DoAction("closeover")
+				case Close:
+					if session.closed {
+						break
+					}
+					if session.status != "ok" {
+						session._Close(false)
+					} else {
+						//logs.Debug("recv remote close, step1", session.LocalAddr().String(), session.RemoteAddr().String())
+						buf := make([]byte, 7)
+						go session.DoWrite(makeEncode(buf, CloseBack, 0, 0, session.xor))
+						time.AfterFunc(time.Millisecond*500, func() {
+							//logs.Debug("close remote over, step4", session.LocalAddr().String(), session.RemoteAddr().String())
+							if session.closed {
+								return
+							}
+							go session.DoAction("closeend")
+						})
+					}
+				case Reset:
+					logs.Debug("recv reset")
+					go session._Close(false)
+				case Ping:
+				default:
+					if session.status != "ok" {
+						session.Close()
+					}
+				}
+			}
+		}
+	}
+}
+
+func (session *UDPMakeSession) _Close(bFirstCall bool) {
+	if session.closed {
+		return
+	}
+	session.closed = true
+	//session.wait.Wait()
+	go func() {
+		//logs.Debug("pipe begin close", session.LocalAddr().String(), session.RemoteAddr().String())
+		if bFirstCall {
+			go session.DoAction("closebegin", session.LocalAddr().String())
+			<-session.closeChan
+		} else {
+			close(session.closeChan)
+		}
+		//logs.Debug("pipe end close", session.id)
+		close(session.quitChan)
+		if session.listener != nil {
+			session.listener.remove(session.remote.String())
+		} else {
+			if session.sock != nil {
+				session.sock.Close()
+			}
+		}
+	}()
+}
+
+func (session *UDPMakeSession) Close() error {
+	if session.closed {
+		return nil
+	}
+	if session.status != "ok" {
+		session._Close(false)
+		return nil
+	}
+	go session.DoAction("quit")
+	<-session.closeChan
+	return nil
+}
+
+func (session *UDPMakeSession) DoAction2(action string, args ...interface{}) {
+	//session.wait.Add(1)
+	//logs.Debug(action, len(args))
+	select {
+	case session.do2 <- Action{t: action, args: args}:
+	case <-session.quitChan:
+		//session.wait.Done()
+	}
+}
+func (session *UDPMakeSession) DoAction(action string, args ...interface{}) {
+	//session.wait.Add(1)
+	//logs.Debug(action, len(args))
+	select {
+	case session.do <- Action{t: action, args: args}:
+	case <-session.quitChan:
+		//session.wait.Done()
+	}
+}
+
+func (session *UDPMakeSession) processInput(s string, n int) {
+}
+
+func (session *UDPMakeSession) LocalAddr() net.Addr {
+	return session.sock.LocalAddr()
+}
+
+func (session *UDPMakeSession) RemoteAddr() net.Addr {
+	return net.Addr(session.remote)
+}
+
+func (session *UDPMakeSession) SetDeadline(t time.Time) error {
+	return session.sock.SetDeadline(t)
+}
+
+func (session *UDPMakeSession) SetReadDeadline(t time.Time) error {
+	return session.sock.SetReadDeadline(t)
+}
+
+func (session *UDPMakeSession) SetWriteDeadline(t time.Time) error {
+	return session.sock.SetWriteDeadline(t)
+}
+
+func (session *UDPMakeSession) DoWrite(s []byte) bool {
+	wc := make(chan struct{})
+	select {
+	case session.checkCanWrite <- wc:
+		select {
+		case <-wc:
+		case <-session.quitChan:
+			return false
+		}
+		session.DoAction2("write", s)
+		return true
+	case <-session.quitChan:
+		return false
+	}
+}
+
+func (session *UDPMakeSession) Write(b []byte) (n int, err error) {
+	sendL := len(b)
+	if sendL == 0 || session.status != "ok" {
+		return 0, nil
+	}
+	var data []byte
+	if session.confuseSeed <= 0 {
+		data = make([]byte, sendL+1)
+		data[0] = Data
+		copy(data[1:], b)
+	} else {
+		remain := WriteBufferSize - sendL - 3
+		if remain > 0 && (WriteBufferSize/2-rand.Intn(session.confuseSeed) < 500 || rand.Intn(100) < 5) {
+			if remain > 1000 {
+				remain = 1000
+			}
+			remain = rand.Intn(remain / 2)
+		} else {
+			remain = 0
+		}
+		data = make([]byte, sendL+remain+3)
+		data[0] = Data
+		data[1] = byte(sendL & 0xff)
+		data[2] = byte((sendL >> 8) & 0xff)
+		copy(data[3:], b)
+		//logs.Debug("try send", len(data), sendL)
+		//copy(data[3+sendL:], b)
+	}
+	ok := session.DoWrite(data)
+	if !ok {
+		return 0, errors.New("closed")
+	}
+	if session.confuseSeed > 0 {
+		if rand.Intn(WriteBufferSize/2) > session.confuseSeed/2 {
+			n := rand.Intn(session.confuseSeed)
+			if n > 10 {
+				d := make([]byte, n)
+				d[0] = Fake
+				session.DoWrite(d)
+			}
+		}
+	}
+	return sendL, err
+}
+
+//udp read does not relay on the len(p), please make a big enough array to cache data
+func (session *UDPMakeSession) Read(p []byte) (n int, err error) {
+	wc := KcpCache{p, 0, make(chan int)}
+	select {
+	case session.recvChan <- wc:
+		select {
+		case n = <-wc.c:
+		case <-session.quitChan:
+			n = -1
+		}
+	case <-session.quitChan:
+		n = -1
+	}
+	//logs.Debug("real recv", l, string(b[:l]))
+	if n == -1 {
+		return 0, errors.New("force quit for read error")
+	} else {
+		return n, nil
+	}
+}
+
+
+
+type _reuseTbl struct {
+	tbl map[int]bool
+}
+
+var currIdMap map[string]int
+var reuseTbl map[string]*_reuseTbl
+
+func GetId(name string) int {
+	if currIdMap == nil {
+		currIdMap = make(map[string]int)
+		currIdMap[name] = 0
+	}
+	i, _ := currIdMap[name]
+	i++
+	if i >= 2147483647 {
+		i = 0
+	}
+	currIdMap[name] = i
+	//	println("gen new id", currIdMap[name])
+	return i
+}
+
+func RmId(name string, id int) {
+	return
+}

粤ICP备19079148号