request_stream.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package rest
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "time"
  9. )
  10. func NewRequestStream(timeout time.Time, req *Request) *RequestStream {
  11. return &RequestStream{
  12. Request: req,
  13. Timeout: timeout,
  14. }
  15. }
  16. func ReadRequestStream(r io.Reader) (*RequestStream, IErrorArgs) {
  17. // type
  18. req := RequestStream{
  19. Request: &Request{},
  20. }
  21. if err := ReadByte(r, "type", (*byte)(&req.Type)); err != nil {
  22. return nil, err
  23. }
  24. log.Println("TYPE", req.Type)
  25. if req.Type == RequestTypeIn || req.Type == RequestTypeOut {
  26. // id
  27. if err := ReadInt64(r, 2, "id", &req.ID); err != nil {
  28. return nil, err
  29. }
  30. log.Println("ID", req.ID)
  31. if req.Type == RequestTypeIn {
  32. // timeout
  33. var timeout int64
  34. if err := ReadInt64(r, 8, "timeout", &timeout); err != nil {
  35. return nil, err
  36. }
  37. req.Timeout = time.Unix(timeout, 0)
  38. log.Println("TIMEOUT", req.Type)
  39. }
  40. }
  41. // command
  42. if err := ReadString(r, 2, "command", &req.Command); err != nil {
  43. return nil, err
  44. }
  45. // data
  46. dataBuf, err := ReadBufSize(r, 8, "data")
  47. if err != nil {
  48. return nil, err
  49. }
  50. if len(dataBuf) > 0 {
  51. if err := json.Unmarshal(dataBuf, &req.Data); err != nil {
  52. return nil, ioError("data", err)
  53. }
  54. }
  55. // files count
  56. var filesCount int64
  57. if err := ReadInt64(r, 2, "files_count", &filesCount); err != nil {
  58. return nil, err
  59. }
  60. // files
  61. if filesCount > 0 {
  62. req.Files = make(RequestFiles)
  63. }
  64. for i := 0; i < int(filesCount); i++ {
  65. filePrefix := fmt.Sprintf("file[%v]", i)
  66. var fileName string
  67. if err := ReadString(r, 2, filePrefix+".name", &fileName); err != nil {
  68. return nil, err
  69. }
  70. var fileSize int64
  71. if err := ReadInt64(r, 8, filePrefix+".size", &fileSize); err != nil {
  72. return nil, err
  73. }
  74. if fileSize < 1024*1024 {
  75. // RAM
  76. fileData, err := ReadBuf(r, int(fileSize), filePrefix+".data")
  77. if err != nil {
  78. return nil, err
  79. }
  80. buf := NewReadCloserLen(
  81. io.NopCloser(bytes.NewBuffer(fileData)),
  82. int64(len(fileData)),
  83. )
  84. req.Files[fileName] = buf
  85. } else {
  86. // временной файл
  87. tmpF, err := NewTemporaryFile(fileSize, r)
  88. if err != nil {
  89. return nil, err
  90. }
  91. req.Files[fileName] = tmpF
  92. }
  93. }
  94. return &req, nil
  95. }
  96. ////////////////////////////////////////
  97. type RequestStream struct {
  98. ID int64
  99. Timeout time.Time
  100. *Request
  101. }
  102. func (s *RequestStream) Write(w io.Writer) IErrorArgs {
  103. // type
  104. if err := WriteByte(w, byte(s.Type), "type"); err != nil {
  105. return err
  106. }
  107. if s.Type == RequestTypeIn || s.Type == RequestTypeOut {
  108. // id
  109. if err := WriteInt64(w, s.ID, 2, "id"); err != nil {
  110. return err
  111. }
  112. if s.Type == RequestTypeIn {
  113. // timeout
  114. if err := WriteInt64(w, s.Timeout.Unix(), 8, "timeout"); err != nil {
  115. return err
  116. }
  117. }
  118. }
  119. // command
  120. if err := WriteString(w, s.Command, "command", 2); err != nil {
  121. return err
  122. }
  123. // data
  124. if err := WriteBufSize(w, s.Data.JSON(), 8, "data"); err != nil {
  125. return err
  126. }
  127. // files count
  128. if err := WriteInt64(w, int64(len(s.Files)), 2, "files_count"); err != nil {
  129. return err
  130. }
  131. // files
  132. for name, file := range s.Files {
  133. prefix := fmt.Sprintf("file[%s]", name)
  134. // file name
  135. if err := WriteString(w, name, prefix+".name", 2); err != nil {
  136. return err
  137. }
  138. // file body size
  139. if err := WriteInt64(w, file.Len(), 8, prefix+".size"); err != nil {
  140. return err
  141. }
  142. // file body
  143. if _, err := io.Copy(w, file); err != nil {
  144. return ioError(prefix+".data", err)
  145. }
  146. }
  147. return nil
  148. }