Elasticsearch - Transformers 임베딩 입력 저장 (Local Mode)

Page content

개요

  • 엘라스틱서치에서 밀집 벡터 위한 매핑 구성 방법 살펴보기
  • 밀집 벡터가 저장될 embedding 이라는 필드 정의
  • Python 코드로 엘라스틱서치 코드 구현

코드 전체 시나리오

  • Elasticsearch에 연결 및 인덱스 초기화
  • dense_vector 타입으로 매핑 정의
  • 문서 배열 정의
  • BERT 모델을 이용해 각 문서를 벡터 임베딩
  • 벡터 포함 문서를 Elasticsearch에 색인

Elasticsearch 클라이언트 연결

  • 로컬에서 실행 중인 Elasticsearch 서버에 접속
  • basic_auth: 로그인 자격 (ID: elastic, PW: 123456)
  • verify_certs=False: 인증서 검증 생략 (로컬에서 SSL 없이 사용 시 편의용)
es_admin = Elasticsearch("http://localhost:9200", 
                         basic_auth=("elastic", "123456"), 
                         verify_certs=False)

Mapping 정의 및 인덱스 생성

  • dense_vector: 벡터 검색용 필드 (벡터 유사도 기반 검색 가능)
  • dims: BERT의 출력 벡터는 기본적으로 768차원이므로 그에 맞춤
mapping = {
    "properties": {
        "embedding": {
            "type": "dense_vector",
            "dims": 768  # BERT의 출력 벡터 차원 수
        }
    }
}

기존 인덱스 삭제 후 새로 생성

  • 기존에 있던 chapter-2 인덱스를 삭제 (중복 방지)
  • 새로운 인덱스를 위에서 정의한 벡터 매핑으로 생성
try:
    es_admin.indices.delete(index="chapter-2")
    print("기존 chapter-2 인덱스를 삭제했습니다.")
except:
    print("chapter-2 인덱스가 존재하지 않습니다.")

es_admin.indices.create(index="chapter-2", body={'mappings': mapping})
print("새로운 chapter-2 인덱스를 생성했습니다.")

색인할 문서 데이터 구성

  • titletext로 구성된 단순 문서 리스트
  • text는 BERT 임베딩의 입력값이 된다
docs = [
    {"title": "Document 1", "text": "This is the first document"},
    {"title": "Document 2", "text": "This is the second document"},
    {"title": "Document 3", "text": "This is the third document"}
]

BERT 모델과 토크나이저 초기화

  • bert-base-uncased: Hugging Face에서 사전 학습된 BERT 모델
  • AutoTokenizer: 입력 텍스트를 BERT가 이해할 수 있는 토큰으로 변환
  • AutoModel: 텍스트에 대한 BERT 임베딩 추출
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModel.from_pretrained("bert-base-uncased")

BERT 임베딩 생성

  • tokenizer(...): 텍스트를 토큰화하고 PyTorch 텐서로 변환
  • model(**inputs): BERT 실행 → 각 토큰에 대한 임베딩 벡터 반환
  • last_hidden_state.mean(dim=1): 문장의 전체 임베딩을 mean pooling으로 하나의 벡터로 압축 (1×768 벡터)
  • squeeze(0).numpy(): 불필요한 batch 차원 제거 후 NumPy로 변환
  • tolist(): Elasticsearch에 저장 가능하게 리스트 형태로 변환
for doc in docs:
    text = doc["text"]
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs).last_hidden_state.mean(dim=1).squeeze(0).numpy() 
        doc["embedding"] = outputs.tolist()

Elasticsearch에 색인

  • 각 문서를 chapter-2 인덱스에 색인
  • Elasticsearch는 embedding 필드를 dense_vector로 저장하며, 향후 벡터 검색에도 사용 가능
for doc in docs:
    es_admin.index(index="chapter-2", body=doc)

확인

  • Kibana | Management | Dev Tools에서 색인된 문서 조회
GET chapter-2/_search

image.png

사용자 설정 (Elasticsearch 9.0.3 버전)

  • 코드 변경
  • 해당 부분은 각자 확인 필요
# Elasticsearch 클라이언트 라이브러리를 가져옵니다
from elasticsearch import Elasticsearch

# Elasticsearch에 관리자 계정(elastic)으로 연결합니다
# localhost:9200은 Elasticsearch의 기본 주소입니다
# basic_auth를 사용하여 사용자명과 비밀번호로 인증합니다
es_admin = Elasticsearch("http://localhost:9200", basic_auth=("elastic", "123456"))

