UdpForward.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package Service
  2. import (
  3. "github.com/astaxie/beego/logs"
  4. "net"
  5. "sync"
  6. "time"
  7. )
  8. type UdpForward struct {
  9. SrcAddr *net.UDPAddr
  10. DestAddr *net.UDPAddr
  11. LClientAddr *net.UDPAddr
  12. UdpListenerConn *net.UDPConn
  13. UdpConns map[string]UdpConn
  14. UdpConnsMutex *sync.RWMutex
  15. ChkActTime time.Duration
  16. Closed bool
  17. ConnectedEvent func(addr string)
  18. DisConnectedEvent func(addr string)
  19. }
  20. type UdpConn struct {
  21. udp *net.UDPConn
  22. lastActive time.Time
  23. }
  24. const bufferSize = 4096
  25. var chkActTime = time.Minute * 1
  26. func NewUdpForward() *UdpForward {
  27. return &UdpForward{
  28. UdpConns:make(map[string]UdpConn),
  29. UdpConnsMutex:new(sync.RWMutex),
  30. ChkActTime:chkActTime,
  31. ConnectedEvent:func(addr string) {},
  32. DisConnectedEvent:func(addr string){},
  33. }
  34. }
  35. func (_self *UdpForward) DoUdpForward (srcAddr string, destAddr string) error {
  36. var err error
  37. _self.SrcAddr, err = net.ResolveUDPAddr("udp", srcAddr)
  38. if err != nil {
  39. logs.Error("ResolveUDPAddr ", srcAddr, " 出错:", err)
  40. return err
  41. }
  42. _self.DestAddr, err = net.ResolveUDPAddr("udp", destAddr)
  43. if err != nil {
  44. logs.Error("ResolveUDPAddr ", destAddr, " 出错:", err)
  45. return err
  46. }
  47. _self.LClientAddr = &net.UDPAddr{
  48. IP: _self.SrcAddr.IP,
  49. Port: 0,
  50. Zone: _self.SrcAddr.Zone,
  51. }
  52. _self.UdpListenerConn, err = net.ListenUDP("udp", _self.SrcAddr)
  53. if err != nil {
  54. logs.Error("启动UDP监听 ", srcAddr, " 出错:", err)
  55. return err
  56. }
  57. go _self.checkAlive()
  58. go _self.runForward()
  59. return nil
  60. }
  61. func (_self *UdpForward) runForward() {
  62. for {
  63. buf := make([]byte, bufferSize)
  64. n, addr, err := _self.UdpListenerConn.ReadFromUDP(buf)
  65. if err != nil {
  66. return
  67. }
  68. go _self.forwardHandler(buf[:n], addr)
  69. }
  70. }
  71. func (_self *UdpForward) forwardHandler(data []byte, addr *net.UDPAddr) {
  72. _self.UdpConnsMutex.RLock()
  73. udpConn, found := _self.UdpConns[addr.String()]
  74. _self.UdpConnsMutex.RUnlock()
  75. if found {
  76. udpConn.udp.WriteTo(data, _self.DestAddr)
  77. }else{
  78. conn, err := net.ListenUDP("udp", _self.LClientAddr)
  79. if err != nil {
  80. logs.Error("udp-forwader: failed to dial:", err)
  81. return
  82. }
  83. _self.UdpConnsMutex.Lock()
  84. _self.UdpConns[addr.String()] = UdpConn{
  85. udp: conn,
  86. lastActive: time.Now(),
  87. }
  88. _self.UdpConnsMutex.Unlock()
  89. _self.ConnectedEvent(addr.String())
  90. conn.WriteTo(data, _self.DestAddr)
  91. for {
  92. buf := make([]byte, bufferSize)
  93. n, _, err := conn.ReadFromUDP(buf)
  94. if err != nil {
  95. _self.UdpConnsMutex.Lock()
  96. conn.Close()
  97. delete(_self.UdpConns, addr.String())
  98. _self.UdpConnsMutex.Unlock()
  99. return
  100. }
  101. go func(data []byte, conn *net.UDPConn, addr *net.UDPAddr) {
  102. _self.UdpListenerConn.WriteTo(data, addr)
  103. }(buf[:n], conn, addr)
  104. }
  105. }
  106. _self.updateActiveTime(addr)
  107. }
  108. func (_self *UdpForward) updateActiveTime(addr *net.UDPAddr) {
  109. needUpdateTime := false
  110. _self.UdpConnsMutex.RLock()
  111. if _, found := _self.UdpConns[addr.String()]; found {
  112. if _self.UdpConns[addr.String()].lastActive.Before(
  113. time.Now().Add(_self.ChkActTime / 4)) {
  114. needUpdateTime = true
  115. //logs.Debug("needUpdateTime")
  116. }
  117. }
  118. _self.UdpConnsMutex.RUnlock()
  119. if needUpdateTime {
  120. _self.UdpConnsMutex.Lock()
  121. //
  122. if _, found := _self.UdpConns[addr.String()]; found {
  123. connWrapper := _self.UdpConns[addr.String()]
  124. connWrapper.lastActive = time.Now()
  125. _self.UdpConns[addr.String()] = connWrapper
  126. }
  127. _self.UdpConnsMutex.Unlock()
  128. }
  129. }
  130. func (_self *UdpForward) checkAlive() {
  131. for !_self.Closed {
  132. time.Sleep(_self.ChkActTime)
  133. var keysToDelete []string
  134. _self.UdpConnsMutex.RLock()
  135. for k, conn := range _self.UdpConns {
  136. if conn.lastActive.Before(time.Now().Add(-_self.ChkActTime)) {
  137. keysToDelete = append(keysToDelete, k)
  138. //logs.Debug("need delete udp conn")
  139. }
  140. }
  141. _self.UdpConnsMutex.RUnlock()
  142. _self.UdpConnsMutex.Lock()
  143. for _, k := range keysToDelete {
  144. _self.UdpConns[k].udp.Close()
  145. delete(_self.UdpConns, k)
  146. }
  147. _self.UdpConnsMutex.Unlock()
  148. for _, k := range keysToDelete {
  149. _self.DisConnectedEvent(k)
  150. }
  151. }
  152. }
  153. func (_self *UdpForward) Close() {
  154. _self.UdpConnsMutex.Lock()
  155. _self.Closed = true
  156. for _, conn := range _self.UdpConns {
  157. conn.udp.Close()
  158. }
  159. _self.UdpListenerConn.Close()
  160. _self.UdpConnsMutex.Unlock()
  161. }
  162. func (_self *UdpForward) GetConnsInfo() []string {
  163. _self.UdpConnsMutex.Lock()
  164. defer _self.UdpConnsMutex.Unlock()
  165. results := make([]string, 0, len(_self.UdpConns))
  166. for key := range _self.UdpConns {
  167. results = append(results, key)
  168. }
  169. return results
  170. }
粤ICP备19079148号