Mlflow with Helm and serve Train Model on kubernetes

Dounpct
12 min readFeb 3, 2024

--

End to End Diagram and Architecture

After I have done for implementation ML Project about Training and Serving Model to predict threat from data firewall and login logs. I would like to share further my experience for end to end that I used in this project.

Diagram

Project

  • Predict threat from destination country and port of firewall logs
  • Predict threat from login time of access logs
  • Can handle a lot of traffic.

For model training we have test about Supervised Logical reqression and UnSupervised. Supervised need target that mask as threat but from raw log.it not easy to mask as threat by human. So we design to use UnSupervised for Anomaly Detection with “LocalOutlierFactor” for return what record be anomaly

For a lot of traffic. Sure! I am DevOps Engineer I design to be deploy everything as Kubernetest Cluster that can scale pod to serve more traffic. moreover I love to deploy everything with most automation. I use ArgoCD Jenkins and GitHub Action.

Develop Model

I start develop model with Jupyter Extension for Visual Studio Code https://marketplace.visualstudio.com/items?itemName=ms-toolsai.jupyter. it make me easy for running code only some part not for whole file python.

In python training model will compose of

  • Load test json data : only 2–3 file of json raw data
if __name__ == "__main__":
df = pd.DataFrame()
path_to_json = 'rawdata'
json_pattern = os.path.join(path_to_json,'*.txt')
file_list = glob.glob(json_pattern)

xcount = 0

for file in file_list:
xcount = xcount + 1
print("xcount: ", xcount)
print(file)
data = pd.read_json(file, lines=True)
df = pd.concat([df,data], ignore_index = True)
if (xcount == mlflowTrainingFileLimit):
break
  • Data preprocessing : convert unknow data and encoding
    np.set_printoptions(threshold=sys.maxsize)

# df_categories = df[df["ads_country_dst"].str.startswith(('1', '2', '3', '4', '5', '6', '7', '8', '9', '0')) == False]
df_categories = pd.concat([df["ads_country_dst"]], axis=1, sort=False,)
print("-------------- Count Record --------------")
print(df_categories.shape[0])
print("-------------- Count Record --------------")
print("-------------- Count Destination Country --------------")
print(df_categories.value_counts().to_string())
print("-------------- Count Destination Country --------------")

countryMap = mapOfCountryDst()
portMap = mapOfPort()
print("-------------- Number of Country in Encoding --------------")
print("country_key : ", len(countryMap.keys()))
print("country_count : ", len(set(countryMap.values())))
print("-------------- Number of Country in Encoding --------------")
print("-------------- Show Country Not in list --------------")
print(df_categories[~df_categories['ads_country_dst'].isin(countryMap.keys())].value_counts().to_string())
print("-------------- Show Country Not in list --------------")

df_categories = df_categories.mask(~df_categories.isin(countryMap.keys()),'OTHER')
print("Mask OTHER done")
X = df_categories.replace({'ads_country_dst': countryMap})
print("Frequency encoding done")
print("-------------- Show X --------------")
print(X)
print("-------------- Show X --------------")
# x_scaler = StandardScaler().fit(X)
# print("Mean of x is:", x_scaler.mean_)
# print("Variance of x is:", x_scaler.var_)
# print("Standard deviation of x is:", x_scaler.scale_)
# x_scaled = x_scaler.transform(X)
# print(x_scaled)
print("-------------- Port --------------")
df_categories = pd.concat([df_categories,df["ads_dst_port"]], axis=1, sort=False,)
ads_dst_port = pd.concat([df["ads_dst_port"]], axis=1, sort=False,).astype(str)
ads_dst_port = ads_dst_port.mask(~ads_dst_port.isin(portMap.keys()), 'OTHER')
ads_dst_port = ads_dst_port.replace({'ads_dst_port': portMap})
print("-------------- Port --------------")

X = pd.concat([X,ads_dst_port], axis=1, sort=False,)
print("-------------- Show X --------------")
print(X.to_string())
print("-------------- Show X --------------")

normalPoint = 30
mask = (X['ads_country_dst'] <= normalPoint) & (X['ads_dst_port'] == 0)
X_Test = pd.DataFrame({'test': [-1] * len(df)})
X_Test.loc[mask, 'test'] = 1

