怎么用python快速搭建redis集群
怎么用python快速搭建redis集群
本文小编为大家详细介绍“怎么用python快速搭建redis集群”,内容详细,步骤清晰,细节处理妥当,希望这篇“怎么用python快速搭建redis集群”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。
redis通信协议
列出主要的点,便于对于下面程序的理解。
Redis在TCP端口6379(默认端口,在配置可以修改)上监听到来的连接,在客户端与服务器端之间传输的每个Redis命令或者数据都以rn结尾。
回复(服务端可客户端恢复的协议)
Redis用不同的回复类型回复命令。它可能从服务器发送的第一个字节开始校验回复类型:
* 用单行回复(状态回复),回复的第一个字节将是“+”
* 错误消息,回复的第一个字节将是“-”
* 整型数字,回复的第一个字节将是“:”
* 批量回复,回复的第一个字节将是“$”
* 多个批量回复,回复的第一个字节将是“*”
Bulk Strings(批量回复)
批量回复被服务器用于返回一个单二进制安全字符串。
C: GET mykey
S: $6rnfoobarrn
服务器发送第一行回复,该行以“$”开始后面跟随实际要发送的字节数,随后是CRLF,然后发送实际数据,随后是2个字节的额外数据用于最后的CRLF。服务器发送的准确序列如下:
”$6rnfoobarrn”
如果请求的值不存在,批量回复将使用特殊的值-1来作为数据长度,例如:
C: GET nonexistingkey
S: $-1
当请求的对象不存在时,客户端库API不会返回空字符串,而会返回空对象。例如:Ruby库返回‘nil’,而C库返回NULL(或者在回复的对象里设置指定的标志)等等。
二进制
简单说下二进制,就是会包含,所以C语言在处理的时候,就不能用str函数,像strlen、strcpy等,因为它们都是以来判断字符串结尾的。
redis集群
超简单搭建redis集群
官网也介绍了怎么搭建redis集群,试过比较麻烦,因为用的centos6.5,如果用较新的centos,可能会好点。
Redis 集群的数据分片
Redis 集群没有使用一致性hash, 而是引入了 哈希槽的概念.
Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽.集群的每个节点负责一部分hash槽,举个例子,比如当前集群有3个节点,那么:
* 节点 A 包含 0 到 5500号哈希槽.
* 节点 B 包含5501 到 11000 号哈希槽.
* 节点 C 包含11001 到 16384号哈希槽.
这种结构很容易添加或者删除节点. 比如如果我想新添加个节点D, 我需要从节点 A, B, C中得部分槽到D上. 如果我想移除节点A,需要将A中的槽移到B和C节点上,然后将没有任何槽的A节点从集群中移除即可. 由于从一个节点将哈希槽移动到另一个节点并不会停止服务,所以无论添加删除或者改变某个节点的哈希槽的数量都不会造成集群不可用的状态.
Redis 集群协议中的客户端和服务器端
在 Redis 集群中,节点负责存储数据、记录集群的状态(包括键值到正确节点的映射)。集群节点同样能自动发现其他节点,检测出没正常工作的节点, 并且在需要的时候在从节点中推选出主节点。
为了执行这些任务,所有的集群节点都通过TCP连接(TCP bus?)和一个二进制协议(集群连接,cluster bus)建立通信。 每一个节点都通过集群连接(cluster bus)与集群上的其余每个节点连接起来。 节点们使用一个 gossip 协议来传播集群的信息,这样可以:发现新的节点、 发送ping包(用来确保所有节点都在正常工作中)、在特定情况发生时发送集群消息。集群连接也用于在集群中发布或订阅消息。
由于集群节点不能代理(proxy)请求,所以客户端在接收到重定向错误(redirections errors) -MOVED 和 -ASK 的时候, 将命令重定向到其他节点。理论上来说,客户端是可以自由地向集群中的所有节点发送请求,在需要的时候把请求重定向到其他节点,所以客户端是不需要保存集群状态。 不过客户端可以缓存键值和节点之间的映射关系,这样能明显提高命令执行的效率。
-MOVED
简单说下返回-MOVED的情况,就是客户端连节点A请求处理key,但其实key其实在节点B,就返回-MOVED,协议如下:-MOVED 3999 127.0.0.1:6381
不用考虑-ASK的情况。
C语言实现redis客户端
代码如下:
#include<string.h>#include<sys/socket.h>#include<arpa/inet.h>#include<errno.h>#include<fcntl.h>#include<netdb.h>#include<sys/poll.h>#include<unistd.h>#include<sys/types.h>#include<stdlib.h>#include<stdio.h>ssize_tsock_write_loop(intfd,constvoid*vptr,size_tn){size_tnleft=0;ssize_tnwritten=0;constchar*ptr;ptr=(char*)vptr;nleft=n;while(nleft>0){if((nwritten=write(fd,ptr,nleft))<=0){if(errno==EINTR){nwritten=0;//再次调用write}else{return-5;}}nleft=nleft-nwritten;ptr=ptr+nwritten;}return(n);}intsock_read_wait(intfd,inttimeout){structpollfdpfd;pfd.fd=fd;pfd.events=POLLIN;pfd.revents=0;timeout*=1000;for(;;){switch(poll(&pfd,1,timeout)){case-1:if(errno!=EINTR){return(-2);}continue;case0:errno=ETIMEDOUT;return(-1);default:if(pfd.revents&POLLIN)return(0);elsereturn(-3);}}}ssize_tsock_read_tmo(intfd,void*vptr,size_tlen,inttimeout){if(timeout>0&&sock_read_wait(fd,timeout)<0)return(-1);elsereturn(read(fd,vptr,len));}intsock_connect_nore(constchar*IPaddr,intport,inttimeout){//chartemp[4096];intsock_fd=0,n=0,errcode=0;structsockaddr_inservaddr;if(IPaddr==NULL){return-1;}if((sock_fd=socket(AF_INET,SOCK_STREAM,0))<0){return-1;}memset(&servaddr,0,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port=htons(port);//changedbynavy2003.3.3forsupportdomainaddr//if((servaddr.sin_addr.s_addr=inet_addr(IPaddr))==-1)if((errcode=inet_pton(AF_INET,IPaddr,&servaddr.sin_addr))<=0){//addedbynavy2003.3.31forsupportdomainaddrstructhostent*pHost=NULL,host;charsBuf[2048],sHostIp[17];inth_errnop=0;memset(&host,0,sizeof(host));memset(sBuf,0,sizeof(sBuf));memset(sHostIp,0,sizeof(sHostIp));pHost=&host;#ifdef_SOLARIS_PLAT//solarisif((gethostbyname_r(IPaddr,pHost,sBuf,sizeof(sBuf),&h_errnop)==NULL)||#else//linuxif((gethostbyname_r(IPaddr,pHost,sBuf,sizeof(sBuf),&pHost,&h_errnop)!=0)||#endif(pHost==NULL)){close(sock_fd);return-1;}if(pHost->h_addrtype!=AF_INET&&pHost->h_addrtype!=AF_INET6){close(sock_fd);return-1;}//目前仅取第一个IP地址if((inet_ntop(pHost->h_addrtype,*(pHost->h_addr_list),sHostIp,sizeof(sHostIp)))==NULL){close(sock_fd);return-1;}if((errcode=inet_pton(AF_INET,sHostIp,&servaddr.sin_addr))<=0){close(sock_fd);return-1;}//endaddedbynavy2003.3.31}if((errcode=sock_timed_connect(sock_fd,(structsockaddr*)&servaddr,sizeof(servaddr),timeout))<0){close(sock_fd);return-1;}returnsock_fd;}intsock_connect(constchar*IPaddr,intport,inttimeout){chartemp[4096];intsock_fd=0,n=0,errcode=0;structsockaddr_inservaddr;if(IPaddr==NULL){return-1;}if((sock_fd=socket(AF_INET,SOCK_STREAM,0))<0){return-1;}memset(&servaddr,0,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port=htons(port);//changedbynavy2003.3.3forsupportdomainaddr//if((servaddr.sin_addr.s_addr=inet_addr(IPaddr))==-1)if((errcode=inet_pton(AF_INET,IPaddr,&servaddr.sin_addr))<=0){//addedbynavy2003.3.31forsupportdomainaddrstructhostent*pHost=NULL,host;charsBuf[2048],sHostIp[17];inth_errnop=0;memset(&host,0,sizeof(host));memset(sBuf,0,sizeof(sBuf));memset(sHostIp,0,sizeof(sHostIp));pHost=&host;#ifdef_SOLARIS_PLAT//solarisif((gethostbyname_r(IPaddr,pHost,sBuf,sizeof(sBuf),&h_errnop)==NULL)||#else//linuxif((gethostbyname_r(IPaddr,pHost,sBuf,sizeof(sBuf),&pHost,&h_errnop)!=0)||#endif(pHost==NULL)){close(sock_fd);return-1;}if(pHost->h_addrtype!=AF_INET&&pHost->h_addrtype!=AF_INET6){close(sock_fd);return-1;}//目前仅取第一个IP地址if((inet_ntop(pHost->h_addrtype,*(pHost->h_addr_list),sHostIp,sizeof(sHostIp)))==NULL){close(sock_fd);return-1;}if((errcode=inet_pton(AF_INET,sHostIp,&servaddr.sin_addr))<=0){close(sock_fd);return-1;}//endaddedbynavy2003.3.31}if((errcode=sock_timed_connect(sock_fd,(structsockaddr*)&servaddr,sizeof(servaddr),timeout))<0){close(sock_fd);return-1;}n=sock_read_tmo(sock_fd,temp,4096,timeout);//一般错误if(n<=0){close(sock_fd);sock_fd=-1;}returnsock_fd;}intsock_non_blocking(intfd,inton){intflags;if((flags=fcntl(fd,F_GETFL,0))<0){return-10;}if(fcntl(fd,F_SETFL,on?flags|O_NONBLOCK:flags&~O_NONBLOCK)<0){return-10;}return0;}intsock_write_wait(intfd,inttimeout){structpollfdpfd;pfd.fd=fd;pfd.events=POLLOUT;pfd.revents=0;timeout*=1000;for(;;){switch(poll(&pfd,1,timeout)){case-1:if(errno!=EINTR){return(-2);}continue;case0:errno=ETIMEDOUT;return(-1);default:if(pfd.revents&POLLOUT)return(0);elsereturn(-3);}}}intsock_timed_connect(intsock,structsockaddr*sa,intlen,inttimeout){interror=0;socklen_terror_len;sock_non_blocking(sock,1);if(connect(sock,sa,len)==0){sock_non_blocking(sock,0);return(0);}if(errno!=EINPROGRESS){sock_non_blocking(sock,0);return(-1);}/**Aconnectionisinprogress.Waitforalimitedamountoftimefor*somethingtohappen.Ifnothinghappens,reportanerror.*/if(sock_write_wait(sock,timeout)!=0){sock_non_blocking(sock,0);return(-2);}/**Somethinghappened.SomeSolaris2versionshavegetsockopt()itself*returntheerror,insteadofreturningitviatheparameterlist.*/error=0;error_len=sizeof(error);if(getsockopt(sock,SOL_SOCKET,SO_ERROR,(char*)&error,&error_len)!=0){sock_non_blocking(sock,0);return(-3);}if(error){errno=error;sock_non_blocking(sock,0);return(-4);}sock_non_blocking(sock,0);/**Noproblems.*/return(0);}staticintcheck_ip_in_list(constchar*ip,char*iplist){char*token=NULL;char*saveptr=NULL;token=strtok_r(iplist,",",&saveptr);while(token!=NULL){char*ptmp=NULL;char*ip_mask=strtok_r(token,"/",&ptmp);if(!ip_mask)return-1;char*ip_bit=strtok_r(NULL,"/",&ptmp);if(ip_bit){intmask_bit=atoi(ip_bit);if(mask_bit<0||mask_bit>32)continue;unsignedlongaddr[4]={0};sscanf(ip_mask,"%lu.%lu.%lu.%lu",addr,addr+1,addr+2,addr+3);unsignedlongvl1=addr[0]<<24|addr[1]<<16|addr[2]<<8|addr[3];sscanf(ip,"%lu.%lu.%lu.%lu",addr,addr+1,addr+2,addr+3);unsignedlongvl2=addr[0]<<24|addr[1]<<16|addr[2]<<8|addr[3];vl1=(vl1>>(32-mask_bit));vl2=(vl2>>(32-mask_bit));if(vl1==vl2)return1;}else{if(strcmp(ip,ip_mask)==0)return1;}token=strtok_r(NULL,",",&saveptr);}return0;}staticintcheck_ip_in_redis(constchar*redis_host,constchar*ip,constchar*rq_pro){charbuf[128];intloops=0;strcpy(buf,redis_host);do{loops++;char*ptmp=NULL;char*host=strtok_r(buf,":",&ptmp);if(!host)return-1;char*s_port=strtok_r(NULL,":",&ptmp);if(!s_port)return-1;intport=atoi(s_port);charrespone[40]={0};intsock_fd=-1;if((sock_fd=sock_connect_nore(host,port,5))<0)return-1;if(sock_write_loop(sock_fd,rq_pro,strlen(rq_pro))!=strlen(rq_pro)){close(sock_fd);return-1;}if(sock_read_tmo(sock_fd,respone,sizeof(respone)-1,5)<=0){close(sock_fd);return-1;}if(strncmp(":0",respone,2)==0){close(sock_fd);return0;}elseif(strncmp(":1",respone,2)==0){close(sock_fd);return1;}elseif(strncmp("$",respone,1)==0){intdata_size=0;intret=0;char*data_line=strstr(respone,"rn");if(!data_line){close(sock_fd);return-1;}data_line=data_line+2;data_size=atoi(respone+1);if(data_size==-1){close(sock_fd);return0;}if(strlen(data_line)==data_size+2){printf("line=%d,data_line=%sn",__LINE__,data_line);ret=check_ip_in_list(ip,data_line);close(sock_fd);returnret;}char*data=calloc(data_size+3,1);if(!data){close(sock_fd);return-1;}strcpy(data,data_line);intread_size=strlen(data);intleft_size=data_size+2-read_size;while(left_size>0){intnread=sock_read_tmo(sock_fd,data+read_size,left_size,5);if(nread<=0){free(data);close(sock_fd);return-1;}read_size+=nread;left_size-=nread;}close(sock_fd);printf("line=%d,data=%sn",__LINE__,data);ret=check_ip_in_list(ip,data);free(data);returnret;}elseif(strncmp("-MOVED",respone,6)==0){close(sock_fd);char*p=strchr(respone,'');if(p==NULL)return-1;p=strchr(p+1,'');if(p==NULL)return-1;strcpy(buf,p+1);}else{close(sock_fd);return-1;}}while(loops<2);return-1;}intmain(intargc,char*argv[]){if(argc!=2){printf("pleaseinputipn");return-1;}constchar*redis_ip="127.0.0.1:7002";constchar*domain="test.com";charexist_pro[128]={0};charget_pro[128]={0};snprintf(exist_pro,sizeof(exist_pro),"EXISTStest|%s|%srn",domain,"127.0.0.1");snprintf(get_pro,sizeof(get_pro),"GETtest_%srn",domain);intloops=0;intret=0;do{loops++;ret=check_ip_in_redis(redis_ip,argv[1],exist_pro);if(ret==0)ret=check_ip_in_redis(redis_ip,argv[1],get_pro);}while(loops<3&&ret<0);printf("line=%d,ret=%dn",__LINE__,ret);returnret;}
c_redis_cli.c
主要看这个check_ip_in_redis函数就行了,其它都是一些socket的封装。
python实现redis客户端
#!/usr/bin/pythonimportsysimportsocketdefmain(argv):if(len(argv)!=3):print"pleaseinputdomainip!"returnhost="192.168.188.47"port=7002while1:s=socket.socket()s.connect((host,port))cmd='set%s_white_ip%srn'%(argv[1],argv[2])s.send(cmd)res=s.recv(32)s.close()ifres[0]=="+":print"setdomainwhiteipsuc!"returnelifres[0:6]=="-MOVED":list=res.split("")ip_list=list[2].split(":")host=ip_list[0]port=int(ip_list[1])else:print"setdomainwhiteiperror!"returnif__name__=="__main__":main(sys.argv)
读到这里,这篇“怎么用python快速搭建redis集群”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注恰卡编程网行业资讯频道。
推荐阅读
-
python(中无效的十进制怎么解决 python怎么转换进制)
python怎么转换进制?Python执行二进制转换:1.十进制到二进制(bin)首先,让让我们看看如何将十进制转换成二进制。我...
-
python怎么清除完全相同的行(python splte如何分隔有多个相同符号的str)
pythonsplte如何分隔有多个相同符号的str?str你的string内容str_(相同的符号)执行完了以后再在相同符号的...
-
python(编程控制电脑关机 如何控制电脑关机)
如何控制电脑关机?可以在电脑的运行窗口中输入输入公式,给电脑可以设置自动关机。1.按开快捷键winr然后打开运行窗口。2.在运行窗...
-
python中的特殊标识符(python 中 标识符中可以有逗号吗)
python中标识符中可以有逗号吗?在python语言中合法的标识符是字母、数字以及_,所以我合法的标识符中肯定不能有逗号if...
-
python(excel 提取数据写入新表 python导入excel数据找不到工作簿)
python导入excel数据找不到工作簿?我可以导入数据后找不到工作,不是因为他的工作没有被转移。什么软件可提取并合并Exce...
-
python中字典定义的四种方法(python global关键字的用法详解)
pythonglobal关键字的用法详解?global标志实际上是目的是提示python讲解器,说被其修饰的变量是全局变量。这样...
-
python(array用法 python如何对两个数组做差处理)
python如何对两个数组做差处理?Python中的列表中的元素肯定不能真接相加,减。t最佳的位置的是将列表装换成Python中的...
-
php如何让Swoole/Pool进程池实现Redis持久连接
php如何让Swoole/Pool进程池实现Redis持久连接本篇...
-
python多行注释符号怎么表示
python多行注释符号怎么表示这篇文章主要介绍“python多行...
-
python支持的操作系统是什么
python支持的操作系统是什么这篇文章主要介绍“python支持...