client.go 8.5 KB


  1. package ami
  2. import (
  3. "container/list"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "net"
  9. "runtime"
  10. "sync"
  11. "time"
  12. )
  13. type State byte
  14. const (
  15. StateStopped State = iota
  16. StateConnection
  17. StateConnected
  18. StateAuth
  19. StateAvailable
  20. StateBusy
  21. )
  22. func (s State) String() string {
  23. switch s {
  24. case StateStopped:
  25. return "Stopped"
  26. case StateConnection:
  27. return "Connection"
  28. case StateConnected:
  29. return "Connected"
  30. case StateAuth:
  31. return "Auth"
  32. case StateAvailable:
  33. return "Available"
  34. case StateBusy:
  35. return "Busy"
  36. default:
  37. return ""
  38. }
  39. }
  40. func New(host, login, password string, ctxGlobal context.Context, stateChanged func(State, error)) (cl *Client) {
  41. cl = &Client{
  42. &client{
  43. host: host,
  44. login: login,
  45. password: password,
  46. stateChanged: stateChanged,
  47. state: StateStopped,
  48. request: make(chan Request),
  49. response: make(chan Response),
  50. event: make(chan Event),
  51. requestsWork: list.New(),
  52. socketClosed: make(chan error),
  53. actionIDPrefix: fmt.Sprint(time.Now().UnixNano()),
  54. eventListeners: make(map[int64]*EventListener),
  55. locker: new(sync.RWMutex),
  56. },
  57. }
  58. if ctxGlobal == nil {
  59. cl.ctx, cl.ctxCancel = context.WithCancel(context.Background())
  60. } else {
  61. cl.ctx, cl.ctxCancel = context.WithCancel(ctxGlobal)
  62. }
  63. go cl.eventListenersCleaner()
  64. runtime.SetFinalizer(cl, destroyClient)
  65. return
  66. }
  67. // Client object
  68. type Client struct {
  69. *client
  70. }
  71. type client struct {
  72. ctx context.Context
  73. ctxCancel context.CancelFunc
  74. host string
  75. login string
  76. password string
  77. conn net.Conn
  78. request chan Request
  79. response chan Response
  80. event chan Event
  81. clientSideEvent chan Event
  82. stateChanged func(State, error)
  83. state State
  84. requestsWork *list.List
  85. socketClosed chan error
  86. actionIDPrefix string
  87. actionUUID uint64
  88. eventListeners map[int64]*EventListener
  89. locker *sync.RWMutex
  90. }
  91. func (s *client) State() State {
  92. return s.state
  93. }
  94. func (s *client) removeEventListener(uuid int64) {
  95. s.locker.Lock()
  96. if listener, check := s.eventListeners[uuid]; check {
  97. listener.close()
  98. delete(s.eventListeners, uuid)
  99. }
  100. s.locker.Unlock()
  101. }
  102. func (s *client) eventListenersCleaner() {
  103. ctx, _ := context.WithCancel(s.ctx)
  104. for {
  105. select {
  106. case <-time.After(time.Minute * 30):
  107. {
  108. now := time.Now()
  109. s.locker.Lock()
  110. for uuid, v := range s.eventListeners {
  111. if now.After(v.timeActual) {
  112. v.close()
  113. delete(s.eventListeners, uuid)
  114. }
  115. }
  116. s.locker.Unlock()
  117. }
  118. case <-ctx.Done():
  119. {
  120. for _, v := range s.eventListeners {
  121. v.close()
  122. }
  123. return
  124. }
  125. }
  126. }
  127. }
  128. func (s *client) registerEventListener(uuid int64) <-chan Event {
  129. listener := &EventListener{
  130. uuid: uuid,
  131. eventChan: make(chan Event, 1),
  132. }
  133. s.locker.Lock()
  134. s.eventListeners[uuid] = listener
  135. s.locker.Unlock()
  136. return listener.eventChan
  137. }
  138. func (s *client) initActionID() (res string) {
  139. res = fmt.Sprintf("%v%v", s.actionIDPrefix, s.actionUUID)
  140. if s.actionUUID < max_client_uuid {
  141. s.actionUUID++
  142. } else {
  143. s.actionUUID = 0
  144. }
  145. return
  146. }
  147. func (s *client) requestByActionID(actionID string) (req Request, elem *list.Element, check bool) {
  148. for elem = s.requestsWork.Front(); elem != nil; elem = elem.Next() {
  149. req = elem.Value.(Request)
  150. if req.ActionID() == actionID {
  151. check = true
  152. return
  153. }
  154. }
  155. return
  156. }
  157. func (s *client) setState(state State, err error) {
  158. oldState := s.state
  159. s.state = state
  160. if s.stateChanged != nil && (state != oldState || err != nil) {
  161. s.stateChanged(state, err)
  162. }
  163. s.state = state
  164. }
  165. func (s *client) Event() chan Event {
  166. if s.clientSideEvent == nil {
  167. s.clientSideEvent = make(chan Event)
  168. }
  169. return s.clientSideEvent
  170. }
  171. func (s *client) eventAccepted(event Event) {
  172. log.Println("... event-accepted", event)
  173. switch event.Name() {
  174. case "FullyBooted":
  175. {
  176. if s.state == StateAuth {
  177. for elem := s.requestsWork.Front(); elem != nil; elem.Next() {
  178. s.sendQueueRequest()
  179. }
  180. }
  181. }
  182. }
  183. // send event to client side
  184. if s.clientSideEvent != nil {
  185. s.clientSideEvent <- event
  186. }
  187. if event.uuid > 0 {
  188. var check bool
  189. var listener *EventListener
  190. s.locker.RLock()
  191. if listener, check = s.eventListeners[event.uuid]; check {
  192. listener.incomingEvent(event)
  193. }
  194. s.locker.RUnlock()
  195. if check && event.Name() == "Hangup" {
  196. s.locker.Lock()
  197. listener.close()
  198. delete(s.eventListeners, event.uuid)
  199. s.locker.Unlock()
  200. }
  201. }
  202. }
  203. // start open connection to asterisk ami server
  204. func (s *client) Start() {
  205. var err error
  206. // check state. StateStopped needed
  207. if s.state != StateStopped {
  208. err = errors.New("AMI start error: client already started")
  209. s.stateChanged(s.state, err)
  210. return
  211. }
  212. defer func() {
  213. s.setState(StateStopped, err)
  214. }()
  215. s.setState(StateConnection, nil)
  216. // connection and read ami greetings message
  217. if s.conn, err = net.Dial("tcp", s.host); err != nil {
  218. err = fmt.Errorf("AMI connection socket connection error: %v", err.Error())
  219. return
  220. }
  221. s.setState(StateConnected, nil)
  222. // socket connected. receive greetings text
  223. if _, err = s.receiveSingle(); err != nil {
  224. err = fmt.Errorf("AMI greetings receive error: %v", err.Error())
  225. return
  226. }
  227. // greetings received, make attempt to auth
  228. auth := InitRequest("Login")
  229. auth.SetParam("UserName", s.login)
  230. auth.SetParam("Secret", s.password)
  231. actionCallback := func(action ActionData) {
  232. if action.isEvent() {
  233. s.eventAccepted(Event{action, 0})
  234. } else {
  235. response := Response{action}
  236. if !response.IsError() {
  237. s.setState(StateAuth, nil)
  238. } else {
  239. err = fmt.Errorf("AMI authentication error: %v", action["Message"])
  240. return
  241. }
  242. }
  243. }
  244. if socketErr := s.sendSingleRequest(auth, actionCallback); socketErr != nil || err != nil {
  245. if err == nil {
  246. err = socketErr
  247. }
  248. return
  249. }
  250. go s.receiveLoop()
  251. loop:
  252. for {
  253. select {
  254. case request := <-s.request:
  255. {
  256. actionID := s.initActionID()
  257. request.ActionData["ActionID"] = actionID
  258. s.requestsWork.PushFront(request)
  259. if s.state == StateAuth {
  260. if err := s.sendRequest(request); err != nil {
  261. log.Println("SendRequestERROR")
  262. }
  263. }
  264. }
  265. case event := <-s.event:
  266. s.eventAccepted(event)
  267. case response := <-s.response:
  268. {
  269. if req, elem, check := s.requestByActionID(response.ActionID()); check {
  270. req.chanResponse <- response
  271. close(req.chanResponse)
  272. s.requestsWork.Remove(elem)
  273. }
  274. }
  275. case err = <-s.socketClosed:
  276. break loop
  277. }
  278. }
  279. return
  280. }
  281. func (s *client) sendQueueRequest() error {
  282. for elem := s.requestsWork.Front(); elem != nil; elem.Next() {
  283. req := elem.Value.(Request)
  284. if req.sended {
  285. s.requestsWork.Remove(elem)
  286. } else if err := s.sendRequest(req); err != nil {
  287. return err
  288. }
  289. }
  290. return nil
  291. }
  292. func (s *client) receiveSingle() (data []byte, err error) {
  293. count, buf := 0, make([]byte, 1024)
  294. if count, err = s.conn.Read(buf); err == nil {
  295. data = buf[:count]
  296. }
  297. return
  298. }
  299. func (s *client) sendSingleRequest(request Request, acceptCallback func(ActionData)) (err error) {
  300. // send action
  301. if err = s.sendRequest(request); err != nil {
  302. return
  303. }
  304. // receive answer
  305. var data []byte
  306. for {
  307. count, buf := 0, make([]byte, 1024)
  308. if count, err = s.conn.Read(buf); err != nil {
  309. return
  310. }
  311. if data = actionsFromRaw(append(data, buf[:count]...), acceptCallback); len(data) == 0 {
  312. return
  313. }
  314. }
  315. }
  316. func (s *client) sendRequest(req Request) (err error) {
  317. if _, err := s.conn.Write(req.raw()); err != nil {
  318. err = fmt.Errorf("AMI socket send data error: %v", err.Error())
  319. } else {
  320. req.sended = true
  321. }
  322. return
  323. }
  324. func (s *client) receiveLoop() {
  325. var (
  326. data []byte
  327. count int
  328. err error
  329. )
  330. buf := make([]byte, 1024)
  331. for {
  332. if count, err = s.conn.Read(buf); err != nil {
  333. err = fmt.Errorf("AMI socket receive data error: %v", err.Error())
  334. s.socketClosed <- err
  335. return
  336. }
  337. data = actionsFromRaw(
  338. append(data, buf[:count]...),
  339. func(action ActionData) {
  340. if action.isEvent() {
  341. s.event <- initEvent(action)
  342. } else {
  343. s.response <- Response{action}
  344. }
  345. },
  346. )
  347. }
  348. }
  349. func (s *client) Request(req Request, timeout time.Duration) (resp Response, accepted bool) {
  350. req.chanResponse = make(chan Response)
  351. s.request <- req
  352. if timeout == 0 {
  353. resp, accepted = <-req.chanResponse
  354. } else {
  355. select {
  356. case resp, accepted = <-req.chanResponse:
  357. accepted = true
  358. case <-time.After(timeout):
  359. break
  360. }
  361. }
  362. return
  363. }
  364. // Close finish work with client
  365. func (s *client) Close() {
  366. if s.state > StateStopped {
  367. s.conn.Close()
  368. }
  369. s.ctxCancel()
  370. }
  371. // destructor for finalizer
  372. func destroyClient(cl *Client) {
  373. cl.Close()
  374. }