tests.test_events

  1import os
  2import time
  3
  4import pytest
  5
  6import cv2
  7import numpy as np
  8import pandas as pd
  9
 10from csi_images.csi_scans import Scan
 11from csi_images.csi_tiles import Tile
 12from csi_images.csi_events import Event, EventArray
 13from csi_images import csi_images
 14
 15if os.environ.get("DEBIAN_FRONTEND") == "noninteractive":
 16    SHOW_PLOTS = False
 17else:
 18    # Change this to your preference for local testing, but commit as True
 19    SHOW_PLOTS = True
 20
 21
 22@pytest.fixture
 23def bzscan():
 24    return Scan.load_txt("tests/data")
 25
 26
 27@pytest.fixture
 28def axscan():
 29    return Scan.load_yaml("tests/data")
 30
 31
 32@pytest.fixture
 33def circle():
 34    circle = np.zeros((100, 100, 3), dtype=np.uint8)
 35    circle = cv2.circle(circle, (50, 50), 20, (1, 0, 0), -1)
 36    circle = circle[:, :, 0]
 37    circle = circle.astype(np.uint8)
 38    return circle
 39
 40
 41def test_get_crops(bzscan):
 42    tile = Tile(bzscan, 1000)
 43    event = Event(tile, 1086, 342)
 44
 45    # Test a regular event
 46    images = event.get_crops()
 47    assert len(images) == 5
 48    images = event.get_crops(crop_size=50)
 49    assert images[0].shape == (50, 50)
 50    images = event.get_crops(crop_size=100)
 51    assert images[0].shape == (100, 100)
 52
 53    if SHOW_PLOTS:
 54        for image in images:
 55            cv2.imshow("Bright DAPI event in the center", image)
 56            cv2.waitKey(0)
 57        cv2.destroyAllWindows()
 58
 59    # Test a corner event
 60    event = Event(tile, 1350, 2)
 61    images = event.get_crops()
 62    assert len(images) == 5
 63    images = event.get_crops(crop_size=200)
 64    assert images[0].shape == (200, 200)
 65    images = event.get_crops(crop_size=100)
 66    assert images[0].shape == (100, 100)
 67
 68    if SHOW_PLOTS:
 69        for image in images:
 70            cv2.imshow("Events in the top-right corner of a tile", image)
 71            cv2.waitKey(0)
 72        cv2.destroyAllWindows()
 73
 74    # Test many events
 75    tile2 = Tile(bzscan, 500)
 76    events = [
 77        Event(tile, 515, 411),
 78        Event(tile2, 2, 1000),
 79        Event(tile, 1000, 1000),
 80        Event(tile, 87, 126),
 81        Event(tile, 1000, 2),
 82        Event(tile2, 800, 800),
 83        Event(tile, 1000, 662),
 84    ]
 85
 86    # Test time to extract images sequentially
 87    start_time = time.time()
 88    images_1 = []
 89    for event in events:
 90        images_1.append(event.get_crops())
 91    sequential_time = time.time() - start_time
 92
 93    # Test time to extract images in parallel
 94    start_time = time.time()
 95    images_2 = Event.get_many_crops(events, crop_size=100)
 96    parallel_time = time.time() - start_time
 97    assert parallel_time < sequential_time
 98    for list_a, list_b in zip(images_1, images_2):
 99        assert len(list_a) == len(list_b)
