R 에서 IRIS DB에 저장된 데이터 가져오기

목차

  • 설명

  • RJDBC 를 이용하여 IRIS DB 접속하기

  • IRIS Global 테이블 생성하기

  • IRIS Global 테이블에 데이터를 insert / select 하기

  • IRIS Local 테이블 생성하기

  • IRIS Local 테이블에 데이터를 insert / select 하기


설명

R-Studio 에서 RJDBC 패키지를 이용하여 IRIS DB 에 테이블을 create 하고, 데이터를 insert, select 하는 예제입니다.

IRIS 의 global 테이블과 local 테이블을 특성에 맞게 각각 생성해 보고 데이터를 insert / select 해 봅니다.



RJDBC 를 이용하여 IRIS DB 접속하기

  • RJDBC 패키지를 이용합니다.
    • id / passwd = id / password

    • iris DB 접속 정보 : xxx.xxx.xxx.xxx/xxxx

    • JDBC path 지정

library(RJDBC)

.jinit()
.jaddClassPath("/usr/share/R/library/jettison-1.3.2.jar")
.jaddClassPath("/usr/share/R/library/log4j-1.2.17.jar")
.jaddClassPath("/docker/tools/Spark-on-IRIS/lib/java/mobigen-iris-jdbc-2.1.0.1.jar")

  • java class path 확인

print(.jclassPath())

[1] "/usr/lib64/R/library/rJava/java"
[2] "/docker/tools/Spark-on-IRIS/lib/java/mobigen-iris-jdbc-2.1.0.1.jar"
[3] "/usr/lib64/R/library/RJDBC/java/RJDBC.jar"

  • IRIS DB 접속 connector object( conn ) 생성하는 코드입니다.
    • myiris 계정 예제

    • jdbc driver 버전 : mobigen-iris-jdbc-2.1.0.1.jar

drv <- RJDBC::JDBC("com.mobigen.iris.jdbc.IRISDriver",
                   "/docker/tools/Spark-on-IRIS/lib/java/mobigen-iris-jdbc-2.1.0.1.jar",
                    identifier.quote= "`")
conn <- RJDBC::dbConnect(drv, "jdbc:iris://xxx.xxx.xxx.xxx:xxx/myiris", "id", "password")

IRIS Global 테이블 생성하기

  • 붓꽃(iris) 데이터를 기준으로 한 Global 테이블을 생성해 봅니다.

  • Global 테이블 생성 SQL

sql_create <-  "CREATE TABLE IRIS_GLOBAL_TEST_1 (
                irisid INTEGER,
                sepal_length REAL,
                sepal_width REAL,
                petal_length REAL,
                petal_width REAL,
                species TEXT)
                datascope       GLOBAL
                ramexpire       0
                diskexpire      0
                partitionkey    None
                partitiondate   None
                partitionrange  0 ; "

  • dbSendUpdate 로 sql 문을 실행합니다.
    • return 되는 값은 없으니 주의하세요!!!!

# 테이블이 있다면 DROP 하고 생성(drop 문은 항상 주의해 주세요!!) 합니다.
dbSendUpdate(conn, 'DROP TABLE IF EXISTS MYIRIS.IRIS_GLOBAL_TEST_1 ;')


# CREATE GLOBAL TABLE : dbSendUpdate 를 이용하며, SQL 문 끝에 ; 를 꼭 넣어야 합니다.
# return값은 없습니다.
dbSendUpdate(conn, sql_create)

  • 테이블이 생성되었는 지 확인합니다.
    • IRIS DB 명령어인 “table list” 사용합니다.

table_list <- dbGetQuery(conn, "table list")
print(table_list)

IRIS Global 테이블에 Insert / select

  • Insert into Global table
    • INSERT INTO 테이블명 VALUES (,,,,) 예제

    • dbSendUpdate

ins_sql <- sprintf( "INSERT INTO IRIS_GLOBAL_TEST_1 (irisid, sepal_length,sepal_width, petal_length,petal_width,  species) VALUES (1, 1.0, 2.0, 3.0, 4.0, 'test') ; ")

dbSendUpdate(conn, ins_sql)

  • Select from Global table
    • dbGetQuery

