Commit a3a0761a authored by Ashkan's avatar Ashkan

Add scheduler. Removed some comments.

parent 6aaed882
Pipeline #4328 failed with stages
in 1 second
......@@ -18,7 +18,7 @@ class HECSearch:
self.searchType = searchType
pass
# @profile
def search(self):
ri_opt = math.inf
k_opt = 0
......@@ -103,7 +103,6 @@ class HECSearch:
return math.ceil((self.prrtApplicationParameters.max_latency -
self.prrtChannelParameters.rtt_prop_fwd -
n_c * arq_delay_i) / max(self.prrtSystemParameters.redundancy_packet_transmission_delay, self.prrtSystemParameters.source_packet_interval))
# @profile
def get_fec_delay_min(self):
......
......@@ -21,7 +21,7 @@ cpdef float hypergeometric_distribution(int n, int k, int i, int j):
cpdef float residual_packet_erasure_rate(int k, int n, float ch_loss_rate):
cdef:
float total_packet_erasure = 0
int i, j
unsigned int i, j
for i in range(1, k + 1):
for j in range(max(n - k + 1, i), n - k + i + 1):
total_packet_erasure += i * hypergeometric_distribution(n, k, i, j) \
......@@ -52,14 +52,16 @@ cpdef int estimate_n_for_k(k, n_max, loss_rate_fwd, max_residual_loss_rate):
return n
@cdivision(True)
cpdef bint is_maximum_loss_fulfilled(k, n, loss_rate_fwd, max_residual_loss_rate):
cdef float total_packet_erasure = 0
cpdef bint is_maximum_loss_fulfilled(int k, int n, float loss_rate_fwd, float max_residual_loss_rate):
cdef:
float total_packet_erasure = 0
int i, j
for i in range(1, k):
for j in range(max(n - k + 1, i), n - k + i):
packet_erasure_at_i = i * hypergeometric_distribution(n, k, i, j) * \
get_error_prob(j, n, loss_rate_fwd)
total_packet_erasure += packet_erasure_at_i
residual_packet_erasure_rate = (1 / k) * total_packet_erasure # Pr(k, n)
cdef float residual_packet_erasure_rate = total_packet_erasure / k # Pr(k, n)
if (residual_packet_erasure_rate <= max_residual_loss_rate):
return True
return False
\ No newline at end of file
import os
import sys
import prrt
import time
import datetime
import hec_search
import numpy as np
import pandas as pd
......@@ -8,21 +10,21 @@ from multiprocessing import Pool
# --------------------------------------------------------------
# Generate dataset
# ds_all_input = set(os.listdir("../../hecps/code/ML/dataset1/"))
# print(len(ds_all_input))
# ds_calculated_output = set(os.listdir("documents/bigdata/"))
# ds_calculated_output = {s.replace("out", "in") for s in ds_calculated_output}
# print(len(ds_calculated_output))
# ds_basename = list(ds_all_input - ds_calculated_output)
# print(len(ds_basename))
# ds_rel_input_path = "../../hecps/code/ML/dataset1/"
# ds_rel_output_path = "documents/bigdata/"
ds_all_input = set(os.listdir("../../hecps/code/ML/dataset1/")) # TODO: Adjust dataset dir
print(len(ds_all_input))
ds_calculated_output = set(os.listdir("documents/bigdata/"))
ds_calculated_output = {s.replace("out", "in") for s in ds_calculated_output}
print(len(ds_calculated_output))
ds_basename = list(ds_all_input - ds_calculated_output)
print(len(ds_basename))
ds_rel_input_path = "../../hecps/code/ML/dataset1/" # TODO: Adjust dataset dir
ds_rel_output_path = "documents/bigdata/"
#----------------------------------------------------------------
ds_basename = os.listdir("documents/input/")
ds_basename = "LONG_in_12_param_4_sz_zzelqt"
ds_rel_input_path = "documents/input/"
ds_rel_output_path = "documents/output/"
# ds_basename = os.listdir("documents/input/")
# ds_basename = "in_12_param_4_sz_zzbodx"
# ds_rel_input_path = "documents/input/"
# ds_rel_output_path = "documents/output/"
columns_order = ["app_max_latency", "app_max_residual_loss_rate", "app_data_rate", "app_pkt_length",
......@@ -38,34 +40,25 @@ def evaluate(searchAlgorithm, appParams, channelParams, systemParams):
n_p_min = 1
n_p_max = np.round(get_n_p_max(channelParams.rtt_prop_fwd, appParams.pkt_length, channelParams.data_rate_btl_fwd), 0)
if n_p_min <= n_p_max:
# print("n_p_max: " + str(n_p_max))
start = time.time()
search = hec_search.HECSearch(searchAlgorithm, n_p_min, n_p_max, appParams, channelParams, systemParams)
search_result = search.search()
duration = time.time() - start
print("Duration: " + str(duration))
return [search_result, duration]
else:
return []
def test_case(dataset_basename):
counter = 0
save_result_to = pd.DataFrame()
# Load dataset in chunk
for df_in_chunk in pd.read_csv(ds_rel_input_path + dataset_basename, sep=',', chunksize=100):
print(dataset_basename + " started.")
for index, row in df_in_chunk.iterrows():
# if index == 1:
# print(str(index))
if index != -1:
appParams = prrt.PrrtApplicationParameters(row['app_max_latency'], row['app_max_residual_loss_rate'], row['app_data_rate'], row['app_pkt_length'])
chnlParams = prrt.PrrtChannelParameters(row['ch_loss_rate'], 0, row['ch_rtt_prop_fwd'], 0, row['ch_data_rate_btl_fwd'], 0)
sysParams = prrt.PrrtSystemParameters(0, np.round(row['sys_red_pkt_trans_dly'], 5), 0, 0, row['sys_src_pkt_interval'])
for searchAlgorithm in ["GreedySearch"]:
# print("index: " + str(index))
# print(str(row['app_max_latency']) + ":" + str(row['app_max_residual_loss_rate']) + ":" + str(row['app_data_rate'] )+ ":" + str(row['app_pkt_length']) + ":" +
# str(row['ch_loss_rate']) + ":" + str(row['ch_rtt_prop_fwd']) + str(row['ch_data_rate_btl_fwd']) + ":" + str(np.round(row['sys_red_pkt_trans_dly'], 5)) + ":" +
# str(np.round(row['sys_red_pkt_trans_dly'], 5)) + ":" + str(row['sys_src_pkt_interval']))
search_result = evaluate(searchAlgorithm, appParams, chnlParams, sysParams)
if len(search_result) != 0:
config = search_result[0]
......@@ -76,13 +69,9 @@ def test_case(dataset_basename):
'ch_loss_rate' : row['ch_loss_rate'],
'ch_rtt_prop_fwd' : row['ch_rtt_prop_fwd'],
'ch_data_rate_btl_fwd' : row['ch_data_rate_btl_fwd'],
# 'sys_block_coding_dly' : row['sys_block_coding_dly'],
'sys_red_pkt_trans_dly' : np.round(row['sys_red_pkt_trans_dly'], 5),
# 'sys_proc_dly' : row['sys_proc_dly'],
# 'sys_pkt_loss_detection_dly' : row['sys_pkt_loss_detection_dly'],
'sys_src_pkt_interval' : row['sys_src_pkt_interval'],
'time': np.around(search_result[1], 0),
# 'search': searchAlgorithm,
'config': [config[0], config[1], config[2]],
'fec_balance': config[3]}, ignore_index=True, sort=False)
else:
......@@ -93,10 +82,7 @@ def test_case(dataset_basename):
'ch_loss_rate' : row['ch_loss_rate'],
'ch_rtt_prop_fwd' : row['ch_rtt_prop_fwd'],
'ch_data_rate_btl_fwd' : row['ch_data_rate_btl_fwd'],
# 'sys_block_coding_dly' : row['sys_block_coding_dly'],
'sys_red_pkt_trans_dly' : np.round(row['sys_red_pkt_trans_dly'], 5),
# 'sys_proc_dly' : row['sys_proc_dly'],
# 'sys_pkt_loss_detection_dly' : row['sys_pkt_loss_detection_dly'],
'sys_src_pkt_interval' : row['sys_src_pkt_interval'],
# 'search': searchAlgorithm,
'time' : 0,
......@@ -104,14 +90,28 @@ def test_case(dataset_basename):
'fec_balance': config[3]}, ignore_index=True, sort=False)
counter += 1
#print("Chunk round: " + str(counter))
save_result_to.to_csv(ds_rel_output_path + dataset_basename.replace("in", "out", 1), sep=',', index = False, columns = columns_order)
print(dataset_basename + " finished.")
# test_case(ds_basename)
if __name__ == '__main__':
pool = Pool(processes=7)
pool.map(test_case, ds_basename, chunksize=1)
job_started = False
pool = Pool(processes=7) # TODO: Adjust number of cores
while True:
time.sleep(60)
current_DT = datetime.datetime.now()
start_job_at = datetime.datetime(current_DT.year, current_DT.month, current_DT.day, 16, 44, 30) # TODO: Adjust start time
end_job_at = datetime.datetime(current_DT.year, current_DT.month, current_DT.day, 16, 44, 40) # TODO: Adjust end time
if end_job_at > current_DT > start_job_at:
if not job_started:
job_started = True
someList = pool.imap_unordered(test_case, ds_basename, chunksize=1)
print("Job has started at: " + str(current_DT))
else:
if job_started:
print("Job has ended at: " + str(current_DT))
job_started = False
pool.terminate()
pool.close()
sys.exit("Time to go!")
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment