멀티 프로세스용 세마포어 적용 방식
증권 정보분배 체결데이터 환경에서 세마포어가 왜 필요한지를, 운영 상황을 고려해 체계적으로 정리해 드리겠습니다.
1. 문제 상황: 체결 데이터의 동시성 문제
1.1 데이터 특성
- 체결 데이터는 초당 수십~수백 건 이상 들어올 수 있음
- 동일 종목에 대한 체결 데이터가 거의 동시에 들어올 수 있음
- 데이터는 정확한 순서와 완전성이 유지되어야 함 (체결 누락, 순서 꼬임 불가)
1.2 처리 방식
- 공유 메모리(struct ELWJONG_MEMREC)에 종목별 최신 체결 정보 저장
- 여러 프로세스가 동일 공유 메모리에 접근
- 수신 프로세스: 네트워크에서 데이터 받아서 공유 메모리에 쓰기
- 배포/가공 프로세스: 공유 메모리에서 읽어서 후처리
1.3 발생 가능한 문제
세마포어 없이 동작할 경우:
- 경합(Race Condition)
- 프로세스 A가 쓰기 중인데, 프로세스 B가 읽거나 또 다른 쓰기를 시도
- 일부 필드만 수정된 중간 상태를 다른 프로세스가 읽는 경우 발생
- 데이터 손실
- 두 프로세스가 동시에 같은 메모리 영역을 덮어써서 일부 체결 데이터가 사라짐
- 순서 왜곡
- 09:00:00 체결 → 09:00:01 체결 순서여야 하는데, 동시에 쓰여 순서가 뒤바뀜
2. 세마포어가 필요한 이유
2.1 임계영역 보호
- 임계영역(Critical Section): 공유 메모리의 한 종목 데이터(예: ELWJONG_MEMREC)를 갱신하는 구간
- 세마포어를 사용하면 한 번에 하나의 프로세스만 해당 종목 데이터에 접근 가능
- 예: sem_wait() → 쓰기 → sem_post()
2.2 순차 처리 보장
- 동일 종목에 대한 체결 데이터를 순서대로 처리할 수 있음
- 이전 체결 데이터 처리가 끝나기 전까지 다음 체결 처리를 대기시킴
2.3 멀티 프로세스 안전성
- sem_t의 pshared=1 옵션을 사용하면 프로세스 간 동기화 가능
- 공유 메모리 상에 세마포어 변수를 배치하면 모든 프로세스가 동일한 동기화 객체 사용
3. 세마포어 사용의 장점
데이터 무결성 | 동시 접근 차단 → 데이터 손상 방지 | Race Condition으로 데이터 깨짐 |
처리 순서 | 한 건씩 순차 처리 | 순서 뒤바뀜 가능성 |
CPU 효율 | 커널 대기 상태로 기다림 (block) | Spin lock 사용 시 CPU 낭비 |
확장성 | 프로세스 수가 늘어나도 안전 | 프로세스 많아질수록 충돌 심화 |
4. 적용 시 고려 사항
- 세마포어의 범위
- 종목 단위 세마포어: 해당 종목 데이터만 잠금
- 전체 공유 메모리 단위 세마포어: 모든 종목 데이터 접근 차단
→ 고속 환경에서는 종목 단위가 훨씬 유리
- 초기화 시점
- 공유 메모리 최초 생성 시 sem_init(&sem, 1, 1)
- 재시작 시에는 기존 세마포어 그대로 사용
- 비정상 종료 복구
- 강제 종료 시 세마포어 상태가 잠금 상태로 남을 수 있음
→ 재기동 시 sem_trywait() 또는 sem_getvalue()로 상태 확인 및 복구
- 강제 종료 시 세마포어 상태가 잠금 상태로 남을 수 있음
5. 결론
증권 체결 데이터 시스템에서는 세마포어 없이 안정적인 데이터 처리 불가합니다.
특히 멀티 프로세스 환경에서 세마포어는
- 동일 종목 데이터에 대한 동시 접근을 방지하고,
- 처리 순서 보장하며,
- 데이터 무결성을 유지하는 핵심 역할을 합니다.
기존)
struct ELWJONG_MEMREC { Ubyte lock_flag; /* 메모리사용상태 0 : 사용가능 1 : 사용중 */ Ubyte seq[4]; /* 주소 */ Ubyte seq_no[4]; /* 신코드Seq */ Ubyte full_code[12]; /* 풀코드 */ Ubyte code[7]; /* 단축코드 */ Ubyte jmast[SZ_ELWJMAST +1]; /* ELW 종목마스터 */ Ubyte jsise[SZ_JSISE +1]; /* ELW 종목시세 */ Ubyte info[SZ_INFO +1]; /* ELW 거래원정보 */ Ubyte tuja[SZ_ELWTUJA +1]; /* ELW 투자지표 */ Ubyte tujagleeks[SZ_ELWTUJAGLEEKS +1]; /* ELW 투자지표민감도 */ }; |
void RecvElwTuja(char *rbuf, int rlen) { struct ELWTUJA elwtuja; struct ELWJONG_MEMREC *elwjongp; int lock_count = 0; memset(&resend, 0x00, SZ_ELWTUJA); memcpy(&resend, &rbuf[FRAMEID_SIZE], SZ_ELWTUJA); elwjongp = (struct ELWJONG_MEMREC *)GetElwCodeLocation((struct ELWJONG_MEMREC *)elwjong_addr, elwtuja.full_code); if(elwjongp == 0) { sprintf(Lbuf, "ELW TUJA ERROR[%.5s][%.12s]", rbuf, elwtuja.full_code); LogOut(Logf, Lbuf, 1); return(-1); } while(1) { if(elwjongp->lock_flag == '0' || lock_count == 10) { elwjongp->lock_flag = '1'; break; } usleep(10000); lock_count++; } if(strncmp(elwtuja.time, "JUNJJJ", sizeof(elwtuja.time)) == 0) { memcpy(elwtuja.time, managep->end_time, sizeof(elwtuja.time)); } memcpy(elwjongp->tuja, &elwtuja, SZ_ELWTUJA); elwjongp->lock_flag = '0'; } |
1. 구조체에 sem_t 추가
struct ELWJONG_MEMREC에 lock_sem을 넣어서 각 종목마다 별도 세마포어를 갖게 합니다.
#include <semaphore.h> struct ELWJONG_MEMREC { sem_t lock_sem; /* 멀티 프로세스 간 동기화용 세마포어 */ Ubyte seq[4]; Ubyte seq_no[4]; Ubyte full_code[12]; Ubyte code[7]; Ubyte jmast[SZ_ELWJMAST + 1]; Ubyte jsise[SZ_JSISE + 1]; Ubyte info[SZ_INFO + 1]; Ubyte tuja[SZ_ELWTUJA + 1]; Ubyte tujagleeks[SZ_ELWTUJAGLEEKS + 1]; }; |
2. 공유 메모리 초기화 시 sem_init (pshared=1)
공유 메모리 생성 직후, 프로세스 간 공유 가능하도록 pshared=1로 초기화합니다.
struct ELWJONG_MEMREC *elwjong_shm = mmap(NULL, sizeof(struct ELWJONG_MEMREC) * 종목수, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); // 최초 1회 초기화 시 for (int i = 0; i < 종목수; i++) { sem_init(&elwjong_shm[i].lock_sem, 1, 1); // pshared=1, 초기값=1 } |
주의
- pshared=1 → 프로세스 간 공유 가능
- MAP_SHARED로 매핑해야 다른 프로세스에서 접근 가능
- 이미 초기화된 세마포어를 다른 프로세스에서 또 sem_init 하면 안 됩니다.
(초기화는 한 번만)
3. RecvElwTuja 변경
lock_flag 대신 세마포어로 접근 제어
void RecvElwTuja(char *rbuf, int rlen) { struct ELWTUJA elwtuja; struct ELWJONG_MEMREC *elwjongp; memset(&elwtuja, 0x00, SZ_ELWTUJA); memcpy(&elwtuja, &rbuf[FRAMEID_SIZE], SZ_ELWTUJA); elwjongp = (struct ELWJONG_MEMREC *) GetElwCodeLocation((struct ELWJONG_MEMREC *)elwjong_addr, elwtuja.full_code); if (elwjongp == NULL) { sprintf(Lbuf, "ELW TUJA ERROR[%.5s][%.12s]", rbuf, elwtuja.full_code); LogOut(Logf, Lbuf, 1); return; } // 세마포어 잠금 (다른 프로세스가 해당 종목을 처리 중이면 대기) sem_wait(&elwjongp->lock_sem); if (strncmp(elwtuja.time, "JUNJJJ", sizeof(elwtuja.time)) == 0) { memcpy(elwtuja.time, managep->end_time, sizeof(elwtuja.time)); } memcpy(elwjongp->tuja, &elwtuja, SZ_ELWTUJA); // 세마포어 해제 sem_post(&elwjongp->lock_sem); } |
팁)
24시간 돌아가는 프로세스이므로, 새벽에 강제로 종료와 재시작이 있을시에,
그전에 공유메모리와 세마포어를 동시에 제거한다.
즉
1. 프로세스 종료
2. 공유메모리 삭제
3. 세마포어 삭제
4. 프로세스 재시작(프로세스 시작과 동시에 공유메모리를 새로 만들고, 세마포어도 새로 만든다.)
'증권정보' 카테고리의 다른 글
증권TCP.DATA 수신(In Windows,Linux) (0) | 2024.12.13 |
---|---|
원장관리시스템 (0) | 2021.05.28 |
정보분배 데이타 수신 (0) | 2021.04.14 |
증권정보홈페이지를 만들어보자(3) (0) | 2021.04.01 |
증권정보홈페이지를 만들어보자(2) (0) | 2021.03.27 |