100        for image_a, image_b in zip(list_a, list_b):
101            assert np.array_equal(image_a, image_b)
102
103    # Test that it works after converting to EventArray and back
104    event_array = EventArray.from_events(events)
105    remade_events = event_array.to_events(
106        [bzscan], ignore_metadata=True, ignore_features=True
107    )
108    images_3 = Event.get_many_crops(remade_events, crop_size=100)
109    for list_a, list_b in zip(images_1, images_3):
110        assert len(list_a) == len(list_b)
111        for image_a, image_b in zip(list_a, list_b):
112            assert np.array_equal(image_a, image_b)
113
114
115def test_event_coordinates_for_bzscanner(bzscan):
116    # Origin
117    tile = Tile(bzscan, (0, 0))
118    event = Event(tile, 0, 0)
119    scan_origin = event.get_scan_position()
120    assert 2500 <= scan_origin[0] <= 3500
121    assert 2500 <= scan_origin[1] <= 3500
122    scan_origin_on_slide = event.get_slide_position()
123    assert 71500 <= scan_origin_on_slide[0] <= 72500
124    assert 21500 <= scan_origin_on_slide[1] <= 22500
125    # Within the same tile, "top-right corner"; same x, reduces y (flipped)
126    event = Event(tile, 1000, 0)
127    scan_position = event.get_scan_position()
128    assert scan_origin[0] < scan_position[0]
129    assert scan_origin[1] == scan_position[1]
130    slide_position = event.get_slide_position()
131    assert scan_origin_on_slide[0] == slide_position[0]
132    assert scan_origin_on_slide[1] > slide_position[1]
133    # Within the same tile, "bottom-left corner"; reduces x, same y
134    event = Event(tile, 0, 1000)
135    scan_position = event.get_scan_position()
136    assert scan_origin[0] == scan_position[0]
137    assert scan_origin[1] < scan_position[1]
138    slide_position = event.get_slide_position()
139    assert scan_origin_on_slide[0] > slide_position[0]
140    assert scan_origin_on_slide[1] == slide_position[1]
141
142    # Next tile, same row
143    tile = Tile(bzscan, (1, 0))
144    event = Event(tile, 0, 0)
145    scan_position = event.get_scan_position()
146    assert scan_origin[0] < scan_position[0]
147    assert scan_origin[1] == scan_position[1]
148    slide_position = event.get_slide_position()
149    assert slide_position[0] == scan_origin_on_slide[0]
150    assert slide_position[1] < scan_origin_on_slide[1]
151
152    # Next row, same column
153    tile = Tile(bzscan, (0, 1))
154    event = Event(tile, 0, 0)
155    scan_position = event.get_scan_position()
156    assert scan_origin[0] == scan_position[0]
157    assert scan_origin[1] < scan_position[1]
158    slide_position = event.get_slide_position()
159    assert slide_position[0] < scan_origin_on_slide[0]
160    assert slide_position[1] == scan_origin_on_slide[1]
161
162    # Opposite corner
163    tile = Tile(bzscan, (bzscan.roi[0].tile_cols - 1, bzscan.roi[0].tile_rows - 1))
164    event = Event(tile, 1361, 1003)
165    scan_position = event.get_scan_position()
166    assert 21500 <= scan_position[0] <= 22500
167    assert 58500 <= scan_position[1] <= 60500
168    slide_position = event.get_slide_position()
169    assert 14500 <= slide_position[0] <= 15500
170    assert 2500 <= slide_position[1] <= 3500
171
172
173def test_event_coordinates_for_axioscan(axscan):
174    # Origin
175    tile = Tile(axscan, 0)
176    event = Event(tile, 0, 0)
177    scan_position = event.get_scan_position()
178    assert -59000 <= scan_position[0] < -55000
179    assert 0 <= scan_position[1] < 4000
180    slide_position = event.get_slide_position()
181    assert 16000 <= slide_position[0] < 20000
182    assert scan_position[1] == slide_position[1]
183
184    # Opposite corner
185    tile = Tile(axscan, (axscan.roi[0].tile_cols - 1, axscan.roi[0].tile_rows - 1))
186    event = Event(tile, 2000, 2000)
187    scan_position = event.get_scan_position()
188    assert -4000 <= scan_position[0] <= 0
189    assert 21000 <= scan_position[1] <= 25000
190    slide_position = event.get_slide_position()
191    assert 71000 <= slide_position[0] <= 75000
192    assert scan_position[1] == slide_position[1]
193
194
195def test_eventarray_conversions(axscan):
196    # Origin
197    tile = Tile(axscan, 0)
198    event0 = Event(tile, 0, 0)
199    event1 = Event(tile, 1000, 1000)
200    event2 = Event(tile, 2000, 2000)
201
202    event_array = EventArray.from_events([event0, event1, event2])
203
204    assert len(event_array) == 3
205    assert event_array.metadata is None
206    assert event_array.features is None
207
208    event0.metadata = pd.Series({"event0": 0})
209
210    try:
211        event_array = EventArray.from_events([event0, event1, event2])
212        # Should throw error
213        assert False
214    except ValueError:
215        pass
216
217    event1.metadata = pd.Series({"event0": 1})
218    event2.metadata = pd.Series({"event0": 2})
219
220    event_array = EventArray.from_events([event0, event1, event2])
221
222    assert len(event_array) == 3
223
224    events_df = event_array.to_dataframe()
225
226    assert len(events_df) == 3
227
228    assert event_array == EventArray.from_dataframe(events_df)
229
230    # Test adding different dtypes and converting back and forth
231    event_array.features = pd.DataFrame(
232        {"feature1": [1, 2, 3], "feature2": [4.0, 5.0, 6.0]}
233    )
234    remade_event_list = event_array.to_events([axscan])
235    assert len(remade_event_list) == 3
236    remade_event_array = EventArray.from_events(remade_event_list)
237    assert event_array == remade_event_array
238    # Test saving and loading
239    assert event_array.save_csv("tests/data/events.csv")
240    assert event_array == EventArray.load_csv("tests/data/events.csv")
241    os.remove("tests/data/events.csv")
242
243    assert event_array.save_hdf5("tests/data/events.h5")
244    assert event_array == EventArray.load_hdf5("tests/data/events.h5")
245    os.remove("tests/data/events.h5")
246
247
248# @pytest.mark.skip(reason="No longer required.")
249def test_ocular_conversions():
250    input_path = "/mnt/HDSCA_Development/DZ/0B68818/ocular"
251    result = EventArray.load_ocular(input_path)
252    # For the purposes of this test, we will manually relabel "clust" == nan to 0
253    # These come from ocular_interesting.rds, which does not have clusters
254    result.metadata["clust"] = result.metadata["clust"].fillna(0)
255    result.metadata["hcpc"] = result.metadata["hcpc"].fillna(0)
256    result.save_ocular("tests/data")
257    new_result = EventArray.load_ocular("tests/data")
258    # # Sort them so that they are in the same order
259    result = result.sort(["tile", "x", "y"])
260    new_result = new_result.sort(["tile", "x", "y"])
261    # Note: hcpc method within ocularr and here are different
262    result.metadata["hcpc"] = new_result.metadata["hcpc"].copy()
263    assert result == new_result
264    # Clean up
265    os.remove("tests/data/ocular_interesting.csv")
266    os.remove("tests/data/ocular_interesting.rds")
267    os.remove("tests/data/rc-final.csv")
268    os.remove("tests/data/rc-final.rds")
269    os.remove("tests/data/rc-final1.rds")
270    os.remove("tests/data/rc-final2.rds")
271    os.remove("tests/data/rc-final3.rds")
272    os.remove("tests/data/rc-final4.rds")
273
274    # Try it with "others" files
275    result = EventArray.load_ocular(input_path, event_type="others")
276    result.save_ocular("tests/data", event_type="others")
277    new_result = EventArray.load_ocular("tests/data", event_type="others")
278    result = result.sort(["tile", "x", "y"])
279    new_result = new_result.sort(["tile", "x", "y"])
280    # Note: hcpc method within ocularr and here are different
281    result.metadata["hcpc"] = new_result.metadata["hcpc"].copy()
282    assert result == new_result
283    # Clean up
284    os.remove("tests/data/others-final.csv")
285    os.remove("tests/data/others-final.rds")
286    os.remove("tests/data/others-final1.rds")
287    os.remove("tests/data/others-final2.rds")
288    os.remove("tests/data/others-final3.rds")
289    os.remove("tests/data/others-final4.rds")
290
291
292def test_copy_sort_rows_get(axscan):
293    # Origin
294    tile = Tile(axscan, 0)
295    events = [
296        Event(tile, 0, 100),
297        Event(tile, 0, 0),
298        Event(tile, 1000, 1000),
299        Event(tile, 1000, 1),
300        Event(tile, 2000, 2000),
301    ]
302
303    events = EventArray.from_events(events)
304
305    # Copy
306    events_copy = events.copy()
307    events_copy.info["x"] = np.uint16(1)
308    # Check that changes to the copy did not change the original
309    assert events_copy.info["x"].equals(pd.Series([1, 1, 1, 1, 1], dtype=np.uint16))
310    assert events.info["x"].equals(pd.Series([0, 0, 1000, 1000, 2000], dtype=np.uint16))
311
312    # Sort
313    events = events.sort(["x", "y"], ascending=[False, True])
314    assert events.info["x"].equals(pd.Series([2000, 1000, 1000, 0, 0], dtype=np.uint16))
315    assert events.info["y"].equals(pd.Series([2000, 1, 1000, 0, 100], dtype=np.uint16))
316
317    # Get
318    events_get = events.get(["x", "y"])
319    assert events_get["x"].equals(pd.Series([2000, 1000, 1000, 0, 0], dtype=np.uint16))
320    assert events_get["y"].equals(pd.Series([2000, 1, 1000, 0, 100], dtype=np.uint16))
321    assert events_get.columns.equals(pd.Index(["x", "y"]))
322
323    # Rows
324    events_get = events.rows([0, 1, 3])
325    assert len(events_get) == 3
326    assert events_get.info["x"].equals(pd.Series([2000, 1000, 0], dtype=np.uint16))
327    assert events_get.info["y"].equals(pd.Series([2000, 1, 0], dtype=np.uint16))
328    events_get = events.rows([True, False, False, True, True])
329    assert len(events_get) == 3
330    assert events_get.info["x"].equals(pd.Series([2000, 0, 0], dtype=np.uint16))
331    assert events_get.info["y"].equals(pd.Series([2000, 0, 100], dtype=np.uint16))
332
333
334def test_adding_metadata_features(axscan):
335    # Origin
336    tile = Tile(axscan, 0)
337    events = [
338        Event(tile, 0, 100),
339        Event(tile, 0, 0),
340        Event(tile, 1000, 1000),
341        Event(tile, 1000, 1),
342        Event(tile, 2000, 2000),
343    ]
344
345    events = EventArray.from_events(events)
346
347    # Add metadata
348    events.add_metadata(pd.DataFrame({"test": [1, 2, 3, 4, 5]}))
349    assert events.get("test").equals(pd.DataFrame({"test": [1, 2, 3, 4, 5]}))
350    # Add again
351    events.add_metadata(pd.DataFrame({"test": [5, 4, 3, 2, 1]}))
352    assert events.get("test").equals(pd.DataFrame({"test": [5, 4, 3, 2, 1]}))
353
354    # Add features
355    events.add_features(pd.DataFrame({"test2": [1, 2, 3, 4, 5]}))
356    assert events.get("test2").equals(pd.DataFrame({"test2": [1, 2, 3, 4, 5]}))
357    # Add again
358    events.add_features(pd.DataFrame({"test2": [5, 4, 3, 2, 1]}))
359    assert events.get("test2").equals(pd.DataFrame({"test2": [5, 4, 3, 2, 1]}))
360
361
362def test_event_montages(bzscan, circle):
363    tile = Tile(bzscan, 1000)
364    event = Event(tile, 1086, 342)
365    images = event.get_crops(crop_size=100)
366
367    montage = csi_images.make_montage(
368        images,
369        [0, 1, 4, 2, 3],
370        {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)},
371        labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"],
372    )
373    if SHOW_PLOTS:
374        cv2.imshow(
375            "Full, classic montage with labels",
376            cv2.cvtColor(montage, cv2.COLOR_RGB2BGR),
377        )
378        cv2.waitKey(0)
379        cv2.destroyAllWindows()
380
381    montage = csi_images.make_montage(
382        images,
383        [0, 1, 4, 2, 3],
384        {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)},
385        labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"],
386        mask=circle,
387    )
388    if SHOW_PLOTS:
389        cv2.imshow(
390            "Full, classic montage with labels and mask overlay",
391            cv2.cvtColor(montage, cv2.COLOR_RGB2BGR),
392        )
393        cv2.waitKey(0)
394        cv2.destroyAllWindows()
395
396    montage = csi_images.make_montage(
397        images,
398        [0, 1, 4, 2, 3],
399        {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)},
400        labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"],
401        mask=circle,
402        mask_mode="hard",
403    )
404    if SHOW_PLOTS:
405        cv2.imshow(
406            "Full, classic montage with labels and hard-masking",
407            cv2.cvtColor(montage, cv2.COLOR_RGB2BGR),
408        )
409        cv2.waitKey(0)
410        cv2.destroyAllWindows()
411
412
413def test_saving_crops_and_montages(bzscan):
414    tile = Tile(bzscan, 1000)
415    tile2 = Tile(bzscan, 0)
416    events = [
417        Event(tile, 1086, 342),
418        Event(tile2, 2, 1000),
419        Event(tile, 1000, 1000),
420        Event(tile2, 800, 800),
421        Event(tile, 1000, 662),
422    ]
423
424    # Get all crops and montages
425    serial_crops = []
426    serial_montages = []
427    for event in events:
428        serial_crops.append(event.get_crops())
429        serial_montages.append(event.get_montage())
430
431    # Save crops and montages
432    Event.get_and_save_many_crops(events, "temp", bzscan.get_channel_names())
433    Event.get_and_save_many_montages(events, "temp")
434
435    saved_crops = []
436    saved_montages = []
437    for event in events:
438        crops = event.load_crops("temp")
439        saved_crops.append([crops[c] for c in bzscan.get_channel_names()])
440        saved_montages.append(event.load_montage("temp"))
441
442    # Make sure crops are identical
443    for a, b in zip(serial_crops, saved_crops):
444        for a_img, b_img in zip(a, b):
445            assert np.array_equal(a_img, b_img)
446
447    # Montages got JPEG compressed, so
448    # Size comparison:
449    for a, b in zip(serial_montages, saved_montages):
450        assert a.shape == b.shape
451
452    # Visual inspection
453    if SHOW_PLOTS:
454        cv2.imshow("Original", cv2.cvtColor(serial_montages[0], cv2.COLOR_RGB2BGR))
455        cv2.imshow("Saved", cv2.cvtColor(saved_montages[0], cv2.COLOR_RGB2BGR))
456        cv2.waitKey(0)
457        cv2.destroyAllWindows()
458
459    # Clean up
460    for file in os.listdir("temp"):
461        os.remove(os.path.join("temp", file))
462    os.rmdir("temp")
463
464
465def test_saving_and_loading(axscan):
466    # Origin
467    tile = Tile(axscan, 0)
468    events = [
469        Event(tile, 0, 100),
470        Event(tile, 0, 0),
471        Event(tile, 1000, 1000),
472        Event(tile, 1000, 1),
473        Event(tile, 2000, 2000),
474    ]
475
476    events = EventArray.from_events(events)
477
478    # Add content
479    events.add_metadata(pd.DataFrame({"test": [1, 2, 3, 4, 5]}))
480
481    # Add features
482    events.add_features(pd.DataFrame({"test2": [1, 2, 3, 4, 5]}))
483
484    # Save and load
485    assert events.save_csv("tests/data/events.csv")
486    assert events == EventArray.load_csv("tests/data/events.csv")
487
488    assert events.save_json("tests/data/events.json")
489    assert events == EventArray.load_json("tests/data/events.json")
490
491    assert events.save_hdf5("tests/data/events.h5")
492    assert events == EventArray.load_hdf5("tests/data/events.h5")
493
494    # Clean up
495    os.remove("tests/data/events.csv")
496    os.remove("tests/data/events.json")
497    os.remove("tests/data/events.h5")
@pytest.fixture
def bzscan():
23@pytest.fixture
24def bzscan():
25    return Scan.load_txt("tests/data")
@pytest.fixture
def axscan():
28@pytest.fixture
29def axscan():
30    return Scan.load_yaml("tests/data")
@pytest.fixture
def circle():
33@pytest.fixture
34def circle():
35    circle = np.zeros((100, 100, 3), dtype=np.uint8)
36    circle = cv2.circle(circle, (50, 50), 20, (1, 0, 0), -1)
37    circle = circle[:, :, 0]
38    circle = circle.astype(np.uint8)
39    return circle
def test_get_crops(bzscan):
 42def test_get_crops(bzscan):
 43    tile = Tile(bzscan, 1000)
 44    event = Event(tile, 1086, 342)
 45
 46    # Test a regular event
 47    images = event.get_crops()
 48    assert len(images) == 5
 49    images = event.get_crops(crop_size=50)
 50    assert images[0].shape == (50, 50)
 51    images = event.get_crops(crop_size=100)
 52    assert images[0].shape == (100, 100)
 53
 54    if SHOW_PLOTS:
 55        for image in images:
 56            cv2.imshow("Bright DAPI event in the center", image)
 57            cv2.waitKey(0)
 58        cv2.destroyAllWindows()
 59
 60    # Test a corner event
 61    event = Event(tile, 1350, 2)
 62    images = event.get_crops()
 63    assert len(images) == 5
 64    images = event.get_crops(crop_size=200)
 65    assert images[0].shape == (200, 200)
 66    images = event.get_crops(crop_size=100)
 67    assert images[0].shape == (100, 100)
 68
 69    if SHOW_PLOTS:
 70        for image in images:
 71            cv2.imshow("Events in the top-right corner of a tile", image)
 72            cv2.waitKey(0)
 73        cv2.destroyAllWindows()
 74
 75    # Test many events
 76    tile2 = Tile(bzscan, 500)
 77    events = [
 78        Event(tile, 515, 411),
 79        Event(tile2, 2, 1000),
 80        Event(tile, 1000, 1000),
 81        Event(tile, 87, 126),
 82        Event(tile, 1000, 2),
 83        Event(tile2, 800, 800),
 84        Event(tile, 1000, 662),
 85    ]
 86
 87    # Test time to extract images sequentially
 88    start_time = time.time()
 89    images_1 = []
 90    for event in events:
 91        images_1.append(event.get_crops())
 92    sequential_time = time.time() - start_time
 93
 94    # Test time to extract images in parallel
 95    start_time = time.time()
 96    images_2 = Event.get_many_crops(events, crop_size=100)
 97    parallel_time = time.time() - start_time
 98    assert parallel_time < sequential_time
 99    for list_a, list_b in zip(images_1, images_2):
