http://www.mysqlkorea.co.kr
한글매뉴얼 5.0 , 한글매뉴얼 5.1 , MySQL 5.1 HA , 사용자매뉴얼
Advanced Knowle...  
엔지니어 노트  
블로그존  
글로벌 MySQL  
MySQL 5.5 GA  
MySQL 5.6 Developer  
최신글
mysql 리플리케이…
스레드 내에서 LO…
Mysql Workbench…
한글 우선 정렬 ,…
ODBC 5.3 64비트 …
 
바이너리 로그 API와 리플리케이션 리스너
글쓴이 : taeguni   날짜 : 11-10-11 14:54   조회수 : 2598

바이너리 로그 API와 리플리케이션 리스너

이 블로그는 MySQL 리플리케이션 리스너 패키지에 있는 바이너리 로그 API에 대한 것입니다. 바이너리 로그 API와 리플리케이션 리스너가 MySQL 데이터베이스의 직관적인 검색과 무슨 상관이 있는지 의아합니까? Change-Data-Capture(CDC)라고 불리는 기술을 이용하여, 어떤 외부인덱스에 업데이트를 할 때, 당신은 바이너리 로그를 사용할 수 있기 때문입니다.
이 블로그를 읽고 있는 독자가 절대로 초보자가 아니라고 가정합니다. MySQL 리플리케이션에 대하여 이해하고, 약간의 C++에 대한 지식이 필요합니다.
다음 동영상을 보면, 바이너리 로그 API의 반복적인 업데이트를 사용하여 MySQL의 테이블에 테스트 필드를 찾을 수 있는 인기있는 엔터프라이즈 검색 프로그램인 'SOLR'로 만들어진 간단한 응용프로그램의 개념을 보여줄 것입니다.
동영상의 예제는 행기반의 복제(리플리케이션) 이벤트를 사용합니다. 행기반 복제는 일반적으로 특별한 용도의 데이터 저장과 함께 MySQL을 통합하는것을 쉽게 만들어줍니다. 나는 바이너리 로그의 이벤트가 발생할 때, 캡쳐하고 분석하는 것이 쉽게 바이너리 로그 API로 접목될 수 있다고 생각했습니다. 아직 더 지원해야하고, 복제 및 데이터 통합의 사용 사례는 많지만 아마도 아직 이것이 바른 뱡향을 찾아가기에는 이른 단계일 수 있습니다.

전제조건

데모를 복사하기 전에, 시스템 구성을 알아 봅시다.
  • 최신 리플리케이션 리스너를 복사합니다. MySQL Labs 또는 복제 소스로 부터 바이너리 파일을 다운로드 합니다.
  • 최신 g++ (4.4 /4.5가 좋습니다.)와 Boost 1.45 또는 그 이상 버전
  • OpenSSL
  • Cmake
  • Netbeans (선택사항)
  • Clucence
  • SOLR
  • Linux (선택사항)
  • MySQL 5.6
  • 대담한 해커의 마음가짐!

기본 예제

