Elasticsearch - Transformers 임베딩 입력 저장 (Local Mode)
Page content
개요
- 엘라스틱서치에서 밀집 벡터 위한 매핑 구성 방법 살펴보기
- 밀집 벡터가 저장될 embedding 이라는 필드 정의
- Python 코드로 엘라스틱서치 코드 구현
코드 전체 시나리오
Elasticsearch
에 연결 및 인덱스 초기화dense_vector
타입으로 매핑 정의- 문서 배열 정의
BERT
모델을 이용해 각 문서를 벡터 임베딩- 벡터 포함 문서를 Elasticsearch에 색인
Elasticsearch 클라이언트 연결
- 로컬에서 실행 중인
Elasticsearch
서버에 접속 basic_auth
: 로그인 자격 (ID:elastic
, PW:123456
)verify_certs=False
: 인증서 검증 생략 (로컬에서 SSL 없이 사용 시 편의용)
es_admin = Elasticsearch("http://localhost:9200",
basic_auth=("elastic", "123456"),
verify_certs=False)
Mapping 정의 및 인덱스 생성
dense_vector
: 벡터 검색용 필드 (벡터 유사도 기반 검색 가능)dims
: BERT의 출력 벡터는 기본적으로 768차원이므로 그에 맞춤
mapping = {
"properties": {
"embedding": {
"type": "dense_vector",
"dims": 768 # BERT의 출력 벡터 차원 수
}
}
}
기존 인덱스 삭제 후 새로 생성
- 기존에 있던
chapter-2
인덱스를 삭제 (중복 방지) - 새로운 인덱스를 위에서 정의한 벡터 매핑으로 생성
try:
es_admin.indices.delete(index="chapter-2")
print("기존 chapter-2 인덱스를 삭제했습니다.")
except:
print("chapter-2 인덱스가 존재하지 않습니다.")
es_admin.indices.create(index="chapter-2", body={'mappings': mapping})
print("새로운 chapter-2 인덱스를 생성했습니다.")
색인할 문서 데이터 구성
title
과text
로 구성된 단순 문서 리스트text
는 BERT 임베딩의 입력값이 된다
docs = [
{"title": "Document 1", "text": "This is the first document"},
{"title": "Document 2", "text": "This is the second document"},
{"title": "Document 3", "text": "This is the third document"}
]
BERT 모델과 토크나이저 초기화
bert-base-uncased
: Hugging Face에서 사전 학습된 BERT 모델AutoTokenizer
: 입력 텍스트를 BERT가 이해할 수 있는 토큰으로 변환AutoModel
: 텍스트에 대한 BERT 임베딩 추출
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModel.from_pretrained("bert-base-uncased")
BERT 임베딩 생성
tokenizer(...)
: 텍스트를 토큰화하고 PyTorch 텐서로 변환model(**inputs)
: BERT 실행 → 각 토큰에 대한 임베딩 벡터 반환last_hidden_state.mean(dim=1)
: 문장의 전체 임베딩을 mean pooling으로 하나의 벡터로 압축 (1×768 벡터)squeeze(0).numpy()
: 불필요한 batch 차원 제거 후 NumPy로 변환tolist()
: Elasticsearch에 저장 가능하게 리스트 형태로 변환
for doc in docs:
text = doc["text"]
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
outputs = model(**inputs).last_hidden_state.mean(dim=1).squeeze(0).numpy()
doc["embedding"] = outputs.tolist()
Elasticsearch에 색인
- 각 문서를
chapter-2
인덱스에 색인 - Elasticsearch는
embedding
필드를 dense_vector로 저장하며, 향후 벡터 검색에도 사용 가능
for doc in docs:
es_admin.index(index="chapter-2", body=doc)
확인
- Kibana | Management | Dev Tools에서 색인된 문서 조회
GET chapter-2/_search
사용자 설정 (Elasticsearch 9.0.3 버전)
- 코드 변경
- 해당 부분은 각자 확인 필요
# Elasticsearch 클라이언트 라이브러리를 가져옵니다
from elasticsearch import Elasticsearch
# Elasticsearch에 관리자 계정(elastic)으로 연결합니다
# localhost:9200은 Elasticsearch의 기본 주소입니다
# basic_auth를 사용하여 사용자명과 비밀번호로 인증합니다
es_admin = Elasticsearch("http://localhost:9200", basic_auth=("elastic", "123456"))
def check_user_permissions():
"""
username 사용자의 현재 권한과 역할을 확인하는 함수
Returns:
list: username 사용자가 가진 역할들의 리스트
"""
try:
# Security API를 사용하여 username 사용자의 정보를 조회합니다
response = es_admin.security.get_user(username="username")
user_info = response.get('username', {}) # 응답에서 username 사용자 정보 추출
roles = user_info.get('roles', []) # 사용자가 가진 역할들 추출
# 사용자 정보를 콘솔에 출력합니다
print("username 사용자 정보:")
print(f" - 역할: {roles}")
print(f" - 전체 정보: {user_info}")
return roles # 역할 리스트를 반환합니다
except Exception as e:
# 사용자 정보 조회 중 오류가 발생한 경우 예외 처리
print(f"사용자 정보 조회 실패: {e}")
return [] # 빈 리스트 반환
def update_username_permissions():
"""
username 사용자에게 모든 인덱스에 접근할 수 있는 권한을 부여하는 함수
superuser 역할을 추가하여 Elasticsearch의 모든 기능에 접근 가능하게 합니다
"""
try:
# Security API를 사용하여 username 사용자의 정보를 업데이트합니다
es_admin.security.put_user(
username="username", # 업데이트할 사용자명
body={
"password": "password", # 기존 비밀번호 유지
"roles": [
"kibana_admin", # Kibana 관리자 권한 (대시보드, 시각화 관리)
"kibana_system", # Kibana 시스템 권한 (시스템 설정 접근)
"quotes_writer", # quotes 인덱스에 대한 읽기/쓰기 권한
"superuser" # 모든 권한 (모든 인덱스 접근 가능, 임시 권한)
],
"full_name": "Username" # 사용자 전체 이름
}
)
# 권한 업데이트 성공 메시지 출력
print("username 사용자 권한이 업데이트되었습니다.")
print(" - superuser 역할이 추가되어 모든 인덱스에 접근 가능합니다.")
except Exception as e:
# 권한 업데이트 중 오류가 발생한 경우 예외 처리
print(f"사용자 권한 업데이트 실패: {e}")
def list_all_indices():
"""
Elasticsearch 클러스터에 존재하는 모든 인덱스의 목록을 조회하는 함수
각 인덱스의 문서 수와 저장 크기 정보도 함께 표시합니다
"""
try:
# CAT API를 사용하여 모든 인덱스 정보를 JSON 형태로 조회합니다
response = es_admin.cat.indices(format="json")
# 인덱스 목록 출력 시작
print("\n 현재 Elasticsearch의 모든 인덱스:")
print("-" * 50)
# 각 인덱스의 정보를 순회하면서 출력합니다
for index in response:
index_name = index.get('index', '') # 인덱스 이름
docs_count = index.get('docs.count', '0') # 문서 수
store_size = index.get('store.size', '0b') # 저장 크기
# 인덱스 정보를 포맷팅하여 출력
print(f" {index_name}")
print(f" - 문서 수: {docs_count}")
print(f" - 크기: {store_size}")
print()
except Exception as e:
# 인덱스 목록 조회 중 오류가 발생한 경우 예외 처리
print(f" 인덱스 목록 조회 실패: {e}")
def main():
"""
메인 실행 함수
username 사용자의 권한을 확인하고, 필요한 권한을 부여한 후,
Elasticsearch의 모든 인덱스 목록을 확인하는 전체 프로세스를 실행합니다
"""
# 프로그램 시작 메시지 출력
print("🔍 username 사용자 권한 확인 및 수정")
print("=" * 50)
# Step 1: 현재 username 사용자의 권한 상태를 확인합니다
print("\n 현재 username 사용자 권한 확인...")
current_roles = check_user_permissions()
# Step 2: username 사용자에게 필요한 모든 권한을 부여합니다
print("\n username 사용자 권한 업데이트...")
update_username_permissions()
# Step 3: 권한 업데이트 후 변경된 권한을 다시 확인합니다
print("\n 업데이트된 권한 확인...")
updated_roles = check_user_permissions()
# Step 4: Elasticsearch에 존재하는 모든 인덱스 목록을 확인합니다
print("\n 모든 인덱스 목록 확인...")
list_all_indices()
# 프로그램 완료 메시지와 다음 단계 안내
print("\n🎉 권한 확인 및 수정 완료!")
print("\n📝 다음 단계:")
print("1. Kibana에서 username/password로 다시 로그인해보세요.")
print("2. Stack Management > Index Patterns에서 인덱스를 확인해보세요.")
print("3. 모든 인덱스가 보이지 않으면 브라우저 캐시를 삭제해보세요.")
# 스크립트가 직접 실행될 때만 main() 함수를 호출합니다
if __name__ == "__main__":
main()