100        assert len(list_a) == len(list_b)
101        for image_a, image_b in zip(list_a, list_b):
102            assert np.array_equal(image_a, image_b)
103
104    # Test that it works after converting to EventArray and back
105    event_array = EventArray.from_events(events)
106    remade_events = event_array.to_events(
107        [bzscan], ignore_metadata=True, ignore_features=True
108    )
109    images_3 = Event.get_many_crops(remade_events, crop_size=100)
110    for list_a, list_b in zip(images_1, images_3):
111        assert len(list_a) == len(list_b)
112        for image_a, image_b in zip(list_a, list_b):
113            assert np.array_equal(image_a, image_b)
def test_event_coordinates_for_bzscanner(bzscan):
116def test_event_coordinates_for_bzscanner(bzscan):
117    # Origin
118    tile = Tile(bzscan, (0, 0))
119    event = Event(tile, 0, 0)
120    scan_origin = event.get_scan_position()
121    assert 2500 <= scan_origin[0] <= 3500
122    assert 2500 <= scan_origin[1] <= 3500
123    scan_origin_on_slide = event.get_slide_position()
124    assert 71500 <= scan_origin_on_slide[0] <= 72500
125    assert 21500 <= scan_origin_on_slide[1] <= 22500
126    # Within the same tile, "top-right corner"; same x, reduces y (flipped)
127    event = Event(tile, 1000, 0)
128    scan_position = event.get_scan_position()
129    assert scan_origin[0] < scan_position[0]
130    assert scan_origin[1] == scan_position[1]
131    slide_position = event.get_slide_position()
132    assert scan_origin_on_slide[0] == slide_position[0]
133    assert scan_origin_on_slide[1] > slide_position[1]
134    # Within the same tile, "bottom-left corner"; reduces x, same y
135    event = Event(tile, 0, 1000)
136    scan_position = event.get_scan_position()
137    assert scan_origin[0] == scan_position[0]
138    assert scan_origin[1] < scan_position[1]
139    slide_position = event.get_slide_position()
140    assert scan_origin_on_slide[0] > slide_position[0]
141    assert scan_origin_on_slide[1] == slide_position[1]
142
143    # Next tile, same row
144    tile = Tile(bzscan, (1, 0))
145    event = Event(tile, 0, 0)
146    scan_position = event.get_scan_position()
147    assert scan_origin[0] < scan_position[0]
148    assert scan_origin[1] == scan_position[1]
149    slide_position = event.get_slide_position()
150    assert slide_position[0] == scan_origin_on_slide[0]
151    assert slide_position[1] < scan_origin_on_slide[1]
152
153    # Next row, same column
154    tile = Tile(bzscan, (0, 1))
155    event = Event(tile, 0, 0)
156    scan_position = event.get_scan_position()
157    assert scan_origin[0] == scan_position[0]
158    assert scan_origin[1] < scan_position[1]
159    slide_position = event.get_slide_position()
160    assert slide_position[0] < scan_origin_on_slide[0]
161    assert slide_position[1] == scan_origin_on_slide[1]
162
163    # Opposite corner
164    tile = Tile(bzscan, (bzscan.roi[0].tile_cols - 1, bzscan.roi[0].tile_rows - 1))
165    event = Event(tile, 1361, 1003)
166    scan_position = event.get_scan_position()
167    assert 21500 <= scan_position[0] <= 22500
168    assert 58500 <= scan_position[1] <= 60500
169    slide_position = event.get_slide_position()
170    assert 14500 <= slide_position[0] <= 15500
171    assert 2500 <= slide_position[1] <= 3500
def test_event_coordinates_for_axioscan(axscan):
174def test_event_coordinates_for_axioscan(axscan):
175    # Origin
176    tile = Tile(axscan, 0)
177    event = Event(tile, 0, 0)
178    scan_position = event.get_scan_position()
179    assert -59000 <= scan_position[0] < -55000
180    assert 0 <= scan_position[1] < 4000
181    slide_position = event.get_slide_position()
182    assert 16000 <= slide_position[0] < 20000
183    assert scan_position[1] == slide_position[1]
184
185    # Opposite corner
186    tile = Tile(axscan, (axscan.roi[0].tile_cols - 1, axscan.roi[0].tile_rows - 1))
187    event = Event(tile, 2000, 2000)
188    scan_position = event.get_scan_position()
189    assert -4000 <= scan_position[0] <= 0
190    assert 21000 <= scan_position[1] <= 25000
191    slide_position = event.get_slide_position()
192    assert 71000 <= slide_position[0] <= 75000
193    assert scan_position[1] == slide_position[1]
def test_eventarray_conversions(axscan):
196def test_eventarray_conversions(axscan):
197    # Origin
198    tile = Tile(axscan, 0)
199    event0 = Event(tile, 0, 0)
200    event1 = Event(tile, 1000, 1000)
201    event2 = Event(tile, 2000, 2000)
202
203    event_array = EventArray.from_events([event0, event1, event2])
204
205    assert len(event_array) == 3
206    assert event_array.metadata is None
207    assert event_array.features is None
208
209    event0.metadata = pd.Series({"event0": 0})
210
211    try:
212        event_array = EventArray.from_events([event0, event1, event2])
213        # Should throw error
214        assert False
215    except ValueError:
216        pass
217
218    event1.metadata = pd.Series({"event0": 1})
219    event2.metadata = pd.Series({"event0": 2})
220
221    event_array = EventArray.from_events([event0, event1, event2])
222
223    assert len(event_array) == 3
224
225    events_df = event_array.to_dataframe()
226
227    assert len(events_df) == 3
228
229    assert event_array == EventArray.from_dataframe(events_df)
230
231    # Test adding different dtypes and converting back and forth
232    event_array.features = pd.DataFrame(
233        {"feature1": [1, 2, 3], "feature2": [4.0, 5.0, 6.0]}
234    )
235    remade_event_list = event_array.to_events([axscan])
236    assert len(remade_event_list) == 3
237    remade_event_array = EventArray.from_events(remade_event_list)
238    assert event_array == remade_event_array
239    # Test saving and loading
240    assert event_array.save_csv("tests/data/events.csv")
241    assert event_array == EventArray.load_csv("tests/data/events.csv")
242    os.remove("tests/data/events.csv")
243
244    assert event_array.save_hdf5("tests/data/events.h5")
245    assert event_array == EventArray.load_hdf5("tests/data/events.h5")
246    os.remove("tests/data/events.h5")
def test_ocular_conversions():
250def test_ocular_conversions():
251    input_path = "/mnt/HDSCA_Development/DZ/0B68818/ocular"
252    result = EventArray.load_ocular(input_path)
253    # For the purposes of this test, we will manually relabel "clust" == nan to 0
254    # These come from ocular_interesting.rds, which does not have clusters
255    result.metadata["clust"] = result.metadata["clust"].fillna(0)
256    result.metadata["hcpc"] = result.metadata["hcpc"].fillna(0)
257    result.save_ocular("tests/data")
258    new_result = EventArray.load_ocular("tests/data")
259    # # Sort them so that they are in the same order
260    result = result.sort(["tile", "x", "y"])
261    new_result = new_result.sort(["tile", "x", "y"])
262    # Note: hcpc method within ocularr and here are different
263    result.metadata["hcpc"] = new_result.metadata["hcpc"].copy()
264    assert result == new_result
265    # Clean up
266    os.remove("tests/data/ocular_interesting.csv")
267    os.remove("tests/data/ocular_interesting.rds")
268    os.remove("tests/data/rc-final.csv")
269    os.remove("tests/data/rc-final.rds")
270    os.remove("tests/data/rc-final1.rds")
271    os.remove("tests/data/rc-final2.rds")
272    os.remove("tests/data/rc-final3.rds")
273    os.remove("tests/data/rc-final4.rds")
274
275    # Try it with "others" files
276    result = EventArray.load_ocular(input_path, event_type="others")
277    result.save_ocular("tests/data", event_type="others")
278    new_result = EventArray.load_ocular("tests/data", event_type="others")
279    result = result.sort(["tile", "x", "y"])
280    new_result = new_result.sort(["tile", "x", "y"])
281    # Note: hcpc method within ocularr and here are different
282    result.metadata["hcpc"] = new_result.metadata["hcpc"].copy()
283    assert result == new_result
284    # Clean up
285    os.remove("tests/data/others-final.csv")
286    os.remove("tests/data/others-final.rds")
287    os.remove("tests/data/others-final1.rds")
288    os.remove("tests/data/others-final2.rds")
289    os.remove("tests/data/others-final3.rds")
290    os.remove("tests/data/others-final4.rds")
def test_copy_sort_rows_get(axscan):
293def test_copy_sort_rows_get(axscan):
294    # Origin
295    tile = Tile(axscan, 0)
296    events = [
297        Event(tile, 0, 100),
298        Event(tile, 0, 0),
299        Event(tile, 1000, 1000),
300        Event(tile, 1000, 1),
301        Event(tile, 2000, 2000),
302    ]
303
304    events = EventArray.from_events(events)
305
306    # Copy
307    events_copy = events.copy()
308    events_copy.info["x"] = np.uint16(1)
309    # Check that changes to the copy did not change the original
310    assert events_copy.info["x"].equals(pd.Series([1, 1, 1, 1, 1], dtype=np.uint16))
311    assert events.info["x"].equals(pd.Series([0, 0, 1000, 1000, 2000], dtype=np.uint16))
312
313    # Sort
314    events = events.sort(["x", "y"], ascending=[False, True])
315    assert events.info["x"].equals(pd.Series([2000, 1000, 1000, 0, 0], dtype=np.uint16))
316    assert events.info["y"].equals(pd.Series([2000, 1, 1000, 0, 100], dtype=np.uint16))
317
318    # Get
319    events_get = events.get(["x", "y"])
320    assert events_get["x"].equals(pd.Series([2000, 1000, 1000, 0, 0], dtype=np.uint16))
321    assert events_get["y"].equals(pd.Series([2000, 1, 1000, 0, 100], dtype=np.uint16))
322    assert events_get.columns.equals(pd.Index(["x", "y"]))
323
324    # Rows
325    events_get = events.rows([0, 1, 3])
326    assert len(events_get) == 3
327    assert events_get.info["x"].equals(pd.Series([2000, 1000, 0], dtype=np.uint16))
328    assert events_get.info["y"].equals(pd.Series([2000, 1, 0], dtype=np.uint16))
329    events_get = events.rows([True, False, False, True, True])
330    assert len(events_get) == 3
331    assert events_get.info["x"].equals(pd.Series([2000, 0, 0], dtype=np.uint16))
332    assert events_get.info["y"].equals(pd.Series([2000, 0, 100], dtype=np.uint16))
def test_adding_metadata_features(axscan):
335def test_adding_metadata_features(axscan):
336    # Origin
337    tile = Tile(axscan, 0)
338    events = [
339        Event(tile, 0, 100),
340        Event(tile, 0, 0),
341        Event(tile, 1000, 1000),
342        Event(tile, 1000, 1),
343        Event(tile, 2000, 2000),
344    ]
345
346    events = EventArray.from_events(events)
347
348    # Add metadata
349    events.add_metadata(pd.DataFrame({"test": [1, 2, 3, 4, 5]}))
350    assert events.get("test").equals(pd.DataFrame({"test": [1, 2, 3, 4, 5]}))
351    # Add again
352    events.add_metadata(pd.DataFrame({"test": [5, 4, 3, 2, 1]}))
353    assert events.get("test").equals(pd.DataFrame({"test": [5, 4, 3, 2, 1]}))
354
355    # Add features
356    events.add_features(pd.DataFrame({"test2": [1, 2, 3, 4, 5]}))
357    assert events.get("test2").equals(pd.DataFrame({"test2": [1, 2, 3, 4, 5]}))
358    # Add again
359    events.add_features(pd.DataFrame({"test2": [5, 4, 3, 2, 1]}))
360    assert events.get("test2").equals(pd.DataFrame({"test2": [5, 4, 3, 2, 1]}))
def test_event_montages(bzscan, circle):
363def test_event_montages(bzscan, circle):
364    tile = Tile(bzscan, 1000)
365    event = Event(tile, 1086, 342)
366    images = event.get_crops(crop_size=100)
367
368    montage = csi_images.make_montage(
369        images,
370        [0, 1, 4, 2, 3],
371        {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)},
372        labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"],
373    )
374    if SHOW_PLOTS:
375        cv2.imshow(
376            "Full, classic montage with labels",
377            cv2.cvtColor(montage, cv2.COLOR_RGB2BGR),
378        )
379        cv2.waitKey(0)
380        cv2.destroyAllWindows()
381
382    montage = csi_images.make_montage(
383        images,
384        [0, 1, 4, 2, 3],
385        {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)},
386        labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"],
387        mask=circle,
388    )
389    if SHOW_PLOTS:
390        cv2.imshow(
391            "Full, classic montage with labels and mask overlay",
392            cv2.cvtColor(montage, cv2.COLOR_RGB2BGR),
393        )
394        cv2.waitKey(0)
395        cv2.destroyAllWindows()
396
397    montage = csi_images.make_montage(
398        images,
399        [0, 1, 4, 2, 3],
400        {0: (0, 0, 1), 1: (1, 0, 0), 2: (0, 1, 0), 4: (1, 1, 1)},
401        labels=["RGB", "DAPI", "AF555", "AF488", "AF647", "BRIGHT"],
402        mask=circle,
403        mask_mode="hard",
404    )
405    if SHOW_PLOTS:
406        cv2.imshow(
407            "Full, classic montage with labels and hard-masking",
408            cv2.cvtColor(montage, cv2.COLOR_RGB2BGR),
409        )
410        cv2.waitKey(0)
411        cv2.destroyAllWindows()
def test_saving_crops_and_montages(bzscan):
414def test_saving_crops_and_montages(bzscan):
415    tile = Tile(bzscan, 1000)
416    tile2 = Tile(bzscan, 0)
417    events = [
418        Event(tile, 1086, 342),
419        Event(tile2, 2, 1000),
420        Event(tile, 1000, 1000),
421        Event(tile2, 800, 800),
422        Event(tile, 1000, 662),
423    ]
424
425    # Get all crops and montages
426    serial_crops = []
427    serial_montages = []
428    for event in events:
429        serial_crops.append(event.get_crops())
430        serial_montages.append(event.get_montage())
431
432    # Save crops and montages
433    Event.get_and_save_many_crops(events, "temp", bzscan.get_channel_names())
434    Event.get_and_save_many_montages(events, "temp")
435
436    saved_crops = []
437    saved_montages = []
438    for event in events:
439        crops = event.load_crops("temp")
440        saved_crops.append([crops[c] for c in bzscan.get_channel_names()])
441        saved_montages.append(event.load_montage("temp"))
442
443    # Make sure crops are identical
444    for a, b in zip(serial_crops, saved_crops):
445        for a_img, b_img in zip(a, b):
446            assert np.array_equal(a_img, b_img)
447
448    # Montages got JPEG compressed, so
449    # Size comparison:
450    for a, b in zip(serial_montages, saved_montages):
451        assert a.shape == b.shape
452
453    # Visual inspection
454    if SHOW_PLOTS:
455        cv2.imshow("Original", cv2.cvtColor(serial_montages[0], cv2.COLOR_RGB2BGR))
456        cv2.imshow("Saved", cv2.cvtColor(saved_montages[0], cv2.COLOR_RGB2BGR))
457        cv2.waitKey(0)
458        cv2.destroyAllWindows()
459
460    # Clean up
461    for file in os.listdir("temp"):
462        os.remove(os.path.join("temp", file))
463    os.rmdir("temp")
def test_saving_and_loading(axscan):
466def test_saving_and_loading(axscan):
467    # Origin
468    tile = Tile(axscan, 0)
469    events = [
470        Event(tile, 0, 100),
471        Event(tile, 0, 0),
472        Event(tile, 1000, 1000),
473        Event(tile, 1000, 1),
474        Event(tile, 2000, 2000),
475    ]
476
477    events = EventArray.from_events(events)
478
479    # Add content
480    events.add_metadata(pd.DataFrame({"test": [1, 2, 3, 4, 5]}))
481
482    # Add features
483    events.add_features(pd.DataFrame({"test2": [1, 2, 3, 4, 5]}))
484
485    # Save and load
486    assert events.save_csv("tests/data/events.csv")
487    assert events == EventArray.load_csv("tests/data/events.csv")
488
489    assert events.save_json("tests/data/events.json")
490    assert events == EventArray.load_json("tests/data/events.json")
491
492    assert events.save_hdf5("tests/data/events.h5")
493    assert events == EventArray.load_hdf5("tests/data/events.h5")
494
495    # Clean up
496    os.remove("tests/data/events.csv")
497    os.remove("tests/data/events.json")
498    os.remove("tests/data/events.h5")