print("-------------- Show X_Test --------------")
print(X_Test)
print("-------------- Show X_Test --------------")
  • Training the model
# Call and fit the Local Outlier Factor detector
# lof_detector = LocalOutlierFactor(n_neighbors=30, contamination=0.001,novelty=True).fit(x_scaled)
setNNeighbors = int((df_categories.shape[0]/300))
print("set n_neighbors : " , setNNeighbors)
lof_detector = LocalOutlierFactor(n_neighbors=setNNeighbors, contamination=0.1,novelty=True).fit(X.values)

print("-------------- Model Size (MB) --------------")
print("{:.2f}".format(sys.getsizeof(pickle.dumps(lof_detector))/(1024*1024)))
print("-------------- Model Size (MB) --------------")

# lof_detect = lof_detector.predict(x_scaled)
lof_detect = lof_detector.predict(X)
  • Evaluating the model and Making predictions
recordDetect,countDetect = np.unique(lof_detect, return_counts=True)
print("--------------Count Anomaly VS Normal-------------")
print(recordDetect)
print(countDetect)

if (len(countDetect) == 1):
row_to_be_added = countDetect
countDetect = np.append(np.array([0]),row_to_be_added,axis=0)

print("Anomaly = " , countDetect[0] , "record with " , (countDetect[0])*100/(countDetect[0]+countDetect[1]) ," %")
print("Normal = " , countDetect[1] , "record with " , (countDetect[1])*100/(countDetect[0]+countDetect[1]) ," %")
print("--------------Count Anomaly VS Normal-------------")

print("-------------- List Destination Country Port with Prediction -------------")
# print(type(df_categories.value_counts()))
# print(type(df_categories))

htmlItem = ""

for index, value in df_categories.value_counts().items():
encode = countryMap[index[0]]

portEncode = 0
if (str(index[1]) in portMap):
portEncode = portMap[str(index[1])]

predictData = lof_detector.predict([[ encode , portEncode ]])

if (portEncode > 0):
print(index[0] , " | code =" , encode , " | Port Level = ", index[1] , "/" , str(portEncode) , " | count =" , value , " | result =" , dataPredictionToString(predictData[0]))

htmlItem = htmlItem + "<tr><td>" + index[0] + "</td><td class=right-aligned>" + str(encode) + "</td><td class=right-aligned>" + str(index[1]) + "</td><td class=right-aligned>" + str(portEncode) + "</td><td class=right-aligned>" + str(value)+ "</td><td>" + dataPredictionToString(predictData[0]) + "</td><tr>"
print("-------------- List Destination Country with Prediction -------------")
  • Vitualize data or report
plt.figure(figsize=(7,7))
# plt.scatter(x_scaled[:, 0], x_scaled[:, 0], c=lof_detect, cmap="flag", alpha=0.5)
plt.scatter(X.to_numpy()[:, 0], X.to_numpy()[:, 1], c=lof_detect, cmap="flag", alpha=0.1)
plt.title("train-ads-anomaly-dest-country-port")
plt.savefig('images/train-ads-anomaly-dest-country-port.png')
plt.show()
print("-------------- Machine Learning - Confusion Matrix -------------")
Accuracy = metrics.accuracy_score(X_Test, lof_detect)
print("Accuracy : " , Accuracy)
Precision = metrics.precision_score(X_Test, lof_detect)
print("Precision : " , Precision)
Sensitivity_recall = metrics.recall_score(X_Test, lof_detect)
print("Sensitivity_recall : " , Sensitivity_recall)
Specificity = metrics.recall_score(X_Test, lof_detect, pos_label=-1)
print("Specificity : " , Specificity)
F1_score = metrics.f1_score(X_Test, lof_detect)
print("F1_score : " , F1_score)
print("-------------- Machine Learning - Confusion Matrix -------------")
confusion_matrix = metrics.confusion_matrix(X_Test, lof_detect)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = ["Anomally", "Normally"])
cm_display.plot()
plt.savefig('images/train-ads-anomaly-dest-country-port-confusion-matrix.png')
plt.show()
  • Generate Report