binlog API를 사용하려면 헤더파일에 "binlog_api.h"와, "libreplication" 라이브러리의 링크를 포함해야 합니다.
API는 네트워크 또는 파일 시스템 통해서 바이너리 로그에 연결합니다. CDC의 구현을 위해서, 네트워크를 통해서 서버에 직접연결하면, 정적 파일을 읽는 것보다 훨씬 더 재미있는 것이 되도록 시작할 수 있습니다.
아래 예제는 단순히 지정된 위치에서 바이너리 로그에 연결하는 프로그램 장성 방법을 보여줍니다.
/* example1.cpp */
#include 
#include "binlog_api.h"
int main(int argc, char** argv){
  if (argc != 2)  {
    fprintf(stderr,"Usage:\n\treplaybinlog URL\n\nExample:\n\treplaybinlog mysql://root:mypasswd@127.0.0.1:3306\n\n");
    return (EXIT_FAILURE);
  }
  Binary_log binlog(system::create_transport(argv[1]));
  if (binlog.connect())
  {
    fprintf(stderr,"Can't connect to the master.\n");
    return (EXIT_FAILURE);
  }
  if (binlog.set_position(4) != ERR_OK)
  {
    fprintf(stderr,"Can't reposition the binary log reader.\n");
    return (EXIT_FAILURE);
  }
return EXIT_SUCCESS;
}
컴파일 코드 사용
% g++ example1.cpp -I/path/to/mysql-replication-listener/include/ -L/path/to/mysql-replication-listener/include/ -lreplication -lboost_system -o example1 
create_transport() helper 함수는 URL을 인수로 하여, Binary_log_driver 객체를 반환합니다. 이것은 나중에 바이너리 로그에 연결할 수 있는 Binary_log 객체에서 사용됩니다. 온라인 네트워크 연결을 위해서 "mysql://username:pass@localhost:3365"과 같은 것을 입력하고, 정적 파일을 열람하기 위해서 "file:///tmp/binlog.000001"과 같은 형식을 입력합니다.
다음을 인식하는 두 가지 중요한 개념이 있습니다.
  • 이벤트 루프.
  • 콘텐츠 핸들러 파이프라인.
이벤트 루프는 단순히 일반적인 GUI 이벤트 루프와 같이 모든 프로그래머에게 상당히 표준화 되어야합니다. 새로운 이벤트가 게시되어 있는지 확인하고 그것을 실행하여 이벤트 대기열(큐)에서 밀어냅니다.
만약에 오른쪽으로 밀어내는 함수가 있고 그 함수이 실행되었을때, 끌어내기(Pulling)는 항상 편리한 개발 패턴이나 더 좋은 방법이 될 수 없습니다. 이벤트 중심의 (또는 비동기식) API나 콘튼츠 핸드러 파이프라인의 라이브러리 안에서 실현되는 프로그램 방법론은 종종 언급됩니다.
각각의 콘텐츠 핸들러는 스택에 밀어 넣고 하나의 핸들러가 재작성 하는 것을 가능하게 하거나, 또는 스택에서 핸들러를 처리 하기 위해 행동하는 새로운 이벤트를 만듭니다.
아래의 동영상 예제에서 쿼리 분석을 위한 콘텐츠 핸들러 이벤트가 스택에 밀어 넣습니다. 구문 기반 복제에 대한 쿼리이벤트가 사용되고, 서버와 다른것 사이에서 실행된 시제구문이 페이로드에 포함되어 있습니다.
다음 예제에서는 SOLR 엔터프라이즈 검색 엔진에 대해서 알아보겠습니다. 여기서는 의도적으로 세세한 내용을 제외하겠지만, 주 목적은 API의 일반적인 개념과 기능을 신속하게 제공하는 것입니다.

고급예제

