request_stream.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package rest
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "time"
  8. )
  9. func NewRequestStream(timeout time.Time, req *Request) *RequestStream {
  10. return &RequestStream{
  11. Request: req,
  12. Timeout: timeout,
  13. }
  14. }
  15. func ReadRequestStream(r io.Reader) (*RequestStream, IErrorArgs) {
  16. // type
  17. req := RequestStream{
  18. Request: &Request{},
  19. }
  20. if err := ReadByte(r, "type", (*byte)(&req.Type)); err != nil {
  21. return nil, err
  22. }
  23. if req.Type == RequestTypeIn || req.Type == RequestTypeOut {
  24. // id
  25. if err := ReadInt64(r, 2, "id", &req.ID); err != nil {
  26. return nil, err
  27. }
  28. if req.Type == RequestTypeIn {
  29. // timeout
  30. var timeout int64
  31. if err := ReadInt64(r, 8, "timeout", &timeout); err != nil {
  32. return nil, err
  33. }
  34. req.Timeout = time.Unix(timeout, 0)
  35. }
  36. }
  37. // command
  38. if err := ReadString(r, 2, "command", &req.Command); err != nil {
  39. return nil, err
  40. }
  41. // data
  42. dataBuf, err := ReadBufSize(r, 8, "data")
  43. if err != nil {
  44. return nil, err
  45. }
  46. if len(dataBuf) > 0 {
  47. if err := json.Unmarshal(dataBuf, &req.Data); err != nil {
  48. return nil, ioError("data", err)
  49. }
  50. }
  51. // files count
  52. var filesCount int64
  53. if err := ReadInt64(r, 2, "files_count", &filesCount); err != nil {
  54. return nil, err
  55. }
  56. // files
  57. if filesCount > 0 {
  58. req.Files = make(RequestFiles)
  59. }
  60. for i := 0; i < int(filesCount); i++ {
  61. filePrefix := fmt.Sprintf("file[%v]", i)
  62. var fileName string
  63. if err := ReadString(r, 2, filePrefix+".name", &fileName); err != nil {
  64. return nil, err
  65. }
  66. var fileSize int64
  67. if err := ReadInt64(r, 8, filePrefix+".size", &fileSize); err != nil {
  68. return nil, err
  69. }
  70. if fileSize < 1024*1024 {
  71. // RAM
  72. fileData, err := ReadBuf(r, int(fileSize), filePrefix+".data")
  73. if err != nil {
  74. return nil, err
  75. }
  76. buf := NewReadCloserLen(
  77. io.NopCloser(bytes.NewBuffer(fileData)),
  78. int64(len(fileData)),
  79. )
  80. req.Files[fileName] = buf
  81. } else {
  82. // временной файл
  83. tmpF, err := NewTemporaryFile(fileSize, r)
  84. if err != nil {
  85. return nil, err
  86. }
  87. req.Files[fileName] = tmpF
  88. }
  89. }
  90. return &req, nil
  91. }
  92. ////////////////////////////////////////
  93. type RequestStream struct {
  94. ID int64
  95. Timeout time.Time
  96. *Request
  97. }
  98. func (s *RequestStream) Write(w io.Writer) IErrorArgs {
  99. // type
  100. if err := WriteByte(w, byte(s.Type), "type"); err != nil {
  101. return err
  102. }
  103. if s.Type == RequestTypeIn || s.Type == RequestTypeOut {
  104. // id
  105. if err := WriteInt64(w, s.ID, 2, "id"); err != nil {
  106. return err
  107. }
  108. if s.Type == RequestTypeIn {
  109. // timeout
  110. if err := WriteInt64(w, s.Timeout.Unix(), 8, "timeout"); err != nil {
  111. return err
  112. }
  113. }
  114. }
  115. // command
  116. if err := WriteString(w, s.Command, "command", 2); err != nil {
  117. return err
  118. }
  119. // data
  120. if err := WriteBufSize(w, s.Data.JSON(), 8, "data"); err != nil {
  121. return err
  122. }
  123. // files count
  124. if err := WriteInt64(w, int64(len(s.Files)), 2, "files_count"); err != nil {
  125. return err
  126. }
  127. // files
  128. for name, file := range s.Files {
  129. prefix := fmt.Sprintf("file[%s]", name)
  130. // file name
  131. if err := WriteString(w, name, prefix+".name", 2); err != nil {
  132. return err
  133. }
  134. // file body size
  135. if err := WriteInt64(w, file.Len(), 8, prefix+".size"); err != nil {
  136. return err
  137. }
  138. // file body
  139. if _, err := io.Copy(w, file); err != nil {
  140. return ioError(prefix+".data", err)
  141. }
  142. }
  143. return nil
  144. }