AgentServiceV1.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package main
  2. import (
  3. "bufio"
  4. "forward-core/Constant"
  5. "forward-core/NetUtils"
  6. "github.com/astaxie/beego/logs"
  7. "io"
  8. "net"
  9. "strconv"
  10. "time"
  11. )
  12. type AgentServiceV1 struct {
  13. localConnMap map[int]net.Conn
  14. MagicServerAddr string
  15. AgentOnline bool
  16. }
  17. func (_self *AgentServiceV1) ConnToMagicServer() {
  18. serviceConn, err := net.DialTimeout("tcp", _self.MagicServerAddr, 30*time.Second)
  19. if err != nil {
  20. logs.Error("try dial err", err)
  21. _self.AgentOnline = false
  22. return
  23. }
  24. callback := func(conn net.Conn, sessionId int, cmd byte, payload []byte) {
  25. //payload 收到的消息内容
  26. _self.OnTunnelRecv(conn, sessionId, cmd, payload)
  27. }
  28. logs.Debug("开始接收服务端返回指令或数据...")
  29. _self.AgentOnline = true
  30. go NetUtils.ReadConn(serviceConn, callback)
  31. }
  32. func (_self *AgentServiceV1) OnTunnelRecv(conn net.Conn, sessionId int, cmd byte, payload []byte) {
  33. logs.Debug("收到一条给 sessionId:", sessionId, " 客户端的数据,指令是:", cmd)
  34. switch cmd {
  35. case Constant.MagicCmd_AgentListenerOpen:
  36. targetAddr := string(payload)
  37. go _self.ListenForClient(targetAddr, _self.MagicServerAddr)
  38. case Constant.MagicCmd_AgentConnOpen:
  39. targetAddr := string(payload)
  40. logs.Debug("sessionId:", sessionId, " 收到 AgentConnOpen 指令是,打开本地连接:", targetAddr)
  41. //AgentConnOpen 让连接进来的客户端,在它的本地创建一个连接,并关联好sessionId
  42. localConn, err := net.DialTimeout("tcp", targetAddr, 30*time.Second)
  43. if err != nil {
  44. logs.Error("try dial err", err)
  45. return
  46. }
  47. _self.localConnMap[sessionId] = localConn
  48. //接收 localConn 返回数据,并将返回的数据,写回给 conn,带上 sessionId
  49. go _self.ReadRawConn(localConn, conn, sessionId, Constant.MagicCmd_DataToMagic)
  50. case Constant.MagicCmd_DataToAgent:
  51. logs.Debug("sessionId:", sessionId, " 收到 MsgToAgent 指令")
  52. localConn := _self.localConnMap[sessionId]
  53. localConn.Write(payload)
  54. logs.Debug("sessionId:", sessionId, " 数据已写入本地目标连接")
  55. case Constant.MagicCmd_Refused:
  56. //client := string(payload)
  57. logs.Debug("Magic服务端拒绝本次连接")
  58. }
  59. }
  60. func (_self *AgentServiceV1) ReadRawConn(from net.Conn, magic_client_Conn net.Conn, sessionId int, cmd byte) {
  61. arr := make([]byte, 5000)
  62. reader := bufio.NewReader(from)
  63. for {
  64. size, err := reader.Read(arr)
  65. if err != nil {
  66. break
  67. }
  68. err = NetUtils.WriteConn(magic_client_Conn, sessionId, cmd, arr[0:size])
  69. if err != nil {
  70. //有异常
  71. logs.Error(err)
  72. break
  73. }
  74. }
  75. }
  76. func (_self *AgentServiceV1) ListenForClient(localListenAddr, toAddr string) {
  77. client_listener, err := net.Listen("tcp", localListenAddr)
  78. if err != nil {
  79. logs.Error("ListenForClient err:", err)
  80. return
  81. }
  82. for {
  83. logs.Debug("ListenForClient Ready to Accept ...")
  84. client_Conn, err := client_listener.Accept()
  85. if err != nil {
  86. logs.Error("Accept err:", err)
  87. break
  88. }
  89. //连接到远程服务
  90. serviceConn, err := net.DialTimeout("tcp", toAddr, 30*time.Second)
  91. if err != nil {
  92. logs.Error("try dial err", err)
  93. return
  94. }
  95. go func() {
  96. _, err = io.Copy(serviceConn, client_Conn)
  97. if err != nil {
  98. logs.Error("to magic_client 网络连接异常:", err)
  99. }
  100. }()
  101. go func() {
  102. _, err = io.Copy(client_Conn, serviceConn)
  103. if err != nil {
  104. logs.Error("to magic_client 网络连接异常2:", err)
  105. }
  106. }()
  107. }
  108. }
  109. func (_self *AgentServiceV1) ConnectToMagicLoop() {
  110. //客户端与服务端建立连接
  111. if !_self.AgentOnline {
  112. _self.ConnToMagicServer()
  113. delay := 3
  114. time.AfterFunc(time.Second*time.Duration(delay), func() {
  115. _self.ConnectToMagicLoop()
  116. })
  117. logs.Debug("reConnect after " + strconv.Itoa(delay) + " seconds")
  118. }
  119. }
粤ICP备19079148号