go through smoke test of all API (#12)
* add field progress msg into docinfo; add file processing procedure * go through upload, create kb, add doc to kb * smoke test for all API * smoke test for all API
This commit is contained in:
@@ -14,9 +14,9 @@ class Config:
|
||||
self.env = env
|
||||
if env == "spark":CF.read("./cv.cnf")
|
||||
|
||||
def get(self, key):
|
||||
def get(self, key, default=None):
|
||||
global CF
|
||||
return CF.get(self.env, key)
|
||||
return CF[self.env].get(key, default)
|
||||
|
||||
def init(env):
|
||||
return Config(env)
|
||||
|
||||
@@ -49,7 +49,11 @@ class Postgres(object):
|
||||
cur = self.conn.cursor()
|
||||
cur.execute(sql)
|
||||
updated_rows = cur.rowcount
|
||||
<<<<<<< HEAD
|
||||
self.conn.commit()
|
||||
=======
|
||||
conn.commit()
|
||||
>>>>>>> upstream/main
|
||||
cur.close()
|
||||
return updated_rows
|
||||
except Exception as e:
|
||||
|
||||
@@ -5,10 +5,10 @@ import time
|
||||
import copy
|
||||
import elasticsearch
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch_dsl import UpdateByQuery, Search, Index
|
||||
from elasticsearch_dsl import UpdateByQuery, Search, Index, Q
|
||||
from util import config
|
||||
|
||||
print("Elasticsearch version: ", elasticsearch.__version__)
|
||||
logging.info("Elasticsearch version: ", elasticsearch.__version__)
|
||||
|
||||
|
||||
def instance(env):
|
||||
@@ -20,7 +20,7 @@ def instance(env):
|
||||
timeout=600
|
||||
)
|
||||
|
||||
print("ES: ", ES_DRESS, ES.info())
|
||||
logging.info("ES: ", ES_DRESS, ES.info())
|
||||
|
||||
return ES
|
||||
|
||||
@@ -31,7 +31,7 @@ class HuEs:
|
||||
self.info = {}
|
||||
self.config = config.init(env)
|
||||
self.conn()
|
||||
self.idxnm = self.config.get("idx_nm","")
|
||||
self.idxnm = self.config.get("idx_nm", "")
|
||||
if not self.es.ping():
|
||||
raise Exception("Can't connect to ES cluster")
|
||||
|
||||
@@ -46,6 +46,7 @@ class HuEs:
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error("Fail to connect to es: " + str(e))
|
||||
time.sleep(1)
|
||||
|
||||
def version(self):
|
||||
v = self.info.get("version", {"number": "5.6"})
|
||||
@@ -121,7 +122,6 @@ class HuEs:
|
||||
acts.append(
|
||||
{"update": {"_id": id, "_index": ids[id]["_index"]}, "retry_on_conflict": 100})
|
||||
acts.append({"doc": d, "doc_as_upsert": "true"})
|
||||
logging.info("bulk upsert: %s" % id)
|
||||
|
||||
res = []
|
||||
for _ in range(100):
|
||||
@@ -148,7 +148,6 @@ class HuEs:
|
||||
return res
|
||||
except Exception as e:
|
||||
logging.warn("Fail to bulk: " + str(e))
|
||||
print(e)
|
||||
if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
|
||||
time.sleep(3)
|
||||
continue
|
||||
@@ -229,7 +228,7 @@ class HuEs:
|
||||
return False
|
||||
|
||||
def search(self, q, idxnm=None, src=False, timeout="2s"):
|
||||
print(json.dumps(q, ensure_ascii=False))
|
||||
if not isinstance(q, dict): q = Search().query(q).to_dict()
|
||||
for i in range(3):
|
||||
try:
|
||||
res = self.es.search(index=(self.idxnm if not idxnm else idxnm),
|
||||
@@ -271,9 +270,31 @@ class HuEs:
|
||||
str(e) + "【Q】:" + str(q.to_dict()))
|
||||
if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
|
||||
continue
|
||||
self.conn()
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def updateScriptByQuery(self, q, scripts, idxnm=None):
|
||||
ubq = UpdateByQuery(index=self.idxnm if not idxnm else idxnm).using(self.es).query(q)
|
||||
ubq = ubq.script(source=scripts)
|
||||
ubq = ubq.params(refresh=True)
|
||||
ubq = ubq.params(slices=5)
|
||||
ubq = ubq.params(conflicts="proceed")
|
||||
for i in range(3):
|
||||
try:
|
||||
r = ubq.execute()
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.error("ES updateByQuery exception: " +
|
||||
str(e) + "【Q】:" + str(q.to_dict()))
|
||||
if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
|
||||
continue
|
||||
self.conn()
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def deleteByQuery(self, query, idxnm=""):
|
||||
for i in range(3):
|
||||
try:
|
||||
@@ -307,7 +328,6 @@ class HuEs:
|
||||
routing=routing, refresh=False) # , doc_type="_doc")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(e)
|
||||
logging.error("ES update exception: " + str(e) + " id:" + str(id) + ", version:" + str(self.version()) +
|
||||
json.dumps(script, ensure_ascii=False))
|
||||
if str(e).find("Timeout") > 0:
|
||||
|
||||
73
python/util/minio_conn.py
Normal file
73
python/util/minio_conn.py
Normal file
@@ -0,0 +1,73 @@
|
||||
import logging
|
||||
import time
|
||||
from util import config
|
||||
from minio import Minio
|
||||
from io import BytesIO
|
||||
|
||||
class HuMinio(object):
|
||||
def __init__(self, env):
|
||||
self.config = config.init(env)
|
||||
self.conn = None
|
||||
self.__open__()
|
||||
|
||||
def __open__(self):
|
||||
try:
|
||||
if self.conn:self.__close__()
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
try:
|
||||
self.conn = Minio(self.config.get("minio_host"),
|
||||
access_key=self.config.get("minio_usr"),
|
||||
secret_key=self.config.get("minio_pwd"),
|
||||
secure=False
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error("Fail to connect %s "%self.config.get("minio_host") + str(e))
|
||||
|
||||
|
||||
def __close__(self):
|
||||
del self.conn
|
||||
self.conn = None
|
||||
|
||||
|
||||
def put(self, bucket, fnm, binary):
|
||||
for _ in range(10):
|
||||
try:
|
||||
if not self.conn.bucket_exists(bucket):
|
||||
self.conn.make_bucket(bucket)
|
||||
|
||||
r = self.conn.put_object(bucket, fnm,
|
||||
BytesIO(binary),
|
||||
len(binary)
|
||||
)
|
||||
return r
|
||||
except Exception as e:
|
||||
logging.error(f"Fail put {bucket}/{fnm}: "+str(e))
|
||||
self.__open__()
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def get(self, bucket, fnm):
|
||||
for _ in range(10):
|
||||
try:
|
||||
r = self.conn.get_object(bucket, fnm)
|
||||
return r.read()
|
||||
except Exception as e:
|
||||
logging.error(f"Fail get {bucket}/{fnm}: "+str(e))
|
||||
self.__open__()
|
||||
time.sleep(1)
|
||||
return
|
||||
|
||||
if __name__ == "__main__":
|
||||
conn = HuMinio("infiniflow")
|
||||
fnm = "/opt/home/kevinhu/docgpt/upload/13/11-408.jpg"
|
||||
from PIL import Image
|
||||
img = Image.open(fnm)
|
||||
buff = BytesIO()
|
||||
img.save(buff, format='JPEG')
|
||||
print(conn.put("test", "11-408.jpg", buff.getvalue()))
|
||||
bts = conn.get("test", "11-408.jpg")
|
||||
img = Image.open(BytesIO(bts))
|
||||
img.save("test.jpg")
|
||||
|
||||
Reference in New Issue
Block a user