#include <stdio.h>
#include <sys/timeb.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/wait.h>
#include <sys/poll.h>
#include <stdbool.h>
#include <fcntl.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include "mysql.h"
#define SZ_QUEUETEXT 4096
struct __MESSAGE_QUEUE
{
long mtype;
char mtext[SZ_QUEUETEXT];
};
MYSQL *mysql=NULL;
MYSQL *connection=NULL;
char mesg[1024];
static int s_insert(char *buff);
static int s_commit(void);
static int ____l_myInsert(MYSQL *mysql, char *query, char *errmsg);
static int extract_message_from_strchr(char *resultstr, char msg[8][100]);
static int insert_into_rb_order_test(char *rbuf, int rlen, char *queue_name, int kk);
static int ____getmsgqueue(key_t qkey);
static int ____rcvmsgqueue(int q_id, struct __MESSAGE_QUEUE *msg_buf, int msg_type);
static char *____get_semiconlon_date_time(void);
char rb_memory_buf[200000][999];
int rb_memory_cnt=0;
int main(int argc, char *argv[])
{
int kk, rc, rlen;
struct __MESSAGE_QUEUE msg_buf;
int qkey, q_id = -1, mtype=1;
char rbuf[4096];
if(argc != 2) return(-100);
sscanf(argv[1], "%x", &qkey);
if((q_id = ____getmsgqueue(qkey)) < 0)
{
printf( "____getmsgqueue q_id[%d] error[%d]\n", q_id, errno);
return(-1);
}
mysql = mysql_init(NULL);
if(mysql == NULL) {
return(-1);
}
connection = mysql_real_connect(mysql, "127.0.0.1", "heroG", "duddnd702!", "HEROG", 0, NULL, 0);
if (connection == NULL)
{
return(-2);
}
/*READ LOOP------------------------------------------*/
kk=0;
while(1)
{
rlen = 0;
errno = 0;
memset(&rbuf, 0x00, sizeof(rbuf));
memset(&msg_buf, 0x00, sizeof(msg_buf));
rlen = ____rcvmsgqueue(q_id, &msg_buf, mtype);
if (errno > 0)
{
printf( "____rcvmsgqueue error[%d]\n", errno);
break;
}
else
{
if(rlen == 0)
{
usleep(10000);
continue;
}
}
memcpy(&rbuf[0], (char *)&msg_buf.mtext[0], rlen);
rc = insert_into_rb_order_test(rbuf, rlen, argv[1], kk);
if(rc)
{
return(-1);
}
kk++;
}
mysql_close(connection);
return(0);
}
int insert_into_rb_order_test(char *rbuf, int rlen, char *queue_name, int kk)
{
char command[4096];
char msg[8][100];
int rc;
memset(command, 0x00, sizeof(command));
memcpy(command, rbuf, rlen);
#if(1)
rc = extract_message_from_strchr(rbuf, msg);
if(rc)
{
//ERROR
}
sprintf(command, "REPLACE INTO"
" RB_OCUS_POS_TEST2(seqno,account_no,stock_symbol,start_volume,start_amount,"
"start_price,stock_type,trustee_id,regtime) VALUES("
"%s,"
"'%s',"
"'%s',"
"'%s',"
"'%s',"
"'%s',"
"'%s',"
"'%s',"
"'%s');",
msg[0],
msg[1],
msg[2],
msg[3],
msg[4],
msg[5],
msg[6],
msg[7],
____get_semiconlon_date_time());
rc = s_insert(command);
if(rc)
{
printf("(error)(%s)\n", command);
return(-1);
}
else
{
s_commit();
}
#endif
if(kk / 500 * 500 == kk) printf(">>QUEUE[%s],TIME(%s),SQ(%s)\n", queue_name, ____get_semiconlon_date_time(), command);
return(0);
}
int extract_message_from_strchr(char *resultstr, char msg[8][100])
{
char tmp[100][1024];
char tmpstr[1024];
char *d1, *d2;
int kk, rc;
memset(tmp,0x00,sizeof(tmp));
for(kk=0; kk<8; kk++) memset(&msg[kk],0x00,sizeof(msg[kk]));
kk=0;
d1 = strchr(resultstr, '|');
memcpy(tmp[kk++], resultstr, d1 - resultstr);
if(d1 != NULL)
{
for(;;)
{
d2 = strchr(d1+1, '|');
if(d2==NULL) break;
memcpy(tmp[kk++], d1+1, d2-d1-1);
d1 = strchr(d2+1, '|');
if(d1==NULL) break;
memcpy(tmp[kk++], d2+1, d1-d2-1);
}
}
rc=kk;
for(kk=0; kk<rc; kk++)
{
memset(tmpstr,0x00,sizeof(tmpstr));
strcpy(tmpstr, tmp[kk]);
memcpy(msg[kk],tmpstr,strlen(tmpstr));
}
return(0);
}
int s_insert(char *buff)
{
if (____l_myInsert(mysql, buff, mesg) < 0)
{
printf( "%s\n", mesg);
return(-1);
}
return(0);
}
int s_commit(void)
{
mysql_commit(mysql);
return(0);
}
int ____l_myInsert(MYSQL *mysql, char *query, char *errmsg)
{
int retn;
retn = mysql_real_query(mysql, query, strlen(query));
if (retn != 0)
{
sprintf(errmsg, "SQL insert Error[%d].",mysql_errno(mysql));
return -1;
}
return (mysql_affected_rows(mysql));
}
/*- ------------------------------------------------------------------------- -*
*- Message Queue Get
*- ------------------------------------------------------------------------ -*/
int ____getmsgqueue(key_t qkey)
{
int q_id;
if((q_id = msgget(qkey,IPC_CREAT | 0666)) < 0)
{
return(-1);
}
return (q_id);
}
/*- ------------------------------------------------------------------------- -*
*- Message Queue Rcveive
*- ------------------------------------------------------------------------ -*/
int ____rcvmsgqueue(int q_id, struct __MESSAGE_QUEUE *msg_buf, int msg_type)
{
int msg_size;
if((msg_size = msgrcv(q_id, msg_buf, SZ_QUEUETEXT , msg_type, MSG_NOERROR)) < 0)
{
if(errno == ENOMSG)
{
return(0);
}
return(-1);
}
return (msg_size);
}
char *____get_semiconlon_date_time(void)
{
struct timeb itb;
struct tm *lt;
static char dt[100];
ftime(&itb);
lt = localtime(&itb.time);
memset(dt , 0x00 , sizeof(dt));
sprintf(dt, "%04d%02d%02d %02d:%02d:%02d(%03d)"
, lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday
, lt->tm_hour, lt->tm_min, lt->tm_sec
, itb.millitm);
return dt;
}
'데이타베이스 > MySQL' 카테고리의 다른 글
#LINUX,common.c(MySQL) (0) | 2019.10.30 |
---|---|
#LINUX,send_msg_to_queue.c (2번째) (0) | 2019.10.30 |
#Queue를 이용한 MySQL입력 성능향상에 관한 글(1) (0) | 2019.10.30 |
1달전 자료 지우는 쿼리 (0) | 2019.10.29 |
MySQL LOCK 확인 및 해제(kill) 방법 (0) | 2019.10.29 |