#include <stdio.h>
#include <sys/timeb.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/wait.h>
#include <sys/poll.h>
#include <stdbool.h>
#include <fcntl.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include "mysql.h"

 

#define SZ_QUEUETEXT 4096

 

struct __MESSAGE_QUEUE
{
    long mtype;
    char mtext[SZ_QUEUETEXT];
};

 

MYSQL *mysql=NULL;
MYSQL *connection=NULL;
char mesg[1024];

 

static int s_insert(char *buff);
static int s_commit(void);
static int ____l_myInsert(MYSQL *mysql, char *query, char *errmsg);
static int extract_message_from_strchr(char *resultstr, char msg[8][100]);
static int insert_into_rb_order_test(char *rbuf, int rlen, char *queue_name, int kk);
static int ____getmsgqueue(key_t qkey);
static int ____rcvmsgqueue(int q_id, struct __MESSAGE_QUEUE *msg_buf, int msg_type);
static char *____get_semiconlon_date_time(void);

char rb_memory_buf[200000][999];
int rb_memory_cnt=0;

 

int main(int argc, char *argv[])
{
    int kk, rc, rlen;
    struct __MESSAGE_QUEUE msg_buf;
    int qkey, q_id = -1, mtype=1;
    char rbuf[4096];

    if(argc != 2) return(-100);

 

    sscanf(argv[1], "%x", &qkey);
    if((q_id = ____getmsgqueue(qkey)) < 0)
    {
        printf( "____getmsgqueue q_id[%d] error[%d]\n", q_id, errno);
        return(-1);
    }

    mysql = mysql_init(NULL);
    if(mysql == NULL) {
        return(-1);
    }

    connection = mysql_real_connect(mysql, "127.0.0.1", "heroG", "duddnd702!", "HEROG", 0, NULL, 0);
    if (connection == NULL)
    {
        return(-2);
    }

      /*READ LOOP------------------------------------------*/

    kk=0;
    while(1) 
    {
        rlen = 0;
        errno = 0;
        memset(&rbuf, 0x00, sizeof(rbuf));
        memset(&msg_buf, 0x00, sizeof(msg_buf));

        rlen = ____rcvmsgqueue(q_id, &msg_buf, mtype);
        if (errno > 0)
        {
            printf( "____rcvmsgqueue error[%d]\n", errno);
            break;
        }
        else  
        {
            if(rlen == 0)
            {
                usleep(10000);
                continue;
            }
        }
        memcpy(&rbuf[0], (char *)&msg_buf.mtext[0], rlen);

        rc = insert_into_rb_order_test(rbuf, rlen, argv[1], kk);
        if(rc)
        {
            return(-1);
        }

        kk++;
    }
    mysql_close(connection);
    return(0);
}

int insert_into_rb_order_test(char *rbuf, int rlen, char *queue_name, int kk)
{
    char command[4096];
    char msg[8][100];
    int rc;

 

    memset(command, 0x00, sizeof(command));
    memcpy(command, rbuf, rlen);

#if(1)
    rc = extract_message_from_strchr(rbuf, msg);
    if(rc)
    {
        //ERROR
    }
    sprintf(command, "REPLACE INTO"

                     " RB_OCUS_POS_TEST2(seqno,account_no,stock_symbol,start_volume,start_amount,"

                     "start_price,stock_type,trustee_id,regtime) VALUES("
                     "%s,"
                     "'%s',"
                     "'%s',"
                     "'%s',"
                     "'%s',"
                     "'%s',"
                     "'%s',"
                     "'%s',"
                     "'%s');",
                     msg[0],
                     msg[1],
                     msg[2],
                     msg[3],
                     msg[4],
                     msg[5],
                     msg[6],
                     msg[7],
                     ____get_semiconlon_date_time());
    rc = s_insert(command);
    if(rc)
    {
        printf("(error)(%s)\n", command);
        return(-1);
    }
    else
    {
        s_commit();
    }
#endif

    if(kk / 500 * 500 == kk) printf(">>QUEUE[%s],TIME(%s),SQ(%s)\n", queue_name, ____get_semiconlon_date_time(), command);

    return(0);
}

int extract_message_from_strchr(char *resultstr, char msg[8][100])
{
    char tmp[100][1024];
    char tmpstr[1024];
    char *d1, *d2;
    int kk, rc;

 

    memset(tmp,0x00,sizeof(tmp));

    for(kk=0; kk<8; kk++) memset(&msg[kk],0x00,sizeof(msg[kk]));

 

    kk=0;
    d1 = strchr(resultstr, '|');
    memcpy(tmp[kk++], resultstr, d1 - resultstr);

    if(d1 != NULL)
    {
        for(;;)
        {
            d2 = strchr(d1+1, '|');
            if(d2==NULL) break;
            memcpy(tmp[kk++], d1+1, d2-d1-1);

            d1 = strchr(d2+1, '|');
            if(d1==NULL) break;
            memcpy(tmp[kk++], d2+1, d1-d2-1);
        }
    }
    rc=kk;

 

    for(kk=0; kk<rc; kk++)
    {
        memset(tmpstr,0x00,sizeof(tmpstr));
        strcpy(tmpstr, tmp[kk]);

        memcpy(msg[kk],tmpstr,strlen(tmpstr));
    }
    return(0);
}

int s_insert(char *buff)
{
    if (____l_myInsert(mysql, buff, mesg) < 0) 
    {
        printf( "%s\n", mesg);
        return(-1);
    }

    return(0);
}

int s_commit(void)
{
    mysql_commit(mysql);

    return(0);
}

int ____l_myInsert(MYSQL *mysql, char *query, char *errmsg)
{
    int retn;

    retn = mysql_real_query(mysql, query, strlen(query));
    if (retn != 0)
    {
        sprintf(errmsg, "SQL insert Error[%d].",mysql_errno(mysql));
        return -1;
    }

    return (mysql_affected_rows(mysql));
}

/*- ------------------------------------------------------------------------- -*
 *- Message Queue Get
 *- ------------------------------------------------------------------------ -*/
int ____getmsgqueue(key_t qkey)
{
    int q_id;

    if((q_id = msgget(qkey,IPC_CREAT | 0666)) < 0)
    {
        return(-1);
    }
    return (q_id);
}

/*- ------------------------------------------------------------------------- -*
 *- Message Queue Rcveive
 *- ------------------------------------------------------------------------ -*/
int ____rcvmsgqueue(int q_id, struct __MESSAGE_QUEUE *msg_buf, int msg_type)
{
    int msg_size;

    if((msg_size = msgrcv(q_id, msg_buf, SZ_QUEUETEXT , msg_type, MSG_NOERROR)) < 0)
    {
        if(errno == ENOMSG)
        {
            return(0);
        }
        return(-1);
    }
    return (msg_size);
}

char *____get_semiconlon_date_time(void)
{
    struct timeb itb;
    struct tm *lt;
    static char dt[100];

    ftime(&itb);

    lt = localtime(&itb.time);

    memset(dt , 0x00 , sizeof(dt));

    sprintf(dt, "%04d%02d%02d %02d:%02d:%02d(%03d)"
             , lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday
             , lt->tm_hour, lt->tm_min, lt->tm_sec
             , itb.millitm);
    return dt;
}

 

 

+ Recent posts