欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于epoll的tcp-socket通信服务模块

程序员文章站 2022-07-14 09:19:05
...

最近一直研究对自己的开源物联网边缘服务瘦身,以适应资源更短缺的设备上运行,因此需要裁减第三方库的依赖,由于原先tcp-socket通信是调用第三方库的,因此准备动手为开源项目写一个简要适合项目需要的服务模块。

由于服务端需要针对每个客户端有定制下行通讯需求,因此采用epoll以便于明确知道句柄,进行特定通信。

epoll相关函数不多,容易理解,相关知识文章一大把,我就不阐述了。这里讲述一下我的项目要求:

通信服务模块作为一个独立运行线程,构建一个epoll,监听客户端连接,然后进入循环,获取事件描述信息,并处理相关事务;

新连接进来后,除了注册fd到epoll外,我们还需要定义有一个额外的容器自行管理fd;

在写入、读取异常时除了从epoll删除fd外,同时也需要从容器删除fd;

另外由于业务通信是有次序的,构建了两个缓存队列存储读取、写入的业务数据。

下面来看具体实现源码:

epoll_socket.h

#ifndef _EPOLL_SOCKET_H_
#define _EPOLL_SOCKET_H_
/**********************************************************************************
  *Copyright 2020-05-06, pyfree
  *
  *File Name       : epoll_socket.h
  *File Mark       : 
  *Summary         : 
  *
  *Current Version : 1.00
  *Author          : pyfree
  *FinishDate      :
  *
  *Replace Version :
  *Author          :
  *FinishDate      :

 ***********************************************************************************/
#include <sys/socket.h>  
#include <sys/epoll.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>  
#include <string>
#include <set>

#include "thread_py.h"
#include "queuedata.h"
#include "Mutex.h"

struct ItemCache
{
	ItemCache()
		: data(""),fd(-1)
	{
	};
	ItemCache(std::string data_)
		: data(data_),fd(-1)
	{
	};
	ItemCache(std::string data_,int fd_)
		: data(data_),fd(fd_)
	{
	};
	std::string data;
	int fd;
};

class epoll_socket : public Thread_py
{
public:
	epoll_socket();
	~epoll_socket();
	/**
	 * 打开服务侦听
	 * @param port {int} 侦听端口
	 * @param isblock_listen {bool} 是否阻塞
	 * @return {void} 
	 */
	bool open(int port, bool isblock_listen = false);

	int get_socketfd();
	/**
	 * 线程运行函数
	 * @return {char*} 
	 */
	int run();
	/**
	 * 指定fd发送字符串
	 * @param item {string} 字符内容
	 * @param fd {int} -1时,对所有fd发送
	 * @return {void} 
	 */
	void send(std::string item, int fd=-1);
	/**
	 * 获取客户端内容
	 * @param item {string&} 字符内容
	 * @param fd {int&} 	客户端fd
	 * @return {bool} 
	 */
	bool get(std::string &item,int &fd);
	/**
	 * 获取最新异常信息
	 * @return {char*} 
	 */
	char* get_errmsg();
	/**
	 * 设置读写数据时是否打印输出
	 * @param flag {bool} 	是否打印
	 * @return {char*} 
	 */
	void setPrintFlag(bool flag);
	/**
	 * 设置客户端事务是否阻塞
	 * @param flag {bool} 	是否阻塞
	 * @return {char*} 
	 */
	void setClientBlock(bool flag);
private:
	void add_event(int fd, int state);
	void del_event(int fd, int state);
	void mod_event(int fd, int state);
	void add_client(int fd);
	void del_client(int fd);

	void handle_events(struct epoll_event *events, int num);

	bool handle_accept();

	bool do_read(int fd);

	bool do_write(int fd);
private:
	bool running;
	bool print_flag;
	bool isblock_client;
	int socketfd;
	struct sockaddr_in servaddr;
	int epollfd;
	char err_msg[256];
	std::set<int> fds_client;
	PYMutex	fds_mutex;
	QueueData<ItemCache> buffer_read;
	QueueData<ItemCache> buffer_write;
};

#endif

epoll_socket.cpp

#include "epoll_socket.h"

#include <fcntl.h>  
#include <unistd.h>  
#include <stdio.h>  
#include <string.h>
#include <errno.h>  
#include <iostream> 