다음 예제에서는 SOLR 엔터프라이즈 검색 엔진에 대해서 알아보겠습니다.
행 기반 복제에 대한 옵션과 함께 MySQL 서버가 시작되고, clucene 라이브러리는 MySQL 서버에서 실시간으로 데이터를 루씬 인덱스에 업데이트 하기위해서 사용됩니다.
나는 clucene 사용방법에 대한 자세한 내용을 포함하지는 않지만, 소스에 관심이 있다면, 저장소에 있는 예제 파일의 일부를 사용할 수 있습니다.
아래의 프로그램 구조는 앞에서 설명한 패턴을 따릅니다: binlog 객체가 있고, 이벤트 루프가 있고, 어떤 선태된 이벤트에 대한 트리거를 가진 콘텐츠 핸들러가 있습니다.
#include <stdlib.h>
#include <boost/foreach.hpp>
#include "binlog_api.h"
#include "table_update.h"
#include "table_delete.h"
#include "table_insert.h"
#include "table_index.h"
using mysql::system::create_transport;
using mysql::Binary_log;
std::string cl_index_file;
class Incident_handler : public mysql::Content_handler
{
public:
 Incident_handler() : mysql::Content_handler() {}
 Binary_log_event *process_event(mysql::Incident_event *incident)
 {
   std::cout << "Event type: "
             << mysql::system::get_event_type_str(incident->get_event_type())
             << " length: " << incident->header()->event_length
             << " next pos: " << incident->header()->next_position
             << std::endl;
   std::cout << "type= "
             << (unsigned)incident->type
             << " message= "
             << incident->message
             <<  std::endl
             <<  std::endl;
   /* Consume the event */
   delete incident;
   return 0;
 }
};
class Applier : public mysql::Content_handler
{
public:
 Applier(Table_index *index)
 {
   m_table_index= index;
 }
 mysql::Binary_log_event *process_event(mysql::Row_event *rev)
 {
   boost::uint64_t table_id= rev->table_id;
   Int2event_map::iterator ti_it= m_table_index->find(table_id);
   if (ti_it == m_table_index->end ())
   {
     std::cout << "Table id "
               << table_id
               << " was not registered by any preceding table map event."
               << std::endl;
     return rev;
   }
   /*
    Each row event contains multiple rows and fields. The Row_iterator
    allows us to iterate one row at a time.
   */
   mysql::Row_event_set rows(rev, ti_it->second);
   /*
    Create a fuly qualified table name
   */
   std::ostringstream os;
   os << ti_it->second->db_name << '.' << ti_it->second->table_name;
   mysql::Row_event_set::iterator it= rows.begin();
   do {
     mysql::Row_of_fields fields= *it;
     if (rev->get_event_type() == mysql::WRITE_ROWS_EVENT)
       table_insert(os.str(),fields);
     if (rev->get_event_type() == mysql::UPDATE_ROWS_EVENT)
     {
       ++it;
       mysql::Row_of_fields fields2= *it;
       table_update(os.str(),fields,fields2);
     }
     if (rev->get_event_type() == mysql::DELETE_ROWS_EVENT)
       table_delete(os.str(),fields);
     } while (++it != rows.end());
     /* Consume the event */
     delete rev;
     return 0;
  }
private:
  Table_index *m_table_index;
};
int main(int argc, char** argv)
{
  if (argc != 3)
  {
    fprintf(stderr,"Usage:\n\nmysql2lucene URL\n\nExample:\n\nmysql2lucene mysql://root@127.0.0.1:3306 myindexfile\n\n");
    return (EXIT_FAILURE);
  }
  Binary_log binlog(create_transport(argv[1]));

  cl_index_file.append (argv[2]);
  /*
    Attach a custom event content handlers
  */
  Incident_handler incident_hndlr;
  Table_index table_event_hdlr;
  Applier replay_hndlr(&table_event_hdlr);
  binlog.content_handler_pipeline()->push_back(&table_event_hdlr);
  binlog.content_handler_pipeline()->push_back(&incident_hndlr);
  binlog.content_handler_pipeline()->push_back(&replay_hndlr);
  if (binlog.connect())
  {
    fprintf(stderr,"Can't connect to the master.\n");
    return (EXIT_FAILURE);
  }
  binlog.set_position("searchbin.000001", 4);
  bool quit= false;
  while(!quit)
  {
    /*
     Pull events from the master. This is the heart beat of the event listener.
    */
    Binary_log_event  *event;
    binlog.wait_for_next_event(&event);
    /*
     Print the event
    */
    std::cout << "Event type: "
              << mysql::system::get_event_type_str(event->get_event_type())
              << " length: " << event->header()->event_length
              << " next pos: " << event->header()->next_position
              << std::endl;
    /*
     Perform a special action based on event type
    */
    switch(event->header()->type_code)
    {
    case mysql::QUERY_EVENT:
      {
        const mysql::Query_event *qev= static_cast(event);
        std::cout << "query= "
                  << qev->query
                  << " db= "
                  << qev->db_name
                  <<  std::endl
                  <<  std::endl;
        if (qev->query.find("DROP TABLE REPLICATION_LISTENER") != std::string::npos)
        {
          quit= true;
        }
      }
      break;
    case mysql::ROTATE_EVENT:
      {
        mysql::Rotate_event *rot= static_cast(event);
        std::cout << "filename= "
                  << rot->binlog_file.c_str()
                  << " pos= "
                  << rot->binlog_pos
                  << std::endl
                  << std::endl;
      }
      break;
    } // end switch
    delete event;
  } // end loop
  return (EXIT_SUCCESS);
}
이 코드를 유용하게 사용할려면, SOLR 웹 서비스와 MySQL 5.6서버와 데이터베이스에 입력을 공급하는 몇몇 MySQL client을 실행해야 합니다. 또한 lucene 인덱스를 저장할 위치를 지정하야 합니다.
행기반 리플리케이션(--binlog_format=row)과 "searchbin"과 같은 바이너리 로그의 기본명(--log_bin=searchbin)을 사용하여 MySQL 서버를 실행할 수 있는지 확인해야합니다. MySQL 5.6 소스코드를 다운로드 하였을 경우에는 mysql-test 디렉토리에 있는 테스트 도구인 'MTR'을 사용하여 쉽게 알 수 있습니다. 단순한 유형:
% ./mtr --start alias --mysqld=--log_bin=searchbin --mysqld=--binary_log=row
콜솔 실행중에서 SOLR 플랫폼을 얻을려면, SOLR 매뉴얼이나 소스패키지의 예제디렉토리에서 이것을 실행합니다.
% java -jar start.jar
또한 clucene 인덱스의 위치를 지정하는 것을 있지 말아야 합니다.
Applier 클래스는 몇가지 추가 설명이 필요합니다. 행 기반으로 이벤트를 구문 분석하는 큭히 이 부분이다. 그 아래 더 많은 것들:

열 기반 이벤트 분석

리플리케이션 스트림으로 부터 데이터를 받아올 때, 데이블 이름, 플드 형식 및 유용한 형테의 실제 데이터와 같은 것들은 어떻게 축출합니까?
바이너리 로그 프로토콜에 대한 설명은 MySQL forge에 위치하고 있습니다. 세부사항에 대하여 자세히 알고 싶다면, MySQL forge는 정보의 원천일 것입니다.
여기서는 세부사항을 생략하고, MySQL의 세종류의 행기반 이벤트 알아 볼 것입니다.
  • UPDATE_ROWS_EVENT
  • DELETE_ROWS_EVENT
  • INSERT_ROWS_EVENT
각각의 레이아웃은 매우 유사합니다. 우선 모든 이벤트에서 동일한 표준 헤더가 있습니다.
class Log_event_header
{
public:
uint8_t reserved;
uint32_t timestamp;
uint8_t type_code;
uint32_t server_id;
uint32_t event_length;
uint32_t next_position;
uint16_t flags;
};
행의 특정한 데이터는 다음과 같습니다.
class Row_event: public Binary_log_event
{
public:
uint64_t table_id;
uint16_t flags;
uint64_t columns_len;
uint32_t null_bits_len;
vector used_columns;
vector row;
[..]
};
알고있듯이, 'row' 백터로 부터 실제 데이터를 가지고 오는 것은 그리 평범하지 않습니다. 우리의 프로그램이 사용할 수 있어서 무언가로 값을 축출할 수 있는 파서와 같은 것이 필요합니다.
리플리케이션에 사용되는 데이터모델은 가벼운 값 객체 주위에 구축되고, 어떤 값은 Row_of_fields로 벡터 컬렉션에 저장됩니다. 각각의 Row_of_fields는 Row_event_set의 일부입니다.
다시한번 처음부터 살펴보겠습니다.
각각의 행은 Row_event_set이라고 불리는 여러 행과 필드로 구성되어 있습니다. Row_iterator는 우리에게 한번에 하나씩 각각의 행을 반복하는 것을 가능하게 합니다.
if (row_event->header()->type_code= UPDATE_ROWS_EVENT)
{
[..]
  Row_event_set rows(row_event, table_map_event);
  Row_event_set::iterator it= rows.begin();
잠깐! table_map_event은 무엇입니까? 그것은, 각각의 트랜젝션과 모든 Row event 전에 서버에 사용되는 테이블에 대한 정보를 선언한 descript-xor events를 전달합니다. 이제 테이블 대신 테이블 이름, 데이터베이스, 모든 단일 row event에 있는 전체 필드가 인덱스로 참조될 수 있기때문에 좋습니다. 물론 우리는 먼저 인덱스를 구축해야 합니다. 물론 먼저 인덱스를 구축해야합니다. 오래 걸리지 않을테니 잠시 기달려 주세요.

table map으로 부터의 인덱스 구축

먼저 map content 콘텐츠 핸들러를 등록하고, 그것을 스텍에 저장합니다. table map 이벤트에 table_id를 표준 STL 지도로 사용합니다.
class Table_map_handler : public Content_handler
{
  public:
  Binary_log_event *process_event(Table_map_event *event)
  {
    m_table_map.insert(Event_index_element(event->table_id,tm));
    /*
      Eat this event instead of passing it
      on to the next handler.
    */
    return NULL;
  }
  std::map m_table_map;
}
[..]
/* create an instance of the handler */
Table_map_handler table_index;
/* push content handler to stack */
binlog.content_handler_pipeline()->push_back(&table_index);
[..]
각각의 row event는 다음 변경 테이블에 대한 인덱스를 포함합니다:
[..]
/* get table id from the recently captured row event */
boost::uint64_t table_id= row_event->table_id;
Table_map_event *table_map_event=
table_index.m_table_map[table_id];
[..]
이제 행 반복을 하기 위해 필요한 table map 이벤트가 생겼습니다!
basic_transaction_parser.cpp 파일은 어떤 트랜젝션 기간 동안의 집계 table map 콘텐츠 핸들러가 포함되어 있습니다. 더 유용한 테이블인덱서를 구축하기 위해서 이와 같은 예제를 사용합니다.

반복의 마지막 행

일단 모든 행을 반복하는 것이 가능한 Row_iterator를 만들었습니다.
Row_event_set rows(row_event, table_map_event);
do {
  Row_of_fields fields= *it;
  if (event->get_event_type() == UPDATE_ROWS_EVENT)
  {
    ++it;
    Row_of_fields fields2= *it;
    table_update(os.str(),fields,fields2);
  }
[..]
} while (++it != rows.end());
Row_of_fields 클래스는 우리에게 한번에 하나의 필드를 반복하는 것을 가능하게 합니다.
void table_update (const string& table_name,
Row_of_fields &old_fields,
Row_of_fields &new_fields
{
  int field_id= 0, string str;
  Row_of_fields::iterator
  field_it= new_fields.begin();
  Converter converter;
  do {
    converter.to(str, *field_it);
    cout << field_id << "= " << str;"
    ++field_it; ++field_id;
  } while(field_it != new_fields.end());
}
Row_of_fields은 어떤 필요로 하는 값에 따라서 다른 형식으로 변환 할 수 있는 값 객체를 포함하고 있습니다. 위의 예제에서는 필드 값을 텍스트로 변환하여 화면으로 인쇄 할 수 있는 표준 변환 객체가 사용됩니다.
우리의 데이터 모델은 아래서 위로나 위에서 아래로 읽어가는 것이다.

관련 링크

이전글
다음글 Multi-threaded Slave에 대한 업데이트 
MySQL Korea 사이트의 컨텐츠 소유권은 (주)상상이비즈에 있으므로 무단전재를 금합니다.
ⓒ 2010-2011 ssebiz All Rights Reserved.