def check_user_permissions():
    """
    username 사용자의 현재 권한과 역할을 확인하는 함수
    
    Returns:
        list: username 사용자가 가진 역할들의 리스트
    """
    try:
        # Security API를 사용하여 username 사용자의 정보를 조회합니다
        response = es_admin.security.get_user(username="username")
        user_info = response.get('username', {})  # 응답에서 username 사용자 정보 추출
        roles = user_info.get('roles', [])    # 사용자가 가진 역할들 추출
        
        # 사용자 정보를 콘솔에 출력합니다
        print("username 사용자 정보:")
        print(f"   - 역할: {roles}")
        print(f"   - 전체 정보: {user_info}")
        
        return roles  # 역할 리스트를 반환합니다
        
    except Exception as e:
        # 사용자 정보 조회 중 오류가 발생한 경우 예외 처리
        print(f"사용자 정보 조회 실패: {e}")
        return []  # 빈 리스트 반환

def update_username_permissions():
    """
    username 사용자에게 모든 인덱스에 접근할 수 있는 권한을 부여하는 함수
    superuser 역할을 추가하여 Elasticsearch의 모든 기능에 접근 가능하게 합니다
    """
    try:
        # Security API를 사용하여 username 사용자의 정보를 업데이트합니다
        es_admin.security.put_user(
            username="username",  # 업데이트할 사용자명
            body={
                "password": "password",  # 기존 비밀번호 유지
                "roles": [
                    "kibana_admin",           # Kibana 관리자 권한 (대시보드, 시각화 관리)
                    "kibana_system",          # Kibana 시스템 권한 (시스템 설정 접근)
                    "quotes_writer",          # quotes 인덱스에 대한 읽기/쓰기 권한
                    "superuser"               # 모든 권한 (모든 인덱스 접근 가능, 임시 권한)
                ],
                "full_name": "Username"  # 사용자 전체 이름
            }
        )
        # 권한 업데이트 성공 메시지 출력
        print("username 사용자 권한이 업데이트되었습니다.")
        print("   - superuser 역할이 추가되어 모든 인덱스에 접근 가능합니다.")
        
    except Exception as e:
        # 권한 업데이트 중 오류가 발생한 경우 예외 처리
        print(f"사용자 권한 업데이트 실패: {e}")

def list_all_indices():
    """
    Elasticsearch 클러스터에 존재하는 모든 인덱스의 목록을 조회하는 함수
    각 인덱스의 문서 수와 저장 크기 정보도 함께 표시합니다
    """
    try:
        # CAT API를 사용하여 모든 인덱스 정보를 JSON 형태로 조회합니다
        response = es_admin.cat.indices(format="json")
        
        # 인덱스 목록 출력 시작
        print("\n 현재 Elasticsearch의 모든 인덱스:")
        print("-" * 50)
        
        # 각 인덱스의 정보를 순회하면서 출력합니다
        for index in response:
            index_name = index.get('index', '')      # 인덱스 이름
            docs_count = index.get('docs.count', '0') # 문서 수
            store_size = index.get('store.size', '0b') # 저장 크기
            
            # 인덱스 정보를 포맷팅하여 출력
            print(f" {index_name}")
            print(f"   - 문서 수: {docs_count}")
            print(f"   - 크기: {store_size}")
            print()
            
    except Exception as e:
        # 인덱스 목록 조회 중 오류가 발생한 경우 예외 처리
        print(f" 인덱스 목록 조회 실패: {e}")

def main():
    """
    메인 실행 함수
    username 사용자의 권한을 확인하고, 필요한 권한을 부여한 후,
    Elasticsearch의 모든 인덱스 목록을 확인하는 전체 프로세스를 실행합니다
    """
    # 프로그램 시작 메시지 출력
    print("🔍 username 사용자 권한 확인 및 수정")
    print("=" * 50)
    
    # Step 1: 현재 username 사용자의 권한 상태를 확인합니다
    print("\n 현재 username 사용자 권한 확인...")
    current_roles = check_user_permissions()
    
    # Step 2: username 사용자에게 필요한 모든 권한을 부여합니다
    print("\n username 사용자 권한 업데이트...")
    update_username_permissions()
    
    # Step 3: 권한 업데이트 후 변경된 권한을 다시 확인합니다
    print("\n 업데이트된 권한 확인...")
    updated_roles = check_user_permissions()
    
    # Step 4: Elasticsearch에 존재하는 모든 인덱스 목록을 확인합니다
    print("\n 모든 인덱스 목록 확인...")
    list_all_indices()
    
    # 프로그램 완료 메시지와 다음 단계 안내
    print("\n🎉 권한 확인 및 수정 완료!")
    print("\n📝 다음 단계:")
    print("1. Kibana에서 username/password로 다시 로그인해보세요.")
    print("2. Stack Management > Index Patterns에서 인덱스를 확인해보세요.")
    print("3. 모든 인덱스가 보이지 않으면 브라우저 캐시를 삭제해보세요.")

# 스크립트가 직접 실행될 때만 main() 함수를 호출합니다
if __name__ == "__main__":
    main()