TCP_transfer/tcpputfiles.cpp
2024-09-12 18:32:49 +08:00

353 lines
14 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* 程序名tcpputfiles.cpp采用tcp协议实现文件上传的客户端。
*/
#include "_el.h"
using namespace eviwbh;
// 程序运行的参数结构体。
struct st_arg
{
int clienttype; // 客户端类型1-上传文件2-下载文件本程序固定填1。
char ip[31]; // 服务端的IP地址。
int port; // 服务端的端口。
char clientpath[256]; // 本地文件存放的根目录。 /data /data/aaa /data/bbb
int ptype; // 文件上传成功后本地文件的处理方式1-删除文件2-移动到备份目录。
char clientpathbak[256]; // 文件成功上传后本地文件备份的根目录当ptype==2时有效。
bool andchild; // 是否上传clientpath目录下各级子目录的文件true-是false-否。
char matchname[256]; // 待上传文件名的匹配规则,如"*.TXT,*.XML"。
char srvpath[256]; // 服务端文件存放的根目录。/data1 /data1/aaa /data1/bbb
int timetvl; // 扫描本地目录文件的时间间隔(执行文件上传任务的时间间隔),单位:秒。
int timeout; // 进程心跳的超时时间。
char pname[51]; // 进程名,建议用"tcpputfiles_后缀"的方式。
} starg;
// 帮助文档。
void _help();
// 把xml解析到参数starg结构中。
bool _xmltoarg(const char *strxmlbuffer);
clogfile logfile; // 日志对象。
ctcpclient tcpclient; // 创建tcp通讯的客户端对象。
// 程序退出和信号2、15的处理函数。
void EXIT(int sig);
bool activetest(); // 心跳。
string strsendbuffer; // 发送报文的buffer。
string strrecvbuffer; // 接收报文的buffer。
// 向服务端发送登录报文,把客户端程序的参数传递给服务端。
bool login(const char *argv);
// 文件上传的主函数,执行一次文件上传的任务。
bool _tcpputfiles(bool &bcontinue);
// 处理传输文件的响应报文(删除或者转存本地的文件)。
bool ackmessage(const string &strrecvbuffer);
// 把文件的内容发送给对端。
bool sendfile(const string &filename,const int filesize);
cpactive pactive; // 进程心跳。
int main(int argc,char *argv[])
{
if (argc!=3) { _help(); return -1; }
// 关闭全部的信号和输入输出。
// 设置信号,在shell状态下可用 "kill + 进程号" 正常终止些进程。
// 但请不要用 "kill -9 +进程号" 强行终止。
// 在网络通讯程序中一般不关IO因为某些函数可能会往1和2中输出信息
// 如果关了1和2那么1和2会被socket重用向1和2输出的信息会发送到网络中。
//closeioandsignal(false);
signal(SIGINT,EXIT); signal(SIGTERM,EXIT);
// 打开日志文件。
if (logfile.open(argv[1])==false)
{
printf("打开日志文件失败(%s\n",argv[1]); return -1;
}
// 解析xml得到程序运行的参数。
if (_xmltoarg(argv[2])==false) return -1;
pactive.addpinfo(starg.timeout,starg.pname); // 把进程的心跳信息写入共享内存。
// 向服务端发起连接请求。
if (tcpclient.connect(starg.ip,starg.port)==false)
{
logfile.write("tcpclient.connect(%s,%d) failed.\n",starg.ip,starg.port); EXIT(-1);
}
// 向服务端发送登录报文,把客户端程序的参数传递给服务端。
if (login(argv[2])==false) { logfile.write("login() failed.\n"); EXIT(-1); }
bool bcontinue=true; // 如果调用_tcpputfiles()发送了文件bcontinue为true否则为false。
while (true)
{
// 调用文件上传的主函数,执行一次文件上传的任务。
if (_tcpputfiles(bcontinue)==false) { logfile.write("_tcpputfiles() failed.\n"); EXIT(-1); }
// 如果刚才执行文件上传任务的时候上传了文件,在上传的过程中,可能有新的文件陆续已生成,
// 那么,为保证文件被尽快上传,进程不体眠。(只有在刚才执行文件上传任务的时候没有上传文件的情况下才休眠)
if (bcontinue==false)
{
sleep(starg.timetvl);
// 发送心跳报文。
if (activetest()==false) break;
}
pactive.uptatime();
}
EXIT(0);
}
// 心跳。
bool activetest()
{
strsendbuffer="<activetest>ok</activetest>";
// xxxxxxxxxx logfile.write("发送:%s\n",strsendbuffer.c_str());
if (tcpclient.write(strsendbuffer)==false) return false; // 向服务端发送请求报文。
if (tcpclient.read(strrecvbuffer,10)==false) return false; // 接收服务端的回应报文。
// xxxxxxxxxx logfile.write("接收:%s\n",strrecvbuffer.c_str());
// 心跳机制的代码可简单化处理,只需要收到对端的回应就行了,不必判断回应的内容。
return true;
}
void EXIT(int sig)
{
logfile.write("程序退出sig=%d\n\n",sig);
exit(0);
}
// 程序的帮助文档。
void _help()
{
printf("\n");
printf("Using:/project/tools/bin/tcpputfiles logfilename xmlbuffer\n\n");
printf("Sample:/project/tools/bin/procctl 20 /project/tools/bin/tcpputfiles /log/idc/tcpputfiles_surfdata.log "\
"\"<ip>192.168.150.128</ip><port>5005</port>"\
"<clientpath>/tmp/client</clientpath><ptype>1</ptype>"
"<srvpath>/tmp/server</srvpath>"\
"<andchild>true</andchild><matchname>*.xml,*.txt,*.csv</matchname><timetvl>10</timetvl>"\
"<timeout>50</timeout><pname>tcpputfiles_surfdata</pname>\"\n\n");
printf("本程序是数据中心的公共功能模块采用tcp协议把文件上传给服务端。\n");
printf("logfilename 本程序运行的日志文件。\n");
printf("xmlbuffer 本程序运行的参数,如下:\n");
printf("ip 服务端的IP地址。\n");
printf("port 服务端的端口。\n");
printf("ptype 文件上传成功后的处理方式1-删除文件2-移动到备份目录。\n");
printf("clientpath 本地文件存放的根目录。\n");
printf("clientpathbak 文件成功上传后本地文件备份的根目录当ptype==2时有效。\n");
printf("andchild 是否上传clientpath目录下各级子目录的文件true-是false-否缺省为false。\n");
printf("matchname 待上传文件名的匹配规则,如\"*.TXT,*.XML\"\n");
printf("srvpath 服务端文件存放的根目录。\n");
printf("timetvl 扫描本地目录文件的时间间隔单位取值在1-30之间。\n");
printf("timeout 本程序的超时时间单位视文件大小和网络带宽而定建议设置50以上。\n");
printf("pname 进程名,尽可能采用易懂的、与其它进程不同的名称,方便故障排查。\n\n");
}
// 把xml解析到参数starg结构。
bool _xmltoarg(const char *strxmlbuffer)
{
memset(&starg,0,sizeof(struct st_arg));
getxmlbuffer(strxmlbuffer,"ip",starg.ip);
if (strlen(starg.ip)==0) { logfile.write("ip is null.\n"); return false; }
getxmlbuffer(strxmlbuffer,"port",starg.port);
if ( starg.port==0) { logfile.write("port is null.\n"); return false; }
getxmlbuffer(strxmlbuffer,"ptype",starg.ptype);
if ((starg.ptype!=1)&&(starg.ptype!=2)) { logfile.write("ptype not in (1,2).\n"); return false; }
getxmlbuffer(strxmlbuffer,"clientpath",starg.clientpath);
if (strlen(starg.clientpath)==0) { logfile.write("clientpath is null.\n"); return false; }
getxmlbuffer(strxmlbuffer,"clientpathbak",starg.clientpathbak);
if ((starg.ptype==2)&&(strlen(starg.clientpathbak)==0)) { logfile.write("clientpathbak is null.\n"); return false; }
getxmlbuffer(strxmlbuffer,"andchild",starg.andchild);
getxmlbuffer(strxmlbuffer,"matchname",starg.matchname);
if (strlen(starg.matchname)==0) { logfile.write("matchname is null.\n"); return false; }
getxmlbuffer(strxmlbuffer,"srvpath",starg.srvpath);
if (strlen(starg.srvpath)==0) { logfile.write("srvpath is null.\n"); return false; }
getxmlbuffer(strxmlbuffer,"timetvl",starg.timetvl);
if (starg.timetvl==0) { logfile.write("timetvl is null.\n"); return false; }
// 扫描本地目录文件的时间间隔(执行上传任务的时间间隔),单位:秒。
// starg.timetvl没有必要超过30秒。
if (starg.timetvl>30) starg.timetvl=30;
// 进程心跳的超时时间一定要大于starg.timetvl。
getxmlbuffer(strxmlbuffer,"timeout",starg.timeout);
if (starg.timeout==0) { logfile.write("timeout is null.\n"); return false; }
if (starg.timeout<=starg.timetvl) { logfile.write("starg.timeout(%d) <= starg.timetvl(%d).\n",starg.timeout,starg.timetvl); return false; }
getxmlbuffer(strxmlbuffer,"pname",starg.pname,50);
//if (strlen(starg.pname)==0) { logfile.write("pname is null.\n"); return false; }
return true;
}
// 向服务端发送登录报文,把客户端程序的参数传递给服务端。
bool login(const char *argv)
{
sformat(strsendbuffer,"%s<clienttype>1</clienttype>",argv);
// xxxxxxxxxx logfile.write("发送:%s\n",strsendbuffer.c_str());
if (tcpclient.write(strsendbuffer)==false) return false; // 向服务端发送请求报文。
if (tcpclient.read(strrecvbuffer,10)==false) return false; // 接收服务端的回应报文。
// xxxxxxxxxx logfile.write("接收:%s\n",strrecvbuffer.c_str());
logfile.write("登录(%s:%d)成功。\n",starg.ip,starg.port);
return true;
}
// 文件上传的主函数,执行一次文件上传的任务。
bool _tcpputfiles(bool &bcontinue)
{
bcontinue=false;
cdir dir;
// 打开starg.clientpath目录。
if (dir.opendir(starg.clientpath,starg.matchname,10000,starg.andchild)==false)
{
logfile.write("dir.opendir(%s) 失败。\n",starg.clientpath); return false;
}
int delayed=0; // 未收到对端确认报文的文件数量发送了一个文件就加1接收到了一个回应就减1。
// 遍历目录中的每个文件
while (dir.readdir())
{
bcontinue=true;
// 把文件名、修改时间、文件大小组成报文,发送给对端。
sformat(strsendbuffer,"<filename>%s</filename><mtime>%s</mtime><size>%d</size>",
dir.m_ffilename.c_str(),dir.m_mtime.c_str(),dir.m_filesize);
// xxxxxxxxxxxxxxx logfile.write("strsendbuffer=%s\n",strsendbuffer.c_str());
if (tcpclient.write(strsendbuffer)==false)
{
logfile.write("tcpclient.write() failed.\n"); return false;
}
// 发送文件内容。
logfile.write("send %s(%d) ...",dir.m_ffilename.c_str(),dir.m_filesize);
if (sendfile(dir.m_ffilename,dir.m_filesize)==true)
{
logfile << "ok.\n"; delayed++;
}
else
{
logfile << "failed.\n"; tcpclient.close(); return false;
}
pactive.uptatime();
// 接收服务端的确认报文。
while (delayed>0)
{
if (tcpclient.read(strrecvbuffer,-1)==false) break;
// xxxxxxxxxxxxxxx logfile.write("strrecvbuffer=%s\n",strrecvbuffer.c_str());
// 处理服务端的确认报文(删除本地文件或把本地文件移动到备份目录)。
ackmessage(strrecvbuffer);
}
}
// 继续接收对端的确认报文。
while (delayed>0)
{
if (tcpclient.read(strrecvbuffer,10)==false) break;
// xxxxxxxxxxxxxxx logfile.write("strrecvbuffer=%s\n",strrecvbuffer.c_str());
// 处理传输文件的响应报文(删除或者转存本地的文件)。
delayed--;
ackmessage(strrecvbuffer);
}
return true;
}
// 把文件的内容发送给对端。
bool sendfile(const string &filename,const int filesize)
{
int onread=0; // 每次打算从文件中读取的字节数。
char buffer[1000]; // 存放读取数据的bufferbuffer的大小可参考硬盘一次读取数据量4K为宜
int totalbytes=0; // 从文件中已读取的字节总数。
cifile ifile; // 读取文件的对象。
// 必须以二进制的方式操作文件。
if (ifile.open(filename,ios::in|ios::binary)==false) return false;
while (true)
{
memset(buffer,0,sizeof(buffer));
// 计算本次应该读取的字节数如果剩余的数据超过1000字节就读1000字节。
if (filesize-totalbytes>1000) onread=1000;
else onread=filesize-totalbytes;
// 从文件中读取数据。
ifile.read(buffer,onread);
// 把读取到的数据发送给对端。
if (tcpclient.write(buffer,onread)==false) { return false; }
// 计算文件已读取的字节总数,如果文件已读完,跳出循环。
totalbytes=totalbytes+onread;
if (totalbytes==filesize) break;
}
return true;
}
// 处理传输文件的响应报文(删除或者转存本地的文件)。
bool ackmessage(const string &strrecvbuffer)
{
// <filename>/tmp/client/2.xml</filename><result>ok</result>
string filename; // 本地文件名。
string result; // 对端接收文件的结果。
getxmlbuffer(strrecvbuffer,"filename",filename);
getxmlbuffer(strrecvbuffer,"result",result);
// 如果服务端接收文件不成功,直接返回(下次执行文件传输任务时将会重传)。
if (result!="ok") return true;
// 如果starg.ptype==1删除文件。
if (starg.ptype==1)
{
if (remove(filename.c_str())!=0) { logfile.write("remove(%s) failed.\n",filename.c_str()); return false; }
}
// 如果starg.ptype==2移动到备份目录。
if (starg.ptype==2)
{
// 生成转存后的备份目录文件名。 例如:/tmp/client/2.xml /tmp/clientbak/2.xml
string bakfilename=filename;
replacestr(bakfilename,starg.clientpath,starg.clientpathbak,false); // 注意第三个参数一定要填false。
if (renamefile(filename,bakfilename)==false)
{ logfile.write("renamefile(%s,%s) failed.\n",filename.c_str(),bakfilename.c_str()); return false; }
}
return true;
}