Commit 770e3e8b authored by Ashkan's avatar Ashkan

Latest updated.

parent 8cb1e572
This diff is collapsed.
......@@ -250,65 +250,70 @@ class PrrtCodingConfiguration:
# def __eq__()
# Calculate the redundancy information this coding configuration causes, given a certain channel.
def sum_error_prob_over_sent_packets(self, sent_packets_item, p_e):
def sum_error_prob_over_sent_packets(self, sent_packets_item, channel_loss_rate):
error_prob = 0
for i in range(sent_packets_item + 1, sent_packets_item + self.k + 1):
error_prob += prrt_utils.get_error_prob(i, sent_packets_item + self.k, p_e)
for i in range(sent_packets_item - self.k + 1, sent_packets_item + 1):
error_prob += prrt_utils.get_error_prob(i, sent_packets_item, channel_loss_rate)
return error_prob
def get_redundant_information(self):
p_e = self.prrtChannelParameters.loss_rate_fwd
sent_redundant_packets = np.cumsum(self.n_p) # Cumulative sum of all redundant packets up to each cycle
error_prob_up_to_every_cycle = list(
map(lambda x: self.sum_error_prob_over_sent_packets(x, p_e), sent_redundant_packets[1:]))
ri = 1 / self.k * self.n_p[0] + 1 / self.k * np.dot(error_prob_up_to_every_cycle, self.n_p[1:])
channel_loss_rate = self.prrtChannelParameters.loss_rate_fwd
sent_redundant_packets = np.cumsum(self.n_p) + self.k # Cumulative sum of all redundant packets up to each cycle
#print("sent_redundant_packets=" + str(sent_redundant_packets))
error_prob_up_to_every_cycle = list(map(lambda x: self.sum_error_prob_over_sent_packets(x, channel_loss_rate), sent_redundant_packets[:-1]))
#print(error_prob_up_to_every_cycle)
ri = (self.n_p[0] + np.dot(error_prob_up_to_every_cycle, self.n_p[1:])) / self.k
return ri
def get_ri_min(self, prrtChannelParameters):
p_e = prrtChannelParameters.loss_rate_fwd
return p_e / (1 - p_e)
channel_loss_rate = prrtChannelParameters.loss_rate_fwd
return channel_loss_rate / (1 - channel_loss_rate)
def get_ri_residual(self, prrtChannelParameters, prrtApplicationParameters):
p_e = prrtChannelParameters.loss_rate_fwd
channel_loss_rate = prrtChannelParameters.loss_rate_fwd
p_res = prrtApplicationParameters.max_residual_loss_rate
if (p_e >= p_res):
return (p_e - p_res) / (1 - p_e)
if (channel_loss_rate >= p_res):
return (channel_loss_rate - p_res) / (1 - channel_loss_rate)
else:
return 0
def is_valid_for(self, prrtApplicationParameters, prrtChannelParameters, prrtSystemParameters):
if (self.is_maximum_latency_fulfilled(prrtApplicationParameters, prrtChannelParameters,
prrtSystemParameters) and
self.is_maximum_loss_fulfilled(prrtApplicationParameters, prrtChannelParameters, prrtSystemParameters)):
return True
def is_valid_for(self):
return [self.is_maximum_latency_fulfilled(), self.is_maximum_loss_fulfilled()]
def is_maximum_latency_fulfilled(self, prrtApplicationParameters, prrtChannelParameters, prrtSystemParameters):
fec_delay = prrtSystemParameters.block_coding_delay + self.n_p[
0] * prrtSystemParameters.redundancy_packet_transmission_delay + \
(prrtChannelParameters.rtt_prop_fwd + prrtSystemParameters.processing_delay) / 2 + \
prrtSystemParameters.packet_loss_detection_delay
def is_maximum_latency_fulfilled(self):
fec_delay = self.k * max(self.prrtSystemParameters.redundancy_packet_transmission_delay, self.prrtSystemParameters.source_packet_interval) + \
self.n_p[0] * (self.prrtSystemParameters.block_coding_delay + self.prrtSystemParameters.redundancy_packet_transmission_delay) + \
self.prrtChannelParameters.rtt_prop_fwd
req_delay_list = list(map(lambda c: prrtChannelParameters.rtt_prop_fwd
+ np.dot(self.n_p[c], prrtSystemParameters.redundancy_packet_transmission_delay)
+ prrtSystemParameters.processing_delay))
if (fec_delay + np.dot(self.len(self.n_p), req_delay_list) <= prrtApplicationParameters.max_latency):
if fec_delay + len(self.n_p) * self.prrtChannelParameters.rtt_prop_fwd \
+ sum(map(lambda np : np * self.prrtSystemParameters.redundancy_packet_transmission_delay, self.n_p)) <= self.prrtApplicationParameters.max_latency:
return True
return False
def is_maximum_loss_fulfilled(self, prrtApplicationParameters, prrtChannelParameters):
def is_maximum_loss_fulfilled(self):
total_packet_erasure = 0
for i in range(1, self.k):
for j in range(max(self.n - self.k + 1, i), self.n - self.k + i):
packet_erasure_at_i = i * prrt_utils.hypergeometric_distribution(self.n, self.k, i, j) * prrt_utils.get_error_prob(
j,
self.n,
prrtChannelParameters.loss_rate_fwd)
self.prrtChannelParameters.loss_rate_fwd)
total_packet_erasure += packet_erasure_at_i
residual_packet_erasure_rate = (1 / self.k) * total_packet_erasure # Pr(k, n)
if (residual_packet_erasure_rate <= prrtApplicationParameters.max_residual_loss_rate):
if (residual_packet_erasure_rate <= self.prrtApplicationParameters.max_residual_loss_rate):
return True
return False
# For now this condition is ignored.
def is_app_data_rate_fulfilled(self):
data_rate_constraint = self.prrtChannelParameters.data_rate_btl_fwd / (1 + self.get_redundant_information())
#print("data_rate_constraint=" + str(data_rate_constraint) + "app_data_rate=" + str(self.prrtApplicationParameters.data_rate))
if self.prrtApplicationParameters.data_rate <= data_rate_constraint:
print("DATA_RATE_IS_FULFILLED")
return True
return False
cdef class PrrtSocket:
cdef cprrt.PrrtSocket* _c_socket
......
......@@ -38,21 +38,32 @@ def gen_repair_schedules(redundancy, positions, min, max, is_order_ascending):
return arbitrary_schedules
def gen_repair_schedule(redundancy, positions, min, max):
if min > max:
def gen_repair_schedule(redundancy, numberOfCycles, minPossibleRedundancy, maxPossibleRedundancy):
if minPossibleRedundancy > maxPossibleRedundancy:
# raise Exception("Illegal input combinations. Make sure the minPossibleRedundancy < maxPossibleRedundancy.")
return []
# raise Exception(
# "Illegal input combinations. Make sure the min < max.")
if positions == 0:
# Only proactive cycle.
if numberOfCycles == 0:
return [redundancy]
opt_schedule = [min for p in range(positions)]
c_redundancy_left_over = redundancy - min * positions
last_index = positions - 1
while c_redundancy_left_over > 0:
if opt_schedule[last_index] < max:
opt_schedule[last_index] += 1
c_redundancy_left_over -= 1
optRepairSchedule = [0]
positionsToFill = 1
# fill all reactive cycles with minimum possible amount of redundancies: minPossibleRedundancy
while redundancy > 0 and positionsToFill <= numberOfCycles:
optRepairSchedule += [minPossibleRedundancy]
redundancy -= minPossibleRedundancy
positionsToFill += 1
# insert the rest of redundancy packets to the very last rounds if possible.
while redundancy > 0 and numberOfCycles > 0:
if optRepairSchedule[numberOfCycles] < maxPossibleRedundancy:
optRepairSchedule[numberOfCycles] += 1
redundancy -= 1
else:
last_index -= 1
numberOfCycles -= 1
# if all the reactive cycles filled with maxPossibleRedundancy and still some redundancies left, insert them in the proactive cycle (0).
if redundancy > 0:
optRepairSchedule[0] += redundancy
return opt_schedule
\ No newline at end of file
return optRepairSchedule
\ No newline at end of file
......@@ -6,17 +6,22 @@ import numpy as np
import pandas as pd
from multiprocessing import Pool
# ds_basename = os.listdir("../../hecps/code/ML/data/")
# ds_basename = ["in_12_param_4_sz_zzatql"]
# ds_rel_input_path = "../../hecps/code/ML/data/"
# ds_rel_result_path = "documents/result/"
ds_basename = os.listdir("../../hecps/code/ML/data/")
ds_rel_input_path = "../../hecps/code/ML/data/"
ds_rel_output_path = "documents/bigdata/"
#['documents/1','documents/2','documents/3','documents/4','documents/5','documents/6']
ds_basename = os.listdir("documents/debug/")
# ds_basename = ["in_12_param_4_sz_zzatql"]
ds_rel_input_path = "documents/debug/"
ds_rel_result_path = "documents/debug/"
# ds_basename = os.listdir("documents/input/")
# ds_basename = ["LONG_in_12_param_4_sz_zzelqt"]
# ds_rel_input_path = "documents/input/"
# ds_rel_output_path = "documents/output/"
columns_order = ["app_max_latency", "app_max_residual_loss_rate", "app_data_rate", "app_pkt_length",
"ch_loss_rate", "ch_rtt_prop_fwd", "ch_data_rate_btl_fwd",
"sys_red_pkt_trans_dly", "sys_src_pkt_interval",
"time", "config", "fec_balance"]
def get_n_p_max(rtt_prop_fwd, pkt_length, data_rate_btl_fwd):
return rtt_prop_fwd * data_rate_btl_fwd / pkt_length
......@@ -26,10 +31,12 @@ def evaluate(searchAlgorithm, appParams, channelParams, systemParams):
n_p_min = 1
n_p_max = np.round(get_n_p_max(channelParams.rtt_prop_fwd, appParams.pkt_length, channelParams.data_rate_btl_fwd), 0)
if n_p_min <= n_p_max:
# print("n_p_max: " + str(n_p_max))
start = time.time()
search = hec_search.HECSearch(searchAlgorithm, n_p_min, n_p_max, appParams, channelParams, systemParams)
search_result = search.search()
duration = time.time() - start
# print("Duration: " + str(duration))
return [search_result, duration]
else:
return []
......@@ -39,13 +46,18 @@ def test_case(dataset_basename):
save_result_to = pd.DataFrame()
# Load dataset in chunk
for df_in_chunk in pd.read_csv(ds_rel_input_path + dataset_basename, sep=',', chunksize=100):
print(dataset_basename + " started.")
for index, row in df_in_chunk.iterrows():
appParams = prrt.PrrtApplicationParameters(row['app_max_latency'], row['app_max_residual_loss_rate'], row['app_data_rate'], row['app_pkt_length'])
chnlParams = prrt.PrrtChannelParameters(row['ch_loss_rate'], 0, row['ch_rtt_prop_fwd'], 0, row['ch_data_rate_btl_fwd'], 0)
sysParams = prrt.PrrtSystemParameters(row['sys_block_coding_dly'], np.round(row['sys_red_pkt_trans_dly'], 5), row['sys_proc_dly'], row['sys_pkt_loss_detection_dly'], row['sys_src_pkt_interval'])
if index == 98:
if index != -1:
appParams = prrt.PrrtApplicationParameters(row['app_max_latency'], row['app_max_residual_loss_rate'], row['app_data_rate'], row['app_pkt_length'])
chnlParams = prrt.PrrtChannelParameters(row['ch_loss_rate'], 0, row['ch_rtt_prop_fwd'], 0, row['ch_data_rate_btl_fwd'], 0)
sysParams = prrt.PrrtSystemParameters(0, np.round(row['sys_red_pkt_trans_dly'], 5), 0, 0, row['sys_src_pkt_interval'])
for searchAlgorithm in ["GreedySearch"]:
print(str(index))
# print("index: " + str(index))
# print(str(row['app_max_latency']) + ":" + str(row['app_max_residual_loss_rate']) + ":" + str(row['app_data_rate'] )+ ":" + str(row['app_pkt_length']) + ":" +
# str(row['ch_loss_rate']) + ":" + str(row['ch_rtt_prop_fwd']) + str(row['ch_data_rate_btl_fwd']) + ":" + str(np.round(row['sys_red_pkt_trans_dly'], 5)) + ":" +
# str(np.round(row['sys_red_pkt_trans_dly'], 5)) + ":" + str(row['sys_src_pkt_interval']))
search_result = evaluate(searchAlgorithm, appParams, chnlParams, sysParams)
if len(search_result) != 0:
config = search_result[0]
......@@ -56,14 +68,15 @@ def test_case(dataset_basename):
'ch_loss_rate' : row['ch_loss_rate'],
'ch_rtt_prop_fwd' : row['ch_rtt_prop_fwd'],
'ch_data_rate_btl_fwd' : row['ch_data_rate_btl_fwd'],
'sys_block_coding_dly' : row['sys_block_coding_dly'],
'sys_red_pkt_trans_dly' : row['sys_red_pkt_trans_dly'],
'sys_proc_dly' : row['sys_proc_dly'],
'sys_pkt_loss_detection_dly' : row['sys_pkt_loss_detection_dly'],
# 'sys_block_coding_dly' : row['sys_block_coding_dly'],
'sys_red_pkt_trans_dly' : np.round(row['sys_red_pkt_trans_dly'], 5),
# 'sys_proc_dly' : row['sys_proc_dly'],
# 'sys_pkt_loss_detection_dly' : row['sys_pkt_loss_detection_dly'],
'sys_src_pkt_interval' : row['sys_src_pkt_interval'],
'search': searchAlgorithm,
'config': [config.k, len(config.n_p), config.n_p],
'duration' : np.around(search_result[1],0)}, ignore_index=True, sort=False)
'time': np.around(search_result[1], 0),
# 'search': searchAlgorithm,
'config': [config[0], config[1], config[2]],
'fec_balance': config[3]}, ignore_index=True, sort=False)
else:
save_result_to = save_result_to.append({'app_max_latency': row['app_max_latency'],
'app_max_residual_loss_rate': row['app_max_residual_loss_rate'],
......@@ -72,23 +85,25 @@ def test_case(dataset_basename):
'ch_loss_rate' : row['ch_loss_rate'],
'ch_rtt_prop_fwd' : row['ch_rtt_prop_fwd'],
'ch_data_rate_btl_fwd' : row['ch_data_rate_btl_fwd'],
'sys_block_coding_dly' : row['sys_block_coding_dly'],
'sys_red_pkt_trans_dly' : row['sys_red_pkt_trans_dly'],
'sys_proc_dly' : row['sys_proc_dly'],
'sys_pkt_loss_detection_dly' : row['sys_pkt_loss_detection_dly'],
# 'sys_block_coding_dly' : row['sys_block_coding_dly'],
'sys_red_pkt_trans_dly' : np.round(row['sys_red_pkt_trans_dly'], 5),
# 'sys_proc_dly' : row['sys_proc_dly'],
# 'sys_pkt_loss_detection_dly' : row['sys_pkt_loss_detection_dly'],
'sys_src_pkt_interval' : row['sys_src_pkt_interval'],
'search': searchAlgorithm,
# 'search': searchAlgorithm,
'time' : 0
'config': ["INV_PRM_NPM"],
'duration' : 0}, ignore_index=True, sort=False)
'fec_balance': config[3]}, ignore_index=True, sort=False)
counter += 1
#print("Chunk round: " + str(counter))
save_result_to.to_csv(ds_rel_result_path + dataset_basename.replace("in", "out", 1), sep=',', index = False)
save_result_to.to_csv(ds_rel_output_path + dataset_basename.replace("in", "out", 1), sep=',', index = False, columns = columns_order)
print(dataset_basename + " finished.")
test_case(ds_basename[0])
# if __name__ == '__main__':
# pool = Pool(processes=8)
# pool.map(test_case, ds_basename, chunksize=1)
# test_case(ds_basename)
if __name__ == '__main__':
pool = Pool(processes=7)
pool.map(test_case, ds_basename, chunksize=1)
......@@ -5,12 +5,14 @@ import os, errno
import versioneer
ext = [Extension(name='prrt', language="c", sources=["prrt/prrt.pyx"]),
Extension(name='restricted_integer_composition', language="c", sources=["prrt/restricted_integer_composition.pyx"])
Extension(name='restricted_integer_composition', language="c", sources=["prrt/restricted_integer_composition.pyx"]),
Extension(name='prrt_utils', language="c", sources=["prrt/prrt_utils.pyx"])
]
try:
os.remove(os.path.join(os.path.dirname(os.path.realpath(__file__)), "prrt/prrt.c"))
os.remove(os.path.join(os.path.dirname(os.path.realpath(__file__)), "prrt/restricted_integer_composition.c"))
os.remove(os.path.join(os.path.dirname(os.path.realpath(__file__)), "prrt/prrt_utils.c"))
except OSError as e:
if e.errno != errno.ENOENT:
raise
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment