123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- package rest
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "time"
- )
- func NewRequestStream(timeout time.Time, req *Request) *RequestStream {
- return &RequestStream{
- Request: req,
- Timeout: timeout,
- }
- }
- func ReadRequestStream(r io.Reader) (*RequestStream, IErrorArgs) {
- // type
- req := RequestStream{
- Request: &Request{},
- }
- if err := ReadByte(r, "type", (*byte)(&req.Type)); err != nil {
- return nil, err
- }
- log.Println("TYPE", req.Type)
- if req.Type == RequestTypeIn || req.Type == RequestTypeOut {
- // id
- if err := ReadInt64(r, 2, "id", &req.ID); err != nil {
- return nil, err
- }
- log.Println("ID", req.ID)
- if req.Type == RequestTypeIn {
- // timeout
- var timeout int64
- if err := ReadInt64(r, 8, "timeout", &timeout); err != nil {
- return nil, err
- }
- req.Timeout = time.Unix(timeout, 0)
- log.Println("TIMEOUT", req.Type)
- }
- }
- // command
- if err := ReadString(r, 2, "command", &req.Command); err != nil {
- return nil, err
- }
- // data
- dataBuf, err := ReadBufSize(r, 8, "data")
- if err != nil {
- return nil, err
- }
- if len(dataBuf) > 0 {
- if err := json.Unmarshal(dataBuf, &req.Data); err != nil {
- return nil, ioError("data", err)
- }
- }
- // files count
- var filesCount int64
- if err := ReadInt64(r, 2, "files_count", &filesCount); err != nil {
- return nil, err
- }
- // files
- if filesCount > 0 {
- req.Files = make(RequestFiles)
- }
- for i := 0; i < int(filesCount); i++ {
- filePrefix := fmt.Sprintf("file[%v]", i)
- var fileName string
- if err := ReadString(r, 2, filePrefix+".name", &fileName); err != nil {
- return nil, err
- }
- var fileSize int64
- if err := ReadInt64(r, 8, filePrefix+".size", &fileSize); err != nil {
- return nil, err
- }
- if fileSize < 1024*1024 {
- // RAM
- fileData, err := ReadBuf(r, int(fileSize), filePrefix+".data")
- if err != nil {
- return nil, err
- }
- buf := NewReadCloserLen(
- io.NopCloser(bytes.NewBuffer(fileData)),
- int64(len(fileData)),
- )
- req.Files[fileName] = buf
- } else {
- // временной файл
- tmpF, err := NewTemporaryFile(fileSize, r)
- if err != nil {
- return nil, err
- }
- req.Files[fileName] = tmpF
- }
- }
- return &req, nil
- }
- ////////////////////////////////////////
- type RequestStream struct {
- ID int64
- Timeout time.Time
- *Request
- }
- func (s *RequestStream) Write(w io.Writer) IErrorArgs {
- // type
- if err := WriteByte(w, byte(s.Type), "type"); err != nil {
- return err
- }
- if s.Type == RequestTypeIn || s.Type == RequestTypeOut {
- // id
- if err := WriteInt64(w, s.ID, 2, "id"); err != nil {
- return err
- }
- if s.Type == RequestTypeIn {
- // timeout
- if err := WriteInt64(w, s.Timeout.Unix(), 8, "timeout"); err != nil {
- return err
- }
- }
- }
- // command
- if err := WriteString(w, s.Command, "command", 2); err != nil {
- return err
- }
- // data
- if err := WriteBufSize(w, s.Data.JSON(), 8, "data"); err != nil {
- return err
- }
- // files count
- if err := WriteInt64(w, int64(len(s.Files)), 2, "files_count"); err != nil {
- return err
- }
- // files
- for name, file := range s.Files {
- prefix := fmt.Sprintf("file[%s]", name)
- // file name
- if err := WriteString(w, name, prefix+".name", 2); err != nil {
- return err
- }
- // file body size
- if err := WriteInt64(w, file.Len(), 8, prefix+".size"); err != nil {
- return err
- }
- // file body
- if _, err := io.Copy(w, file); err != nil {
- return ioError(prefix+".data", err)
- }
- }
- return nil
- }
|