ForWardJob.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package Service
  2. import (
  3. "fmt"
  4. "forward-core/Constant"
  5. "forward-core/Models"
  6. "forward-core/NetUtils"
  7. "forward-core/Utils"
  8. "net"
  9. "sync"
  10. "time"
  11. "github.com/astaxie/beego/logs"
  12. )
  13. type ForWardJob struct {
  14. Config *Models.ForwardConfig
  15. ClientMap map[string]*ForWardClient
  16. ClientMapLock sync.Mutex
  17. Status byte
  18. PortListener net.Listener
  19. UdpForwardJob *UdpForward
  20. }
  21. func (_self *ForWardJob) StartJob(result chan Models.FuncResult) {
  22. sourceAddr := fmt.Sprint(_self.Config.SrcAddr, ":", _self.Config.SrcPort)
  23. destAddr := fmt.Sprint(_self.Config.DestAddr, ":", _self.Config.DestPort)
  24. resultData := &Models.FuncResult{Code: 0, Msg: ""}
  25. var err error
  26. if _self.IsUdpJob() {
  27. //_self.PortListener, err = NetUtils.NewKCP(sourceAddr, Common.DefaultKcpSetting())
  28. //_self.UdpForwardJob.UdpListenerConn, err = NetUtils.NewUDP(sourceAddr)
  29. err = _self.UdpForwardJob.DoUdpForward(sourceAddr, destAddr)
  30. if err != nil {
  31. logs.Error("启动UDP监听 ", sourceAddr, " 出错:", err)
  32. resultData.Code = 1
  33. resultData.Msg = fmt.Sprint("启动UDP监听 ", sourceAddr, " 出错:", err)
  34. result <- *resultData
  35. return
  36. }
  37. _self.Status = Constant.RunStatus_Running
  38. logs.Debug("启动UDP端口转发,从 ", sourceAddr, " 到 ", destAddr)
  39. result <- *resultData
  40. } else {
  41. _self.PortListener, err = NetUtils.NewTCP(sourceAddr)
  42. if err != nil {
  43. logs.Error("启动监听 ", sourceAddr, " 出错:", err)
  44. resultData.Code = 1
  45. resultData.Msg = fmt.Sprint("启动监听 ", sourceAddr, " 出错:", err)
  46. result <- *resultData
  47. return
  48. }
  49. _self.Status = Constant.RunStatus_Running
  50. logs.Debug("启动端口转发,从 ", sourceAddr, " 到 ", destAddr)
  51. result <- *resultData
  52. _self.doTcpForward(destAddr)
  53. }
  54. }
  55. func (_self *ForWardJob) doTcpForward(destAddr string) {
  56. for {
  57. realClientConn, err := _self.PortListener.Accept()
  58. if err != nil {
  59. logs.Error("Forward Accept err:", err.Error())
  60. logs.Error(fmt.Sprint("转发出现异常:", _self.Config.SrcAddr, ":", _self.Config.SrcPort, "->", destAddr))
  61. _self.StopJob()
  62. break
  63. }
  64. if ForWardDebug == true {
  65. logs.Info("新用户 ", realClientConn.RemoteAddr().String(), " 数据转发规则:", fmt.Sprint(_self.Config.SrcAddr, ":", _self.Config.SrcPort), "->", destAddr)
  66. }
  67. var destConn net.Conn
  68. if _self.Config.Protocol == "UDP" {
  69. //destConn, err = Common.DialKcpTimeout(destAddr, 100)
  70. destConn, err = net.DialTimeout("UDP", destAddr, 30*time.Second)
  71. } else {
  72. destConn, err = net.DialTimeout("tcp", destAddr, 30*time.Second)
  73. }
  74. if err != nil {
  75. if ForWardDebug == true {
  76. logs.Warn("转发出现异常 Forward to Dest Addr err:", err.Error())
  77. }
  78. //break
  79. continue
  80. }
  81. forwardClient := &ForWardClient{realClientConn, destConn, _self.ClosedCallBack}
  82. go forwardClient.StartForward()
  83. _self.RegistryClient(_self.GetClientId(realClientConn), forwardClient)
  84. //_self.RegistryClient(fmt.Sprint(sourceAddr, "_", "TCP", "_", id), forwardClient)
  85. }
  86. }
  87. func (_self *ForWardJob) ClosedCallBack(srcConn net.Conn, destConn net.Conn) {
  88. _self.UnRegistryClient(_self.GetClientId(srcConn))
  89. }
  90. func (_self *ForWardJob) GetClientId(conn net.Conn) string {
  91. return conn.RemoteAddr().String()
  92. }
  93. func (_self *ForWardJob) RegistryClient(srcAddr string, forwardClient *ForWardClient) {
  94. _self.ClientMapLock.Lock()
  95. defer _self.ClientMapLock.Unlock()
  96. _self.ClientMap[srcAddr] = forwardClient
  97. }
  98. func (_self *ForWardJob) UnRegistryClient(srcAddr string) {
  99. _self.ClientMapLock.Lock()
  100. defer _self.ClientMapLock.Unlock()
  101. delete(_self.ClientMap, srcAddr)
  102. if ForWardDebug == true {
  103. logs.Debug("UnRegistryClient srcAddr: ", srcAddr)
  104. }
  105. }
  106. func (_self *ForWardJob) IsJobRunning() bool {
  107. return _self.Status == Constant.RunStatus_Running
  108. }
  109. func (_self *ForWardJob) IsUdpJob() bool {
  110. return Utils.ToUpper(_self.Config.Protocol) == "UDP"
  111. }
  112. func (_self *ForWardJob) StopJob() {
  113. if _self.IsUdpJob() {
  114. _self.stopUdpJob()
  115. } else {
  116. _self.stopTcpJob()
  117. }
  118. _self.Status = Constant.RunStatus_Stoped
  119. }
  120. func (_self *ForWardJob) stopTcpJob() {
  121. _self.PortListener.Close()
  122. for srcAddr, client := range _self.ClientMap {
  123. if ForWardDebug == true {
  124. logs.Debug("停止真实用户连接:", srcAddr)
  125. }
  126. client.StopForward()
  127. }
  128. _self.ClientMap = nil
  129. }
  130. func (_self *ForWardJob) stopUdpJob() {
  131. _self.UdpForwardJob.Close()
  132. }
粤ICP备19079148号