htmlAnomalyVSNormally = '''
<table class="table table-striped">
<th>Type</th><th class=right-aligned>Record</th><th class=right-aligned>%Record</th>
<tr><td>Anomaly</td><td class=right-aligned>''' + str(countDetect[0]) + '''</td><td class=right-aligned> ''' + str((countDetect[0])*100/(countDetect[0]+countDetect[1])) + '''</td></tr>
<tr><td>Normal</td><td class=right-aligned>''' + str(countDetect[1]) + '''</td><td class=right-aligned> ''' + str((countDetect[1])*100/(countDetect[0]+countDetect[1])) + '''</td></tr>
</table>
'''

htmlCountryPrediction = '''
<table class="table table-striped">
<th>Country</th><th class=right-aligned>CODE</th><th class=right-aligned>Port</th><th class=right-aligned>Malware DST Ports Level</th><th class=right-aligned>Amount</th><th>Prediction</th>
''' + htmlItem + '''
</table>
'''

htmlMatrix = '''
<table class="table table-striped">
<th>Type</th><th>Meaning</th><th class=right-aligned>Score</th>
<tr><td>Accuracy</td><td>The proportion of correctly predicted cases</td><td class=right-aligned>''' + str(Accuracy) + '''</td></tr>
<tr><td>Precision</td><td>Positive Predictive Value</td><td class=right-aligned>''' + str(Precision) + '''</td></tr>
<tr><td>Sensitivity_recall</td><td> True Positive Rate</td><td class=right-aligned>''' + str(Sensitivity_recall) + '''</td></tr>
<tr><td>Specificity</td><td>True Negative Rate</td><td class=right-aligned>''' + str(Specificity) + '''</td></tr>
<tr><td>F1_score</td><td>Balances precision and recall</td><td class=right-aligned>''' + str(F1_score) + '''</td></tr>
</table>
'''

summary_table = '''
<p>Count Record : ''' + str(df_categories.shape[0]) + '''</p>
<h2>Local Outlier Factor (LOF)</h2>
<p>n_neighbors : ''' + str(setNNeighbors) + '''</p>
''' + htmlAnomalyVSNormally + '''
''' + htmlCountryPrediction + '''
<table class="table table-striped">
<th>Local Outlier Factor (LOF)</th>
<tr>
<td><img src="train-ads-anomaly-dest-country-port-port.png" alt="train-ads-anomaly-dest-country-port.png"></td>
</tr>
</table>
<table class="table table-striped">
<th>confusion-matrix</th>
<tr>
<td><img src="train-ads-anomaly-dest-country-port-confusion-matrix.png" alt="confusion-matrix"></td>
</tr>
</table>
''' + htmlMatrix + '''
'''

html_string = mainReportHTML("train-ads-anomaly-dest-country-port",summary_table)
f = open('report.html','w')
f.write(html_string)
f.close()
  • Keep Model in Mlflow
%set_env MLFLOW_TRACKING_URI=http://127.0.0.1:5000
# %set_env MLFLOW_TRACKING_URI=http://mlflow.rtarf-ml.its-software-services.com/
tracking_uri = os.environ["MLFLOW_TRACKING_URI"]
# export MLFLOW_TRACKING_USERNAME=user
# export MLFLOW_TRACKING_PASSWORD=pwd

experiment = mlflow.set_experiment(experiment_name='ads-anomaly-dest-country-port')
experiment_id = experiment.experiment_id

reportURL = "https://minio-api.rtarf-ml.its-software-services.com/ml-report/train-ads-anomaly-dest-country-port/" + jenkinsBuildID + "/report.html"

run_description = f"""
### Note
**All information** * about Training * ~~ML here~~
Jenkins URL: [{jenkinsURL}]({jenkinsURL})
Report: [{reportURL}]({reportURL})
"""
# %set_env MLFLOW_HTTP_REQUEST_TIMEOUT=4000
urllib3.util.retry.Retry(total=10, backoff_factor=0.1, status_forcelist=[ 500, 502, 503, 504 ])
mlflow.environment_variables.MLFLOW_ARTIFACT_UPLOAD_DOWNLOAD_TIMEOUT='5000'
mlflow.environment_variables.MLFLOW_HTTP_REQUEST_TIMEOUT='5000'

with mlflow.start_run(experiment_id=experiment_id,description=run_description):
mlflow.set_tracking_uri(tracking_uri)

print("Artifact Location: {}".format(experiment.artifact_location))
print("artifact uri : " + mlflow.get_artifact_uri())

