此项目有对应的Android端demo有需求请联系下方QQ
TCP粘包和拆包产生的原因
- 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
- 进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长度
- 以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成托干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。
TCP粘包和拆包的解决策略
- 消息定长。例如100字节。
- 在包尾部增加回车或者空格符等特殊字符进行分割,典型的如FTP协议
- 将消息分为消息头和消息尾。(len+data模式)
- 其它复杂的协议,如RTMP协议等。
- 将消息分为消息头+消息类型+消息体。(len+type+data 模式)
废话不多说直接上代码
服务端
package network
import (
"net"
"github.com/hwholiday/libs/logtool"
"go.uber.org/zap"
"os"
"time"
"io"
"fmt"
)
func InitTcp() {
addr, err := net.ResolveTCPAddr("tcp", "192.168.2.28:8111")
if err != nil {
logtool.Zap.Error("create addr", zap.Error(err))
os.Exit(0)
}
listener, err := net.ListenTCP("tcp", addr)
if err != nil {
logtool.Zap.Error("listen tcp", zap.Error(err))
os.Exit(0)
}
logtool.Zap.Info("listen tcp", zap.String("地址", addr.String()))
go acceptTcp(listener)
}
func acceptTcp(listener *net.TCPListener) {
for {
var (
conn *net.TCPConn
err error
)
if conn, err = listener.AcceptTCP(); err != nil {
logtool.Zap.Info("listener.Accept err", zap.Any(listener.Addr().String(), err))
return
}
if err = conn.SetKeepAlive(false); err != nil {
logtool.Zap.Info("conn.SetKeepAlive err", zap.Error(err))
return
}
if err = conn.SetReadBuffer(1024); err != nil {
logtool.Zap.Info("conn.SetReadBuffer err", zap.Error(err))
return
}
if err = conn.SetWriteBuffer(1024); err != nil {
logtool.Zap.Info("conn.SetWriteBuffer err", zap.Error(err))
return
}
go serveTCP(conn)
}
}
func serveTCP(conn *net.TCPConn) {
client := NewTcpClint(conn, 4, 4)
logtool.Zap.Debug("链接上来的用户", zap.Any("地址", client.RemoteAddr().String()))
go func() {
for {
tag, data, err := client.Read()
if err != nil {
if err == io.EOF {
logtool.Zap.Debug("用户断开链接", zap.Any("地址", client.RemoteAddr().String()))
}
client.conn.Close()
return
}
logtool.Zap.Info(fmt.Sprintf("客户端 : %s 传入类型", client.RemoteAddr().String()), zap.String(fmt.Sprintf("类型 : %d", tag), fmt.Sprintf("数据 : %s", string(data))))
message := make(chan int32)
//心跳
go HeartBeating(client, message, 40)
//判断是否有信息发送上来
go HeartChannel(tag, message)
//做自己的业务逻辑
}
}()
}
func HeartChannel(tag int32, mess chan int32) {
mess <- tag
close(mess)
}
func HeartBeating(client *TcpClient, tag chan int32, timeout int) {
select {
case _ = <-tag:
client.conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
break
case <-time.After(40 * time.Second):
logtool.Zap.Debug("主动断开用户链接", zap.Any("地址", client.RemoteAddr().String()))
if err := client.conn.Close(); err != nil {
break
}
break
}
}
package network
import (
"net"
"bufio"
"bytes"
"encoding/binary"
)
type TcpClient struct {
conn net.Conn
r *bufio.Reader
w *bufio.Writer
head int32
tag int32
}
func NewTcpClint(conn net.Conn, headLen int32, tagLen int32) *TcpClient {
return &TcpClient{conn: conn, r: bufio.NewReader(conn), w: bufio.NewWriter(conn), head: headLen, tag: tagLen}
}
func (c *TcpClient) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
func (c *TcpClient) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
func (c *TcpClient) Close() error {
return c.conn.Close()
}
func (c *TcpClient) Write(message []byte,tag int32) (int, error) {
// 读取消息的长度
var length = int32(len(message))
var pkg = new(bytes.Buffer)
//写入消息头
err := binary.Write(pkg, binary.BigEndian, length)
if err != nil {
return 0, err
}
// 写入消息类型
err = binary.Write(pkg, binary.BigEndian, tag)
if err != nil {
return 0, err
}
//写入消息体
err = binary.Write(pkg, binary.BigEndian, message)
if err != nil {
return 0, err
}
nn, err := c.w.Write(pkg.Bytes())
if err != nil {
return 0, err
}
err = c.w.Flush()
if err != nil {
return 0, err
}
return nn, nil
}
func (c *TcpClient) Read() (int32, []byte, error) {
// Peek 返回缓存的一个切片,该切片引用缓存中前 n 个字节的数据,
// 该操作不会将数据读出,只是引用,引用的数据在下一次读取操作之
// 前是有效的。如果切片长度小于 n,则返回一个错误信息说明原因。
// 如果 n 大于缓存的总大小,则返回 ErrBufferFull。
lengthByte, err := c.r.Peek(int(c.head + c.tag))
if err != nil {
return 0, nil, err
}
//创建 Buffer缓冲器
var length int32
lengthBuff := bytes.NewBuffer(lengthByte[:c.head])
// 通过Read接口可以将buf中得内容填充到data参数表示的数据结构中
err = binary.Read(lengthBuff, binary.BigEndian, &length)
if err != nil {
return 0, nil, err
}
var tag int32
tagBuff := bytes.NewBuffer(lengthByte[c.head:])
// 通过Read接口可以将buf中得内容填充到data参数表示的数据结构中
err = binary.Read(tagBuff, binary.BigEndian, &tag)
if err != nil {
return 0, nil, err
}
// Buffered 返回缓存中未读取的数据的长度
if int32(c.r.Buffered()) < length+c.head+c.tag {
return 0, nil, err
}
// 读取消息真正的内容
pack := make([]byte, int(c.head+length+c.tag))
// Read 从 b 中读出数据到 p 中,返回读出的字节数和遇到的错误。
// 如果缓存不为空,则只能读出缓存中的数据,不会从底层 io.Reader
// 中提取数据,如果缓存为空,则:
// 1、len(p) >= 缓存大小,则跳过缓存,直接从底层 io.Reader 中读
// 出到 p 中。
// 2、len(p) < 缓存大小,则先将数据从底层 io.Reader 中读取到缓存
// 中,再从缓存读取到 p 中。
_, err = c.r.Read(pack)
if err != nil {
return 0, nil, err
}
return tag, pack[c.head+c.tag:], nil
}
客户端
package main
import (
"testing"
"net"
"log"
"encoding/binary"
"bytes"
"time"
"fmt"
"bufio"
)
func Test(t *testing.T) {
conn, err := net.Dial("tcp", "192.168.2.28:8111")
if err != nil {
log.Println("dial error:", err)
return
}
defer conn.Close()
go func() {
/*for {*/
data, err := Encode("2")
if err == nil {
time.Sleep(time.Second * 4)
_, err := conn.Write(data)
if err != nil {
fmt.Println(err)
}
}
/*}*/
}()
reader := bufio.NewReader(conn)
for {
tag ,data, err := Read(reader)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(tag)
fmt.Println(string(data))
}
}
func Encode(message string) ([]byte, error) {
// 读取消息的长度
var length = int32(len(message))
var pkg = new(bytes.Buffer)
// 写入消息头
err := binary.Write(pkg, binary.BigEndian, length)
if err != nil {
return nil, err
}
// 写入消息类型 最大为 0xFFFFFFF
err = binary.Write(pkg, binary.BigEndian, int32(0x2))
if err != nil {
return nil, err
}
// 写入消息实体
err = binary.Write(pkg, binary.BigEndian, []byte(message))
if err != nil {
return nil, err
}
return pkg.Bytes(), nil
}
func Read(c *bufio.Reader) (int32, []byte, error) {
var headLen int32 =4
var tagLen int32 =4
lengthByte, err := c.Peek(int(headLen + tagLen))
if err != nil {
return 0, nil, err
}
var length int32
lengthBuff := bytes.NewBuffer(lengthByte[:headLen])
err = binary.Read(lengthBuff, binary.BigEndian, &length)
if err != nil {
return 0, nil, err
}
var tag int32
tagBuff := bytes.NewBuffer(lengthByte[headLen:])
err = binary.Read(tagBuff, binary.BigEndian, &tag)
if err != nil {
return 0, nil, err
}
if int32(c.Buffered()) < length+headLen+tagLen {
return 0, nil, err
}
pack := make([]byte, int(headLen+length+tagLen))
_, err = c.Read(pack)
if err != nil {
return 0, nil, err
}
return tag, pack[headLen+tagLen:], nil
}
联系 QQ: 3355168235