605 lines
13 KiB
Go
605 lines
13 KiB
Go
|
package wireproto
|
||
|
|
||
|
import (
|
||
|
`bytes`
|
||
|
`encoding/binary`
|
||
|
`fmt`
|
||
|
`io`
|
||
|
`net`
|
||
|
`slices`
|
||
|
)
|
||
|
|
||
|
// GetByteOrder returns the byte order ("endianness") used by the library.
|
||
|
func GetByteOrder() (order binary.ByteOrder) {
|
||
|
|
||
|
order = byteOrder
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// GetHdrs returns a map of a header name/status indicator and its byte sequence.
|
||
|
func GetHdrs() (hdrs map[string][]byte) {
|
||
|
|
||
|
hdrs = map[string][]byte{
|
||
|
"OK": respStatusOK,
|
||
|
"ERR": respStatusErr,
|
||
|
"CKSUM": hdrCKSUM,
|
||
|
"MSGSTART": hdrMSGSTART,
|
||
|
"BODYSTART": hdrBODYSTART,
|
||
|
"BODYEND": hdrBODYEND,
|
||
|
"MSGEND": hdrMSGEND,
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// ReadConnRequest returns a Request from a net.Conn.
|
||
|
func ReadConnRequest(conn net.Conn) (req *Request, err error) {
|
||
|
|
||
|
var b []byte
|
||
|
var size int
|
||
|
var buf *bytes.Buffer = new(bytes.Buffer)
|
||
|
|
||
|
// First check for a checksum.
|
||
|
b = make([]byte, len(hdrMSGSTART))
|
||
|
if _, err = conn.Read(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = buf.Write(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if bytes.Equal(b, hdrCKSUM) {
|
||
|
// A checksum was found. Continue reading in the checksum and MSGSTART.
|
||
|
if _, err = io.CopyN(buf, conn, int64(CksumPackedSize+len(hdrMSGSTART))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
// Otherwise no checksum was found.
|
||
|
|
||
|
// Protocol version, BODYSTART, RG count.
|
||
|
if _, err = io.CopyN(buf, conn, int64(PackedNumSize+len(hdrBODYSTART)+PackedNumSize)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// RG size.
|
||
|
b = make([]byte, PackedNumSize)
|
||
|
if _, err = conn.Read(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = buf.Write(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
size = UnpackInt(b)
|
||
|
|
||
|
// Now copy in the RG, BODYEND, MSGEND.
|
||
|
if _, err = io.CopyN(buf, conn, int64(size+len(hdrBODYEND)+len(hdrMSGEND))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Unmarshal.
|
||
|
req = new(Request)
|
||
|
if err = req.UnmarshalBinary(buf.Bytes()); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// ReadConnResponse returns a Response from a net.Conn.
|
||
|
func ReadConnResponse(conn net.Conn) (resp *Response, err error) {
|
||
|
|
||
|
var b []byte
|
||
|
var size int
|
||
|
var buf *bytes.Buffer = new(bytes.Buffer)
|
||
|
|
||
|
// First get the checksum.
|
||
|
b = make([]byte, len(hdrCKSUM))
|
||
|
if _, err = conn.Read(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = buf.Write(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if !bytes.Equal(b, hdrCKSUM) {
|
||
|
err = ErrBadHdr
|
||
|
return
|
||
|
}
|
||
|
// A checksum was found. Continue reading in the checksum, MSGSTART, protocol version, BODYSTART, and RG count.
|
||
|
if _, err = io.CopyN(
|
||
|
buf, conn, int64(CksumPackedSize+len(hdrMSGSTART)+PackedNumSize+len(hdrBODYSTART)+PackedNumSize),
|
||
|
); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// RG size.
|
||
|
b = make([]byte, PackedNumSize)
|
||
|
if _, err = conn.Read(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = buf.Write(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
size = UnpackInt(b)
|
||
|
|
||
|
// Now copy in the RG, BODYEND, MSGEND.
|
||
|
if _, err = io.CopyN(buf, conn, int64(size+len(hdrBODYEND)+len(hdrMSGEND))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Unmarshal.
|
||
|
resp = new(Response)
|
||
|
if err = resp.UnmarshalBinary(buf.Bytes()); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// PackInt is a more generic form of PackUint32 as a convenience wrapper.
|
||
|
func PackInt(i int) (b []byte) {
|
||
|
|
||
|
b = make([]byte, PackedNumSize)
|
||
|
|
||
|
byteOrder.PutUint32(b, uint32(i))
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// PackUint32 uses the package-wide byteOrder to pack a uint32 into a series of bytes.
|
||
|
func PackUint32(u uint32) (b []byte) {
|
||
|
|
||
|
b = make([]byte, PackedNumSize)
|
||
|
|
||
|
byteOrder.PutUint32(b, u)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// UnpackInt is a more generic form of UnpackUint32 as a convenience wrapper. The same caveat applies for UnpackUint32.
|
||
|
func UnpackInt(b []byte) (i int) {
|
||
|
|
||
|
var u uint32
|
||
|
|
||
|
u = UnpackUint32(b)
|
||
|
i = int(u)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// UnpackUint32 returns a uint32 from byteslice b. The byteslice *MUST* be PackedNumSize (4) bytes long, or u will always be 0.
|
||
|
func UnpackUint32(b []byte) (u uint32) {
|
||
|
|
||
|
if b == nil || len(b) != PackedNumSize {
|
||
|
return
|
||
|
}
|
||
|
u = byteOrder.Uint32(b)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
WriteConnRequest writes a Request to a net.Conn.
|
||
|
|
||
|
If chunkSize == 0, no chunking will be performed. This may be unreliable on some connections.
|
||
|
|
||
|
If chunkSize > 0, chunking will be performed based on chunkSize.
|
||
|
|
||
|
If chunkSize < 0, chunking will be done based on WriteChunkSize number of bytes, which should be sensible for most connections.
|
||
|
*/
|
||
|
func WriteConnRequest(req *Request, conn net.Conn, chunkSize int) (err error) {
|
||
|
|
||
|
if err = WriteRequest(req, conn, chunkSize); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
WriteConnRequestSegmented writes a Request to a net.Conn, chunking it by each logical section.
|
||
|
|
||
|
This is slower than a continuous stream/fixed-chunk stream to the end (see WriteConnRequest),
|
||
|
but will be more manageable over high-latency connections and will prevent clients from being
|
||
|
overwhelmed on a large Request.
|
||
|
*/
|
||
|
func WriteConnRequestSegmented(req *Request, conn net.Conn) (err error) {
|
||
|
|
||
|
if err = WriteRequestSegmented(req, conn); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
WriteConnResponse writes a Response to a net.Conn.
|
||
|
|
||
|
If chunkSize == 0, no chunking will be performed. This may be unreliable on some connections.
|
||
|
|
||
|
If chunkSize > 0, chunking will be performed based on chunkSize.
|
||
|
|
||
|
If chunkSize < 0, chunking will be done based on WriteChunkSize number of bytes, which should be sensible for most connections.
|
||
|
*/
|
||
|
func WriteConnResponse(resp *Response, conn net.Conn, chunkSize int) (err error) {
|
||
|
|
||
|
if err = WriteResponse(resp, conn, chunkSize); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
WriteConnResponseSegmented writes a Response to a net.Conn, chunking it by each logical section.
|
||
|
|
||
|
This is slower than a continuous stream/fixed-chunk stream to the end (see WriteConnResponse),
|
||
|
but will be more manageable over high-latency connections and will prevent clients from being
|
||
|
overwhelmed on a large Response.
|
||
|
*/
|
||
|
func WriteConnResponseSegmented(resp *Response, conn net.Conn) (err error) {
|
||
|
|
||
|
if err = WriteResponseSegmented(resp, conn); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
WriteRequest writes a Request to an io.Writer.
|
||
|
|
||
|
If chunkSize == 0, no chunking will be performed. This may be unreliable for some io.Writers.
|
||
|
|
||
|
If chunkSize > 0, chunking will be performed based on chunkSize.
|
||
|
|
||
|
If chunkSize < 0, chunking will be done based on WriteChunkSize number of bytes, which should be sensible for most io.Writers.
|
||
|
*/
|
||
|
func WriteRequest(req *Request, w io.Writer, chunkSize int) (err error) {
|
||
|
|
||
|
var b []byte
|
||
|
var buf *bytes.Buffer
|
||
|
|
||
|
if req == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if b, err = req.MarshalBinary(); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if chunkSize == 0 {
|
||
|
if _, err = w.Write(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if chunkSize < 0 {
|
||
|
chunkSize = WriteChunkSize
|
||
|
}
|
||
|
|
||
|
buf = bytes.NewBuffer(b)
|
||
|
|
||
|
for buf.Len() != 0 {
|
||
|
if buf.Len() < chunkSize {
|
||
|
chunkSize = buf.Len()
|
||
|
}
|
||
|
if _, err = io.CopyN(w, buf, int64(chunkSize)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
WriteRequestSegmented writes a Request to an io.Writer, chunking it by each logical section.
|
||
|
|
||
|
This is slower than a continuous stream/fixed-chunk stream to the end (see WriteRequest),
|
||
|
but will be more manageable for slower io.Writers.
|
||
|
*/
|
||
|
func WriteRequestSegmented(req *Request, w io.Writer) (err error) {
|
||
|
|
||
|
var size int
|
||
|
|
||
|
if req == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
for _, rg := range req.RecordGroups {
|
||
|
size += rg.Size()
|
||
|
}
|
||
|
|
||
|
if req.Checksum != nil {
|
||
|
if _, err = w.Write(hdrCKSUM); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackUint32(*req.Checksum)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
if _, err = w.Write(hdrMSGSTART); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if _, err = w.Write(PackUint32(req.ProtocolVersion)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if _, err = w.Write(PackInt(len(req.RecordGroups))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(size)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
for _, rg := range req.RecordGroups {
|
||
|
size = 0
|
||
|
for _, rec := range rg.Records {
|
||
|
size += rec.Size()
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(len(rg.Records))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(size)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
for _, rec := range rg.Records {
|
||
|
size = 0
|
||
|
for _, kvp := range rec.Pairs {
|
||
|
size += kvp.Size()
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(len(rec.Pairs))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(size)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
for _, kvp := range rec.Pairs {
|
||
|
if _, err = w.Write(PackInt(kvp.Name.Size())); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(kvp.Value.Size())); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(kvp.Name); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(kvp.Value); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if _, err = w.Write(hdrBODYEND); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(hdrMSGEND); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
WriteResponse writes a Response to an io.Writer.
|
||
|
|
||
|
If chunkSize == 0, no chunking will be performed. This may be unreliable for some io.Writers.
|
||
|
|
||
|
If chunkSize > 0, chunking will be performed based on chunkSize.
|
||
|
|
||
|
If chunkSize < 0, chunking will be done based on WriteChunkSize number of bytes, which should be sensible for most io.Writers.
|
||
|
*/
|
||
|
func WriteResponse(resp *Response, w io.Writer, chunkSize int) (err error) {
|
||
|
|
||
|
var b []byte
|
||
|
var buf *bytes.Buffer
|
||
|
|
||
|
if resp == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if b, err = resp.MarshalBinary(); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if chunkSize == 0 {
|
||
|
if _, err = w.Write(b); err != nil {
|
||
|
return
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if chunkSize < 0 {
|
||
|
chunkSize = WriteChunkSize
|
||
|
}
|
||
|
|
||
|
buf = bytes.NewBuffer(b)
|
||
|
|
||
|
for buf.Len() != 0 {
|
||
|
if buf.Len() < chunkSize {
|
||
|
chunkSize = buf.Len()
|
||
|
}
|
||
|
if _, err = io.CopyN(w, buf, int64(chunkSize)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
WriteResponseSegmented writes a Response to an io.Writer, chunking it by each logical section.
|
||
|
|
||
|
This is slower than a continuous stream/fixed-chunk stream to the end (see WriteResponse),
|
||
|
but will be more manageable for slower io.Writers.
|
||
|
*/
|
||
|
func WriteResponseSegmented(resp *Response, w io.Writer) (err error) {
|
||
|
|
||
|
var size int
|
||
|
|
||
|
if resp == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
for _, rg := range resp.RecordGroups {
|
||
|
size += rg.Size()
|
||
|
}
|
||
|
|
||
|
if _, err = w.Write([]byte{resp.Status}); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if _, err = w.Write(hdrCKSUM); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackUint32(resp.Checksum)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if _, err = w.Write(hdrMSGSTART); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if _, err = w.Write(PackUint32(resp.ProtocolVersion)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if _, err = w.Write(PackInt(len(resp.RecordGroups))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(size)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
for _, rg := range resp.RecordGroups {
|
||
|
size = 0
|
||
|
for _, rec := range rg.Records {
|
||
|
size += rec.Size()
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(len(rg.Records))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(size)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
for _, rec := range rg.Records {
|
||
|
size = 0
|
||
|
for _, kvp := range rec.Pairs {
|
||
|
size += kvp.Size()
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(len(rec.Pairs))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(size)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(rec.OriginalRecord.Size())); err != nil {
|
||
|
return
|
||
|
}
|
||
|
for _, kvp := range rec.Pairs {
|
||
|
if _, err = w.Write(PackInt(kvp.Name.Size())); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(kvp.Value.Size())); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(kvp.Name); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(kvp.Value); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
size = 0
|
||
|
for _, kvp := range rec.OriginalRecord.Pairs {
|
||
|
size += kvp.Size()
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(len(rec.OriginalRecord.Pairs))); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(size)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
for _, kvp := range rec.OriginalRecord.Pairs {
|
||
|
if _, err = w.Write(PackInt(kvp.Name.Size())); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(PackInt(kvp.Value.Size())); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(kvp.Name); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(kvp.Value); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if _, err = w.Write(hdrBODYEND); err != nil {
|
||
|
return
|
||
|
}
|
||
|
if _, err = w.Write(hdrMSGEND); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// chunkByteLine splits b into a chunked slice of no more than maxByteLine per 1st-level element.
|
||
|
func chunkByteLine(b []byte) (chunked [][]byte) {
|
||
|
|
||
|
chunked = make([][]byte, 0, (len(b)+maxByteLine-1)/maxByteLine)
|
||
|
|
||
|
// slices.Chunk requires Golang 1.23+
|
||
|
for chunk := range slices.Chunk(b, maxByteLine) {
|
||
|
chunked = append(chunked, chunk)
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// cksumBytes returns a byte-packed representation of the checksum.
|
||
|
func cksumBytes(cksum uint32) (packed []byte) {
|
||
|
|
||
|
packed = make([]byte, CksumPackedSize)
|
||
|
byteOrder.PutUint32(packed, cksum)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
padBytesRight is used when rendering Model objects.
|
||
|
|
||
|
val is assumed to *not* be in hex format already; it should be the raw []byte representation.
|
||
|
*/
|
||
|
func padBytesRight(val []byte, length int) (out string) {
|
||
|
|
||
|
out = fmt.Sprintf("%-*x", length, val)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
padIntRight is used when rendering Model objects.
|
||
|
*/
|
||
|
func padIntRight(val int, length int) (out string) {
|
||
|
|
||
|
out = padBytesRight(PackUint32(uint32(val)), length)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
padStrRight is used when rendering Model objects.
|
||
|
*/
|
||
|
func padStrRight(val string, length int) (out string) {
|
||
|
|
||
|
out = padBytesRight([]byte(val), length)
|
||
|
|
||
|
return
|
||
|
}
|