# SELECT from GLOBAL TABLE
result2 <- dbGetQuery(conn, 'select * from IRIS_GLOBAL_TEST_1')
print(result2)


IRIS Local 테이블 생성하기

  • SYSLOG 데이터를 기준으로 Local 테이블을 생성합니다.
    • partiton range = 60min ( 60분으로 파티션 범위를 정합니다.)

    • partition 구분 컬럼 = DATETIME ( partition 구분기준 컬럼이름. 반드시 해당 시간필드를 YYYYMMDDHHMMSS 14자리 text 형식으로 변환해 놓아야 합니다)

    • partition키 = HOST ( partition key)

cr_table_sql <- 'CREATE TABLE IRIS_LOCAL_TEST_2 (
   DATETIME     TEXT,
   HOST         TEXT,
   FACILITY     TEXT,
   PRIORITY     TEXT,
   LEVEL        TEXT,
   LEVEL_INT    TEXT,
   TAG          TEXT,
   PROGRAM      TEXT )
datascope       LOCAL
ramexpire       60
diskexpire      2102400
partitionkey    HOST
partitiondate   DATETIME
partitionrange  60
; '

dbSendUpdate(conn, cr_table_sql)

IRIS Local 테이블로부터 데이터 Select

  • IRIS DB 에 있는 Local 테이블로부터 데이터를 select 합니다.

  • 가져온 데이터를 R dataframe 에 저장한 후 1000 개 단위로 새로 만든 테이블에 Insert 를 실행합니다.
    • dbSendQuery 로 데이터 select

    • dbFetch 를 반복 해서 데이터를 가져 와서 dataframe 에 저장합니다.

    • dbClearResult

# SELECT from LOCAL TABLE EVA.SYSLOG
# 2019-11-12 15:00:00 ~ 16:59:59 ( 20191112150000, 20191112160000 2개의 파티션에서 데이터를 가지고 옵니다)
# count = 89887

select_sql <- "/*+ LOCATION ( PARTITION >= '20191112150000' AND PARTITION <= '20191112160000' ) */
SELECT
        DATETIME, HOST, FACILITY, PRIORITY, LEVEL, LEVEL_INT, TAG, PROGRAM
FROM
        EVA.SYSLOG
; "

my_dataframe <- data.frame()
rs <- dbSendQuery(conn, select_sql)  # dbSendQuery !!!! ( dbGetQuery 아님 )

nn = 1000   #  1000 개 단위로 fetch
tmp_df <- data.frame()

tmp_df <- dbFetch(rs, n=nn)
my_dataframe <- tmp_df

while ( nrow(tmp_df) ==  nn ) {
  tmp_df <- dbFetch(rs, n=nn)
  my_dataframe <- rbind(my_dataframe, tmp_df)
}
dbClearResult(rs)
# TRUE
print(nrow(tmp_df))   # 마지막 fetch 레코드 수 < nn
# 887


# select 한 전체 레코드 수
print(nrow(my_dataframe))
# 89887

  • R DataFrame 을 IRIS DB 에 Insert 하기
    • my_dataframe( 총 89887 건 ) : 1000 건을 한번에 insert 하는 예제입니다.

    • batch insert SQL 문을 만드는 function 생성 : insert_batch_sql_f

    • dbGetQuery 로 insert한 데이터를 확인합니다.

library(dplyr)

table_name <- 'MYIRIS.IRIS_LOCAL_TEST_2'

insert_batch_sql_f <- function(conn, table, df) {
  batch <- apply(df, 1, FUN = function(x) paste0("'",trimws(x),"'",collapse = ",")) %>% paste0("(",.,")",collapse = ", ")

  colums <-  paste(unlist(colnames(df)), collapse=',')
  query <- paste("INSERT INTO ", table, "(", colums, ") VALUES ", batch, ';')

  dbSendUpdate(conn, query)
}

insert_batch_sql_f(conn, table_name, my_dataframe[1:1000, ])
# 1건씩 인서트는 인서트 sql 을 만들어서 dbSendUpdate(conn, query)

my_count <- dbGetQuery(conn,"select count(*) from IRIS_LOCAL_TEST_2")
print(my_count)