#define MAXBUFFSIZE	1024
#define MAXEVENTS	500
#define FDSIZE		1000

epoll_socket::epoll_socket() 
	: running(true)
	, print_flag(false)
	, isblock_client(false)
{
	socketfd = 0;
	memset(&servaddr, 0, sizeof(servaddr));
}

epoll_socket::~epoll_socket()
{
	running = false;
}

bool epoll_socket::open(int port, bool isblock_listen)
{
	if ((socketfd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
		printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
		return false;
	}
	servaddr.sin_family = AF_INET;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY);//IP地址设置成INADDR_ANY,让系统自动获取本机的IP地址。
	servaddr.sin_port = htons(port);
	if (!isblock_listen) {
		int flags = fcntl(socketfd, F_GETFL, 0);
		fcntl(socketfd, F_SETFL, flags | O_NONBLOCK);//设置为非阻塞
	}

	//设置重用地址,防止Address already in use
	int on = 1;
	if (setsockopt(socketfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1){
		snprintf(err_msg, sizeof(err_msg)
			, "set reuse addr error: %s(errno: %d)\n"
			, strerror(errno), errno);
		return false;
	}

	//将本地地址绑定到所创建的套接字上  
	if (bind(socketfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){
		snprintf(err_msg, sizeof(err_msg)
			, "bind socket error: %s(errno: %d)\n"
			, strerror(errno), errno);
		return false;
	}
	//开始监听是否有客户端连接  
	if (listen(socketfd, 5) == -1) {
		snprintf(err_msg, sizeof(err_msg)
			, "listen socket error: %s(errno: %d)\n"
			, strerror(errno), errno);
		return false;
	}
	std::cout << "create socket success\n";
	return true;
}

int epoll_socket::get_socketfd()
{
	return socketfd;
}

int epoll_socket::run()
{
	//创建一个描述符
	if ((epollfd = epoll_create(FDSIZE)) == -1){
		snprintf(err_msg, sizeof(err_msg)
			, "listen socket error: %s(errno: %d)\n"
			, strerror(errno), errno);
		return -1;
	}
	//添加监听描述符事件
	add_event(socketfd, EPOLLIN);
	struct epoll_event events[MAXEVENTS];
	int ret;
	while (running) {
		//获取已经准备好的描述符事件
		ret = epoll_wait(epollfd, events, MAXEVENTS, 1);
		handle_events(events, ret);
	}
	close(epollfd);
	return 0;
}

void epoll_socket::send(std::string item,int fd/*=-1*/)
{
	if(fd<0){
		fds_mutex.Lock();
		std::set<int> fds_ = fds_client;
		fds_mutex.Unlock();
		std::set<int>::iterator it = fds_.begin();
		for (; it!=fds_.end(); ++it)
		{
			buffer_write.add(ItemCache(item,fd));
			mod_event(*it,EPOLLOUT);
		}
	}else{
		fds_mutex.Lock();
		bool f_ = (fds_client.find(fd)!=fds_client.end());
		fds_mutex.Unlock();
		if(f_)
		{
			buffer_write.add(ItemCache(item,fd));
			mod_event(fd,EPOLLOUT);
		}
	}
}

bool epoll_socket::get(std::string &item,int &fd)
{
	bool ret =false;
	ItemCache it;
	if( buffer_read.pop(it))
	{
		item = it.data;
		fd = it.fd;
		ret = true;
	}
	return ret;
}

void epoll_socket::add_event(int fd, int state)
{
	struct epoll_event ev;
	ev.events = state;
	ev.data.fd = fd;
	/*
	//如果是ET模式,设置EPOLLET
	ev.events |= EPOLLET;
	*/
	//设置是否阻塞
	if(!isblock_client){
		int flags = fcntl(fd, F_GETFL);
		fcntl(fd, F_SETFL, flags | O_NONBLOCK);
	}
	epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}

void epoll_socket::del_event(int fd, int state)
{
	struct epoll_event ev;
	ev.events = state;
	ev.data.fd = fd;
	epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}

void epoll_socket::mod_event(int fd, int state)
{
	struct epoll_event ev;
	ev.events = state;
	ev.data.fd = fd;
	epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}

void epoll_socket::add_client(int fd)
{
	fds_mutex.Lock();
	fds_client.insert(fd);
	fds_mutex.Unlock();
}
void epoll_socket::del_client(int fd)
{
	fds_mutex.Lock();
	fds_client.erase(fd);
	fds_mutex.Unlock();
}

void epoll_socket::handle_events(epoll_event * events, int num)
{
	int i;
	int fd;
	//进行选好遍历
	for (i = 0; i < num; i++) {
		fd = events[i].data.fd;
		//根据描述符的类型和事件类型进行处理
		if ((fd == socketfd) && (events[i].events& EPOLLIN)){
			handle_accept();
		}else if (events[i].events & EPOLLIN){
			do_read(fd);
		}else if (events[i].events & EPOLLOUT){
			do_write(fd);
		}else{
			del_client(fd);
			close(fd);
		}
	}
}

bool epoll_socket::handle_accept()
{
	int clifd;
	struct sockaddr_in cliaddr;
	socklen_t cliaddrlen = sizeof(cliaddr);
	clifd = accept(socketfd, (struct sockaddr*)&cliaddr, &cliaddrlen);
	if (clifd == -1) {
		snprintf(err_msg, sizeof(err_msg)
			, "listen socket error: %s(errno: %d)\n"
			, strerror(errno), errno);
		return false;
	}
	else {
		char msg[128] = { 0 };
		//获取端口错误
		sprintf(msg,"accept a new client(%d):%s:%d\n"
			, clifd, inet_ntoa(cliaddr.sin_addr)
			, cliaddr.sin_port);
		std::cout << msg;
		//添加一个客户描述符和事件
		add_event(clifd, EPOLLIN);
		add_client(clifd);
	}
}

bool epoll_socket::do_read(int fd)
{
	char buf[MAXBUFFSIZE]={0};
	int buflen = read(fd, buf, MAXBUFFSIZE);
	if (buflen == -1) {
		snprintf(err_msg, sizeof(err_msg)
			, "read error(%d): %s(errno: %d)\n"
			, fd, strerror(errno), errno);
		std::cout << err_msg;
		del_client(fd);
		close(fd);
		del_event(fd, EPOLLIN);
		return false;
	}
	else if (buflen == 0) {
		char msg[128] = { 0 };
		sprintf(msg,"client(%d) close.\n", fd);
		std::cout << msg;
		del_client(fd);
		close(fd);
		del_event(fd, EPOLLIN);
		return true;
	}
	else {
		if(print_flag){
			char msg[MAXBUFFSIZE] = { 0 };
			sprintf(msg, "read message is:%s\n", buf);
			std::cout << msg;
		}
		buffer_read.add(ItemCache(std::string(buf,buflen),fd));
	}
	return true;
}

bool epoll_socket::do_write(int fd)
{
	ItemCache it;
	if(!buffer_write.pop(it))
	{
		return false;
	}
	if(it.fd!=fd){
		snprintf(err_msg, sizeof(err_msg)
			, "write error,fd(%d,%d)\n",it.fd,fd);
		std::cout << err_msg;
	}
	int nwrite;
	nwrite = write(fd, it.data.c_str(), it.data.length());
	if (nwrite == -1)
	{
		snprintf(err_msg, sizeof(err_msg)
			, "write error: %s(errno: %d)\n"
			, strerror(errno), errno);
		std::cout << err_msg;
		del_client(fd);
		close(fd);
		del_event(fd, EPOLLOUT);
		return false;
	}
	else{ 
		if(print_flag){
			char msg[MAXBUFFSIZE] = { 0 };
			sprintf(msg, "write message is:%s\n", it.data.c_str());
			std::cout << msg;
		}
		mod_event(fd, EPOLLIN);
	}
	return true;
}

char * epoll_socket::get_errmsg()
{
	return err_msg;
}

void epoll_socket::setPrintFlag(bool flag)
{
	print_flag = flag;
}

void epoll_socket::setClientBlock(bool flag)
{
	isblock_client = flag;
}

其他依赖的源码以及编译一并给出:

线程锁相关:

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#ifndef _PYMUTEX_H_
#define _PYMUTEX_H_

/***********************************************************************
  *Copyright 2020-03-06, pyfree
  *
  *File Name       : Mutex.h
  *File Mark       : 
  *Summary         : 线程锁
  *
  *Current Version : 1.00
  *Author          : pyfree
  *FinishDate      :
  *
  *Replace Version :
  *Author          :
  *FinishDate      :

 ************************************************************************/

#ifdef WIN32
//#include <windows.h>
#else
#include <pthread.h>
#endif

typedef void *HANDLE;

class IMutex
{
public:
	virtual ~IMutex() {}

  /**
	 * 上锁
	 * @return {void} 
	 */
	virtual void Lock() const = 0;
  /**
	 * 尝试上锁
	 * @return {void} 
	 */
	virtual bool TryLock() const = 0;
  /**
	 * 解锁
	 * @return {void} 
	 */
	virtual void Unlock() const = 0;
};

class PYMutex : public IMutex
{
public:
	PYMutex();
	~PYMutex();

	virtual void Lock() const;
	virtual bool TryLock() const;
	virtual void Unlock() const;
private:
#ifdef _WIN32
	HANDLE m_mutex;
#else
	mutable pthread_mutex_t m_mutex;
#endif
};

#endif //_PYMUTEX_H_
#include "Mutex.h"

#ifdef WIN32
#include <windows.h>
#endif
//#include <iostream>
#include <stdio.h>

PYMutex::PYMutex()
{
#ifdef _WIN32
	m_mutex = ::CreateMutex(NULL, FALSE, NULL);
#else
	pthread_mutex_init(&m_mutex, NULL);
#endif
}


PYMutex::~PYMutex()
{
#ifdef _WIN32
	::CloseHandle(m_mutex);
#else
	pthread_mutex_destroy(&m_mutex);
#endif
}


void PYMutex::Lock() const
{
#ifdef _WIN32
	//DWORD d = WaitForSingleObject(m_mutex, INFINITE);
	WaitForSingleObject(m_mutex, INFINITE);
	/// \todo check 'd' for result
#else
	pthread_mutex_lock(&m_mutex);
#endif
}

bool PYMutex::TryLock() const
{
#ifdef _WIN32
    DWORD dwWaitResult = WaitForSingleObject(m_mutex, 0);  
	if (dwWaitResult != WAIT_OBJECT_0 && dwWaitResult != WAIT_TIMEOUT) {
		printf("thread WARNING: bad result from try-locking mutex\n");
	}
    return (dwWaitResult == WAIT_OBJECT_0) ? true : false; 
#else
	return (0==pthread_mutex_trylock(&m_mutex))?true:false;
#endif	
};

void PYMutex::Unlock() const
{
#ifdef _WIN32
	::ReleaseMutex(m_mutex);
#else
	pthread_mutex_unlock(&m_mutex);
#endif
}

容器队列相关:

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#ifndef _QUEUE_DATA_H_
#define _QUEUE_DATA_H_

/***********************************************************************
  *Copyright 2020-03-06, pyfree
  *
  *File Name       : queuedata.h
  *File Mark       : 
  *Summary         : 
  *数据队列类,线程安全
  *
  *Current Version : 1.00
  *Author          : pyfree
  *FinishDate      :
  *
  *Replace Version :
  *Author          :
  *FinishDate      :

 ************************************************************************/
#include <queue>
#include <deque>
#include <stdio.h>
#include <string.h>

#include "Mutex.h"

template <class T>
class QueueData
{
public:
	QueueData(std::string desc = "thread_queue");
	~QueueData();
	//////////////////////////////////////////////////////////////
	/**
	 * 获取队列大小
	 * @return {int } 队列大小
	 */
	int size();
	/**
	 * 判定队列是否为空
	 * @return {bool } 是否为空队列
	 */
	bool isEmpty();
	/**
	 * 获取队列头元素
	 * @param it {T&} 头元素
	 * @return {bool } 是否成功
	 */
	bool getFirst(T &it);
	/**
	 * 删除元素
	 * @return {bool } 是否成功
	 */
	bool removeFirst();
	/**
	 * 获取队列头元素,并从队列终删除
	 * @param it {T&} 头元素
	 * @return {bool } 是否成功
	 */
	bool pop(T &it);
	/**
	 * 从队列头开始逐步获取多个元素,并剔除
	 * @param its {queue<T>&} 获取到的元素集
	 * @param sizel {int} 一次获取多少个
	 * @return {bool } 至少获取一个元素以上则成功
	 */
	bool getList(std::queue<T> &its,unsigned int sizel=5);
	/**
	 * 从队列尾部添加元素
	 * @param it {T} 被添加元素
	 * @return {void } 无返回
	 */
	void add(T it);
	/**
	 * 从队列头部添加元素
	 * @param it {T} 被添加元素
	 * @return {void } 无返回
	 */
	void add_front(T it);
	/**
	 * 清空元素
	 * @return {void }
	 */
	void clear();
private:
	void init();
	QueueData& operator=(const QueueData&) {return this;};
protected:
	std::string queue_desc;
private:
	/////////////////////////////点集转发////////////////////////////////////////////
	
	//协议解析结果缓存
	std::deque<T> datacache_queue;	//队列容器
	PYMutex m_Mutex;				//线程锁,或者如果更彻底采用acl库,采用acl::thread_mutex替代
	//
	static unsigned int QSize;		//队列大小约束,超出是会从队列头剔除旧数据腾出空位在对末添加数据
	//
	int queue_overS;				//队列溢出次数计数
};
template <class T>
unsigned int  QueueData<T>::QSize = 100;

template <class T>
QueueData<T>::QueueData(std::string desc)
	: queue_desc(desc)
{
	init();
};

template <class T>
void QueueData<T>::init() 
{
	queue_overS = 0;
};

template <class T>
QueueData<T>::~QueueData()
{

}

//////////////////////////////////////////////////////////
template <class T>
int QueueData<T>::size()
{
	int ret = 0;
	m_Mutex.Lock();
	ret = static_cast<int>(datacache_queue.size());
	m_Mutex.Unlock();
	return ret;
}

template <class T>
bool QueueData<T>::isEmpty()
{
	bool ret = false;
	m_Mutex.Lock();
	ret = datacache_queue.empty();
	m_Mutex.Unlock();
	return ret;
}

template <class T>
bool QueueData<T>::getFirst(T &it) 
{
	bool ret = false;
	m_Mutex.Lock();
	if (!datacache_queue.empty()) 
	{
		it = datacache_queue.front();
		ret = true;
	}
	m_Mutex.Unlock();
	return ret;
}

template <class T>
bool QueueData<T>::removeFirst() 
{
	bool ret = false;
	m_Mutex.Lock();
	if (!datacache_queue.empty()) 
	{
		datacache_queue.pop_front();
		ret = true;
	}
	m_Mutex.Unlock();
	return ret;
}

template <class T>
bool QueueData<T>::pop(T &it)
{
	bool ret = false;
	m_Mutex.Lock();
	if (!datacache_queue.empty()) 
	{
		it = datacache_queue.front();
		datacache_queue.pop_front();
		ret = true;
	}
	m_Mutex.Unlock();
	return ret;
};

template <class T>
bool QueueData<T>::getList(std::queue<T> &its,unsigned int sizel)
{
	m_Mutex.Lock();
	while (!datacache_queue.empty())
	{
		its.push(datacache_queue.front());
		datacache_queue.pop_front();
		if (its.size() >= sizel)
		{
			break;
		}
	}
	m_Mutex.Unlock();
	return !its.empty();
};

template <class T>
void QueueData<T>::add(T it) 
{
	m_Mutex.Lock();
	if (datacache_queue.size() > QSize) 
	{
		queue_overS++;
		datacache_queue.pop_front();
	}
	datacache_queue.push_back(it);
	m_Mutex.Unlock();
	if (queue_overS >= 10) 
	{
		//每溢出10次,报告一次
		printf("add item to queue %s at end,but the size of QueueData is up to limmit size: %d .[%s %s %d]\n"
			, queue_desc.c_str(), QSize
			, __FILE__, __FUNCTION__, __LINE__);
		queue_overS = 0;
	}
}

template <class T>
void QueueData<T>::add_front(T it)
{
	m_Mutex.Lock();
	if (datacache_queue.size() > QSize) 
	{
		queue_overS++;
		datacache_queue.pop_front();
	}
	datacache_queue.push_front(it);
	m_Mutex.Unlock();
	if (queue_overS >= 10) 
	{
		//每溢出10次,报告一次
		printf("add item to queue %s at first,but the size of QueueData is up to limmit size: %d .[%s %s %d]\n"
			, queue_desc.c_str(), QSize
			, __FILE__, __FUNCTION__, __LINE__);
		queue_overS = 0;
	}
}

template <class T>
void QueueData<T>::clear()
{
	m_Mutex.Lock();
	datacache_queue.clear();
	m_Mutex.Unlock();
	queue_overS = 0;
}

#endif //_QUEUE_DATA_H_

线程相关:

#ifndef _THREAD_PY_H_
#define _THREAD_PY_H_
/**********************************************************************************
  *Copyright 2020-05-06, pyfree
  *
  *File Name       : thread_py.h
  *File Mark       : 
  *Summary         : 
  *
  *Current Version : 1.00
  *Author          : pyfree
  *FinishDate      :
  *
  *Replace Version :
  *Author          :
  *FinishDate      :

 ***********************************************************************************/
#include <pthread.h>
#include <unistd.h>

class Thread_py
{
private:
    //current thread ID
    pthread_t tid;
    //thread status
    int threadStatus;
    //get manner pointer of execution 
    static void* run0(void* pVoid);
    //manner of execution inside
    void* run1();
public:
    //threadStatus-new create
    static const int THREAD_STATUS_NEW = 0;
    //threadStatus-running
    static const int THREAD_STATUS_RUNNING = 1;
    //threadStatus-end
    static const int THREAD_STATUS_EXIT = -1;
    // constructed function
    Thread_py();
    ~Thread_py();
    //the entity for thread running
    virtual int run()=0;
    //start thread
    bool start();
    //gte thread ID
    pthread_t getThreadID();
    //get thread status
    int getState();
    //wait for thread end
    void join();
    //wait for thread end in limit time
    void join(unsigned long millisTime);
};

#endif /* _Thread_py_H */
#include "thread_py.h"

#include <stdio.h>

void* Thread_py::run0(void* pVoid)
{
    Thread_py* p = (Thread_py*) pVoid;
    p->run1();
    return p;
}

void* Thread_py::run1()
{
    threadStatus = THREAD_STATUS_RUNNING;
    tid = pthread_self();
    run();
    threadStatus = THREAD_STATUS_EXIT;
    tid = 0;
    pthread_exit(NULL);
}

Thread_py::Thread_py()
{
    tid = 0;
    threadStatus = THREAD_STATUS_NEW;
}

Thread_py::~Thread_py()
{
	join(10);
}

int Thread_py::run()
{
    while(true){
        printf("thread is running!\n");
        sleep(100);
    }
    return 0;
}

bool Thread_py::start()
{
    return pthread_create(&tid, NULL, run0, this) == 0;
}

pthread_t Thread_py::getThreadID()
{
    return tid;
}

int Thread_py::getState()
{
    return threadStatus;
}

void Thread_py::join()
{
    if (tid > 0)
    {
        pthread_join(tid, NULL);
    }
}

void Thread_py::join(unsigned long millisTime)
{
    if (tid == 0)
    {
        return;
    }
    if (millisTime == 0)
    {
        join();
    }else
    {
        unsigned long k = 0;
        while (threadStatus != THREAD_STATUS_EXIT && k <= millisTime)
        {
            usleep(100);
            k++;
        }
    }
}
 

demo,main.cpp以及Makefile

#include <iostream>  
#include "epoll_socket.h"
using namespace std;

int main(int argc, char **argv)
{
	epoll_socket myepoll;
	if(!myepoll.open(5000,true)){
		cout << myepoll.get_errmsg();
	}
	myepoll.start();
    myepoll.setPrintFlag(true);
    std::string item;
    int fd = 0;
    int cout = 0;
    while (true)
    {
        if(myepoll.get(item,fd))
        {
            // printf("read:%s\n",item.c_str());
            char buf[32]={0};
            sprintf(buf,"rec_count=%d",cout++);
            myepoll.send(std::string(buf),fd);
        }
        usleep(10);
    }
    
	return 0;
}
CX=g++

BIN 		:= .
TARGET      := epolltest

source		:= Mutex.cpp thread_py.cpp  epoll_socket.cpp main.cpp
$(TARGET) :
	$(CX) -gdwarf-2 $(source) -o $(BIN)/$(TARGET) -lpthread

clean:
	rm  $(BIN)/$(TARGET)

编译后我本人是采用sscom5工具模拟客户端,启动3个,设置间隔分别为10、20、30ms发送间隔进行压力测试,跑了一个下午,数据读写依然稳定,有兴趣的朋友可以继续测试优化,例如增加客户端端吗,降低cup资源消耗。

基于epoll的tcp-socket通信服务模块

 坚持分享工程化,产品化的知识为主,更便捷实现技术落地,谢谢