Coverage for encodermap/encodermap_tf1/misc.py: 11%

135 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-12-31 16:54 +0100

1# Standard Library Imports 

2import os 

3from math import pi 

4 

5# Third Party Imports 

6import numpy as np 

7import tensorflow.compat.v1 as tf 

8from tensorboard.backend.event_processing.event_accumulator import EventAccumulator 

9 

10 

11def add_layer_summaries(layer, debug=False): 

12 """ 

13 

14 :param layer: 

15 :return: 

16 """ 

17 weights = layer.variables[0] 

18 biases = layer.variables[1] 

19 variable_summaries(layer.name + "_weights", weights, debug) 

20 variable_summaries(layer.name + "_biases", biases, debug) 

21 

22 

23def periodic_distance(a, b, periodicity=2 * pi): 

24 """ 

25 

26 :param a: 

27 :param b: 

28 :param periodicity: 

29 :return: 

30 """ 

31 d = tf.abs(b - a) 

32 return tf.minimum(d, periodicity - d) 

33 

34 

35def periodic_distance_np(a, b, periodicity=2 * pi): 

36 """ 

37 

38 :param a: 

39 :param b: 

40 :param periodicity: 

41 :return: 

42 """ 

43 d = np.abs(b - a) 

44 return np.minimum(d, periodicity - d) 

45 

46 

47def variable_summaries(name, variables, debug=False): 

48 """ 

49 Attach several summaries to a Tensor for TensorBoard visualization. 

50 

51 :param name: 

52 :param variables: 

53 :return: 

54 """ 

55 if not isinstance(variables, list): 

56 variables = [variables] 

57 

58 for i, var in enumerate(variables): 

59 try: 

60 add_index = len(variables) > 1 

61 except TypeError: 

62 add_index = True 

63 if add_index: 

64 name = name + str(i) 

65 with tf.name_scope(name): 

66 mean = tf.reduce_mean(var) 

67 tf.summary.scalar("mean", mean) 

68 with tf.name_scope("stddev"): 

69 stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean))) 

70 tf.summary.scalar("stddev", stddev) 

71 tf.summary.scalar("max", tf.reduce_max(var)) 

72 tf.summary.scalar("min", tf.reduce_min(var)) 

73 tf.summary.histogram("histogram", var) 

74 tf.summary.tensor_summary("values", var) 

75 

76 

77def create_dir(path): 

78 """ 

79 

80 :param path: 

81 :return: 

82 """ 

83 if not os.path.isdir(path): 

84 os.makedirs(path) 

85 return path 

86 

87 

88def distance_cost(r_h, r_l, sig_h, a_h, b_h, sig_l, a_l, b_l, periodicity): 

89 """ 

90 

91 :param r_h: 

92 :param r_l: 

93 :param sig_h: 

94 :param a_h: 

95 :param b_h: 

96 :param sig_l: 

97 :param a_l: 

98 :param b_l: 

99 :param periodicity: 

100 :return: 

101 """ 

102 if periodicity == float("inf"): 

103 dist_h = pairwise_dist(r_h) 

104 else: 

105 dist_h = pairwise_dist_periodic(r_h, periodicity) 

106 dist_l = pairwise_dist(r_l) 

107 

108 sig_h = sigmoid(dist_h, sig_h, a_h, b_h) 

109 sig_l = sigmoid(dist_l, sig_l, a_l, b_l) 

110 

111 cost = tf.reduce_mean(tf.square(sig_h - sig_l)) 

112 return cost 

113 

114 

115def sigmoid(r, sig, a, b): 

116 """ 

117 

118 :param r: 

119 :param sig: 

120 :param a: 

121 :param b: 

122 :return: 

123 """ 

124 return 1 - (1 + (2 ** (a / b) - 1) * (r / sig) ** a) ** (-b / a) 

125 

126 

127def pairwise_dist_periodic(positions, periodicity): 

128 assert len(positions.shape) == 2 

129 with tf.name_scope("pairwise_dist_periodic"): 

130 vecs = periodic_distance( 

131 tf.expand_dims(positions, axis=1), 

132 tf.expand_dims(positions, axis=0), 

133 periodicity, 

134 ) 

135 mask = tf.to_float(tf.equal(vecs, 0.0)) 

136 vecs = vecs + mask * 1e-16 # gradient infinite for 0 

137 dists = tf.norm(vecs, axis=2) 

138 return dists 

139 

140 

141def pairwise_dist(positions, squared=False, flat=False): 

142 # thanks to https://omoindrot.github.io/triplet-loss 

143 

144 with tf.name_scope("pairwise_dist"): 

145 if not tf.is_numeric_tensor(positions): 

146 positions = tf.convert_to_tensor(positions) 

147 if len(positions.get_shape()) == 2: 

148 positions = tf.expand_dims(positions, 0) 

149 

150 # Get the dot product between all embeddings 

151 # shape (batch_size, batch_size) 

152 dot_product = tf.matmul(positions, tf.transpose(positions, [0, 2, 1])) 

153 

154 # Get squared L2 norm for each embedding. We can just take the diagonal of `dot_product`. 

155 # This also provides more numerical stability (the diagonal of the result will be exactly 0). 

156 # shape (batch_size,) 

157 square_norm = tf.linalg.diag_part(dot_product) 

158 

159 # Compute the pairwise distance matrix as we have: 

160 # ||a - b||^2 = ||a||^2 - 2 <a, b> + ||b||^2 

161 # shape (batch_size, batch_size) 

162 distances = ( 

163 tf.expand_dims(square_norm, 1) 

164 - 2.0 * dot_product 

165 + tf.expand_dims(square_norm, 2) 

166 ) 

167 

168 # Because of computation errors, some distances might be negative so we put everything >= 0.0 

169 distances = tf.maximum(distances, 0.0) 

170 

171 if flat: 

172 n = int(positions.shape[1]) 

173 mask = np.ones((n, n), dtype=bool) 

174 mask[np.tril_indices(n)] = False 

175 distances = tf.boolean_mask(distances, mask, axis=1) 

176 

177 if not squared: 

178 # Because the gradient of sqrt is infinite when distances == 0.0 (ex: on the diagonal) 

179 # we need to add a small epsilon where distances == 0.0 

180 mask = tf.to_float(tf.equal(distances, 0.0)) 

181 distances = distances + mask * 1e-16 

182 

183 distances = tf.sqrt(distances) 

184 

185 # Correct the epsilon added: set the distances on the mask to be exactly 0.0 

186 distances = distances * (1.0 - mask) 

187 

188 return distances 

189 

190 

191def search_and_replace( 

192 file_path, search_pattern, replacement, out_path=None, backup=True 

193): 

194 """ 

195 Searches for a pattern in a text file and replaces it with the replacement 

196 

197 :param file_path: (str) 

198 path to the file to search 

199 :param search_pattern: (str) 

200 pattern to search for 

201 :param replacement: (str) 

202 string that replaces the search_pattern in the output file 

203 :param out_path: (str) 

204 path where to write the output file. If no path is given the original file will be replaced. 

205 :param backup: (bool) 

206 if backup is true the original file is renamed to filename.bak before it is overwritten 

207 

208 :return: 

209 

210 """ 

211 with open(file_path, "r") as f: 

212 file_data = f.read() 

213 

214 file_data = file_data.replace(search_pattern, replacement) 

215 

216 if not out_path: 

217 out_path = file_path 

218 if backup: 

219 os.rename(file_path, file_path + ".bak") 

220 

221 with open(out_path, "w") as file: 

222 file.write(file_data) 

223 

224 

225def run_path(path): 

226 """ 

227 Creates a directory at "path/run{i}" where the i is corresponding to the smallest not yet existing path 

228 

229 :param path: (str) 

230 :return: (str) 

231 path of the created folder 

232 

233 """ 

234 i = 0 

235 while True: 

236 current_path = os.path.join(path, "run{}".format(i)) 

237 if not os.path.exists(current_path): 

238 os.makedirs(current_path) 

239 output_path = current_path 

240 break 

241 else: 

242 i += 1 

243 return output_path 

244 

245 

246def random_on_cube_edges(n_points, sigma=0): 

247 x = y = z = 1 

248 r = np.random.uniform(size=n_points) 

249 

250 coordinates = np.zeros((n_points, 3)) 

251 

252 ids = np.zeros(n_points) 

253 

254 a = np.array([[0, 0, 0]] * 3 + [[x, y, 0]] * 3 + [[0, y, z]] * 3 + [[x, 0, z]] * 3) 

255 

256 b = np.array( 

257 [ 

258 [x, 0, 0], 

259 [0, y, 0], 

260 [0, 0, z], 

261 [-x, 0, 0], 

262 [0, -y, 0], 

263 [0, 0, z], 

264 [x, 0, 0], 

265 [0, -y, 0], 

266 [0, 0, -z], 

267 [-x, 0, 0], 

268 [0, y, 0], 

269 [0, 0, -z], 

270 ] 

271 ) 

272 

273 for i in range(12): 

274 mask = (i / 12 < r) & (r < (i + 1) / 12) 

275 coordinates[mask] += ( 

276 a[i] + (np.expand_dims(r[mask], axis=1) - i / 12) * 12 * b[i] 

277 ) 

278 ids[mask] = i 

279 

280 if sigma: 

281 coordinates += np.random.normal(scale=sigma, size=(n_points, 3)) 

282 

283 return coordinates, ids 

284 

285 

286def rotation_matrix(axis_unit_vec, angle): 

287 angle = tf.expand_dims(tf.expand_dims(angle, axis=-1), axis=-1) 

288 i = tf.expand_dims(tf.eye(3), 0) 

289 zeros = tf.zeros(tf.shape(axis_unit_vec)[0]) 

290 cross_prod_matrix = tf.convert_to_tensor( 

291 [ 

292 [zeros, -axis_unit_vec[:, 2], axis_unit_vec[:, 1]], 

293 [axis_unit_vec[:, 2], zeros, -axis_unit_vec[:, 0]], 

294 [-axis_unit_vec[:, 1], axis_unit_vec[:, 0], zeros], 

295 ] 

296 ) 

297 cross_prod_matrix = tf.transpose(cross_prod_matrix, [2, 0, 1]) 

298 r = tf.cos(angle) * i 

299 r += tf.sin(angle) * cross_prod_matrix 

300 axis_unit_vec = tf.expand_dims(axis_unit_vec, 2) 

301 r += (1 - tf.cos(angle)) * tf.matmul( 

302 axis_unit_vec, tf.transpose(axis_unit_vec, [0, 2, 1]) 

303 ) 

304 return r 

305 

306 

307def potential_energy(angles, dihedrals, distances): 

308 energy = 0 

309 energies = tf.where( 

310 distances < 2, 

311 tf.minimum(10 - distances, (1 / distances) ** 12), 

312 tf.zeros(tf.shape(distances)), 

313 ) 

314 energy += tf.reduce_mean(tf.reduce_sum(energies, axis=1)) 

315 return energy 

316 

317 

318def read_from_log(run_path, names): 

319 ea = EventAccumulator(run_path) 

320 ea.Reload() 

321 results = [np.array(ea.Scalars(name)) for name in names] 

322 return results