client.go 8.6 KB


  1. package ami
  2. import (
  3. "container/list"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "net"
  9. "runtime"
  10. "sync"
  11. "time"
  12. )
  13. type State byte
  14. const (
  15. StateStopped State = iota
  16. StateConnection
  17. StateConnected
  18. StateAuth
  19. StateAvailable
  20. StateBusy
  21. )
  22. func (s State) String() string {
  23. switch s {
  24. case StateStopped:
  25. return "Stopped"
  26. case StateConnection:
  27. return "Connection"
  28. case StateConnected:
  29. return "Connected"
  30. case StateAuth:
  31. return "Auth"
  32. case StateAvailable:
  33. return "Available"
  34. case StateBusy:
  35. return "Busy"
  36. default:
  37. return ""
  38. }
  39. }
  40. func New(host, login, password string, ctxGlobal context.Context, stateChanged func(State, error)) (cl *Client) {
  41. cl = &Client{
  42. &client{
  43. host: host,
  44. login: login,
  45. password: password,
  46. stateChanged: stateChanged,
  47. state: StateStopped,
  48. request: make(chan Request),
  49. response: make(chan Response),
  50. event: make(chan Event),
  51. requestsWork: list.New(),
  52. socketClosed: make(chan error),
  53. actionIDPrefix: fmt.Sprint(time.Now().UnixNano()),
  54. eventListeners: make(map[int64]*EventListener),
  55. locker: new(sync.RWMutex),
  56. },
  57. }
  58. if ctxGlobal == nil {
  59. cl.ctx, cl.ctxCancel = context.WithCancel(context.Background())
  60. } else {
  61. cl.ctx, cl.ctxCancel = context.WithCancel(ctxGlobal)
  62. }
  63. go cl.eventListenersCleaner()
  64. runtime.SetFinalizer(cl, destroyClient)
  65. return
  66. }
  67. // Client object
  68. type Client struct {
  69. *client
  70. }
  71. type client struct {
  72. ctx context.Context
  73. ctxCancel context.CancelFunc
  74. host string
  75. login string
  76. password string
  77. conn net.Conn
  78. request chan Request
  79. response chan Response
  80. event chan Event
  81. clientSideEvent chan Event
  82. stateChanged func(State, error)
  83. state State
  84. requestsWork *list.List
  85. socketClosed chan error
  86. actionIDPrefix string
  87. actionUUID uint64
  88. eventListeners map[int64]*EventListener
  89. locker *sync.RWMutex
  90. }
  91. func (s *client) State() State {
  92. return s.state
  93. }
  94. func (s *client) removeEventListener(uuid int64) {
  95. s.locker.Lock()
  96. if listener, check := s.eventListeners[uuid]; check {
  97. listener.close()
  98. delete(s.eventListeners, uuid)
  99. }
  100. s.locker.Unlock()
  101. }
  102. func (s *client) eventListenersCleaner() {
  103. ctx, _ := context.WithCancel(s.ctx)
  104. for {
  105. select {
  106. case <-time.After(time.Minute * 30):
  107. {
  108. now := time.Now()
  109. s.locker.Lock()
  110. for uuid, v := range s.eventListeners {
  111. if now.After(v.timeActual) {
  112. v.close()
  113. delete(s.eventListeners, uuid)
  114. }
  115. }
  116. s.locker.Unlock()
  117. }
  118. case <-ctx.Done():
  119. {
  120. for _, v := range s.eventListeners {
  121. v.close()
  122. }
  123. return
  124. }
  125. }
  126. }
  127. }
  128. func (s *client) registerEventListener(uuid int64) <-chan Event {
  129. listener := &EventListener{
  130. uuid: uuid,
  131. eventChan: make(chan Event, 1),
  132. }
  133. s.locker.Lock()
  134. s.eventListeners[uuid] = listener
  135. s.locker.Unlock()
  136. return listener.eventChan
  137. }
  138. func (s *client) initActionID() (res string) {
  139. res = fmt.Sprintf("%v%v", s.actionIDPrefix, s.actionUUID)
  140. if s.actionUUID < max_client_uuid {
  141. s.actionUUID++
  142. } else {
  143. s.actionUUID = 0
  144. }
  145. return
  146. }
  147. func (s *client) requestByActionID(actionID string) (req Request, elem *list.Element, check bool) {
  148. for elem = s.requestsWork.Front(); elem != nil; elem = elem.Next() {
  149. req = elem.Value.(Request)
  150. if req.ActionID() == actionID {
  151. check = true
  152. return
  153. }
  154. }
  155. return
  156. }
  157. func (s *client) setState(state State, err error) {
  158. oldState := s.state
  159. s.state = state
  160. if s.stateChanged != nil && (state != oldState || err != nil) {
  161. s.stateChanged(state, err)
  162. }
  163. s.state = state
  164. }
  165. func (s *client) Event() chan Event {
  166. if s.clientSideEvent == nil {
  167. s.clientSideEvent = make(chan Event)
  168. }
  169. return s.clientSideEvent
  170. }
  171. func (s *client) eventAccepted(event Event) {
  172. log.Println("... event-accepted", event, event.uuid, s.clientSideEvent)
  173. switch event.Name() {
  174. case "FullyBooted":
  175. {
  176. if s.state == StateAuth {
  177. for elem := s.requestsWork.Front(); elem != nil; elem.Next() {
  178. s.sendQueueRequest()
  179. }
  180. }
  181. }
  182. }
  183. // send event to client side
  184. if s.clientSideEvent != nil {
  185. s.clientSideEvent <- event
  186. }
  187. if event.uuid > 0 {
  188. var check bool
  189. var listener *EventListener
  190. s.locker.RLock()
  191. log.Println("EEEEEEEEEEEEEEEE>>>>>>>>>>>>>>>>>>>>>>>>>>>", s.eventListeners[event.uuid])
  192. if listener, check = s.eventListeners[event.uuid]; check {
  193. listener.incomingEvent(event)
  194. }
  195. s.locker.RUnlock()
  196. if check && event.Name() == "Hangup" {
  197. s.locker.Lock()
  198. listener.close()
  199. delete(s.eventListeners, event.uuid)
  200. s.locker.Unlock()
  201. }
  202. }
  203. }
  204. // start open connection to asterisk ami server
  205. func (s *client) Start() {
  206. var err error
  207. // check state. StateStopped needed
  208. if s.state != StateStopped {
  209. err = errors.New("AMI start error: client already started")
  210. s.stateChanged(s.state, err)
  211. return
  212. }
  213. defer func() {
  214. s.setState(StateStopped, err)
  215. }()
  216. s.setState(StateConnection, nil)
  217. // connection and read ami greetings message
  218. if s.conn, err = net.Dial("tcp", s.host); err != nil {
  219. err = fmt.Errorf("AMI connection socket connection error: %v", err.Error())
  220. return
  221. }
  222. s.setState(StateConnected, nil)
  223. // socket connected. receive greetings text
  224. if _, err = s.receiveSingle(); err != nil {
  225. err = fmt.Errorf("AMI greetings receive error: %v", err.Error())
  226. return
  227. }
  228. // greetings received, make attempt to auth
  229. auth := InitRequest("Login")
  230. auth.SetParam("UserName", s.login)
  231. auth.SetParam("Secret", s.password)
  232. actionCallback := func(action ActionData) {
  233. if action.isEvent() {
  234. s.eventAccepted(Event{action, 0})
  235. } else {
  236. response := Response{action}
  237. if !response.IsError() {
  238. s.setState(StateAuth, nil)
  239. } else {
  240. err = fmt.Errorf("AMI authentication error: %v", action["Message"])
  241. return
  242. }
  243. }
  244. }
  245. if socketErr := s.sendSingleRequest(auth, actionCallback); socketErr != nil || err != nil {
  246. if err == nil {
  247. err = socketErr
  248. }
  249. return
  250. }
  251. go s.receiveLoop()
  252. loop:
  253. for {
  254. select {
  255. case request := <-s.request:
  256. {
  257. actionID := s.initActionID()
  258. request.ActionData["ActionID"] = actionID
  259. s.requestsWork.PushFront(request)
  260. if s.state == StateAuth {
  261. if err := s.sendRequest(request); err != nil {
  262. log.Println("SendRequestERROR")
  263. }
  264. }
  265. }
  266. case event := <-s.event:
  267. s.eventAccepted(event)
  268. case response := <-s.response:
  269. {
  270. if req, elem, check := s.requestByActionID(response.ActionID()); check {
  271. req.chanResponse <- response
  272. close(req.chanResponse)
  273. s.requestsWork.Remove(elem)
  274. }
  275. }
  276. case err = <-s.socketClosed:
  277. break loop
  278. }
  279. }
  280. return
  281. }
  282. func (s *client) sendQueueRequest() error {
  283. for elem := s.requestsWork.Front(); elem != nil; elem.Next() {
  284. req := elem.Value.(Request)
  285. if req.sended {
  286. s.requestsWork.Remove(elem)
  287. } else if err := s.sendRequest(req); err != nil {
  288. return err
  289. }
  290. }
  291. return nil
  292. }
  293. func (s *client) receiveSingle() (data []byte, err error) {
  294. count, buf := 0, make([]byte, 1024)
  295. if count, err = s.conn.Read(buf); err == nil {
  296. data = buf[:count]
  297. }
  298. return
  299. }
  300. func (s *client) sendSingleRequest(request Request, acceptCallback func(ActionData)) (err error) {
  301. // send action
  302. if err = s.sendRequest(request); err != nil {
  303. return
  304. }
  305. // receive answer
  306. var data []byte
  307. for {
  308. count, buf := 0, make([]byte, 1024)
  309. if count, err = s.conn.Read(buf); err != nil {
  310. return
  311. }
  312. if data = actionsFromRaw(append(data, buf[:count]...), acceptCallback); len(data) == 0 {
  313. return
  314. }
  315. }
  316. }
  317. func (s *client) sendRequest(req Request) (err error) {
  318. if _, err := s.conn.Write(req.raw()); err != nil {
  319. err = fmt.Errorf("AMI socket send data error: %v", err.Error())
  320. } else {
  321. req.sended = true
  322. }
  323. return
  324. }
  325. func (s *client) receiveLoop() {
  326. var (
  327. data []byte
  328. count int
  329. err error
  330. )
  331. buf := make([]byte, 1024)
  332. for {
  333. if count, err = s.conn.Read(buf); err != nil {
  334. err = fmt.Errorf("AMI socket receive data error: %v", err.Error())
  335. s.socketClosed <- err
  336. return
  337. }
  338. data = actionsFromRaw(
  339. append(data, buf[:count]...),
  340. func(action ActionData) {
  341. if action.isEvent() {
  342. s.event <- initEvent(action)
  343. } else {
  344. s.response <- Response{action}
  345. }
  346. },
  347. )
  348. }
  349. }
  350. func (s *client) Request(req Request, timeout time.Duration) (resp Response, accepted bool) {
  351. req.chanResponse = make(chan Response)
  352. s.request <- req
  353. if timeout == 0 {
  354. resp, accepted = <-req.chanResponse
  355. } else {
  356. select {
  357. case resp, accepted = <-req.chanResponse:
  358. accepted = true
  359. case <-time.After(timeout):
  360. break
  361. }
  362. }
  363. return
  364. }
  365. // Close finish work with client
  366. func (s *client) Close() {
  367. if s.state > StateStopped {
  368. s.conn.Close()
  369. }
  370. s.ctxCancel()
  371. }
  372. // destructor for finalizer
  373. func destroyClient(cl *Client) {
  374. cl.Close()
  375. }