// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see .
package s3select
import (
	"bytes"
	"encoding/binary"
	"fmt"
	"hash/crc32"
	"net/http"
	"strconv"
	"sync/atomic"
	"time"
)
// A message is in the format specified in
// https://docs.aws.amazon.com/AmazonS3/latest/API/images/s3select-frame-diagram-frame-overview.png
// hence the calculation is made accordingly.
func totalByteLength(headerLength, payloadLength int) int {
	return 4 + 4 + 4 + headerLength + payloadLength + 4
}
func genMessage(header, payload []byte) []byte {
	headerLength := len(header)
	payloadLength := len(payload)
	totalLength := totalByteLength(headerLength, payloadLength)
	buf := new(bytes.Buffer)
	binary.Write(buf, binary.BigEndian, uint32(totalLength))
	binary.Write(buf, binary.BigEndian, uint32(headerLength))
	prelude := buf.Bytes()
	binary.Write(buf, binary.BigEndian, crc32.ChecksumIEEE(prelude))
	buf.Write(header)
	if payload != nil {
		buf.Write(payload)
	}
	message := buf.Bytes()
	binary.Write(buf, binary.BigEndian, crc32.ChecksumIEEE(message))
	return buf.Bytes()
}
// Refer genRecordsHeader().
var recordsHeader = []byte{
	13, ':', 'm', 'e', 's', 's', 'a', 'g', 'e', '-', 't', 'y', 'p', 'e', 7, 0, 5, 'e', 'v', 'e', 'n', 't',
	13, ':', 'c', 'o', 'n', 't', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 24, 'a', 'p', 'p', 'l', 'i', 'c', 'a', 't', 'i', 'o', 'n', '/', 'o', 'c', 't', 'e', 't', '-', 's', 't', 'r', 'e', 'a', 'm',
	11, ':', 'e', 'v', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 7, 'R', 'e', 'c', 'o', 'r', 'd', 's',
}
const (
	// Chosen for compatibility with AWS JAVA SDK
	// It has a a buffer size of 128K:
	// https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/internal/eventstreaming/MessageDecoder.java#L26
	// but we must make sure there is always space to add 256 bytes:
	// https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/SelectObjectContentEventStream.java#L197
	maxRecordMessageLength = (128 << 10) - 256
)
var (
	bufLength = payloadLenForMsgLen(maxRecordMessageLength)
)
// newRecordsMessage - creates new Records Message which can contain a single record, partial records,
// or multiple records. Depending on the size of the result, a response can contain one or more of these messages.
//
// Header specification
// Records messages contain three headers, as follows:
// https://docs.aws.amazon.com/AmazonS3/latest/API/images/s3select-frame-diagram-record.png
//
// Payload specification
// Records message payloads can contain a single record, partial records, or multiple records.
func newRecordsMessage(payload []byte) []byte {
	return genMessage(recordsHeader, payload)
}
// payloadLenForMsgLen computes the length of the payload in a record
// message given the total length of the message.
func payloadLenForMsgLen(messageLength int) int {
	headerLength := len(recordsHeader)
	payloadLength := messageLength - 4 - 4 - 4 - headerLength - 4
	return payloadLength
}
// continuationMessage - S3 periodically sends this message to keep the TCP connection open.
// These messages appear in responses at random. The client must detect the message type and process accordingly.
//
// Header specification:
// Continuation messages contain two headers, as follows:
// https://docs.aws.amazon.com/AmazonS3/latest/API/images/s3select-frame-diagram-cont.png
//
// Payload specification:
// Continuation messages have no payload.
var continuationMessage = []byte{
	0, 0, 0, 57, // total byte-length.
	0, 0, 0, 41, // headers byte-length.
	139, 161, 157, 242, // prelude crc.
	13, ':', 'm', 'e', 's', 's', 'a', 'g', 'e', '-', 't', 'y', 'p', 'e', 7, 0, 5, 'e', 'v', 'e', 'n', 't', // headers.
	11, ':', 'e', 'v', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 4, 'C', 'o', 'n', 't', // headers.
	156, 134, 74, 13, // message crc.
}
// Refer genProgressHeader().
var progressHeader = []byte{
	13, ':', 'm', 'e', 's', 's', 'a', 'g', 'e', '-', 't', 'y', 'p', 'e', 7, 0, 5, 'e', 'v', 'e', 'n', 't',
	13, ':', 'c', 'o', 'n', 't', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 8, 't', 'e', 'x', 't', '/', 'x', 'm', 'l',
	11, ':', 'e', 'v', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 8, 'P', 'r', 'o', 'g', 'r', 'e', 's', 's',
}
// newProgressMessage - creates new Progress Message. S3 periodically sends this message, if requested.
// It contains information about the progress of a query that has started but has not yet completed.
//
// Header specification:
// Progress messages contain three headers, as follows:
// https://docs.aws.amazon.com/AmazonS3/latest/API/images/s3select-frame-diagram-progress.png
//
// Payload specification:
// Progress message payload is an XML document containing information about the progress of a request.
//   * BytesScanned => Number of bytes that have been processed before being uncompressed (if the file is compressed).
//   * BytesProcessed => Number of bytes that have been processed after being uncompressed (if the file is compressed).
//   * BytesReturned => Current number of bytes of records payload data returned by S3.
//
// For uncompressed files, BytesScanned and BytesProcessed are equal.
//
// Example:
//
// 
// 
//
func newProgressMessage(bytesScanned, bytesProcessed, bytesReturned int64) []byte {
	payload := []byte(`