#include <stdio.h>
#include <sys/timeb.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/wait.h>
#include <sys/poll.h>
#include <stdbool.h>
#include <fcntl.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <errno.h>

 

#include "isam.h"
#include "mysql.h"

 

#define SZ_QUEUETEXT 4096
#define ____MAX_QUEUE_MAX_NUMER 10
#define ____QUEUE_MAX_NUMER 10

 

int isfd=0, isrc=0, ismd=0;
static char *____get_semiconlon_date_time(void);
static int read_memory_from_file(void);
static int insert_into_rb_order_test(void);

char rb_memory_buf[200000][999];
int rb_memory_cnt=0;

int g_msg_id[____MAX_QUEUE_MAX_NUMER];

 

int main(void)
{
    int kk, rc, qkey;
    char begin_time[100],end_time[100];
    char queue_number[____MAX_QUEUE_MAX_NUMER][10] =    {"46560","46561","46562","46563","46564",

                        "46565","46566","46567","46568","46569"};

    for(kk=0; kk<____MAX_QUEUE_MAX_NUMER; kk++)
    {
        sscanf(queue_number[kk], "%x", &qkey);
        g_msg_id[kk] = msgget(qkey, IPC_CREAT | 0666 );

        if(g_msg_id[kk] == -1)
        {
            printf("%s queue ipc_creat fail\n", queue_number[kk]);
            return(-1);
        }
    }

    memset(begin_time,0x00,sizeof(begin_time));
    memset(end_time,0x00,sizeof(end_time));

 

    rc = read_memory_from_file();

    printf("send_msg_to_queue Insert>>[%s]\n", ____get_semiconlon_date_time());
    sprintf(begin_time, "%s", ____get_semiconlon_date_time());

 

    rc = insert_into_rb_order_test();
    if(rc)
    {
        printf("insert_into_rb_order_test>>error(%d)\n", rc);
        return(-1);
    }
    printf("send_msg_to_queue Insert>>[%s]\n", ____get_semiconlon_date_time());
    printf("send_msg_to_queue Insert>>CNT[%d]\n",rb_memory_cnt);

    sprintf(end_time, "%s", ____get_semiconlon_date_time());

    printf("send_msg_to_queue>>begin[%s],end[%s]\n", begin_time,end_time);


    return(0);
}

int read_memory_from_file(void)
{
    char jfile[40];
    char    command[7024];
    char buff[1024], tmpb[32];
    FILE  *jfp=NULL;

 

    memset(rb_memory_buf,0x00,sizeof(rb_memory_buf));

    sprintf(jfile, "%s/%s", "/sdat/xdat/iFIS/dat", "ocus_pos.dat");

    jfp = fopen(jfile, "r");
    if (jfp == NULL)
    {
        return(-1);
    }

    for(;;)
    {
        memset(buff, 0x00, sizeof(buff));
        if(fgets(buff, sizeof(buff), jfp)==NULL) break;

        strcpy(rb_memory_buf[rb_memory_cnt], buff);
        rb_memory_cnt++;
    }
    if(jfp != NULL) fclose(jfp);
    return(0);
}

int insert_into_rb_order_test(void)
{
    int     divide_queue_number;
    int     kk, rc;
    struct ocus_pos_o ocus_pos_o;
    char    command[7024];
    char buff[1024], tmpb[32];

    struct queue_message_buffer {
        long mtype;
        char mtext[SZ_QUEUETEXT];
    };
    struct queue_message_buffer msg_buf;

   

    kk=0;
    for(;;)
    {
        memcpy(&ocus_pos_o, rb_memory_buf[kk], sizeof(struct ocus_pos_o));
        memset(command,0x00,sizeof(command));

 

       sprintf(command, "%d|"
                        "%.10s|"
                        "%.20s|"
                        "%.10s|"
                        "%.13s|"
                        "%.13s|"
                        "%.2s|"
                        "%.2s|",
                        kk + 1,
                        ocus_pos_o.account_no,
                        ocus_pos_o.stock_symbol,
                        ocus_pos_o.start_volume,
                        ocus_pos_o.start_amount,
                        ocus_pos_o.start_price,
                        ocus_pos_o.stock_type,
                        ocus_pos_o.trustee_id); 

 

        msg_buf.mtype = 1;
        memcpy(&msg_buf.mtext[0], command, strlen(command));

        divide_queue_number = kk % ____QUEUE_MAX_NUMER;

        if(msgsnd(g_msg_id[divide_queue_number], (struct queue_message_buffer *)&msg_buf, strlen(command), 0) < 0)
        {
            if(errno==EAGAIN) 
            {
                printf(">>msg_push_thread>>[QueueDebug]queue is full, and IPC_NOWAIT was asserted\n");
            }
            else if(errno==EACCES) 
            {
                printf(">>msg_push_thread>>[QueueDebug]permission denied, no write permission\n");
            }
            else if(errno==EFAULT) 
            {
                printf(">>msg_push_thread>>[QueueDebug]msgp address isn't accessable - invalid\n");
            }
            else if(errno==EIDRM) 
            {
                printf(">>msg_push_thread>>[QueueDebug]The message queue has been removed\n");
            }
            else if(errno==EINTR) 
            {
                printf(">>msg_push_thread>>[QueueDebug]Received a signal while waiting to write\n");
            }
            else if(errno==EINVAL) 
            {
                printf(">>msg_push_thread>>[QueueDebug]Invalid message queue identifier, nonpositive message type,

                        or invalid message size\n");
            }
            else if(errno==ENOMEM) 
            {
                printf(">>msg_push_thread>>[QueueDebug]Not enough memory to copy message buffer\n");
            }
        }

        kk++;
        if(kk == rb_memory_cnt) break;

        if(kk/100*100==kk) printf(">>QUEUE[%d]>>TOT[%.7d],NOW[%.7d],DATA[%s]\n", divide_queue_number, rb_memory_cnt,

                           kk, command); 

        usleep(50);
    } 

    printf(">>QUEUE[%d]>>TOT[%.7d],NOW[%.7d],DATA[%s]\n", divide_queue_number, rb_memory_cnt, kk, command); 

    return(0);
}

char *____get_semiconlon_date_time(void)
{
    struct timeb itb;
    struct tm *lt;
    static char dt[100];

    ftime(&itb);

    lt = localtime(&itb.time);

    memset(dt , 0x00 , sizeof(dt));

    sprintf(dt, "%04d%02d%02d %02d:%02d:%02d(%03d)"
             , lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday
             , lt->tm_hour, lt->tm_min, lt->tm_sec
             , itb.millitm);
    return dt;
}

 

+ Recent posts