client.go 8.5 KB


  1. package ami
  2. import (
  3. "container/list"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "net"
  9. "runtime"
  10. "sync"
  11. "time"
  12. )
  13. type State byte
  14. const (
  15. StateStopped State = iota
  16. StateConnection
  17. StateConnected
  18. StateAuth
  19. StateAvailable
  20. StateBusy
  21. )
  22. func (s State) String() string {
  23. switch s {
  24. case StateStopped:
  25. return "Stopped"
  26. case StateConnection:
  27. return "Connection"
  28. case StateConnected:
  29. return "Connected"
  30. case StateAuth:
  31. return "Auth"
  32. case StateAvailable:
  33. return "Available"
  34. case StateBusy:
  35. return "Busy"
  36. default:
  37. return ""
  38. }
  39. }
  40. func New(host, login, password string, ctxGlobal context.Context, stateChanged func(State, error)) (cl *Client) {
  41. cl = &Client{
  42. &client{
  43. host: host,
  44. login: login,
  45. password: password,
  46. stateChanged: stateChanged,
  47. state: StateStopped,
  48. request: make(chan Request),
  49. response: make(chan Response),
  50. event: make(chan Event),
  51. requestsWork: list.New(),
  52. socketClosed: make(chan error),
  53. actionIDPrefix: fmt.Sprint(time.Now().UnixNano()),
  54. eventListeners: make(map[int64]*EventListener),
  55. locker: new(sync.RWMutex),
  56. },
  57. }
  58. if ctxGlobal == nil {
  59. cl.ctx, cl.ctxCancel = context.WithCancel(context.Background())
  60. } else {
  61. cl.ctx, cl.ctxCancel = context.WithCancel(ctxGlobal)
  62. }
  63. go cl.eventListenersCleaner()
  64. runtime.SetFinalizer(cl, destroyClient)
  65. return
  66. }
  67. // Client object
  68. type Client struct {
  69. *client
  70. }
  71. type client struct {
  72. ctx context.Context
  73. ctxCancel context.CancelFunc
  74. host string
  75. login string
  76. password string
  77. conn net.Conn
  78. request chan Request
  79. response chan Response
  80. event chan Event
  81. clientSideEvent chan Event
  82. stateChanged func(State, error)
  83. state State
  84. requestsWork *list.List
  85. socketClosed chan error
  86. actionIDPrefix string
  87. actionUUID uint64
  88. eventListeners map[int64]*EventListener
  89. locker *sync.RWMutex
  90. }
  91. func (s *client) State() State {
  92. return s.state
  93. }
  94. func (s *client) removeEventListener(uuid int64) {
  95. s.locker.Lock()
  96. if listener, check := s.eventListeners[uuid]; check {
  97. listener.close()
  98. delete(s.eventListeners, uuid)
  99. }
  100. s.locker.Unlock()
  101. }
  102. func (s *client) eventListenersCleaner() {
  103. ctx, _ := context.WithCancel(s.ctx)
  104. for {
  105. select {
  106. case <-time.After(time.Minute * 30):
  107. {
  108. now := time.Now()
  109. s.locker.Lock()
  110. for uuid, v := range s.eventListeners {
  111. if now.After(v.timeActual) {
  112. v.close()
  113. delete(s.eventListeners, uuid)
  114. }
  115. }
  116. s.locker.Unlock()
  117. }
  118. case <-ctx.Done():
  119. {
  120. for _, v := range s.eventListeners {
  121. v.close()
  122. }
  123. return
  124. }
  125. }
  126. }
  127. }
  128. func (s *client) registerEventListener(uuid int64) <-chan Event {
  129. listener := &EventListener{
  130. uuid: uuid,
  131. eventChan: make(chan Event),
  132. }
  133. s.locker.Lock()
  134. s.eventListeners[uuid] = listener
  135. s.locker.Unlock()
  136. return listener.eventChan
  137. }
  138. func (s *client) initActionID() (res string) {
  139. res = fmt.Sprintf("%v%v", s.actionIDPrefix, s.actionUUID)
  140. if s.actionUUID < max_client_uuid {
  141. s.actionUUID++
  142. } else {
  143. s.actionUUID = 0
  144. }
  145. return
  146. }
  147. func (s *client) requestByActionID(actionID string) (req Request, elem *list.Element, check bool) {
  148. for elem = s.requestsWork.Front(); elem != nil; elem = elem.Next() {
  149. req = elem.Value.(Request)
  150. if req.ActionID() == actionID {
  151. check = true
  152. return
  153. }
  154. }
  155. return
  156. }
  157. func (s *client) setState(state State, err error) {
  158. oldState := s.state
  159. s.state = state
  160. if s.stateChanged != nil && (state != oldState || err != nil) {
  161. s.stateChanged(state, err)
  162. }
  163. s.state = state
  164. }
  165. func (s *client) Event() chan Event {
  166. if s.clientSideEvent == nil {
  167. s.clientSideEvent = make(chan Event)
  168. }
  169. return s.clientSideEvent
  170. }
  171. func (s *client) eventAccepted(event Event) {
  172. switch event.Name() {
  173. case "FullyBooted":
  174. {
  175. if s.state == StateAuth {
  176. for elem := s.requestsWork.Front(); elem != nil; elem.Next() {
  177. s.sendQueueRequest()
  178. }
  179. }
  180. }
  181. }
  182. // send event to client side
  183. if s.clientSideEvent != nil {
  184. s.clientSideEvent <- event
  185. }
  186. if event.uuid > 0 {
  187. var check bool
  188. var listener *EventListener
  189. s.locker.RLock()
  190. if listener, check = s.eventListeners[event.uuid]; check {
  191. listener.incomingEvent(event)
  192. }
  193. s.locker.RUnlock()
  194. if check && event.Name() == "Hangup" {
  195. s.locker.Lock()
  196. listener.close()
  197. delete(s.eventListeners, event.uuid)
  198. s.locker.Unlock()
  199. }
  200. }
  201. }
  202. // start open connection to asterisk ami server
  203. func (s *client) Start() {
  204. var err error
  205. // check state. StateStopped needed
  206. if s.state != StateStopped {
  207. err = errors.New("AMI start error: client already started")
  208. s.stateChanged(s.state, err)
  209. return
  210. }
  211. defer func() {
  212. s.setState(StateStopped, err)
  213. }()
  214. s.setState(StateConnection, nil)
  215. // connection and read ami greetings message
  216. if s.conn, err = net.Dial("tcp", s.host); err != nil {
  217. err = fmt.Errorf("AMI connection socket connection error: %v", err.Error())
  218. return
  219. }
  220. s.setState(StateConnected, nil)
  221. // socket connected. receive greetings text
  222. if _, err = s.receiveSingle(); err != nil {
  223. err = fmt.Errorf("AMI greetings receive error: %v", err.Error())
  224. return
  225. }
  226. // greetings received, make attempt to auth
  227. auth := InitRequest("Login")
  228. auth.SetParam("UserName", s.login)
  229. auth.SetParam("Secret", s.password)
  230. actionCallback := func(action ActionData) {
  231. if action.isEvent() {
  232. s.eventAccepted(Event{action, 0})
  233. } else {
  234. response := Response{action}
  235. if !response.IsError() {
  236. s.setState(StateAuth, nil)
  237. } else {
  238. err = fmt.Errorf("AMI authentication error: %v", action["Message"])
  239. return
  240. }
  241. }
  242. }
  243. if socketErr := s.sendSingleRequest(auth, actionCallback); socketErr != nil || err != nil {
  244. if err == nil {
  245. err = socketErr
  246. }
  247. return
  248. }
  249. go s.receiveLoop()
  250. loop:
  251. for {
  252. select {
  253. case request := <-s.request:
  254. {
  255. actionID := s.initActionID()
  256. request.ActionData["ActionID"] = actionID
  257. s.requestsWork.PushFront(request)
  258. if s.state == StateAuth {
  259. if err := s.sendRequest(request); err != nil {
  260. log.Println("SendRequestERROR")
  261. }
  262. }
  263. }
  264. case event := <-s.event:
  265. s.eventAccepted(event)
  266. case response := <-s.response:
  267. {
  268. if req, elem, check := s.requestByActionID(response.ActionID()); check {
  269. req.chanResponse <- response
  270. close(req.chanResponse)
  271. s.requestsWork.Remove(elem)
  272. }
  273. }
  274. case err = <-s.socketClosed:
  275. break loop
  276. }
  277. }
  278. return
  279. }
  280. func (s *client) sendQueueRequest() error {
  281. for elem := s.requestsWork.Front(); elem != nil; elem.Next() {
  282. req := elem.Value.(Request)
  283. if req.sended {
  284. s.requestsWork.Remove(elem)
  285. } else if err := s.sendRequest(req); err != nil {
  286. return err
  287. }
  288. }
  289. return nil
  290. }
  291. func (s *client) receiveSingle() (data []byte, err error) {
  292. count, buf := 0, make([]byte, 1024)
  293. if count, err = s.conn.Read(buf); err == nil {
  294. data = buf[:count]
  295. }
  296. return
  297. }
  298. func (s *client) sendSingleRequest(request Request, acceptCallback func(ActionData)) (err error) {
  299. // send action
  300. if err = s.sendRequest(request); err != nil {
  301. return
  302. }
  303. // receive answer
  304. var data []byte
  305. for {
  306. count, buf := 0, make([]byte, 1024)
  307. if count, err = s.conn.Read(buf); err != nil {
  308. return
  309. }
  310. if data = actionsFromRaw(append(data, buf[:count]...), acceptCallback); len(data) == 0 {
  311. return
  312. }
  313. }
  314. }
  315. func (s *client) sendRequest(req Request) (err error) {
  316. if _, err := s.conn.Write(req.raw()); err != nil {
  317. err = fmt.Errorf("AMI socket send data error: %v", err.Error())
  318. } else {
  319. req.sended = true
  320. }
  321. return
  322. }
  323. func (s *client) receiveLoop() {
  324. var (
  325. data []byte
  326. count int
  327. err error
  328. )
  329. buf := make([]byte, 1024)
  330. for {
  331. if count, err = s.conn.Read(buf); err != nil {
  332. err = fmt.Errorf("AMI socket receive data error: %v", err.Error())
  333. s.socketClosed <- err
  334. return
  335. }
  336. data = actionsFromRaw(
  337. append(data, buf[:count]...),
  338. func(action ActionData) {
  339. if action.isEvent() {
  340. s.event <- initEvent(action)
  341. } else {
  342. s.response <- Response{action}
  343. }
  344. },
  345. )
  346. }
  347. }
  348. func (s *client) Request(req Request, timeout time.Duration) (resp Response, accepted bool) {
  349. req.chanResponse = make(chan Response)
  350. s.request <- req
  351. if timeout == 0 {
  352. resp, accepted = <-req.chanResponse
  353. } else {
  354. select {
  355. case resp, accepted = <-req.chanResponse:
  356. accepted = true
  357. case <-time.After(timeout):
  358. break
  359. }
  360. }
  361. return
  362. }
  363. // Close finish work with client
  364. func (s *client) Close() {
  365. if s.state > StateStopped {
  366. s.conn.Close()
  367. }
  368. s.ctxCancel()
  369. }
  370. // destructor for finalizer
  371. func destroyClient(cl *Client) {
  372. cl.Close()
  373. }