<% dim ModuleName,InfoID,ChannelShortName,CorrelativeArticle,InstallDir,ChannelDir,Keyword,PageTitle,ArticleIntro,Articlecontent Keyword=stripHTML("C,rabbitmq-c,使用实例") PageTitle=stripHTML("C rabbitmq-c使用实例") ArticleIntro=stripHTML("") Articlecontent=stripHTML("  C rabbitmq-c使用实例    /*    author: lwh    */    #include <pthread.h>    #includ…") ModuleName = stripHTML("programme") InfoID = stripHTML("194998") ChannelShortName=stripHTML("编程") InstallDir=stripHTML("http://www.77169.com/") ChannelDir=stripHTML("programme") %> C rabbitmq-c使用实例 - 华盟网 - http://www.77169.com
您现在的位置: 华盟网 >> 编程 >> C语言 >> 正文

C rabbitmq-c使用实例

2015/3/17 作者:admin 来源: 网络收集
导读 <% if len(ArticleIntro)<3 then Response.Write Articlecontent 'Response.Write "Articlecontent" else Response.Write ArticleIntro 'Response.Write "ArticleIntro" end if %>

  C rabbitmq-c使用实例
    /*
    author: lwh
    */
    #include <pthread.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <string.h>
    #include <stdint.h>
    #include <unistd.h>
    #include <amqp.h>
    #include <amqp_tcp_socket.h>
    #include <amqp_framing.h>
    #include <mlcLog.h>
    #include <iniparser.h>
    void *recvMessage(void* args){
    mlcDebug("call recvMessage()\n");
    amqp_connection_state_t conn;
    amqp_socket_t *socket = NULL;
    amqp_rpc_reply_t arrt;
    //创建连接
    conn = amqp_new_connection();
    //打开socket
    socket = amqp_tcp_socket_new(conn);
    if(!socket){
    mlcErrx("new socket failed!\n");
    return 0;
    }
    if(amqp_socket_open(socket,"10.247.58.172",5672) != AMQP_STATUS_OK){
    mlcErrx("open socket failed!\n");
    }
    //登录rabbitMQ
    arrt = amqp_login(conn,"/",0,524288,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");
    if(arrt.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION){
    mlcErrx("login mq failed!\n");
    return 0;
    }
    amqp_channel_open(conn, 10);
    amqp_get_rpc_reply(conn);
    //设置QOS
    short prefetchCount=1000;
    amqp_basic_qos(conn,10,0,prefetchCount,false);
    mlcDebug("rcv data……!\n");
    //接收消息,指定队列
    amqp_basic_consume(conn, 10, amqp_cstring_bytes("queue.test"), amqp_empty_bytes, 0, false, 0, amqp_empty_table);
    amqp_get_rpc_reply(conn);
    amqp_frame_t frame;
    while(true){
    amqp_rpc_reply_t ret;
    amqp_envelope_t envelope;
    amqp_maybe_release_buffers(conn);
    ret = amqp_consume_message(conn, &envelope, NULL, 0);
    if (ret.reply_type == AMQP_RESPONSE_NORMAL ){
    char *messageBody = new char[envelope.message.body.len];
    memcpy(messageBody,envelope.message.body.bytes,envelope.message.body.len);
    //printf("messageBody.len=%i messageBody.bytes=%s\n",envelope.message.body.len,messageBody);
    mlcDebug("messageBody.bytes=");
    int i = 0;
    while(messageBody[i] != EOF){
    //printf("%X ",messageBody[i]);
    i++;
    }
    //printf("\n");
    if(amqp_basic_ack(conn,10,envelope.delivery_tag,false)>0){
    mlcWarnx("failing to send the ack to the broker\n");
    }
    free(messageBody);
    amqp_destroy_envelope(&envelope);
    }else if (ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION){
    mlcErrx("amqp_consume_message faild!\n");
    }
    }
    }

 int main(){
    //openlog("mtrmbsd",LOG_NDELAY,0);
    dictionary *ini;
    ini = iniparser_load("cfg.ini");
    if(ini == NULL){
    return -1;
    }
    //iniparser_dump(ini,stderr);
    char *tIp;
    tIp = iniparser_getstring(ini,"mq:ip",NULL);
    printf("ip = %s\n",tIp);
    amqp_connection_state_t conn;
    amqp_socket_t *socket = NULL;
    amqp_rpc_reply_t arrt;
    //创建连接
    conn = amqp_new_connection();
    //打开socket
    socket = amqp_tcp_socket_new(conn);
    if(!socket){
    mlcErrx("new socket failed!\n");
    return 0;
    }
    if(amqp_socket_open(socket,"10.247.58.172",5672) != AMQP_STATUS_OK){
    mlcErrx("open socket failed!\n");
    }
    //登录rabbitMQ
    arrt = amqp_login(conn,"/",0,1310172,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");
    if(arrt.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION){
    mlcErrx("login mq failed!\n");
    return 0;
    }
    //打开隧道
    amqp_channel_open(conn, 1);
    amqp_get_rpc_reply(conn);
    //amqp_confirm_select_ok_t *acsot = amqp_confirm_select(conn , 1);
    //printf("amqp_confirm_select_ok_t= %i",acsot->dummy);
    //amqp_get_rpc_reply(conn);
    /*创建线程*/
    /*
    pthread_t id;
    int thread_result;
    if ((thread_result = pthread_create(&id, NULL, recvMessage, NULL)) != 0){
    mlcErrx("can't create thread:%s\n",strerror(thread_result));
    return -1;
    }*/
    /*发送消息*/
    FILE *fp = NULL;
    if((fp =fopen("test.dat","r")) == NULL){
    mlcErrx("read file error!");
    }
    fseek(fp, 0L, SEEK_END);
    size_t size = ftell(fp);
    rewind(fp);
    size_t t_len =6;
    char message[t_len];
    //fputs(message,fp);
    fclose(fp);
    message[0] = 0xf0;
    message[1] = 0x00;
    message[2] = 0x00;
    message[3] = 0x00;
    message[4] = 0x09;
    message[5] = 0x24;
    while(true){
    amqp_bytes_t message_bytes;
    message_bytes.len =t_len ;
    //strcpy((char*)message_bytes.bytes,message);
    message_bytes.bytes = message;
    amqp_basic_publish(conn,1,amqp_cstring_bytes("mtrmlc.dataIn"),amqp_cstring_bytes("c093"),0,0,NULL,message_bytes);
    sleep(10);
    }
    free(message);
    //pthread_join(id, NULL);
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
    }



  • 上一篇编程:

  • 下一篇编程: