functions.py

functions.py
from datetime import datetime, timedelta
from collections import OrderedDict
import numpy as np

GetFistListLength = lambda input_dict: len(input_dict[list(input_dict.keys())[0]])


#error function
AE = lambda t1, series: [ abs(t1 - t2) for t2 in series ]
#absolute error, clip values above 30*9 days (~ 9 month)
clipedAE = lambda t1, series: [ min(abs(t1 - t2), timedelta(days=30*6)) for t2 in series ]
#AE = lambda t1, series: [  timedelta(days=30*18)  if abs(t1 - t2) > timedelta(days=30*6) else timedelta(days=0) for t2 in series ]

def CheckTimeOrder(input_dict):
  for key, series in input_dict.items():
    print('CheckTimeOrder: key: ' + key)
    for i in range(1,len(series)):
      if not series[i-1] < series[i]:
        raise RuntimeError('CheckTimeOrder wrong time order')
      

def CopyDict(input_dict):
  out_dict = OrderedDict()

  for key in input_dict.keys():
    #copy list inside input_dict
    out_dict[key] = [ element for element in input_dict[key] ]

  return out_dict

def ToDateTime(input_dict):
  #deep copy
  out_dict = CopyDict(input_dict)

  for key, series in out_dict.items():
    # YYYY/MM/DD to delimited list
    delimited_series = [ date.split('/') for date in series ]
    #string to int
    delimited_int_series = [ [ int(val) for val in date ] for date in delimited_series ]

    out_dict[key] = []
    for date in delimited_int_series:
      try:
        out_dict[key].append(datetime(date[0], date[1], date[2]))
      except ValueError:
        print(date)
        raise

  return out_dict

def ToDateTimeDiff(input_dict):
  #deep copy
  out_dict = CopyDict(input_dict)

  for key, series in out_dict.items():
    if len(series)==0:
      raise RuntimeError("ToDateTimeDiff: len(series)==0, for key: " + key)
    first_time_stemp = series[0]
    out_dict[key] = [ (date - first_time_stemp) if not date == first_time_stemp else first_time_stemp for date in series ]

  return out_dict

def Padding(input_dict, nMax=-1):
  #deep copy
  out_dict = CopyDict(input_dict)

  MaxIter = 999

  if nMax < 0:
    nMax = max( [ len(series) for series in input_dict.values() ] )

  for key, series in out_dict.items():
    while len(series) < nMax:
      out_dict[key].append(series[-1]) #copy last element
      #out_dict[key].append(None) #copy last element

      if len(out_dict[key]) > MaxIter:
        raise RuntimeError("Padding: len(series) > MaxIter")

  return out_dict
      
  
def TruncateTimeSeries(input_dict, time_start, time_end):
  #deep copy
  out_dict = CopyDict(input_dict)

  for key, series in out_dict.items():
    out_dict[key] = [ time_start ]
    out_dict[key].extend([ date for date in series if date > time_start and date < time_end ])
    out_dict[key].append(time_end)
  
  return out_dict

def Transpose(input_dict):

  out_dict = OrderedDict()

  # all list in the input_dict should have same length
  list_length = GetFistListLength(input_dict)
  for i in range(list_length):
    out_dict[i] = [ input_dict[key][i] for key in input_dict.keys() ]

  return out_dict

def ClosestTimeDiff(input_dict, iter_time, errFunctor=AE):

  out_dict = {}
  #iteration over time
  for key, series in input_dict.items():
    out_dict[key] = []
    for given_date in iter_time:
      np_abs_time_diff = np.array(errFunctor(given_date, series))
      min_idx          = np.argmin(np_abs_time_diff)
      out_dict[key].append( np_abs_time_diff[min_idx].days )
      
  return out_dict


def EstimateError(input_dict):

  list_length = GetFistListLength(input_dict)
  key_length  = len(input_dict.keys())

  out_list_mean = []
  for i in range(list_length):
    sum_over_keys = []
    for key, series in input_dict.items():
      sum_over_keys.append(series[i])
    np_sum_over_keys = np.array(sum_over_keys)
    #out_list_mean.append(np.median(np_sum_over_keys)) # sample mean
    out_list_mean.append(np.mean(np_sum_over_keys)) # sample mean

  out_list_std = []
  out_list_lower_std = []
  out_list_upper_std = []
  for i in range(list_length):
    std_over_keys = []
    for key, series in input_dict.items():
      std_over_keys.append( series[i] - out_list_mean[i] )
    np_std_over_keys = np.array(std_over_keys)
    #out_list_std.append(np.std(np_std_over_keys, ddof=1) ) # sample standard diviation
    gaussian_quantile = np.quantile(np_std_over_keys,[0. ,0.1577, 0.5, 0.842, 1.0])
    lower_error = gaussian_quantile[1]
    upper_error = gaussian_quantile[3]

    out_list_lower_std.append( np.abs(lower_error) ) # 1sigma gauiisan quantile
    out_list_upper_std.append( np.abs(upper_error) ) # 1sigma gauiisan quantile
    out_list_std = [out_list_lower_std, out_list_upper_std]
   
  return out_list_mean, out_list_std
마지막 수정 일자