MagicServiceV1.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package Service
  2. import (
  3. "bufio"
  4. "fmt"
  5. "forward-core/Constant"
  6. "forward-core/Models"
  7. "forward-core/NetUtils"
  8. "github.com/astaxie/beego/logs"
  9. "io"
  10. "net"
  11. "sync"
  12. "time"
  13. )
  14. type MagicServiceV1 struct {
  15. MagicClientMap map[string]net.Conn
  16. MagicClientMapLock sync.Mutex
  17. MagicListener net.Listener
  18. agentRunType int
  19. MagicTargetAddr string
  20. sessionId int
  21. idLock sync.Mutex
  22. sessionConnMap map[int]net.Conn
  23. ForwardInfo *Models.PortForward
  24. }
  25. func NewMagicServiceV1() *MagicServiceV1 {
  26. return &MagicServiceV1{
  27. MagicClientMap:make(map[string]net.Conn,200),
  28. sessionConnMap:make(map[int]net.Conn,200),
  29. }
  30. }
  31. func (_self *MagicServiceV1) GetNewSessionId() int {
  32. _self.idLock.Lock()
  33. defer _self.idLock.Unlock()
  34. _self.sessionId++
  35. return _self.sessionId
  36. }
  37. func (_self *MagicServiceV1) GetKeyByEntity(entity *Models.PortForward) string {
  38. fromAddr := fmt.Sprint(entity.Addr, ":", entity.Port)
  39. toAddr := fmt.Sprint(entity.TargetAddr, ":", entity.TargetPort)
  40. key := _self.GetKey(fromAddr, toAddr, entity.FType)
  41. return key
  42. }
  43. func (_self *MagicServiceV1) GetKey(sourcePort, targetPort string, fType int) string {
  44. return fmt.Sprint(sourcePort, "_", fType, "_TCP_", targetPort)
  45. }
  46. func (_self *MagicServiceV1) RegistryMagicClient(key string, conn net.Conn) {
  47. _self.MagicClientMapLock.Lock()
  48. defer _self.MagicClientMapLock.Unlock()
  49. _self.MagicClientMap[key] = conn
  50. }
  51. func (_self *MagicServiceV1) GetTopMagicClient() net.Conn {
  52. _self.MagicClientMapLock.Lock()
  53. defer _self.MagicClientMapLock.Unlock()
  54. for _, v := range _self.MagicClientMap {
  55. return v
  56. }
  57. return nil
  58. }
  59. func (_self *MagicServiceV1) GetMagicListener() net.Listener {
  60. return _self.MagicListener
  61. }
  62. func (_self *MagicServiceV1) UnRegistryMagicClient(key string) {
  63. _self.MagicClientMapLock.Lock()
  64. defer _self.MagicClientMapLock.Unlock()
  65. delete(_self.MagicClientMap, key)
  66. logs.Debug("UnRegistryMagicClient key: ", key)
  67. }
  68. func (_self *MagicServiceV1) CountMagicClient() int {
  69. _self.MagicClientMapLock.Lock()
  70. defer _self.MagicClientMapLock.Unlock()
  71. return len(_self.MagicClientMap)
  72. }
  73. func (_self *MagicServiceV1) GetMagicClient() map[string]net.Conn {
  74. return _self.MagicClientMap
  75. }
  76. func (_self *MagicServiceV1) StartMagicService(addr string, result chan Models.FuncResult) {
  77. //启动穿透服务端
  78. resultData := &Models.FuncResult{Code: 0, Msg: ""}
  79. var err error
  80. _self.MagicListener, err = net.Listen("tcp", addr)
  81. if err != nil {
  82. logs.Error("Magic Listen err:", err)
  83. resultData.Code = 1
  84. resultData.Msg = err.Error()
  85. result <- *resultData
  86. return
  87. }
  88. result <- *resultData
  89. for {
  90. logs.Debug("Magic Ready to Accept ...")
  91. magic_client_Conn, err := _self.MagicListener.Accept()
  92. if err != nil {
  93. logs.Error("Accept err:", err)
  94. break
  95. }
  96. if _self.CountMagicClient() > 0 && _self.CurrentAgentRunType() != 1 {
  97. logs.Debug("目前版本只支持一个Agent连接,后续会增加多个的支持")
  98. NetUtils.WriteConn(magic_client_Conn, -1, Constant.MagicCmd_Refused, []byte(""))
  99. magic_client_Conn.Close()
  100. continue
  101. }
  102. if _self.CountMagicClient() == 0 {
  103. magicId := magic_client_Conn.RemoteAddr().String()
  104. _self.RegistryMagicClient(magicId, magic_client_Conn)
  105. } else {
  106. if _self.CurrentAgentRunType() == 1 {
  107. _self.MagicJustCopy(magic_client_Conn, _self.MagicTargetAddr)
  108. }
  109. }
  110. }
  111. }
  112. func (_self *MagicServiceV1) StopMagicService(result chan Models.FuncResult) {
  113. resultData := &Models.FuncResult{Code: 0, Msg: ""}
  114. for k, conn := range _self.MagicClientMap {
  115. conn.Close()
  116. _self.UnRegistryMagicClient(k)
  117. }
  118. _self.MagicListener.Close()
  119. _self.MagicListener = nil
  120. result <- *resultData
  121. }
  122. func (_self *MagicServiceV1) StartMagicForward(portForward *Models.PortForward, result chan Models.FuncResult) {
  123. resultData := &Models.FuncResult{Code: 0, Msg: ""}
  124. agentConn := _self.GetTopMagicClient()
  125. if agentConn == nil {
  126. resultData.Code = 1
  127. resultData.Msg = "未检测到Agent连接"
  128. result <- *resultData
  129. return
  130. }
  131. if _self.CurrentAgentRunType() != 0 {
  132. resultData.Code = 1
  133. resultData.Msg = "有正在执行的Agent连接,开启转发失败"
  134. result <- *resultData
  135. return
  136. }
  137. if portForward.FType == 2 {
  138. //执行反向映射
  139. go _self.ReverseListenForClient(portForward, agentConn, result)
  140. callback := func(conn net.Conn, sessionId int, cmd byte, payload []byte) {
  141. //payload 收到的消息内容
  142. _self.OnTunnelRecv(_self.sessionConnMap[sessionId], sessionId, cmd, payload)
  143. }
  144. logs.Debug("从 magic_client_Conn 读,写入到 client_Conn")
  145. go NetUtils.ReadConn(agentConn, callback)
  146. } else {
  147. //发送指令
  148. localListenAddr := fmt.Sprint(portForward.Addr, ":", portForward.Port)
  149. NetUtils.WriteConn(agentConn, -1, Constant.MagicCmd_AgentListenerOpen, []byte(localListenAddr))
  150. result <- *resultData
  151. _self.agentRunType = 1
  152. _self.MagicTargetAddr = fmt.Sprint(portForward.TargetAddr, ":", portForward.TargetPort)
  153. //key := _self.GetKeyByEntity(portForward)
  154. //_self.RegistryPort(key, nil)
  155. }
  156. _self.ForwardInfo = portForward
  157. }
  158. func (_self *MagicServiceV1) StopMagicForward() error {
  159. return nil
  160. }
  161. func (_self *MagicServiceV1) MagicJustCopy(toConn net.Conn, targetAddr string) {
  162. localConn, err := net.DialTimeout("tcp", targetAddr, 30*time.Second)
  163. if err != nil {
  164. logs.Error("try dial err", err)
  165. return
  166. }
  167. go func() {
  168. _, err = io.Copy(localConn, toConn)
  169. if err != nil {
  170. logs.Error("JustCopy to local 网络连接异常:", err)
  171. localConn.Close()
  172. }
  173. }()
  174. go func() {
  175. _, err = io.Copy(toConn, localConn)
  176. if err != nil {
  177. logs.Error("JustCopy to local 网络连接异常2:", err)
  178. toConn.Close()
  179. }
  180. }()
  181. }
  182. func (_self *MagicServiceV1) ReverseListenForClient(portForward *Models.PortForward, magic_client_Conn net.Conn, result chan Models.FuncResult) {
  183. resultData := &Models.FuncResult{Code: 0, Msg: ""}
  184. localListenAddr := fmt.Sprint(portForward.Addr, ":", portForward.Port)
  185. //让客户端在本地建立连接与目标端口的连接
  186. remote := fmt.Sprint(portForward.TargetAddr, ":", portForward.TargetPort)
  187. //fType := portForward.FType
  188. client_listener, err := net.Listen("tcp", localListenAddr)
  189. if err != nil {
  190. logs.Error("ListenForClient err:", err)
  191. resultData.Code = 1
  192. resultData.Msg = err.Error()
  193. result <- *resultData
  194. return
  195. }
  196. result <- *resultData
  197. _self.agentRunType = 2
  198. //key := _self.GetKeyByEntity(portForward)
  199. //_self.RegistryPort(key, client_listener)
  200. //从 client_Conn 读,写入到 magic_client_Conn
  201. //从 magic_client_Conn 读,写入到 client_Conn
  202. for {
  203. logs.Debug("ListenForClient Ready to Accept ...")
  204. client_Conn, err := client_listener.Accept()
  205. if err != nil {
  206. logs.Error("Accept err:", err)
  207. break
  208. }
  209. //id := client_Conn.RemoteAddr().String()
  210. //_self.RegistryClient(fmt.Sprint(localListenAddr, "_", fType, "_", id), client_Conn)
  211. //有连接进来了,就创建一个sessionId
  212. sessionId := _self.GetNewSessionId()
  213. _self.sessionConnMap[sessionId] = client_Conn
  214. logs.Debug("进来了一个连接,sessionId:", sessionId)
  215. NetUtils.WriteConn(magic_client_Conn, sessionId, Constant.MagicCmd_AgentConnOpen, []byte(remote))
  216. logs.Debug("向 sessionId:", sessionId, " 发送 AgentConnOpen 指令")
  217. logs.Debug("从 client_Conn 读,写入到 magic_client_Conn sessionId:", sessionId)
  218. go _self.ReadRawConn(client_Conn, magic_client_Conn, sessionId, Constant.MagicCmd_DataToAgent)
  219. }
  220. }
  221. func (_self *MagicServiceV1) OnTunnelRecv(client_Conn net.Conn, sessionId int, cmd byte, payload []byte) {
  222. logs.Debug("收到一条给 sessionId:", sessionId, " 客户端的数据,指令是:", cmd)
  223. switch cmd {
  224. case Constant.MagicCmd_DataToMagic:
  225. client_Conn.Write(payload)
  226. }
  227. }
  228. func (_self *MagicServiceV1) ReadRawConn(from net.Conn, magic_client_Conn net.Conn, sessionId int, cmd byte) {
  229. arr := make([]byte, 5000)
  230. reader := bufio.NewReader(from)
  231. for {
  232. size, err := reader.Read(arr)
  233. if err != nil {
  234. break
  235. }
  236. err = NetUtils.WriteConn(magic_client_Conn, sessionId, cmd, arr[0:size])
  237. if err != nil {
  238. //有异常
  239. logs.Error(err)
  240. break
  241. }
  242. }
  243. }
  244. func (_self *MagicServiceV1) CurrentAgentRunType() int {
  245. // 0:空闲,1:服务端映射到内网中,2:内网映射到服务端中
  246. return _self.agentRunType
  247. }
粤ICP备19079148号