mlflow.environment_variables.MLFLOW_ARTIFACT_UPLOAD_DOWNLOAD_TIMEOUT="5000"

mlflow.doctor()
mlflow.log_param("MlflowMinioFolder", mlflowMinioFolder)
mlflow.log_param("country_key", len(countryMap.keys()))
mlflow.log_param("country_count", len(set(countryMap.values())))

mlflow.log_param("SampleFile", xcount)
mlflow.log_param("SampleRows", X.shape[0])
# mlflow.set_tag("JenkinsURL",jenkinsURL)

mlflow.log_metric("Anomaly", str((countDetect[0])*100/(countDetect[0]+countDetect[1])))
mlflow.log_metric("Normal", str((countDetect[1])*100/(countDetect[0]+countDetect[1])))
mlflow.sklearn.log_model(lof_detector, "model", registered_model_name="ads-anomaly-by-dest-country")
print("Model saved in run %s" % mlflow.active_run().info.run_uuid)


print("-------------- Model save success -------------")
  • Serve model in Local
!mlflow models serve -m mlflow-artifacts:/229825231912337594/f8757959b00442f2a2f807d4fe9000a8/artifacts/model -p 9999 --no-conda
  • Test Model in Local
curl -X POST -H "Content-Type:application/json"                     \
--data "{\"dataframe_split\": {\"data\":[ \
[0 , 0]
]}}" \
http://127.0.0.1:9999/invocations | jq

CICD

  • I have create Jenkins job.
  • Create Jenkinsfile to run pipeline to keep Model in Mlflow and gen report
MLFLOW_MINIO_FOLDER=env.MLFLOW_MINIO_FOLDER
MLFLOW_TRAINING_FILE_LIMIT=env.MLFLOW_TRAINING_FILE_LIMIT

pipeline
{
agent
{
node
{
label "jenkins-python"
}
}

environment {
MLFLOW_TRACKING_USERNAME = credentials('MLFLOW_USER')
MLFLOW_TRACKING_PASSWORD = credentials('MLFLOW_PASSWORD')
}

stages
{

stage('PrePare ENV')
{
steps
{
script
{
container("python")
{
sh "python --version"
sh "pip install -r requirements.txt"
}
}
}
}
stage('PrePare DataSet')
{
steps
{
script
{
container("minio-mc")
{
sh "mc -v"
withCredentials([usernamePassword(credentialsId: 'ADS_AWS_CREDENTIAL', passwordVariable: 'password', usernameVariable: 'username')])
{
sh "mc alias set myminio https://minio.rtarf-prod.its-software-services.com/ ${username} ${password}"
sh "mkdir rawdata"
sh "mkdir images"

cmd = """mc ls myminio/rtarf-ml/${MLFLOW_MINIO_FOLDER}/ --json | jq .key | sed 's/"//g'"""
listFile = sh(script: cmd, returnStdout: true)

def lines = listFile.split('\n')
def i = 0
def limit = MLFLOW_TRAINING_FILE_LIMIT.trim().toInteger()
for (String line: lines)
{
i++
sh "mc cp myminio/rtarf-ml/${MLFLOW_MINIO_FOLDER}/${line} ./rawdata/"
if (i >= limit)
{
break
}
}

// sh "mc cp --recursive myminio/rtarf-ml/${MLFLOW_MINIO_FOLDER}/ ./rawdata/"
sh "ls -alrt ./rawdata/"

cmd = "ls ./rawdata/ | wc -l"
countFile = sh(script: cmd, returnStdout: true)
println("countFile: ${countFile}")
currentBuild.description = "folder:${MLFLOW_MINIO_FOLDER}/${MLFLOW_TRAINING_FILE_LIMIT}/${countFile}"
}
}
}
}
}
stage('Training Model ADS-ANOMALY-DEST-COUNTRY-PORT')
{
steps
{
script
{
container("python")
{
sh "python -u train-ads-anomaly-dest-country-port.py ${env.BUILD_URL}console ${MLFLOW_MINIO_FOLDER} ${MLFLOW_TRAINING_FILE_LIMIT} ${env.BUILD_ID}"
}
container("minio-mc")
{
withCredentials([usernamePassword(credentialsId: 'AWS_CREDENTIAL', passwordVariable: 'password', usernameVariable: 'username')])
{
sh "mc alias set ml-minio https://minio-api.rtarf-ml.its-software-services.com/ ${username} ${password}"

sh "mc cp --recursive images/ ml-minio/ml-report/train-ads-anomaly-dest-country-port/${env.BUILD_ID}/"

sh "mc cp report.html ml-minio/ml-report/train-ads-anomaly-dest-country-port/${env.BUILD_ID}/"

println("report : https://minio-api.rtarf-ml.its-software-services.com/ml-report/train-ads-anomaly-dest-country-port/${env.BUILD_ID}/report.html")
}
}
}
}
}
}
}

