123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- package ami
- import (
- "container/list"
- "context"
- "errors"
- "fmt"
- "log"
- "net"
- "runtime"
- "sync"
- "time"
- )
- type State byte
- const (
- StateStopped State = iota
- StateConnection
- StateConnected
- StateAuth
- StateAvailable
- StateBusy
- )
- func (s State) String() string {
- switch s {
- case StateStopped:
- return "Stopped"
- case StateConnection:
- return "Connection"
- case StateConnected:
- return "Connected"
- case StateAuth:
- return "Auth"
- case StateAvailable:
- return "Available"
- case StateBusy:
- return "Busy"
- default:
- return ""
- }
- }
- func New(host, login, password string, ctxGlobal context.Context, stateChanged func(State, error)) (cl *Client) {
- cl = &Client{
- &client{
- host: host,
- login: login,
- password: password,
- stateChanged: stateChanged,
- state: StateStopped,
- request: make(chan Request),
- response: make(chan Response),
- event: make(chan Event),
- requestsWork: list.New(),
- socketClosed: make(chan error),
- actionIDPrefix: fmt.Sprint(time.Now().UnixNano()),
- eventListeners: make(map[int64]*EventListener),
- locker: new(sync.RWMutex),
- },
- }
- if ctxGlobal == nil {
- cl.ctx, cl.ctxCancel = context.WithCancel(context.Background())
- } else {
- cl.ctx, cl.ctxCancel = context.WithCancel(ctxGlobal)
- }
- go cl.eventListenersCleaner()
- runtime.SetFinalizer(cl, destroyClient)
- return
- }
- // Client object
- type Client struct {
- *client
- }
- type client struct {
- ctx context.Context
- ctxCancel context.CancelFunc
- host string
- login string
- password string
- conn net.Conn
- request chan Request
- response chan Response
- event chan Event
- clientSideEvent chan Event
- stateChanged func(State, error)
- state State
- requestsWork *list.List
- socketClosed chan error
- actionIDPrefix string
- actionUUID uint64
- eventListeners map[int64]*EventListener
- locker *sync.RWMutex
- }
- func (s *client) State() State {
- return s.state
- }
- func (s *client) removeEventListener(uuid int64) {
- s.locker.Lock()
- if listener, check := s.eventListeners[uuid]; check {
- listener.close()
- delete(s.eventListeners, uuid)
- }
- s.locker.Unlock()
- }
- func (s *client) eventListenersCleaner() {
- ctx, _ := context.WithCancel(s.ctx)
- for {
- select {
- case <-time.After(time.Minute * 30):
- {
- now := time.Now()
- s.locker.Lock()
- for uuid, v := range s.eventListeners {
- if now.After(v.timeActual) {
- v.close()
- delete(s.eventListeners, uuid)
- }
- }
- s.locker.Unlock()
- }
- case <-ctx.Done():
- {
- for _, v := range s.eventListeners {
- v.close()
- }
- return
- }
- }
- }
- }
- func (s *client) registerEventListener(uuid int64) <-chan Event {
- listener := &EventListener{
- uuid: uuid,
- eventChan: make(chan Event, 1),
- }
- s.locker.Lock()
- s.eventListeners[uuid] = listener
- s.locker.Unlock()
- return listener.eventChan
- }
- func (s *client) initActionID() (res string) {
- res = fmt.Sprintf("%v%v", s.actionIDPrefix, s.actionUUID)
- if s.actionUUID < max_client_uuid {
- s.actionUUID++
- } else {
- s.actionUUID = 0
- }
- return
- }
- func (s *client) requestByActionID(actionID string) (req Request, elem *list.Element, check bool) {
- for elem = s.requestsWork.Front(); elem != nil; elem = elem.Next() {
- req = elem.Value.(Request)
- if req.ActionID() == actionID {
- check = true
- return
- }
- }
- return
- }
- func (s *client) setState(state State, err error) {
- oldState := s.state
- s.state = state
- if s.stateChanged != nil && (state != oldState || err != nil) {
- s.stateChanged(state, err)
- }
- s.state = state
- }
- func (s *client) Event() chan Event {
- if s.clientSideEvent == nil {
- s.clientSideEvent = make(chan Event)
- }
- return s.clientSideEvent
- }
- func (s *client) eventAccepted(event Event) {
- log.Println("... event-accepted", event, event.uuid, s.clientSideEvent)
- switch event.Name() {
- case "FullyBooted":
- {
- if s.state == StateAuth {
- for elem := s.requestsWork.Front(); elem != nil; elem.Next() {
- s.sendQueueRequest()
- }
- }
- }
- }
- // send event to client side
- if s.clientSideEvent != nil {
- s.clientSideEvent <- event
- }
- if event.uuid > 0 {
- var check bool
- var listener *EventListener
- s.locker.RLock()
- log.Println("EEEEEEEEEEEEEEEE>>>>>>>>>>>>>>>>>>>>>>>>>>>", s.eventListeners[event.uuid])
- if listener, check = s.eventListeners[event.uuid]; check {
- listener.incomingEvent(event)
- }
- s.locker.RUnlock()
- if check && event.Name() == "Hangup" {
- s.locker.Lock()
- listener.close()
- delete(s.eventListeners, event.uuid)
- s.locker.Unlock()
- }
- }
- }
- // start open connection to asterisk ami server
- func (s *client) Start() {
- var err error
- // check state. StateStopped needed
- if s.state != StateStopped {
- err = errors.New("AMI start error: client already started")
- s.stateChanged(s.state, err)
- return
- }
- defer func() {
- s.setState(StateStopped, err)
- }()
- s.setState(StateConnection, nil)
- // connection and read ami greetings message
- if s.conn, err = net.Dial("tcp", s.host); err != nil {
- err = fmt.Errorf("AMI connection socket connection error: %v", err.Error())
- return
- }
- s.setState(StateConnected, nil)
- // socket connected. receive greetings text
- if _, err = s.receiveSingle(); err != nil {
- err = fmt.Errorf("AMI greetings receive error: %v", err.Error())
- return
- }
- // greetings received, make attempt to auth
- auth := InitRequest("Login")
- auth.SetParam("UserName", s.login)
- auth.SetParam("Secret", s.password)
- actionCallback := func(action ActionData) {
- if action.isEvent() {
- s.eventAccepted(Event{action, 0})
- } else {
- response := Response{action}
- if !response.IsError() {
- s.setState(StateAuth, nil)
- } else {
- err = fmt.Errorf("AMI authentication error: %v", action["Message"])
- return
- }
- }
- }
- if socketErr := s.sendSingleRequest(auth, actionCallback); socketErr != nil || err != nil {
- if err == nil {
- err = socketErr
- }
- return
- }
- go s.receiveLoop()
- loop:
- for {
- select {
- case request := <-s.request:
- {
- actionID := s.initActionID()
- request.ActionData["ActionID"] = actionID
- s.requestsWork.PushFront(request)
- if s.state == StateAuth {
- if err := s.sendRequest(request); err != nil {
- log.Println("SendRequestERROR")
- }
- }
- }
- case event := <-s.event:
- s.eventAccepted(event)
- case response := <-s.response:
- {
- if req, elem, check := s.requestByActionID(response.ActionID()); check {
- req.chanResponse <- response
- close(req.chanResponse)
- s.requestsWork.Remove(elem)
- }
- }
- case err = <-s.socketClosed:
- break loop
- }
- }
- return
- }
- func (s *client) sendQueueRequest() error {
- for elem := s.requestsWork.Front(); elem != nil; elem.Next() {
- req := elem.Value.(Request)
- if req.sended {
- s.requestsWork.Remove(elem)
- } else if err := s.sendRequest(req); err != nil {
- return err
- }
- }
- return nil
- }
- func (s *client) receiveSingle() (data []byte, err error) {
- count, buf := 0, make([]byte, 1024)
- if count, err = s.conn.Read(buf); err == nil {
- data = buf[:count]
- }
- return
- }
- func (s *client) sendSingleRequest(request Request, acceptCallback func(ActionData)) (err error) {
- // send action
- if err = s.sendRequest(request); err != nil {
- return
- }
- // receive answer
- var data []byte
- for {
- count, buf := 0, make([]byte, 1024)
- if count, err = s.conn.Read(buf); err != nil {
- return
- }
- if data = actionsFromRaw(append(data, buf[:count]...), acceptCallback); len(data) == 0 {
- return
- }
- }
- }
- func (s *client) sendRequest(req Request) (err error) {
- if _, err := s.conn.Write(req.raw()); err != nil {
- err = fmt.Errorf("AMI socket send data error: %v", err.Error())
- } else {
- req.sended = true
- }
- return
- }
- func (s *client) receiveLoop() {
- var (
- data []byte
- count int
- err error
- )
- buf := make([]byte, 1024)
- for {
- if count, err = s.conn.Read(buf); err != nil {
- err = fmt.Errorf("AMI socket receive data error: %v", err.Error())
- s.socketClosed <- err
- return
- }
- data = actionsFromRaw(
- append(data, buf[:count]...),
- func(action ActionData) {
- if action.isEvent() {
- s.event <- initEvent(action)
- } else {
- s.response <- Response{action}
- }
- },
- )
- }
- }
- func (s *client) Request(req Request, timeout time.Duration) (resp Response, accepted bool) {
- req.chanResponse = make(chan Response)
- s.request <- req
- if timeout == 0 {
- resp, accepted = <-req.chanResponse
- } else {
- select {
- case resp, accepted = <-req.chanResponse:
- accepted = true
- case <-time.After(timeout):
- break
- }
- }
- return
- }
- // Close finish work with client
- func (s *client) Close() {
- if s.state > StateStopped {
- s.conn.Close()
- }
- s.ctxCancel()
- }
- // destructor for finalizer
- func destroyClient(cl *Client) {
- cl.Close()
- }
|