欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

[Golang] packetbeat 二次开发-移植(二)

程序员文章站 2022-06-21 20:59:15
因为我们这个代码目的是和HIDS结合。我们这里已经算是完成了协议解析的部分内容。然后我们就需要把读取数据的逻辑直接取出来,方便后面进行融合。寻找逻辑入口从main.go入手// Setups and Runs Packetbeatfunc main() {if err := cmd.RootCmd.Execute(); err != nil {os.Exit(1)}}追踪到cmd/packetbeat.go// Beater object. Contains all obj....

因为我们这个代码目的是和HIDS结合。我们这里已经算是完成了协议解析的部分内容。然后我们就需要把读取数据的逻辑直接取出来,方便后面进行融合。

寻找逻辑入口

从main.go入手

// Setups and Runs Packetbeat
func main() {
	if err := cmd.RootCmd.Execute(); err != nil {
		os.Exit(1)
	}
}

追踪到cmd/packetbeat.go

// Beater object. Contains all objects needed to run the beat
type packetbeat struct {
	config      config.Config //设置目标 协议/端口
	cmdLineArgs flags
	sniff       *sniffer.Sniffer

	// publisher/pipeline
	pipeline beat.Pipeline
	transPub *publish.TransactionPublisher
	flows    *flows.Flows
}

packetbeat/beater/packetbeat.go

func (pb *packetbeat) Run(b *beat.Beat) error {
	······

	var wg sync.WaitGroup
	errC := make(chan error, 1)

	// Run the sniffer in background
	wg.Add(1)
	go func() {
		defer wg.Done()

		err := pb.sniff.Run() //主逻辑
		if err != nil {
			errC <- fmt.Errorf("Sniffer main loop failed: %v", err)
		}
	}()

	logp.Debug("main", "Waiting for the sniffer to finish")
	wg.Wait()
	select {
	default:
	case err := <-errC:
		return err
	}

	return nil
}

packetbeat/sniffer/sniffer.go

func (s *Sniffer) Run() error {
	······
		counter++
		logp.Debug("sniffer", "Packet number: %d", counter)
		worker.OnPacket(data, &ci) // 主逻辑
	}

	return nil
}

packetbeat/decoder/decoder.go

func (d *Decoder) OnPacket(data []byte, ci *gopacket.CaptureInfo) {
	······
		processed, err = d.process(&packet, currentType) //主逻辑
		if err != nil {
			logp.Info("Error processing packet: %v", err)
			break
		}
		if processed {
			break
		}

		// choose next decoding layer
		next, ok := d.decoders[nextType]
		if !ok {
			break
		}

		// jump to next layer
		current = next
		currentType = nextType
	}

······
}


//用于分类数据类型
func (d *Decoder) process(
	packet *protos.Packet,
	layerType gopacket.LayerType,
) (bool, error) {
	withFlow := d.flowID != nil

	switch layerType {
	case layers.LayerTypeEthernet:
		if withFlow {
			d.flowID.AddEth(d.eth.SrcMAC, d.eth.DstMAC)
		}

	case layers.LayerTypeDot1Q:
		d1q := &d.d1q[d.stD1Q.i]
		d.stD1Q.next()
		if withFlow {
			d.flowID.AddVLan(d1q.VLANIdentifier)
		}

	case layers.LayerTypeIPv4:
		debugf("IPv4 packet")
		ip4 := &d.ip4[d.stIP4.i]
		d.stIP4.next()

		if withFlow {
			d.flowID.AddIPv4(ip4.SrcIP, ip4.DstIP)
		}

		packet.Tuple.SrcIP = ip4.SrcIP
		packet.Tuple.DstIP = ip4.DstIP
		packet.Tuple.IPLength = 4

	case layers.LayerTypeIPv6:
		debugf("IPv6 packet")
		ip6 := &d.ip6[d.stIP6.i]
		d.stIP6.next()

		if withFlow {
			d.flowID.AddIPv6(ip6.SrcIP, ip6.DstIP)
		}

		packet.Tuple.SrcIP = ip6.SrcIP
		packet.Tuple.DstIP = ip6.DstIP
		packet.Tuple.IPLength = 16

	case layers.LayerTypeICMPv4:
		debugf("ICMPv4 packet")
		d.onICMPv4(packet)
		return true, nil

	case layers.LayerTypeICMPv6:
		debugf("ICMPv6 packet")
		d.onICMPv6(packet)
		return true, nil

	case layers.LayerTypeUDP:
		debugf("UDP packet")
		d.onUDP(packet)
		return true, nil

	case layers.LayerTypeTCP:
		debugf("TCP packet")
		d.onTCP(packet)
		return true, nil
	}

	return false, nil
}

移植

需要一个获取当前Client活跃的网卡用来读取数据。

func getDevice(localIP string)(string,error)  {
	intf, err := net.Interfaces()
	if err != nil {
		return "",err
	}
	for _, v := range intf {
		ips, err := v.Addrs()
		if err != nil{
			return "" ,fmt.Errorf("<packetbeat.getDevice>",err)
		}
		for _, ip := range ips {
			if ip.String()[:len(ip.String())-3] == localIP{
				return v.Name,nil
			}
			continue
		}

	}
	return "",fmt.Errorf("<packetbeat.getDevice> No run")
}

本文地址:https://blog.csdn.net/Aixixxx/article/details/107903167