tests.test_events
1import os 2import time 3 4import pytest 5 6import cv2 7import numpy as np 8import pandas as pd 9 10from csi_images.csi_scans import Scan 11from csi_images.csi_tiles import Tile 12from csi_images.csi_events import Event, EventArray 13from csi_images import csi_images 14 15if os.environ.get("DEBIAN_FRONTEND") == "noninteractive": 16 SHOW_PLOTS = False 17else: 18 # Change this to your preference for local testing, but commit as True 19 SHOW_PLOTS = True 20 21 22@pytest.fixture 23def bzscan(): 24 return Scan.load_txt("tests/data") 25 26 27@pytest.fixture 28def axscan(): 29 return Scan.load_yaml("tests/data") 30 31 32@pytest.fixture 33def circle(): 34 circle = np.zeros((100, 100, 3), dtype=np.uint8) 35 circle = cv2.circle(circle, (50, 50), 20, (1, 0, 0), -1) 36 circle = circle[:, :, 0] 37 circle = circle.astype(np.uint8) 38 return circle 39 40 41def test_get_crops(bzscan): 42 tile = Tile(bzscan, 1000) 43 event = Event(tile, 1086, 342) 44 45 # Test a regular event 46 images = event.get_crops() 47 assert len(images) == 5 48 images = event.get_crops(crop_size=50) 49 assert images[0].shape == (50, 50) 50 images = event.get_crops(crop_size=100) 51 assert images[0].shape == (100, 100) 52 53 if SHOW_PLOTS: 54 for image in images: 55 cv2.imshow("Bright DAPI event in the center", image) 56 cv2.waitKey(0) 57 cv2.destroyAllWindows() 58 59 # Test a corner event 60 event = Event(tile, 1350, 2) 61 images = event.get_crops() 62 assert len(images) == 5 63 images = event.get_crops(crop_size=200) 64 assert images[0].shape == (200, 200) 65 images = event.get_crops(crop_size=100) 66 assert images[0].shape == (100, 100) 67 68 if SHOW_PLOTS: 69 for image in images: 70 cv2.imshow("Events in the top-right corner of a tile", image) 71 cv2.waitKey(0) 72 cv2.destroyAllWindows() 73 74 # Test many events 75 tile2 = Tile(bzscan, 500) 76 events = [ 77 Event(tile, 515, 411), 78 Event(tile2, 2, 1000), 79 Event(tile, 1000, 1000), 80 Event(tile, 87, 126), 81 Event(tile, 1000, 2), 82 Event(tile2, 800, 800), 83 Event(tile, 1000, 662), 84 ] 85 86 # Test time to extract images sequentially 87 start_time = time.time() 88 images_1 = [] 89 for event in events: 90 images_1.append(event.get_crops()) 91 sequential_time = time.time() - start_time 92 93 # Test time to extract images in parallel 94 start_time = time.time() 95 images_2 = Event.get_many_crops(events, crop_size=100) 96 parallel_time = time.time() - start_time 97 assert parallel_time < sequential_time 98 for list_a, list_b in zip(images_1, images_2): 99 assert len(list_a) == len(list_b) 100 for image_a, image_b in zip(list_a, list_b): 101 assert np.array_equal(image_a, image_b) 102 103 # Test that it works after converting to EventArray and back 104 event_array = EventArray.from_events(events) 105 remade_events = event_array.to_events( 106 [bzscan], ignore_metadata=True, ignore_features=True 107 ) 108 images_3 = Event.get_many_crops(remade_events, crop_size=100) 109 for list_a, list_b in zip(images_1, images_3): 110 assert len(list_a) == len(list_b) 111 for image_a, image_b in zip(list_a, list_b): 112 assert np.array_equal(image_a, image_b) 113 114 115def test_event_coordinates_for_bzscanner(bzscan): 116 # Origin 117 tile = Tile(bzscan, (0, 0)) 118 event = Event(tile, 0, 0) 119 scan_origin = event.get_scan_position() 120 assert 2500 <= scan_origin[0] <= 3500 121 assert 2500 <= scan_origin[1] <= 3500 122 scan_origin_on_slide = event.get_slide_position() 123 assert 71500 <= scan_origin_on_slide[0] <= 72500 124 assert 21500 <= scan_origin_on_slide[1] <= 22500 125 # Within the same tile, "top-right corner"; same x, reduces y (flipped) 126 event = Event(tile, 1000, 0) 127 scan_position = event.get_scan_position() 128 assert scan_origin[0] < scan_position[0] 129 assert scan_origin[1] == scan_position[1] 130 slide_position = event.get_slide_position() 131 assert scan_origin_on_slide[0] == slide_position[0] 132 assert scan_origin_on_slide[1] > slide_position[1] 133 # Within the same tile, "bottom-left corner"; reduces x, same y 134 event = Event(tile, 0, 1000) 135 scan_position = event.get_scan_position() 136 assert scan_origin[0] == scan_position[0] 137 assert scan_origin[1] < scan_position[1] 138 slide_position = event.get_slide_position() 139 assert scan_origin_on_slide[0] > slide_position[0] 140 assert scan_origin_on_slide[1] == slide_position[1] 141 142 # Next tile, same row 143 tile = Tile(bzscan, (1, 0)) 144 event = Event(tile, 0, 0) 145 scan_position = event.get_scan_position() 146 assert scan_origin[0] < scan_position[0] 147 assert scan_origin[1] == scan_position[1] 148 slide_position = event.get_slide_position() 149 assert slide_position[0] == scan_origin_on_slide[0] 150 assert slide_position[1] < scan_origin_on_slide[1] 151 152 # Next row, same column 153 tile = Tile(bzscan, (0, 1)) 154 event = Event(tile, 0, 0) 155 scan_position = event.get_scan_position() 156 assert scan_origin[0] == scan_position[0] 157 assert scan_origin[1] < scan_position[1] 158 slide_position = event.get_slide_position() 159 assert slide_position[0] < scan_origin_on_slide[0] 160 assert slide_position[1] == scan_origin_on_slide[1] 161 162 # Opposite corner 163 tile = Tile(bzscan, (bzscan.roi[0].tile_cols - 1, bzscan.roi[0].tile_rows - 1)) 164 event = Event(tile, 1361, 1003) 165 scan_position = event.get_scan_position() 166 assert 21500 <= scan_position[0] <= 22500 167 assert 58500 <= scan_position[1] <= 60500 168 slide_position = event.get_slide_position() 169 assert 14500 <= slide_position[0] <= 15500 170 assert 2500 <= slide_position[1] <= 3500 171 172 173def test_event_coordinates_for_axioscan(axscan): 174 # Origin 175 tile = Tile(axscan, 0) 176 event = Event(tile, 0, 0) 177 scan_position = event.get_scan_position() 178 assert -59000 <= scan_position[0] < -55000 179 assert 0 <= scan_position[1] < 4000 180 slide_position = event.get_slide_position() 181 assert 16000 <= slide_position[0] < 20000 182 assert scan_position[1] == slide_position[1] 183 184 # Opposite corner 185 tile = Tile(axscan, (axscan.roi[0].tile_cols - 1, axscan.roi[0].tile_rows - 1)) 186 event = Event(tile, 2000, 2000) 187 scan_position = event.get_scan_position() 188 assert -4000 <= scan_position[0] <= 0 189 assert 21000 <= scan_position[1] <= 25000 190 slide_position = event.get_slide_position() 191 assert 71000 <= slide_position[0] <= 75000 192 assert scan_position[1] == slide_position[1] 193 194 195def test_eventarray_conversions(axscan): 196 # Origin 197 tile = Tile(axscan, 0) 198 event0 = Event(tile, 0, 0) 199 event1 = Event(tile, 1000, 1000) 200 event2 = Event(tile, 2000, 2000) 201 202 event_array = EventArray.from_events([event0, event1, event2]) 203 204 assert len(event_array) == 3 205 assert event_array.metadata is None 206 assert event_array.features is None 207 208 event0.metadata = pd.Series({"event0": 0}) 209 210 try: 211 event_array = EventArray.from_events([event0, event1, event2]) 212 # Should throw error 213 assert False 214 except ValueError: 215 pass 216 217 event1.metadata = pd.Series({"event0": 1}) 218 event2.metadata = pd.Series({"event0": 2}) 219 220 event_array = EventArray.from_events([event0, event1, event2]) 221 222 assert len(event_array) == 3 223 224 events_df = event_array.to_dataframe() 225 226 assert len(events_df) == 3 227 228 assert event_array == EventArray.from_dataframe(events_df) 229 230 # Test adding different dtypes and converting back and forth 231 event_array.features = pd.DataFrame( 232 {"feature1": [1, 2, 3], "feature2": [4.0, 5.0, 6.0]} 233 ) 234 remade_event_list = event_array.to_events([axscan]) 235 assert len(remade_event_list) == 3 236 remade_event_array = EventArray.from_events(remade_event_list) 237 assert event_array == remade_event_array 238 # Test saving and loading 239 assert event_array.save_csv("tests/data/events.csv") 240 assert event_array == EventArray.load_csv("tests/data/events.csv") 241 os.remove("tests/data/events.csv") 242 243 assert event_array.save_hdf5("tests/data/events.h5") 244 assert event_array == EventArray.load_hdf5("tests/data/events.h5") 245 os.remove("tests/data/events.h5") 246 247 248# @pytest.mark.skip(reason="No longer required.") 249def test_ocular_conversions(): 250 input_path = "/mnt/HDSCA_Development/DZ/0B68818/ocular" 251 result = EventArray.load_ocular(input_path) 252 # For the purposes of this test, we will manually relabel "clust" == nan to 0 253 # These come from ocular_interesting.rds, which does not have clusters 254 result.metadata["clust"] = result.metadata["clust"].fillna(0) 255 result.metadata["hcpc"] = result.metadata["hcpc"].fillna(0) 256 result.save_ocular("tests/data") 257 new_result = EventArray.load_ocular("tests/data") 258 # # Sort them so that they are in the same order 259 result = result.sort(["tile", "x", "y"]) 260 new_result = new_result.sort(["tile", "x", "y"]) 261 # Note: hcpc method within ocularr and here are different 262 result.metadata["hcpc"] = new_result.metadata["hcpc"].copy() 263 assert result == new_result 264 # Clean up 265 os.remove("tests/data/ocular_interesting.csv") 266 os.remove("tests/data/ocular_interesting.rds") 267 os.remove("tests/data/rc-final.csv") 268 os.remove("tests/data/rc-final.rds") 269 os.remove("tests/data/rc-final1.rds") 270 os.remove("tests/data/rc-final2.rds") 271 os.remove("tests/data/rc-final3.rds") 272 os.remove("tests/data/rc-final4.rds") 273 274 # Try it with "others" files 275 result = EventArray.load_ocular(input_path, event_type="others") 276 result.save_ocular("tests/data", event_type="others") 277 new_result = EventArray.load_ocular("tests/data", event_type="others") 278 result = result.sort(["tile", "x", "y"]) 279 new_result = new_result.sort(["tile", "x", "y"]) 280 # Note: hcpc method within ocularr and here are different 281 result.metadata["hcpc"] = new_result.metadata["hcpc"].copy() 282 assert result == new_result 283 # Clean up 284 os.remove("tests/data/others-final.csv") 285 os.remove("tests/data/others-final.rds") 286 os.remove("tests/data/others-final1.rds") 287 os.remove("tests/data/others-final2.rds") 288 os.remove("tests/data/others-final3.rds") 289 os.remove("tests/data/others-final4.rds") 290 291 292def test_copy_sort_rows_get(axscan): 293 # Origin 294 tile = Tile(axscan, 0) 295 events = [ 296 Event(tile, 0, 100), 297 Event(tile, 0, 0), 298 Event(tile, 1000, 1000), 299 Event(tile, 1000, 1), 300 Event(tile, 2000, 2000), 301 ] 302 303 events = EventArray.from_events(events) 304 305 # Copy 306 events_copy = events.copy() 307 events_copy.info["x"] = np.uint16(1) 308 # Check that changes to the copy did not change the original 309 assert events_copy.info["x"].equals(pd.Series([1, 1, 1, 1, 1], dtype=np.uint16)) 310 assert events.info["x"].equals(pd.Series([0, 0, 1000, 1000, 2000], dtype=np.uint16)) 311 312 # Sort 313 events = events.sort(["x", "y"], ascending=[False, True]) 314 assert events.info["x"].equals(pd.Series([2000, 1000, 1000, 0, 0], dtype=np.uint16)) 315 assert events.info["y"].equals(pd.Series([2000, 1, 1000, 0, 100], dtype=np.uint16)) 316 317 # Get 318 events_get = events.get(["x", "y"]) 319 assert events_get["x"].equals(pd.Series([2000, 1000, 1000, 0, 0], dtype=np.uint16)) 320 assert events_get["y"].equals(pd.Series([2000, 1, 1000, 0, 100], dtype=np.uint16)) 321 assert events_get.columns.equals(pd.Index(["x", "y"])) 322 323 # Rows 324 events_get = events.rows([0, 1, 3]) 325 assert len(events_get) == 3 326 assert events_get.info["x"].equals(pd.Series([2000, 1000, 0], dtype=np.uint16)) 327 assert events_get.info["y"].equals(pd.Series([2000, 1, 0], dtype=np.uint16)) 328 events_get = events.rows([True, False, False, True, True]) 329 assert len(events_get) == 3 330 assert events_get.info["x"].equals(pd.Series([2000, 0, 0], dtype=np.uint16)) 331 assert events_get.info["y"].equals(pd.Series([2000, 0, 100], dtype=np.uint16)) 332 333 334def test_adding_metadata_features(axscan): 335 # Origin 336 tile = Tile(axscan, 0) 337 events = [ 338 Event(tile, 0, 100), 339 Event(tile, 0, 0), 340 Event(tile, 1000, 1000), 341 Event(tile, 1000, 1), 342 Event(tile, 2000, 2000), 343 ] 344 345 events = EventArray.from_events(events) 346 347 # Add metadata 348 events.add_metadata(pd.DataFrame({"test": [1, 2, 3, 4, 5]})) 349 assert events.get("test").equals(pd.DataFrame({"test": [1, 2, 3, 4, 5]})) 350 # Add again 351 events.add_metadata(pd.DataFrame({"test": [5, 4, 3, 2, 1]})) 352 assert events.get("test").equals(pd.DataFrame({"test": [5, 4, 3, 2, 1]})) 353 354 # Add features 355 events.add_features(pd.DataFrame({"test2": [1, 2, 3, 4, 5]})) 356 assert events.get("test2").equals(pd.DataFrame({"test2": [1, 2, 3, 4, 5]})) 357 # Add again 358 events.add_features(pd.DataFrame({"test2": [5, 4, 3, 2, 1]})) 359 assert events.get("test2").equals(pd.DataFrame({"test2": [5, 4, 3, 2, 1]})) 360 361 362def test_event_montages(bzscan, circle): 363 tile = Tile(bzscan, 1000) 364 event = Event(tile, 1086, 342) 365 images = event.get_crops(crop_size=100) 366 367 montage = csi_images.make_montage( 368 images, 369 [0, 1, 4, 2, 3], 370 {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)}, 371 labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"], 372 ) 373 if SHOW_PLOTS: 374 cv2.imshow( 375 "Full, classic montage with labels", 376 cv2.cvtColor(montage, cv2.COLOR_RGB2BGR), 377 ) 378 cv2.waitKey(0) 379 cv2.destroyAllWindows() 380 381 montage = csi_images.make_montage( 382 images, 383 [0, 1, 4, 2, 3], 384 {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)}, 385 labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"], 386 mask=circle, 387 ) 388 if SHOW_PLOTS: 389 cv2.imshow( 390 "Full, classic montage with labels and mask overlay", 391 cv2.cvtColor(montage, cv2.COLOR_RGB2BGR), 392 ) 393 cv2.waitKey(0) 394 cv2.destroyAllWindows() 395 396 montage = csi_images.make_montage( 397 images, 398 [0, 1, 4, 2, 3], 399 {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)}, 400 labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"], 401 mask=circle, 402 mask_mode="hard", 403 ) 404 if SHOW_PLOTS: 405 cv2.imshow( 406 "Full, classic montage with labels and hard-masking", 407 cv2.cvtColor(montage, cv2.COLOR_RGB2BGR), 408 ) 409 cv2.waitKey(0) 410 cv2.destroyAllWindows() 411 412 413def test_saving_crops_and_montages(bzscan): 414 tile = Tile(bzscan, 1000) 415 tile2 = Tile(bzscan, 0) 416 events = [ 417 Event(tile, 1086, 342), 418 Event(tile2, 2, 1000), 419 Event(tile, 1000, 1000), 420 Event(tile2, 800, 800), 421 Event(tile, 1000, 662), 422 ] 423 424 # Get all crops and montages 425 serial_crops = [] 426 serial_montages = [] 427 for event in events: 428 serial_crops.append(event.get_crops()) 429 serial_montages.append(event.get_montage()) 430 431 # Save crops and montages 432 Event.get_and_save_many_crops(events, "temp", bzscan.get_channel_names()) 433 Event.get_and_save_many_montages(events, "temp") 434 435 saved_crops = [] 436 saved_montages = [] 437 for event in events: 438 crops = event.load_crops("temp") 439 saved_crops.append([crops[c] for c in bzscan.get_channel_names()]) 440 saved_montages.append(event.load_montage("temp")) 441 442 # Make sure crops are identical 443 for a, b in zip(serial_crops, saved_crops): 444 for a_img, b_img in zip(a, b): 445 assert np.array_equal(a_img, b_img) 446 447 # Montages got JPEG compressed, so 448 # Size comparison: 449 for a, b in zip(serial_montages, saved_montages): 450 assert a.shape == b.shape 451 452 # Visual inspection 453 if SHOW_PLOTS: 454 cv2.imshow("Original", cv2.cvtColor(serial_montages[0], cv2.COLOR_RGB2BGR)) 455 cv2.imshow("Saved", cv2.cvtColor(saved_montages[0], cv2.COLOR_RGB2BGR)) 456 cv2.waitKey(0) 457 cv2.destroyAllWindows() 458 459 # Clean up 460 for file in os.listdir("temp"): 461 os.remove(os.path.join("temp", file)) 462 os.rmdir("temp") 463 464 465def test_saving_and_loading(axscan): 466 # Origin 467 tile = Tile(axscan, 0) 468 events = [ 469 Event(tile, 0, 100), 470 Event(tile, 0, 0), 471 Event(tile, 1000, 1000), 472 Event(tile, 1000, 1), 473 Event(tile, 2000, 2000), 474 ] 475 476 events = EventArray.from_events(events) 477 478 # Add content 479 events.add_metadata(pd.DataFrame({"test": [1, 2, 3, 4, 5]})) 480 481 # Add features 482 events.add_features(pd.DataFrame({"test2": [1, 2, 3, 4, 5]})) 483 484 # Save and load 485 assert events.save_csv("tests/data/events.csv") 486 assert events == EventArray.load_csv("tests/data/events.csv") 487 488 assert events.save_json("tests/data/events.json") 489 assert events == EventArray.load_json("tests/data/events.json") 490 491 assert events.save_hdf5("tests/data/events.h5") 492 assert events == EventArray.load_hdf5("tests/data/events.h5") 493 494 # Clean up 495 os.remove("tests/data/events.csv") 496 os.remove("tests/data/events.json") 497 os.remove("tests/data/events.h5")
@pytest.fixture
def
bzscan():
@pytest.fixture
def
axscan():
@pytest.fixture
def
circle():
def
test_get_crops(bzscan):
42def test_get_crops(bzscan): 43 tile = Tile(bzscan, 1000) 44 event = Event(tile, 1086, 342) 45 46 # Test a regular event 47 images = event.get_crops() 48 assert len(images) == 5 49 images = event.get_crops(crop_size=50) 50 assert images[0].shape == (50, 50) 51 images = event.get_crops(crop_size=100) 52 assert images[0].shape == (100, 100) 53 54 if SHOW_PLOTS: 55 for image in images: 56 cv2.imshow("Bright DAPI event in the center", image) 57 cv2.waitKey(0) 58 cv2.destroyAllWindows() 59 60 # Test a corner event 61 event = Event(tile, 1350, 2) 62 images = event.get_crops() 63 assert len(images) == 5 64 images = event.get_crops(crop_size=200) 65 assert images[0].shape == (200, 200) 66 images = event.get_crops(crop_size=100) 67 assert images[0].shape == (100, 100) 68 69 if SHOW_PLOTS: 70 for image in images: 71 cv2.imshow("Events in the top-right corner of a tile", image) 72 cv2.waitKey(0) 73 cv2.destroyAllWindows() 74 75 # Test many events 76 tile2 = Tile(bzscan, 500) 77 events = [ 78 Event(tile, 515, 411), 79 Event(tile2, 2, 1000), 80 Event(tile, 1000, 1000), 81 Event(tile, 87, 126), 82 Event(tile, 1000, 2), 83 Event(tile2, 800, 800), 84 Event(tile, 1000, 662), 85 ] 86 87 # Test time to extract images sequentially 88 start_time = time.time() 89 images_1 = [] 90 for event in events: 91 images_1.append(event.get_crops()) 92 sequential_time = time.time() - start_time 93 94 # Test time to extract images in parallel 95 start_time = time.time() 96 images_2 = Event.get_many_crops(events, crop_size=100) 97 parallel_time = time.time() - start_time 98 assert parallel_time < sequential_time 99 for list_a, list_b in zip(images_1, images_2): 100 assert len(list_a) == len(list_b) 101 for image_a, image_b in zip(list_a, list_b): 102 assert np.array_equal(image_a, image_b) 103 104 # Test that it works after converting to EventArray and back 105 event_array = EventArray.from_events(events) 106 remade_events = event_array.to_events( 107 [bzscan], ignore_metadata=True, ignore_features=True 108 ) 109 images_3 = Event.get_many_crops(remade_events, crop_size=100) 110 for list_a, list_b in zip(images_1, images_3): 111 assert len(list_a) == len(list_b) 112 for image_a, image_b in zip(list_a, list_b): 113 assert np.array_equal(image_a, image_b)
def
test_event_coordinates_for_bzscanner(bzscan):
116def test_event_coordinates_for_bzscanner(bzscan): 117 # Origin 118 tile = Tile(bzscan, (0, 0)) 119 event = Event(tile, 0, 0) 120 scan_origin = event.get_scan_position() 121 assert 2500 <= scan_origin[0] <= 3500 122 assert 2500 <= scan_origin[1] <= 3500 123 scan_origin_on_slide = event.get_slide_position() 124 assert 71500 <= scan_origin_on_slide[0] <= 72500 125 assert 21500 <= scan_origin_on_slide[1] <= 22500 126 # Within the same tile, "top-right corner"; same x, reduces y (flipped) 127 event = Event(tile, 1000, 0) 128 scan_position = event.get_scan_position() 129 assert scan_origin[0] < scan_position[0] 130 assert scan_origin[1] == scan_position[1] 131 slide_position = event.get_slide_position() 132 assert scan_origin_on_slide[0] == slide_position[0] 133 assert scan_origin_on_slide[1] > slide_position[1] 134 # Within the same tile, "bottom-left corner"; reduces x, same y 135 event = Event(tile, 0, 1000) 136 scan_position = event.get_scan_position() 137 assert scan_origin[0] == scan_position[0] 138 assert scan_origin[1] < scan_position[1] 139 slide_position = event.get_slide_position() 140 assert scan_origin_on_slide[0] > slide_position[0] 141 assert scan_origin_on_slide[1] == slide_position[1] 142 143 # Next tile, same row 144 tile = Tile(bzscan, (1, 0)) 145 event = Event(tile, 0, 0) 146 scan_position = event.get_scan_position() 147 assert scan_origin[0] < scan_position[0] 148 assert scan_origin[1] == scan_position[1] 149 slide_position = event.get_slide_position() 150 assert slide_position[0] == scan_origin_on_slide[0] 151 assert slide_position[1] < scan_origin_on_slide[1] 152 153 # Next row, same column 154 tile = Tile(bzscan, (0, 1)) 155 event = Event(tile, 0, 0) 156 scan_position = event.get_scan_position() 157 assert scan_origin[0] == scan_position[0] 158 assert scan_origin[1] < scan_position[1] 159 slide_position = event.get_slide_position() 160 assert slide_position[0] < scan_origin_on_slide[0] 161 assert slide_position[1] == scan_origin_on_slide[1] 162 163 # Opposite corner 164 tile = Tile(bzscan, (bzscan.roi[0].tile_cols - 1, bzscan.roi[0].tile_rows - 1)) 165 event = Event(tile, 1361, 1003) 166 scan_position = event.get_scan_position() 167 assert 21500 <= scan_position[0] <= 22500 168 assert 58500 <= scan_position[1] <= 60500 169 slide_position = event.get_slide_position() 170 assert 14500 <= slide_position[0] <= 15500 171 assert 2500 <= slide_position[1] <= 3500
def
test_event_coordinates_for_axioscan(axscan):
174def test_event_coordinates_for_axioscan(axscan): 175 # Origin 176 tile = Tile(axscan, 0) 177 event = Event(tile, 0, 0) 178 scan_position = event.get_scan_position() 179 assert -59000 <= scan_position[0] < -55000 180 assert 0 <= scan_position[1] < 4000 181 slide_position = event.get_slide_position() 182 assert 16000 <= slide_position[0] < 20000 183 assert scan_position[1] == slide_position[1] 184 185 # Opposite corner 186 tile = Tile(axscan, (axscan.roi[0].tile_cols - 1, axscan.roi[0].tile_rows - 1)) 187 event = Event(tile, 2000, 2000) 188 scan_position = event.get_scan_position() 189 assert -4000 <= scan_position[0] <= 0 190 assert 21000 <= scan_position[1] <= 25000 191 slide_position = event.get_slide_position() 192 assert 71000 <= slide_position[0] <= 75000 193 assert scan_position[1] == slide_position[1]
def
test_eventarray_conversions(axscan):
196def test_eventarray_conversions(axscan): 197 # Origin 198 tile = Tile(axscan, 0) 199 event0 = Event(tile, 0, 0) 200 event1 = Event(tile, 1000, 1000) 201 event2 = Event(tile, 2000, 2000) 202 203 event_array = EventArray.from_events([event0, event1, event2]) 204 205 assert len(event_array) == 3 206 assert event_array.metadata is None 207 assert event_array.features is None 208 209 event0.metadata = pd.Series({"event0": 0}) 210 211 try: 212 event_array = EventArray.from_events([event0, event1, event2]) 213 # Should throw error 214 assert False 215 except ValueError: 216 pass 217 218 event1.metadata = pd.Series({"event0": 1}) 219 event2.metadata = pd.Series({"event0": 2}) 220 221 event_array = EventArray.from_events([event0, event1, event2]) 222 223 assert len(event_array) == 3 224 225 events_df = event_array.to_dataframe() 226 227 assert len(events_df) == 3 228 229 assert event_array == EventArray.from_dataframe(events_df) 230 231 # Test adding different dtypes and converting back and forth 232 event_array.features = pd.DataFrame( 233 {"feature1": [1, 2, 3], "feature2": [4.0, 5.0, 6.0]} 234 ) 235 remade_event_list = event_array.to_events([axscan]) 236 assert len(remade_event_list) == 3 237 remade_event_array = EventArray.from_events(remade_event_list) 238 assert event_array == remade_event_array 239 # Test saving and loading 240 assert event_array.save_csv("tests/data/events.csv") 241 assert event_array == EventArray.load_csv("tests/data/events.csv") 242 os.remove("tests/data/events.csv") 243 244 assert event_array.save_hdf5("tests/data/events.h5") 245 assert event_array == EventArray.load_hdf5("tests/data/events.h5") 246 os.remove("tests/data/events.h5")
def
test_ocular_conversions():
250def test_ocular_conversions(): 251 input_path = "/mnt/HDSCA_Development/DZ/0B68818/ocular" 252 result = EventArray.load_ocular(input_path) 253 # For the purposes of this test, we will manually relabel "clust" == nan to 0 254 # These come from ocular_interesting.rds, which does not have clusters 255 result.metadata["clust"] = result.metadata["clust"].fillna(0) 256 result.metadata["hcpc"] = result.metadata["hcpc"].fillna(0) 257 result.save_ocular("tests/data") 258 new_result = EventArray.load_ocular("tests/data") 259 # # Sort them so that they are in the same order 260 result = result.sort(["tile", "x", "y"]) 261 new_result = new_result.sort(["tile", "x", "y"]) 262 # Note: hcpc method within ocularr and here are different 263 result.metadata["hcpc"] = new_result.metadata["hcpc"].copy() 264 assert result == new_result 265 # Clean up 266 os.remove("tests/data/ocular_interesting.csv") 267 os.remove("tests/data/ocular_interesting.rds") 268 os.remove("tests/data/rc-final.csv") 269 os.remove("tests/data/rc-final.rds") 270 os.remove("tests/data/rc-final1.rds") 271 os.remove("tests/data/rc-final2.rds") 272 os.remove("tests/data/rc-final3.rds") 273 os.remove("tests/data/rc-final4.rds") 274 275 # Try it with "others" files 276 result = EventArray.load_ocular(input_path, event_type="others") 277 result.save_ocular("tests/data", event_type="others") 278 new_result = EventArray.load_ocular("tests/data", event_type="others") 279 result = result.sort(["tile", "x", "y"]) 280 new_result = new_result.sort(["tile", "x", "y"]) 281 # Note: hcpc method within ocularr and here are different 282 result.metadata["hcpc"] = new_result.metadata["hcpc"].copy() 283 assert result == new_result 284 # Clean up 285 os.remove("tests/data/others-final.csv") 286 os.remove("tests/data/others-final.rds") 287 os.remove("tests/data/others-final1.rds") 288 os.remove("tests/data/others-final2.rds") 289 os.remove("tests/data/others-final3.rds") 290 os.remove("tests/data/others-final4.rds")
def
test_copy_sort_rows_get(axscan):
293def test_copy_sort_rows_get(axscan): 294 # Origin 295 tile = Tile(axscan, 0) 296 events = [ 297 Event(tile, 0, 100), 298 Event(tile, 0, 0), 299 Event(tile, 1000, 1000), 300 Event(tile, 1000, 1), 301 Event(tile, 2000, 2000), 302 ] 303 304 events = EventArray.from_events(events) 305 306 # Copy 307 events_copy = events.copy() 308 events_copy.info["x"] = np.uint16(1) 309 # Check that changes to the copy did not change the original 310 assert events_copy.info["x"].equals(pd.Series([1, 1, 1, 1, 1], dtype=np.uint16)) 311 assert events.info["x"].equals(pd.Series([0, 0, 1000, 1000, 2000], dtype=np.uint16)) 312 313 # Sort 314 events = events.sort(["x", "y"], ascending=[False, True]) 315 assert events.info["x"].equals(pd.Series([2000, 1000, 1000, 0, 0], dtype=np.uint16)) 316 assert events.info["y"].equals(pd.Series([2000, 1, 1000, 0, 100], dtype=np.uint16)) 317 318 # Get 319 events_get = events.get(["x", "y"]) 320 assert events_get["x"].equals(pd.Series([2000, 1000, 1000, 0, 0], dtype=np.uint16)) 321 assert events_get["y"].equals(pd.Series([2000, 1, 1000, 0, 100], dtype=np.uint16)) 322 assert events_get.columns.equals(pd.Index(["x", "y"])) 323 324 # Rows 325 events_get = events.rows([0, 1, 3]) 326 assert len(events_get) == 3 327 assert events_get.info["x"].equals(pd.Series([2000, 1000, 0], dtype=np.uint16)) 328 assert events_get.info["y"].equals(pd.Series([2000, 1, 0], dtype=np.uint16)) 329 events_get = events.rows([True, False, False, True, True]) 330 assert len(events_get) == 3 331 assert events_get.info["x"].equals(pd.Series([2000, 0, 0], dtype=np.uint16)) 332 assert events_get.info["y"].equals(pd.Series([2000, 0, 100], dtype=np.uint16))
def
test_adding_metadata_features(axscan):
335def test_adding_metadata_features(axscan): 336 # Origin 337 tile = Tile(axscan, 0) 338 events = [ 339 Event(tile, 0, 100), 340 Event(tile, 0, 0), 341 Event(tile, 1000, 1000), 342 Event(tile, 1000, 1), 343 Event(tile, 2000, 2000), 344 ] 345 346 events = EventArray.from_events(events) 347 348 # Add metadata 349 events.add_metadata(pd.DataFrame({"test": [1, 2, 3, 4, 5]})) 350 assert events.get("test").equals(pd.DataFrame({"test": [1, 2, 3, 4, 5]})) 351 # Add again 352 events.add_metadata(pd.DataFrame({"test": [5, 4, 3, 2, 1]})) 353 assert events.get("test").equals(pd.DataFrame({"test": [5, 4, 3, 2, 1]})) 354 355 # Add features 356 events.add_features(pd.DataFrame({"test2": [1, 2, 3, 4, 5]})) 357 assert events.get("test2").equals(pd.DataFrame({"test2": [1, 2, 3, 4, 5]})) 358 # Add again 359 events.add_features(pd.DataFrame({"test2": [5, 4, 3, 2, 1]})) 360 assert events.get("test2").equals(pd.DataFrame({"test2": [5, 4, 3, 2, 1]}))
def
test_event_montages(bzscan, circle):
363def test_event_montages(bzscan, circle): 364 tile = Tile(bzscan, 1000) 365 event = Event(tile, 1086, 342) 366 images = event.get_crops(crop_size=100) 367 368 montage = csi_images.make_montage( 369 images, 370 [0, 1, 4, 2, 3], 371 {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)}, 372 labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"], 373 ) 374 if SHOW_PLOTS: 375 cv2.imshow( 376 "Full, classic montage with labels", 377 cv2.cvtColor(montage, cv2.COLOR_RGB2BGR), 378 ) 379 cv2.waitKey(0) 380 cv2.destroyAllWindows() 381 382 montage = csi_images.make_montage( 383 images, 384 [0, 1, 4, 2, 3], 385 {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)}, 386 labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"], 387 mask=circle, 388 ) 389 if SHOW_PLOTS: 390 cv2.imshow( 391 "Full, classic montage with labels and mask overlay", 392 cv2.cvtColor(montage, cv2.COLOR_RGB2BGR), 393 ) 394 cv2.waitKey(0) 395 cv2.destroyAllWindows() 396 397 montage = csi_images.make_montage( 398 images, 399 [0, 1, 4, 2, 3], 400 {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)}, 401 labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"], 402 mask=circle, 403 mask_mode="hard", 404 ) 405 if SHOW_PLOTS: 406 cv2.imshow( 407 "Full, classic montage with labels and hard-masking", 408 cv2.cvtColor(montage, cv2.COLOR_RGB2BGR), 409 ) 410 cv2.waitKey(0) 411 cv2.destroyAllWindows()
def
test_saving_crops_and_montages(bzscan):
414def test_saving_crops_and_montages(bzscan): 415 tile = Tile(bzscan, 1000) 416 tile2 = Tile(bzscan, 0) 417 events = [ 418 Event(tile, 1086, 342), 419 Event(tile2, 2, 1000), 420 Event(tile, 1000, 1000), 421 Event(tile2, 800, 800), 422 Event(tile, 1000, 662), 423 ] 424 425 # Get all crops and montages 426 serial_crops = [] 427 serial_montages = [] 428 for event in events: 429 serial_crops.append(event.get_crops()) 430 serial_montages.append(event.get_montage()) 431 432 # Save crops and montages 433 Event.get_and_save_many_crops(events, "temp", bzscan.get_channel_names()) 434 Event.get_and_save_many_montages(events, "temp") 435 436 saved_crops = [] 437 saved_montages = [] 438 for event in events: 439 crops = event.load_crops("temp") 440 saved_crops.append([crops[c] for c in bzscan.get_channel_names()]) 441 saved_montages.append(event.load_montage("temp")) 442 443 # Make sure crops are identical 444 for a, b in zip(serial_crops, saved_crops): 445 for a_img, b_img in zip(a, b): 446 assert np.array_equal(a_img, b_img) 447 448 # Montages got JPEG compressed, so 449 # Size comparison: 450 for a, b in zip(serial_montages, saved_montages): 451 assert a.shape == b.shape 452 453 # Visual inspection 454 if SHOW_PLOTS: 455 cv2.imshow("Original", cv2.cvtColor(serial_montages[0], cv2.COLOR_RGB2BGR)) 456 cv2.imshow("Saved", cv2.cvtColor(saved_montages[0], cv2.COLOR_RGB2BGR)) 457 cv2.waitKey(0) 458 cv2.destroyAllWindows() 459 460 # Clean up 461 for file in os.listdir("temp"): 462 os.remove(os.path.join("temp", file)) 463 os.rmdir("temp")
def
test_saving_and_loading(axscan):
466def test_saving_and_loading(axscan): 467 # Origin 468 tile = Tile(axscan, 0) 469 events = [ 470 Event(tile, 0, 100), 471 Event(tile, 0, 0), 472 Event(tile, 1000, 1000), 473 Event(tile, 1000, 1), 474 Event(tile, 2000, 2000), 475 ] 476 477 events = EventArray.from_events(events) 478 479 # Add content 480 events.add_metadata(pd.DataFrame({"test": [1, 2, 3, 4, 5]})) 481 482 # Add features 483 events.add_features(pd.DataFrame({"test2": [1, 2, 3, 4, 5]})) 484 485 # Save and load 486 assert events.save_csv("tests/data/events.csv") 487 assert events == EventArray.load_csv("tests/data/events.csv") 488 489 assert events.save_json("tests/data/events.json") 490 assert events == EventArray.load_json("tests/data/events.json") 491 492 assert events.save_hdf5("tests/data/events.h5") 493 assert events == EventArray.load_hdf5("tests/data/events.h5") 494 495 # Clean up 496 os.remove("tests/data/events.csv") 497 os.remove("tests/data/events.json") 498 os.remove("tests/data/events.h5")