message.go 5.9 KB


  1. package rest_websocket
  2. import (
  3. "bytes"
  4. "io"
  5. "time"
  6. "git.ali33.ru/fcg-xvii/go-tools/json"
  7. "git.ali33.ru/fcg-xvii/rest"
  8. )
  9. // Int64ToBytes упаковывает int64 в срез байтов заданной длины
  10. func Int64ToBytes(num int64, byteCount int) []byte {
  11. bytes := make([]byte, byteCount)
  12. for i := 0; i < byteCount; i++ {
  13. shift := uint((byteCount - 1 - i) * 8)
  14. bytes[i] = byte(num >> shift)
  15. }
  16. return bytes
  17. }
  18. // BytesToInt64 конвертирует срез байтов в int64
  19. func BytesToInt64(bytes []byte) int64 {
  20. var num int64
  21. for _, b := range bytes {
  22. num = (num << 8) | int64(b)
  23. }
  24. return num
  25. }
  26. func ioError(field string, err error) rest.IErrorArgs {
  27. return rest.NewError(
  28. "ErrIO",
  29. json.Map{
  30. "field": field,
  31. "error": err.Error(),
  32. },
  33. )
  34. }
  35. func ReadMessage(r io.Reader) (*Message, rest.IErrorArgs) {
  36. // todo
  37. // id
  38. sType := make([]byte, 1)
  39. if _, err := r.Read(sType); err != nil {
  40. return nil, ioError("type", err)
  41. }
  42. mes := Message{
  43. mType: rest.RequestType(sType[0]),
  44. }
  45. if mes.mType == rest.RequestTypeMessage || mes.mType == rest.RequestTypeAnswer {
  46. sID := make([]byte, 2)
  47. if _, err := r.Read(sID); err != nil {
  48. return nil, ioError("id", err)
  49. }
  50. mes.id = BytesToInt64(sID)
  51. }
  52. if mes.mType == rest.RequestTypeMessage {
  53. sTimeout := make([]byte, 8)
  54. if _, err := r.Read(sTimeout); err != nil {
  55. return nil, ioError("timeout", err)
  56. }
  57. mes.timeout = time.Unix(BytesToInt64(sTimeout), 0)
  58. }
  59. sCommandSize := make([]byte, 2)
  60. if _, err := r.Read(sCommandSize); err != nil {
  61. return nil, ioError("data_size", err)
  62. }
  63. if BytesToInt64(sCommandSize) > 0 {
  64. sCommand := make([]byte, BytesToInt64(sCommandSize))
  65. if _, err := r.Read(sCommand); err != nil {
  66. return nil, ioError("data", err)
  67. }
  68. mes.command = string(sCommand)
  69. }
  70. sDataSize := make([]byte, 8)
  71. if _, err := r.Read(sDataSize); err != nil {
  72. return nil, ioError("data_size", err)
  73. }
  74. sData := make([]byte, BytesToInt64(sDataSize))
  75. if _, err := r.Read(sData); err != nil {
  76. return nil, ioError("data", err)
  77. }
  78. if len(sData) > 0 {
  79. if err := json.Unmarshal(sData, &mes.data); err != nil {
  80. return nil, ioError("data", err)
  81. }
  82. }
  83. sFilesCount := make([]byte, 2)
  84. if _, err := r.Read(sFilesCount); err != nil {
  85. return nil, ioError("files_count", err)
  86. }
  87. filesCount := BytesToInt64(sFilesCount)
  88. files := make(map[string]rest.IReadCloserLen)
  89. for i := 0; i < int(filesCount); i++ {
  90. sFileNameLen := make([]byte, 2)
  91. if _, err := r.Read(sFileNameLen); err != nil {
  92. return nil, ioError("file_name_length", err)
  93. }
  94. sFileName := make([]byte, BytesToInt64(sFileNameLen))
  95. if _, err := r.Read(sFileName); err != nil {
  96. return nil, ioError("file_name", err)
  97. }
  98. sFileSize := make([]byte, 8)
  99. if _, err := r.Read(sFileSize); err != nil {
  100. return nil, ioError("file_size", err)
  101. }
  102. fileSize := BytesToInt64(sFileSize)
  103. if fileSize < 1024*1024 {
  104. // RAM buffer
  105. sFileData := make([]byte, fileSize)
  106. if _, err := r.Read(sFileData); err != nil {
  107. return nil, ioError("file_data", err)
  108. }
  109. buf := rest.NewReadCloserLen(
  110. io.NopCloser(bytes.NewBuffer(sFileData)),
  111. int64(len(sFileData)),
  112. )
  113. files[string(sFileName)] = buf
  114. } else {
  115. // temporary file
  116. tmpF, err := rest.NewTemporaryFile(fileSize, r)
  117. if err != nil {
  118. return nil, err
  119. }
  120. files[string(sFileName)] = tmpF
  121. }
  122. }
  123. return &mes, nil
  124. }
  125. func NewMessage(command string, data json.Map, files map[string]rest.IReadCloserLen, timeout time.Duration, mType rest.RequestType) *Message {
  126. return &Message{
  127. command: command,
  128. data: data,
  129. files: files,
  130. mType: mType,
  131. timeout: time.Now().Add(timeout),
  132. }
  133. }
  134. type Message struct {
  135. id int64
  136. command string
  137. mType rest.RequestType
  138. timeout time.Time
  139. data json.Map
  140. files map[string]rest.IReadCloserLen
  141. owner *WebSocket
  142. }
  143. func (s *Message) Data() json.Map {
  144. return s.data
  145. }
  146. func (s *Message) File(name string) (rest.IReadCloserLen, bool) {
  147. file, check := s.files[name]
  148. return file, check
  149. }
  150. func (s *Message) FileKeys() []string {
  151. keys := make([]string, 0, len(s.files))
  152. for k := range s.files {
  153. keys = append(keys, k)
  154. }
  155. return keys
  156. }
  157. func (s *Message) Command() string {
  158. return s.command
  159. }
  160. func (s *Message) IsBinagy() bool {
  161. return len(s.files) > 0
  162. }
  163. func (s *Message) Write(w io.Writer) rest.IErrorArgs {
  164. // id
  165. if _, err := w.Write(Int64ToBytes(s.id, 2)); err != nil {
  166. return ioError("id", err)
  167. }
  168. // type
  169. if _, err := w.Write(Int64ToBytes(int64(s.mType), 1)); err != nil {
  170. return ioError("type", err)
  171. }
  172. // timeout
  173. if _, err := w.Write(Int64ToBytes(s.timeout.Unix(), 8)); err != nil {
  174. return ioError("timeout", err)
  175. }
  176. // command length
  177. if _, err := w.Write(Int64ToBytes(int64(len(s.command)), 2)); err != nil {
  178. return ioError("command_length", err)
  179. }
  180. // command
  181. if len(s.command) > 0 {
  182. if _, err := w.Write([]byte(s.command)); err != nil {
  183. return ioError("command", err)
  184. }
  185. }
  186. // data
  187. data := s.data.JSON()
  188. // data size
  189. if _, err := w.Write(Int64ToBytes(int64(len(data)), 8)); err != nil {
  190. return ioError("data_size", err)
  191. }
  192. // data body
  193. if _, err := w.Write(data); err != nil {
  194. return ioError("data_body", err)
  195. }
  196. // files count
  197. filesCount := int64(len(s.files))
  198. if _, err := w.Write(Int64ToBytes(filesCount, 2)); err != nil {
  199. return ioError("files_count", err)
  200. }
  201. // files
  202. for name, file := range s.files {
  203. // file name size
  204. fileNameSize := int64(len(name))
  205. if _, err := w.Write(Int64ToBytes(fileNameSize, 2)); err != nil {
  206. return ioError("file_name_size", err)
  207. }
  208. // file name
  209. if _, err := w.Write([]byte(name)); err != nil {
  210. return ioError("file_name", err)
  211. }
  212. // file body size
  213. if _, err := w.Write(Int64ToBytes(file.Len(), 8)); err != nil {
  214. return ioError("file_body_size", err)
  215. }
  216. // file body
  217. if _, err := io.Copy(w, file); err != nil {
  218. return ioError("file_body", err)
  219. }
  220. }
  221. return nil
  222. }
  223. func (s *Message) Close() {
  224. for _, file := range s.files {
  225. file.Close()
  226. }
  227. }