go_wireproto/funcs.go

605 lines
13 KiB
Go
Raw Normal View History

2024-07-09 23:40:20 -04:00
package wireproto
import (
`bytes`
`encoding/binary`
`fmt`
`io`
`net`
`slices`
)
// GetByteOrder returns the byte order ("endianness") used by the library.
func GetByteOrder() (order binary.ByteOrder) {
order = byteOrder
return
}
// GetHdrs returns a map of a header name/status indicator and its byte sequence.
func GetHdrs() (hdrs map[string][]byte) {
hdrs = map[string][]byte{
"OK": respStatusOK,
"ERR": respStatusErr,
"CKSUM": hdrCKSUM,
"MSGSTART": hdrMSGSTART,
"BODYSTART": hdrBODYSTART,
"BODYEND": hdrBODYEND,
"MSGEND": hdrMSGEND,
}
return
}
// ReadConnRequest returns a Request from a net.Conn.
func ReadConnRequest(conn net.Conn) (req *Request, err error) {
var b []byte
var size int
var buf *bytes.Buffer = new(bytes.Buffer)
// First check for a checksum.
b = make([]byte, len(hdrMSGSTART))
if _, err = conn.Read(b); err != nil {
return
}
if _, err = buf.Write(b); err != nil {
return
}
if bytes.Equal(b, hdrCKSUM) {
// A checksum was found. Continue reading in the checksum and MSGSTART.
if _, err = io.CopyN(buf, conn, int64(CksumPackedSize+len(hdrMSGSTART))); err != nil {
return
}
}
// Otherwise no checksum was found.
// Protocol version, BODYSTART, RG count.
if _, err = io.CopyN(buf, conn, int64(PackedNumSize+len(hdrBODYSTART)+PackedNumSize)); err != nil {
return
}
// RG size.
b = make([]byte, PackedNumSize)
if _, err = conn.Read(b); err != nil {
return
}
if _, err = buf.Write(b); err != nil {
return
}
size = UnpackInt(b)
// Now copy in the RG, BODYEND, MSGEND.
if _, err = io.CopyN(buf, conn, int64(size+len(hdrBODYEND)+len(hdrMSGEND))); err != nil {
return
}
// Unmarshal.
req = new(Request)
if err = req.UnmarshalBinary(buf.Bytes()); err != nil {
return
}
return
}
// ReadConnResponse returns a Response from a net.Conn.
func ReadConnResponse(conn net.Conn) (resp *Response, err error) {
var b []byte
var size int
var buf *bytes.Buffer = new(bytes.Buffer)
// First get the checksum.
b = make([]byte, len(hdrCKSUM))
if _, err = conn.Read(b); err != nil {
return
}
if _, err = buf.Write(b); err != nil {
return
}
if !bytes.Equal(b, hdrCKSUM) {
err = ErrBadHdr
return
}
// A checksum was found. Continue reading in the checksum, MSGSTART, protocol version, BODYSTART, and RG count.
if _, err = io.CopyN(
buf, conn, int64(CksumPackedSize+len(hdrMSGSTART)+PackedNumSize+len(hdrBODYSTART)+PackedNumSize),
); err != nil {
return
}
// RG size.
b = make([]byte, PackedNumSize)
if _, err = conn.Read(b); err != nil {
return
}
if _, err = buf.Write(b); err != nil {
return
}
size = UnpackInt(b)
// Now copy in the RG, BODYEND, MSGEND.
if _, err = io.CopyN(buf, conn, int64(size+len(hdrBODYEND)+len(hdrMSGEND))); err != nil {
return
}
// Unmarshal.
resp = new(Response)
if err = resp.UnmarshalBinary(buf.Bytes()); err != nil {
return
}
return
}
// PackInt is a more generic form of PackUint32 as a convenience wrapper.
func PackInt(i int) (b []byte) {
b = make([]byte, PackedNumSize)
byteOrder.PutUint32(b, uint32(i))
return
}
// PackUint32 uses the package-wide byteOrder to pack a uint32 into a series of bytes.
func PackUint32(u uint32) (b []byte) {
b = make([]byte, PackedNumSize)
byteOrder.PutUint32(b, u)
return
}
// UnpackInt is a more generic form of UnpackUint32 as a convenience wrapper. The same caveat applies for UnpackUint32.
func UnpackInt(b []byte) (i int) {
var u uint32
u = UnpackUint32(b)
i = int(u)
return
}
// UnpackUint32 returns a uint32 from byteslice b. The byteslice *MUST* be PackedNumSize (4) bytes long, or u will always be 0.
func UnpackUint32(b []byte) (u uint32) {
if b == nil || len(b) != PackedNumSize {
return
}
u = byteOrder.Uint32(b)
return
}
/*
WriteConnRequest writes a Request to a net.Conn.
If chunkSize == 0, no chunking will be performed. This may be unreliable on some connections.
If chunkSize > 0, chunking will be performed based on chunkSize.
If chunkSize < 0, chunking will be done based on WriteChunkSize number of bytes, which should be sensible for most connections.
*/
func WriteConnRequest(req *Request, conn net.Conn, chunkSize int) (err error) {
if err = WriteRequest(req, conn, chunkSize); err != nil {
return
}
return
}
/*
WriteConnRequestSegmented writes a Request to a net.Conn, chunking it by each logical section.
This is slower than a continuous stream/fixed-chunk stream to the end (see WriteConnRequest),
but will be more manageable over high-latency connections and will prevent clients from being
overwhelmed on a large Request.
*/
func WriteConnRequestSegmented(req *Request, conn net.Conn) (err error) {
if err = WriteRequestSegmented(req, conn); err != nil {
return
}
return
}
/*
WriteConnResponse writes a Response to a net.Conn.
If chunkSize == 0, no chunking will be performed. This may be unreliable on some connections.
If chunkSize > 0, chunking will be performed based on chunkSize.
If chunkSize < 0, chunking will be done based on WriteChunkSize number of bytes, which should be sensible for most connections.
*/
func WriteConnResponse(resp *Response, conn net.Conn, chunkSize int) (err error) {
if err = WriteResponse(resp, conn, chunkSize); err != nil {
return
}
return
}
/*
WriteConnResponseSegmented writes a Response to a net.Conn, chunking it by each logical section.
This is slower than a continuous stream/fixed-chunk stream to the end (see WriteConnResponse),
but will be more manageable over high-latency connections and will prevent clients from being
overwhelmed on a large Response.
*/
func WriteConnResponseSegmented(resp *Response, conn net.Conn) (err error) {
if err = WriteResponseSegmented(resp, conn); err != nil {
return
}
return
}
/*
WriteRequest writes a Request to an io.Writer.
If chunkSize == 0, no chunking will be performed. This may be unreliable for some io.Writers.
If chunkSize > 0, chunking will be performed based on chunkSize.
If chunkSize < 0, chunking will be done based on WriteChunkSize number of bytes, which should be sensible for most io.Writers.
*/
func WriteRequest(req *Request, w io.Writer, chunkSize int) (err error) {
var b []byte
var buf *bytes.Buffer
if req == nil {
return
}
if b, err = req.MarshalBinary(); err != nil {
return
}
if chunkSize == 0 {
if _, err = w.Write(b); err != nil {
return
}
return
}
if chunkSize < 0 {
chunkSize = WriteChunkSize
}
buf = bytes.NewBuffer(b)
for buf.Len() != 0 {
if buf.Len() < chunkSize {
chunkSize = buf.Len()
}
if _, err = io.CopyN(w, buf, int64(chunkSize)); err != nil {
return
}
}
return
}
/*
WriteRequestSegmented writes a Request to an io.Writer, chunking it by each logical section.
This is slower than a continuous stream/fixed-chunk stream to the end (see WriteRequest),
but will be more manageable for slower io.Writers.
*/
func WriteRequestSegmented(req *Request, w io.Writer) (err error) {
var size int
if req == nil {
return
}
for _, rg := range req.RecordGroups {
size += rg.Size()
}
if req.Checksum != nil {
if _, err = w.Write(hdrCKSUM); err != nil {
return
}
if _, err = w.Write(PackUint32(*req.Checksum)); err != nil {
return
}
}
if _, err = w.Write(hdrMSGSTART); err != nil {
return
}
if _, err = w.Write(PackUint32(req.ProtocolVersion)); err != nil {
return
}
if _, err = w.Write(PackInt(len(req.RecordGroups))); err != nil {
return
}
if _, err = w.Write(PackInt(size)); err != nil {
return
}
for _, rg := range req.RecordGroups {
size = 0
for _, rec := range rg.Records {
size += rec.Size()
}
if _, err = w.Write(PackInt(len(rg.Records))); err != nil {
return
}
if _, err = w.Write(PackInt(size)); err != nil {
return
}
for _, rec := range rg.Records {
size = 0
for _, kvp := range rec.Pairs {
size += kvp.Size()
}
if _, err = w.Write(PackInt(len(rec.Pairs))); err != nil {
return
}
if _, err = w.Write(PackInt(size)); err != nil {
return
}
for _, kvp := range rec.Pairs {
if _, err = w.Write(PackInt(kvp.Name.Size())); err != nil {
return
}
if _, err = w.Write(PackInt(kvp.Value.Size())); err != nil {
return
}
if _, err = w.Write(kvp.Name); err != nil {
return
}
if _, err = w.Write(kvp.Value); err != nil {
return
}
}
}
}
if _, err = w.Write(hdrBODYEND); err != nil {
return
}
if _, err = w.Write(hdrMSGEND); err != nil {
return
}
return
}
/*
WriteResponse writes a Response to an io.Writer.
If chunkSize == 0, no chunking will be performed. This may be unreliable for some io.Writers.
If chunkSize > 0, chunking will be performed based on chunkSize.
If chunkSize < 0, chunking will be done based on WriteChunkSize number of bytes, which should be sensible for most io.Writers.
*/
func WriteResponse(resp *Response, w io.Writer, chunkSize int) (err error) {
var b []byte
var buf *bytes.Buffer
if resp == nil {
return
}
if b, err = resp.MarshalBinary(); err != nil {
return
}
if chunkSize == 0 {
if _, err = w.Write(b); err != nil {
return
}
return
}
if chunkSize < 0 {
chunkSize = WriteChunkSize
}
buf = bytes.NewBuffer(b)
for buf.Len() != 0 {
if buf.Len() < chunkSize {
chunkSize = buf.Len()
}
if _, err = io.CopyN(w, buf, int64(chunkSize)); err != nil {
return
}
}
return
}
/*
WriteResponseSegmented writes a Response to an io.Writer, chunking it by each logical section.
This is slower than a continuous stream/fixed-chunk stream to the end (see WriteResponse),
but will be more manageable for slower io.Writers.
*/
func WriteResponseSegmented(resp *Response, w io.Writer) (err error) {
var size int
if resp == nil {
return
}
for _, rg := range resp.RecordGroups {
size += rg.Size()
}
if _, err = w.Write([]byte{resp.Status}); err != nil {
return
}
if _, err = w.Write(hdrCKSUM); err != nil {
return
}
if _, err = w.Write(PackUint32(resp.Checksum)); err != nil {
return
}
if _, err = w.Write(hdrMSGSTART); err != nil {
return
}
if _, err = w.Write(PackUint32(resp.ProtocolVersion)); err != nil {
return
}
if _, err = w.Write(PackInt(len(resp.RecordGroups))); err != nil {
return
}
if _, err = w.Write(PackInt(size)); err != nil {
return
}
for _, rg := range resp.RecordGroups {
size = 0
for _, rec := range rg.Records {
size += rec.Size()
}
if _, err = w.Write(PackInt(len(rg.Records))); err != nil {
return
}
if _, err = w.Write(PackInt(size)); err != nil {
return
}
for _, rec := range rg.Records {
size = 0
for _, kvp := range rec.Pairs {
size += kvp.Size()
}
if _, err = w.Write(PackInt(len(rec.Pairs))); err != nil {
return
}
if _, err = w.Write(PackInt(size)); err != nil {
return
}
if _, err = w.Write(PackInt(rec.OriginalRecord.Size())); err != nil {
return
}
for _, kvp := range rec.Pairs {
if _, err = w.Write(PackInt(kvp.Name.Size())); err != nil {
return
}
if _, err = w.Write(PackInt(kvp.Value.Size())); err != nil {
return
}
if _, err = w.Write(kvp.Name); err != nil {
return
}
if _, err = w.Write(kvp.Value); err != nil {
return
}
}
size = 0
for _, kvp := range rec.OriginalRecord.Pairs {
size += kvp.Size()
}
if _, err = w.Write(PackInt(len(rec.OriginalRecord.Pairs))); err != nil {
return
}
if _, err = w.Write(PackInt(size)); err != nil {
return
}
for _, kvp := range rec.OriginalRecord.Pairs {
if _, err = w.Write(PackInt(kvp.Name.Size())); err != nil {
return
}
if _, err = w.Write(PackInt(kvp.Value.Size())); err != nil {
return
}
if _, err = w.Write(kvp.Name); err != nil {
return
}
if _, err = w.Write(kvp.Value); err != nil {
return
}
}
}
}
if _, err = w.Write(hdrBODYEND); err != nil {
return
}
if _, err = w.Write(hdrMSGEND); err != nil {
return
}
return
}
// chunkByteLine splits b into a chunked slice of no more than maxByteLine per 1st-level element.
func chunkByteLine(b []byte) (chunked [][]byte) {
chunked = make([][]byte, 0, (len(b)+maxByteLine-1)/maxByteLine)
// slices.Chunk requires Golang 1.23+
for chunk := range slices.Chunk(b, maxByteLine) {
chunked = append(chunked, chunk)
}
return
}
// cksumBytes returns a byte-packed representation of the checksum.
func cksumBytes(cksum uint32) (packed []byte) {
packed = make([]byte, CksumPackedSize)
byteOrder.PutUint32(packed, cksum)
return
}
/*
padBytesRight is used when rendering Model objects.
val is assumed to *not* be in hex format already; it should be the raw []byte representation.
*/
func padBytesRight(val []byte, length int) (out string) {
out = fmt.Sprintf("%-*x", length, val)
return
}
/*
padIntRight is used when rendering Model objects.
*/
func padIntRight(val int, length int) (out string) {
out = padBytesRight(PackUint32(uint32(val)), length)
return
}
/*
padStrRight is used when rendering Model objects.
*/
func padStrRight(val string, length int) (out string) {
out = padBytesRight([]byte(val), length)
return
}