Dockerfile for Serving Model

FROM python:3.9

ENV HOME="/root"
WORKDIR ${HOME}

RUN pip install google-cloud-storage

COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

ENV MODEL_URI ${MODEL_URI}
ENV SERVING_PORT ${SERVING_PORT}

COPY serving.sh /serving.sh

CMD [ "/bin/bash", "/serving.sh" ]
# CMD [ "sh", "-c", "mlflow models serve --model-uri $MODEL_URI -h 0.0.0.0 -p $SERVING_PORT --no-conda"]

I have github action to auto build mlflow serving model images to GCR

name: Genuine

on:
push:
branches: [ develop, master ]
tags: ['v*.*.*']

env:
GH_TOKEN: ${{ secrets.GH_TOKEN }}

jobs:
deploy:
needs: [ build1 ]
runs-on: ubuntu-latest
steps:
- name: Line notification start
uses: snow-actions/line-notify@v1.0.0
with:
access_token: ${{ secrets.LINE_ACCESS_TOKEN }}
message: "Start deploying [genuine] [${{ needs.build1.outputs.imageTag }}] to [${{ needs.build1.outputs.autoDeployEnv }}]..."

- name: show job status
if: always()
uses: snow-actions/line-notify@v1.0.0
with:
access_token: ${{ secrets.LINE_ACCESS_TOKEN }}
message: "Done deploying [genuine] [${{ needs.build1.outputs.imageTag }}] to [${{ needs.build1.outputs.autoDeployEnv }}]..."

build1:
runs-on: ubuntu-latest
strategy:
matrix:
service: [ ml-training-api ]
outputs:
imageTag: ${{ steps.prep.outputs.version }}
autoDeployEnv: ${{ steps.prep.outputs.deployEnv }}
autoDeployBranch: ${{ steps.prep.outputs.autoDeployBranch }}
steps:
- name: Checkout repo
uses: actions/checkout@v2

...
...
...

- name: Build and push images
uses: docker/build-push-action@v3
with:
file: ${{ steps.prep.outputs.dockerFile }}
push: true
load: false
tags: ${{ steps.prep.outputs.gcr_tags }}
build-args: version=${{ steps.prep.outputs.version }}

- name: show job status
if: always()
uses: snow-actions/line-notify@v1.0.0
with:
access_token: ${{ secrets.LINE_ACCESS_TOKEN }}
message: "Done building [${{matrix.service}}] [${{ steps.prep.outputs.version }}] with status [${{job.status}}]"

Serve Model in Cluster

apiVersion: apps/v1
kind: Deployment
metadata:
name: mlflow-ads-anomaly-dest-country-port
labels:
app: mlflow-ads-anomaly-dest-country-port
spec:
selector:
matchLabels:
app: mlflow-ads-anomaly-dest-country-port
template:
metadata:
labels:
app: mlflow-ads-anomaly-dest-country-port
spec:
containers:
- name: mlflow-serving
image: <gcr-url>:<gcr-tag>
env:
- name: MODEL_URI
value: "s3://mlflow-artifacts/1/xxxxxxxxxxxxxx/artifacts/model" #change here
- name: SERVING_PORT
value: "8082"
- name: MLFLOW_S3_ENDPOINT_URL
value: "https://minio-ml-hl.minio-ml.svc.cluster.local:9000"
- name: MLFLOW_S3_IGNORE_TLS
value: "true"
resources:
limits:
cpu: 2000m
memory: 4000Mi
requests:
cpu: 500m
memory: 500Mi
envFrom:
- secretRef:
name: mlflow-app-secrets
---
apiVersion: v1
kind: Service
metadata:
labels:
app: mlflow-ads-anomaly-dest-country-port
name: mlflow-ads-anomaly-dest-country-port
spec:
ports:
- name: http-web
port: 8082
protocol: TCP
targetPort: 8082
selector:
app: mlflow-ads-anomaly-dest-country-port
type: ClusterIP
---
apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
name: mlflow-ads-anomaly-dest-country-port
spec:
minReplicas: 3
maxReplicas: 10
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mlflow-ads-anomaly-dest-country-port
targetCPUUtilizationPercentage: 60

