client.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package main
  2. import (
  3. "log"
  4. "time"
  5. "git.ali33.ru/fcg-xvii/go-tools/json"
  6. "github.com/gorilla/websocket"
  7. "golang.org/x/net/context"
  8. )
  9. type WSClient struct {
  10. clients *WSClients
  11. conn *websocket.Conn
  12. ctx context.Context
  13. cancel context.CancelFunc
  14. //callClose func(*WSClient)
  15. //callIncoming func(*WSClient, json.Map)
  16. }
  17. func (s *WSClient) SendData(data json.Map) error {
  18. w, err := s.conn.NextWriter(websocket.TextMessage)
  19. if err != nil {
  20. return err
  21. }
  22. _, err = w.Write(data.JSON())
  23. w.Close()
  24. return err
  25. }
  26. func (s *WSClient) SendSingle(data json.Map) error {
  27. jm := json.Map{
  28. "data": data,
  29. }
  30. return s.SendData(jm)
  31. }
  32. func (s *WSClient) SendMessage(data json.Map, deadline time.Duration) (<-chan json.Map, error) {
  33. dl := time.Now().Add(deadline)
  34. id := s.clients.nextID()
  35. jm := json.Map{
  36. "id": id,
  37. "data": data,
  38. }
  39. if err := s.SendData(jm); err != nil {
  40. return nil, err
  41. }
  42. ctx, _ := context.WithDeadline(context.Background(), dl)
  43. ch := make(chan json.Map)
  44. rch := make(chan json.Map)
  45. s.clients.rStore.Store(id, ch)
  46. go func() {
  47. select {
  48. case <-ctx.Done():
  49. s.clients.rStore.Delete(id)
  50. close(rch)
  51. case rData := <-ch:
  52. select {
  53. case rch <- rData:
  54. default:
  55. }
  56. }
  57. }()
  58. return rch, nil
  59. }
  60. func (s *WSClient) Context() context.Context {
  61. return s.ctx
  62. }
  63. func (s *WSClient) Read() <-chan *Message {
  64. ch := make(chan *Message, 10)
  65. go func() {
  66. defer func() {
  67. s.cancel()
  68. close(ch)
  69. }()
  70. for {
  71. t, src, err := s.conn.ReadMessage()
  72. log.Println(t, string(src), err)
  73. if err != nil {
  74. s.clients.clientDisconnected(s)
  75. return
  76. }
  77. var jm json.Map
  78. if err = json.Unmarshal(src, &jm); err == nil {
  79. if jm.Bool("is_response", false) {
  80. //log.Println("AAAAAAAAAAAAAAAAAAA")
  81. if rch, check := s.clients.rStore.LoadAndDelete(jm.Int("id", -1)); check {
  82. rch.(chan json.Map) <- jm.Map("data", json.Map{})
  83. }
  84. } else {
  85. //log.Println("UUUUUUUUUUUUUUUU")
  86. mes := MessageFromMap(jm, s)
  87. select {
  88. case ch <- mes:
  89. default:
  90. }
  91. }
  92. }
  93. }
  94. }()
  95. return ch
  96. }
  97. /////////////////////////