This is only part of Serving Destination country and port so another model is anomaly time

apiVersion: apps/v1
kind: Deployment
metadata:
name: mlflow-ads-anomaly-time
labels:
app: mlflow-ads-anomaly-time
spec:
selector:
matchLabels:
app: mlflow-ads-anomaly-time
template:
metadata:
labels:
app: mlflow-ads-anomaly-time
spec:
containers:
- name: mlflow-serving
image: <gcr-url>:<gcr-tag>
env:
- name: MODEL_URI
value: "s3://mlflow-artifacts/2/xxxxxxxxxxxxxxx/artifacts/model" #change here selective-rat-864
- name: SERVING_PORT
value: "8082"
- name: MLFLOW_S3_ENDPOINT_URL
value: "https://minio-ml-hl.minio-ml.svc.cluster.local:9000"
- name: MLFLOW_S3_IGNORE_TLS
value: "true"
resources:
limits:
cpu: 2000m
memory: 2000Mi
requests:
cpu: 800m
memory: 800Mi
envFrom:
- secretRef:
name: mlflow-app-secrets
---
apiVersion: v1
kind: Service
metadata:
labels:
app: mlflow-ads-anomaly-time
name: mlflow-ads-anomaly-time
spec:
ports:
- name: http-web
port: 8082
protocol: TCP
targetPort: 8082
selector:
app: mlflow-ads-anomaly-time
type: ClusterIP
---
apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
name: mlflow-ads-anomaly-time
spec:
minReplicas: 1
maxReplicas: 3
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mlflow-ads-anomaly-time
targetCPUUtilizationPercentage: 60

Gateway

I have create gateway for re-engineer before request to model serving. For example gateway need to check for what model to be redirect and how to re-engineer or encoding same as step in model training

@app.route('/v5/gateway', methods=['POST'])
def get_invocationsV5():
headers = {
"Content-Type": "application/json",
}

predictionList = []
content = request.json

runAdsCountryDst = ('disable_predict_anomaly_dest_country' not in content) or (content['disable_predict_anomaly_dest_country'] != 'true')
runAdsTime = ('disable_predict_anomaly_time' not in content) or (content['disable_predict_anomaly_time'] != 'true')
ads_country_dst_log = ""
ads_ts_hh_log = ""
foundFlag = False
if (runAdsCountryDst and ('ads_country_dst' in content)):
content_data = createDataAdsAnomalyDestCountryPort(content['ads_country_dst'],content['ads_dst_port'])
print(content_data)
ads_country_dst_log = "ads_country_dst : ",content['ads_country_dst'],":",content['ads_dst_port']
print(ads_country_dst_log)
foundFlag = True
try:
resp = requests.post(
url="http://%s:%s/invocations" % (host_anomaly_des_country_port, port_anomaly_des_country_port),
data=json.dumps({"dataframe_split": content_data}),headers=headers,
)
responseData = {
"subject": "unsupervised_dst_country_anomaly",
"result": dataPredictionToString(resp.json()["predictions"][0])
}
predictionList.append(responseData)
print(ads_country_dst_log,"-",resp.status_code)
except Exception as e:
errmsg = "Caught exception attempting to call model endpoint: %s" % e
print(errmsg, end="")
return resp.json()

if (runAdsTime and ('ads_ts_hh' in content)):
content_data = {"data":[[ content['ads_ts_hh'] ]]}
ads_ts_hh_log = "ads_ts_hh : ",content['ads_ts_hh']
print(ads_ts_hh_log)
foundFlag = True
try:
resp = requests.post(
url="http://%s:%s/invocations" % (host_anomaly_time, port_anomaly_time),
data=json.dumps({"dataframe_split": content_data}),headers=headers,
)
responseData = {
"subject": "unsupervised_login_anomaly",
"result": dataPredictionToString(resp.json()["predictions"][0])
}
predictionList.append(responseData)
print(ads_ts_hh_log,"-",resp.status_code)
except Exception as e:
errmsg = "Caught exception attempting to call model endpoint: %s" % e
print(errmsg, end="")
return resp.json()

if (not foundFlag):
print("request not invocate all ml")

responsePredictData = {"results": predictionList}
jsonString = jsonify(responsePredictData)
print(ads_country_dst_log,"|", ads_ts_hh_log,"|",responsePredictData)
return jsonString

Dockerfile

FROM python:3.9

ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING=UTF-8

# Create app directory
WORKDIR /app

# Install app dependencies
COPY requirements-flask.txt ./

RUN pip install -r requirements-flask.txt

# Bundle app source
COPY server.py ./
COPY sharelib.py ./

EXPOSE 5000
CMD ["python", "server.py"]

workflow GitHub Action

name: Gateway

on:
push:
branches: [ master ]
tags: ['v*.*.*']


env:
GH_TOKEN: ${{ secrets.GH_TOKEN }}

jobs:
build1:
runs-on: ubuntu-latest
strategy:
matrix:
service: [ ml-training-gateway ]
outputs:
imageTag: ${{ steps.prep.outputs.version }}
autoDeployEnv: ${{ steps.prep.outputs.deployEnv }}
autoDeployBranch: ${{ steps.prep.outputs.autoDeployBranch }}
steps:
- name: Checkout repo
uses: actions/checkout@v2

...
...

Deploy to Cluster

apiVersion: apps/v1
kind: Deployment
metadata:
name: mlflow-gateway
labels:
app: mlflow-gateway
spec:
selector:
matchLabels:
app: mlflow-gateway
template:
metadata:
labels:
app: mlflow-gateway
spec:
containers:
- name: mlflow-gateway
image: <gcr-url>:<gcr-tag>
env:
- name: gateway_port_ml
value: "5000"
# command: ["/bin/sleep", "3650d"]
resources:
limits:
cpu: 1000m
memory: 700Mi
requests:
cpu: 200m
memory: 300Mi
livenessProbe:
failureThreshold: 3
httpGet:
path: /healthcheck
port: 5000
scheme: HTTP
initialDelaySeconds: 0
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 2
readinessProbe:
failureThreshold: 3
httpGet:
path: /healthcheck
port: 5000
scheme: HTTP
initialDelaySeconds: 0
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 2
envFrom:
- configMapRef:
name: gateway-cfm
---
apiVersion: v1
kind: Service
metadata:
labels:
app: mlflow-gateway
name: mlflow-gateway
spec:
ports:
- name: http-web
port: 5000
protocol: TCP
targetPort: 5000
selector:
app: mlflow-gateway
type: ClusterIP
---
apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
name: mlflow-gateway
spec:
minReplicas: 30
maxReplicas: 40
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mlflow-gateway
targetCPUUtilizationPercentage: 60

Serving with Gateway

curl -X POST -H "Content-Type:application/json" \
--data \
'{"@timestamp":"2023-10-10T03:20:21.727444Z","ads_dst_port":"53",...,"ads_country_dst":"United States","ads_ts_hh":"1"}' \
http://<GatewayURL>/v5/gateway
{"results":
[
{"result":"Normally","subject":"unsupervised_dst_country_anomaly"},
{"result":"Anomaly","subject":"unsupervised_login_anomaly"}
]
}

After we run test request from Metalog system. It can handle request for while but because a lot of traffic. HPA of deployment scale to high and consume a lot of resource and pod restart often because healthcheck can’t probe in time.

We have discuss in our team to decide cache in metalog system for 5 minute before send request to api gateway. if cache hit (memcache check that already for result in cache then update back to metalog by don’t need to send request to ML). Everything fine resource can scale down to min and only a little restart for gateway pod.

  • Have fun !!!
  • I hope this topic may make some idea to someone

— — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Credit : TrueDigitalGroup

— — — — — — — — — — — — — — — — — — — — — — — — — — — — —

--

--

Dounpct
Dounpct

Written by Dounpct

I work for TrueDigitalGroup in DevOps x Automation Team

No responses yet