mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
1270 lines
47 KiB
Python
1270 lines
47 KiB
Python
"""
|
|
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
|
|
import os
|
|
import threading
|
|
import time
|
|
import unittest
|
|
from unittest import mock
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
|
|
|
|
from fastdeploy.metrics import trace
|
|
from fastdeploy.metrics.trace import FilteringSpanProcessor, label_span
|
|
|
|
|
|
class TestFilteringSpanProcessor(unittest.TestCase):
|
|
"""Test cases for FilteringSpanProcessor class"""
|
|
|
|
def setUp(self):
|
|
"""Set up test fixtures"""
|
|
self.exporter = ConsoleSpanExporter()
|
|
self.processor = FilteringSpanProcessor(self.exporter)
|
|
|
|
def test_initialization(self):
|
|
"""Test that FilteringSpanProcessor is properly initialized"""
|
|
self.assertIsInstance(self.processor._processor, BatchSpanProcessor)
|
|
self.assertEqual(self.processor._processor.span_exporter, self.exporter)
|
|
|
|
def test_on_start_with_parent_span(self):
|
|
"""Test on_start method with parent span containing stream attribute"""
|
|
# Mock span and parent context
|
|
mock_span = MagicMock()
|
|
mock_parent_span = MagicMock()
|
|
mock_parent_span.is_recording.return_value = True
|
|
mock_parent_span.attributes.get.return_value = "test_stream"
|
|
|
|
# Mock trace.get_current_span to return parent span
|
|
with patch("fastdeploy.metrics.trace.trace.get_current_span", return_value=mock_parent_span):
|
|
with patch.object(self.processor._processor, "on_start") as mock_parent_on_start:
|
|
self.processor.on_start(mock_span, parent_context=None)
|
|
|
|
# Verify stream attribute is set on child span
|
|
mock_span.set_attribute.assert_called_once_with("stream", "test_stream")
|
|
mock_parent_on_start.assert_called_once_with(mock_span, None)
|
|
|
|
def test_on_start_without_parent_span(self):
|
|
"""Test on_start method without parent span"""
|
|
mock_span = MagicMock()
|
|
|
|
# Mock trace.get_current_span to return None
|
|
with patch("fastdeploy.metrics.trace.trace.get_current_span", return_value=None):
|
|
with patch.object(self.processor._processor, "on_start") as mock_parent_on_start:
|
|
self.processor.on_start(mock_span, parent_context=None)
|
|
|
|
# Verify no attributes are set
|
|
mock_span.set_attribute.assert_not_called()
|
|
mock_parent_on_start.assert_called_once_with(mock_span, None)
|
|
|
|
def test_on_start_with_non_recording_parent_span(self):
|
|
"""Test on_start method with non-recording parent span"""
|
|
mock_span = MagicMock()
|
|
mock_parent_span = MagicMock()
|
|
mock_parent_span.is_recording.return_value = False
|
|
|
|
with patch("fastdeploy.metrics.trace.trace.get_current_span", return_value=mock_parent_span):
|
|
with patch.object(self.processor._processor, "on_start") as mock_parent_on_start:
|
|
self.processor.on_start(mock_span, parent_context=None)
|
|
|
|
# Verify no attributes are set
|
|
mock_span.set_attribute.assert_not_called()
|
|
mock_parent_on_start.assert_called_once_with(mock_span, None)
|
|
|
|
def test_on_end_filter_stream_http_response(self):
|
|
"""Test on_end method filters out stream http response spans"""
|
|
mock_span = MagicMock()
|
|
mock_span.attributes.get.side_effect = lambda key: {
|
|
"asgi.event.type": "http.response.body",
|
|
"stream": "true",
|
|
}.get(key)
|
|
mock_span.name = "http send request"
|
|
|
|
with patch.object(self.processor._processor, "on_end") as mock_parent_on_end:
|
|
self.processor.on_end(mock_span)
|
|
|
|
# Verify parent on_end is NOT called (span is filtered out)
|
|
mock_parent_on_end.assert_not_called()
|
|
|
|
def test_on_end_keep_spans_without_http_send(self):
|
|
"""Test on_end method keeps spans without 'http send' in name"""
|
|
mock_span = MagicMock()
|
|
mock_span.attributes.get.side_effect = lambda key: {
|
|
"asgi.event.type": "http.response.body",
|
|
"stream": "true",
|
|
}.get(key)
|
|
mock_span.name = "other operation"
|
|
|
|
with patch.object(self.processor._processor, "on_end") as mock_parent_on_end:
|
|
self.processor.on_end(mock_span)
|
|
|
|
# Verify parent on_end is called
|
|
mock_parent_on_end.assert_called_once_with(mock_span)
|
|
|
|
def test_shutdown(self):
|
|
"""Test shutdown method"""
|
|
with patch.object(self.processor._processor, "shutdown") as mock_shutdown:
|
|
self.processor.shutdown()
|
|
mock_shutdown.assert_called_once()
|
|
|
|
def test_force_flush(self):
|
|
"""Test force_flush method"""
|
|
with patch.object(self.processor._processor, "force_flush") as mock_force_flush:
|
|
self.processor.force_flush(timeout_millis=5000)
|
|
mock_force_flush.assert_called_once_with(5000)
|
|
|
|
|
|
class TestLableSpan(unittest.TestCase):
|
|
"""Test cases for label_span function"""
|
|
|
|
def test_lable_span_with_stream_request(self):
|
|
"""Test label_span function with streaming request"""
|
|
mock_request = MagicMock()
|
|
mock_request.stream = True
|
|
|
|
mock_span = MagicMock()
|
|
mock_span.is_recording.return_value = True
|
|
|
|
with patch("fastdeploy.metrics.trace.trace.get_current_span", return_value=mock_span):
|
|
label_span(mock_request)
|
|
|
|
# Verify stream attribute is set
|
|
mock_span.set_attribute.assert_called_once_with("stream", "true")
|
|
|
|
def test_lable_span_without_stream_request(self):
|
|
"""Test label_span function with non-streaming request"""
|
|
mock_request = MagicMock()
|
|
mock_request.stream = False
|
|
|
|
mock_span = MagicMock()
|
|
mock_span.is_recording.return_value = True
|
|
|
|
with patch("fastdeploy.metrics.trace.trace.get_current_span", return_value=mock_span):
|
|
label_span(mock_request)
|
|
|
|
# Verify no attributes are set
|
|
mock_span.set_attribute.assert_not_called()
|
|
|
|
def test_lable_span_without_current_span(self):
|
|
"""Test label_span function when no current span exists"""
|
|
mock_request = MagicMock()
|
|
mock_request.stream = True
|
|
|
|
with patch("fastdeploy.metrics.trace.trace.get_current_span", return_value=None):
|
|
# Should not raise any exception
|
|
label_span(mock_request)
|
|
|
|
def test_lable_span_with_non_recording_span(self):
|
|
"""Test label_span function with non-recording span"""
|
|
mock_request = MagicMock()
|
|
mock_request.stream = True
|
|
|
|
mock_span = MagicMock()
|
|
mock_span.is_recording.return_value = False
|
|
|
|
with patch("fastdeploy.metrics.trace.trace.get_current_span", return_value=mock_span):
|
|
label_span(mock_request)
|
|
|
|
# Verify no attributes are set
|
|
mock_span.set_attribute.assert_not_called()
|
|
|
|
|
|
class TestTraceComprehensive:
|
|
"""Comprehensive tests for tracing functionality"""
|
|
|
|
def setup_method(self):
|
|
"""Setup test environment"""
|
|
# Mock environment variables
|
|
self.original_env = os.environ.copy()
|
|
os.environ["TRACES_ENABLE"] = "true"
|
|
os.environ["FD_SERVICE_NAME"] = "test_service"
|
|
os.environ["FD_HOST_NAME"] = "test_host"
|
|
os.environ["EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317"
|
|
os.environ["EXPORTER_OTLP_HEADERS"] = "key1=value1,key2=value2"
|
|
os.environ["FD_OTLP_EXPORTER_SCHEDULE_DELAY_MILLIS"] = "1000"
|
|
os.environ["FD_OTLP_EXPORTER_MAX_EXPORT_BATCH_SIZE"] = "512"
|
|
|
|
# Reset global state
|
|
trace.remote_trace_contexts = {}
|
|
trace.threads_info = {}
|
|
trace.reqs_context = {}
|
|
trace.tracing_enabled = False
|
|
|
|
def teardown_method(self):
|
|
"""Restore environment"""
|
|
os.environ = self.original_env
|
|
|
|
def test_process_tracing_init_with_different_scenarios(self):
|
|
"""Test tracing initialization under different scenarios"""
|
|
# Test normal initialization
|
|
trace.process_tracing_init()
|
|
assert trace.tracing_enabled is True
|
|
|
|
# Test with tracing disabled
|
|
os.environ["TRACES_ENABLE"] = "false"
|
|
trace.process_tracing_init()
|
|
assert trace.tracing_enabled is False
|
|
|
|
# Test with invalid endpoint
|
|
os.environ["TRACES_ENABLE"] = "true"
|
|
os.environ["EXPORTER_OTLP_ENDPOINT"] = ""
|
|
|
|
# Test with different protocols
|
|
for protocol in ["grpc", "http/protobuf"]:
|
|
os.environ["OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"] = protocol
|
|
trace.process_tracing_init()
|
|
assert trace.tracing_enabled is True
|
|
|
|
# Test with unsupported protocol
|
|
os.environ["OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"] = "unsupported"
|
|
with pytest.raises(ValueError):
|
|
trace.get_otlp_span_exporter("http://localhost:4317", None)
|
|
|
|
def test_thread_info_with_different_ranks(self):
|
|
"""Test thread info with TP and DP ranks"""
|
|
# Test with TP rank
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread_tp", tp_rank=0, dp_rank=1)
|
|
|
|
pid = threading.get_native_id()
|
|
info = trace.threads_info[pid]
|
|
assert info.tp_rank == 0
|
|
assert info.dp_rank == 1
|
|
|
|
# Test with None ranks
|
|
trace.trace_set_thread_info("test_thread_no_ranks")
|
|
info = trace.threads_info[pid] # Should still be same thread
|
|
assert info.tp_rank == 0 # Should preserve previous values
|
|
|
|
def test_advanced_request_scenarios(self):
|
|
"""Test advanced request tracing scenarios"""
|
|
# Test request with timestamp
|
|
rid = "test_request_timestamp"
|
|
ts = int(time.time() * 1e9) - 1000 # 1 microsecond ago
|
|
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
trace.trace_req_start(rid, "", ts=ts)
|
|
assert rid in trace.reqs_context
|
|
assert trace.reqs_context[rid].start_time_ns == ts
|
|
|
|
trace.trace_req_finish(rid, ts=ts + 2000)
|
|
|
|
# Test request with attributes
|
|
rid2 = "test_request_attrs"
|
|
trace.trace_req_start(rid2, "")
|
|
attrs = {"attr1": "value1", "attr2": 123}
|
|
trace.trace_req_finish(rid2, attrs=attrs)
|
|
|
|
def test_complex_slice_scenarios(self):
|
|
"""Test complex slice operations"""
|
|
rid = "test_complex_slices"
|
|
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Test nested slices
|
|
trace.trace_slice_start("outer", rid)
|
|
trace.trace_slice_start("inner", rid)
|
|
trace.trace_slice_end("inner", rid)
|
|
trace.trace_slice_end("outer", rid)
|
|
|
|
# Test anonymous slices
|
|
trace.trace_slice_start("", rid, anonymous=True)
|
|
trace.trace_slice_end("anonymous_test", rid)
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_report_span_function(self):
|
|
"""Test trace_report_span convenience function"""
|
|
rid = "test_report_span"
|
|
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Test trace_report_span
|
|
start_time = int(time.time() * 1e9)
|
|
end_time = start_time + 1000000 # 1ms later
|
|
attrs = {"test_attr": "test_value"}
|
|
|
|
trace.trace_report_span("report_test", rid, start_time, end_time, attrs)
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_propagation_advanced_scenarios(self):
|
|
"""Test advanced context propagation scenarios"""
|
|
rid = "test_advanced_propagation"
|
|
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Create slices to get a non-null prev_span_context
|
|
trace.trace_slice_start("slice1", rid)
|
|
trace.trace_slice_end("slice1", rid)
|
|
|
|
# Get context with prev_span_context
|
|
context_dict = trace.trace_get_proc_propagate_context(rid)
|
|
assert context_dict is not None
|
|
assert "prev_span" in context_dict
|
|
|
|
# Test propagation with timestamp
|
|
new_rid = "test_propagated"
|
|
ts = int(time.time() * 1e9)
|
|
trace.trace_set_proc_propagate_context(new_rid, context_dict, ts=ts)
|
|
|
|
assert new_rid in trace.reqs_context
|
|
assert trace.reqs_context[new_rid].is_copy is True
|
|
assert trace.reqs_context[new_rid].start_time_ns == ts
|
|
|
|
# Test with empty or invalid context
|
|
trace.trace_set_proc_propagate_context("invalid_rid", None)
|
|
trace.trace_set_proc_propagate_context("invalid_rid", {})
|
|
trace.trace_set_proc_propagate_context("invalid_rid", {"invalid": "data"})
|
|
|
|
trace.trace_req_finish(rid)
|
|
trace.trace_req_finish(new_rid)
|
|
|
|
def test_multiple_threads_same_request(self):
|
|
"""Test tracing with multiple threads on same request"""
|
|
rid = "test_multi_thread"
|
|
|
|
trace.process_tracing_init()
|
|
|
|
# Setup main thread
|
|
trace.trace_set_thread_info("main_thread")
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Create worker thread
|
|
def worker_thread():
|
|
trace.trace_set_thread_info("worker_thread")
|
|
trace.trace_slice_start("worker_task", rid)
|
|
time.sleep(0.001) # Simulate work
|
|
trace.trace_slice_end("worker_task", rid)
|
|
|
|
thread = threading.Thread(target=worker_thread)
|
|
thread.start()
|
|
thread.join()
|
|
|
|
# Main thread continues
|
|
trace.trace_slice_start("main_task", rid)
|
|
trace.trace_slice_end("main_task", rid)
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_span_enum(self):
|
|
"""Test TraceSpanName enum values"""
|
|
assert trace.TraceSpanName.FASTDEPLOY == "FASTDEPLOY"
|
|
assert trace.TraceSpanName.PREPROCESSING == "PREPROCESSING"
|
|
assert trace.TraceSpanName.SCHEDULE == "SCHEDULE"
|
|
assert trace.TraceSpanName.PREFILL == "PREFILL"
|
|
assert trace.TraceSpanName.DECODE == "DECODE"
|
|
assert trace.TraceSpanName.POSTPROCESSING == "POSTPROCESSING"
|
|
|
|
# Test all enum members exist
|
|
expected_spans = ["FASTDEPLOY", "PREPROCESSING", "SCHEDULE", "PREFILL", "DECODE", "POSTPROCESSING"]
|
|
for span_name in expected_spans:
|
|
assert hasattr(trace.TraceSpanName, span_name)
|
|
|
|
def test_host_id_generation(self):
|
|
"""Test host ID generation logic"""
|
|
# Test with environment variable (most reliable)
|
|
os.environ["FD_HOST_NAME"] = "env-host-id"
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
pid = threading.get_native_id()
|
|
assert pid in trace.threads_info
|
|
assert trace.threads_info[pid].host_id == "env-host-id"
|
|
|
|
# Test fallback (when env var is not set)
|
|
os.environ.pop("FD_HOST_NAME", None)
|
|
trace.threads_info.clear() # Reset to trigger re-calculation
|
|
trace.trace_set_thread_info("test_thread2")
|
|
pid2 = threading.get_native_id()
|
|
assert pid2 in trace.threads_info
|
|
# Should generate some kind of host ID
|
|
assert trace.threads_info[pid2].host_id is not None
|
|
assert len(trace.threads_info[pid2].host_id) > 0
|
|
|
|
def test_edge_case_operations(self):
|
|
"""Test edge case operations"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
# Test operations on empty stack
|
|
rid = "test_edge_cases"
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Try to end a slice that doesn't exist
|
|
trace.trace_slice_end("non_existent", rid)
|
|
|
|
# Try to add event to non-existent slice
|
|
trace.trace_event("test_event", rid)
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
# Test repeated operations on finished request
|
|
trace.trace_slice_start("test", rid)
|
|
trace.trace_slice_end("test", rid)
|
|
trace.trace_event("test", rid)
|
|
|
|
def test_timing_functions(self):
|
|
"""Test timing-related functions"""
|
|
# Test that time_ns is used if available
|
|
if hasattr(time, "time_ns"):
|
|
trace.process_tracing_init()
|
|
# Test that timing works correctly by checking timestamps
|
|
ts1 = int(time.time() * 1e9)
|
|
time.sleep(0.001) # 1ms
|
|
ts2 = int(time.time() * 1e9)
|
|
assert ts2 > ts1
|
|
assert ts2 - ts1 >= 1000000 # At least 1ms in nanoseconds
|
|
|
|
def test_request_start_with_trace_content(self):
|
|
"""Test request start with trace content (upstream context)"""
|
|
rid = "test_upstream_context"
|
|
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
# Test with empty upstream context (valid case)
|
|
trace_content = ""
|
|
trace.trace_req_start(rid, trace_content, role="test_role")
|
|
|
|
# Verify that request was created
|
|
assert rid in trace.reqs_context
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_span_linking_logic(self):
|
|
"""Test span linking functionality"""
|
|
rid = "test_span_linking"
|
|
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Create first slice
|
|
trace.trace_slice_start("first_slice", rid)
|
|
trace.trace_slice_end("first_slice", rid)
|
|
|
|
# Create second slice (should be linked to first)
|
|
trace.trace_slice_start("second_slice", rid)
|
|
trace.trace_slice_end("second_slice", rid)
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
@mock.patch("fastdeploy.metrics.trace.trace")
|
|
def test_active_span_handling(self, mock_trace):
|
|
"""Test handling of active spans from FastAPI Instrumentor"""
|
|
rid = "test_active_span"
|
|
|
|
# Mock an active span
|
|
mock_span = mock.MagicMock()
|
|
mock_span.is_recording.return_value = True
|
|
mock_span.name = "fastapi_request"
|
|
mock_span.get_span_context.return_value = mock.MagicMock(is_valid=True, trace_id=1234567890)
|
|
mock_trace.get_current_span.return_value = mock_span
|
|
mock_trace.set_span_in_context.return_value = "mock_context"
|
|
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Verify that active span was used
|
|
assert rid in trace.reqs_context
|
|
assert trace.reqs_context[rid].is_copy is True
|
|
mock_span.set_attribute.assert_called_with("rid", rid)
|
|
mock_span.update_name.assert_called_with("fastapi_request (Req: test_active_span)")
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_lable_span_functionality(self):
|
|
"""Test label_span function with different scenarios"""
|
|
|
|
# Create mock request and span
|
|
class MockRequest:
|
|
def __init__(self, stream):
|
|
self.stream = stream
|
|
|
|
mock_span = mock.MagicMock()
|
|
mock_span.is_recording.return_value = True
|
|
|
|
with mock.patch("fastdeploy.metrics.trace.trace.get_current_span", return_value=mock_span):
|
|
# Test with stream=True
|
|
request_stream = MockRequest(True)
|
|
trace.label_span(request_stream)
|
|
mock_span.set_attribute.assert_called_with("stream", "true")
|
|
|
|
# Test with stream=False
|
|
request_no_stream = MockRequest(False)
|
|
trace.label_span(request_no_stream)
|
|
# Should not set stream attribute for False
|
|
|
|
# Test with no active span
|
|
with mock.patch(
|
|
"fastdeploy.metrics.trace.trace.get_current_span", return_value=mock.MagicMock(is_recording=False)
|
|
):
|
|
request_no_stream = MockRequest(False)
|
|
trace.label_span(request_no_stream)
|
|
# Should not set stream attribute for False
|
|
# Should not crash
|
|
|
|
def test_error_handling_and_logging(self):
|
|
"""Test error handling and logging scenarios"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
with mock.patch("fastdeploy.metrics.trace.logger") as mock_logger:
|
|
# Test operations on non-existent request
|
|
rid = "non_existent"
|
|
trace.trace_slice_start("test", rid)
|
|
trace.trace_slice_end("test", rid)
|
|
trace.trace_event("test", rid)
|
|
trace.trace_slice_add_attr(rid, {"test": "value"})
|
|
|
|
# Should log warnings but not crash
|
|
# Check if warning was called (may not always be called depending on implementation)
|
|
|
|
# Test slice name mismatch warning
|
|
rid = "test_mismatch_warning"
|
|
trace.trace_req_start(rid, "")
|
|
|
|
with mock.patch("fastdeploy.metrics.trace.logger") as mock_logger:
|
|
trace.trace_slice_start("start_name", rid)
|
|
trace.trace_slice_end("different_name", rid)
|
|
assert mock_logger.warning.called
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
|
|
class TestPerformanceAndConcurrency:
|
|
"""Performance and concurrency tests"""
|
|
|
|
def test_concurrent_requests(self):
|
|
"""Test handling of concurrent requests"""
|
|
trace.process_tracing_init()
|
|
|
|
def process_request(request_id, results_list):
|
|
"""Process a single request"""
|
|
trace.trace_set_thread_info(f"thread_{request_id}")
|
|
trace.trace_req_start(request_id, "")
|
|
trace.trace_slice_start("process", request_id)
|
|
time.sleep(0.001) # Simulate work
|
|
trace.trace_slice_end("process", request_id)
|
|
trace.trace_req_finish(request_id)
|
|
result = f"request_{request_id}_completed"
|
|
results_list.append(result)
|
|
return result
|
|
|
|
# Process multiple requests concurrently
|
|
results = []
|
|
threads = []
|
|
|
|
for i in range(10):
|
|
thread = threading.Thread(target=process_request, args=(f"req_{i}", results))
|
|
threads.append(thread)
|
|
thread.start()
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
# Verify all requests were processed
|
|
assert len([r for r in results if r.endswith("_completed")]) == 10
|
|
|
|
def test_memory_cleanup(self):
|
|
"""Test proper memory cleanup"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
# Create and finish multiple requests
|
|
for i in range(5):
|
|
rid = f"test_request_{i}"
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_slice_start("test", rid)
|
|
trace.trace_slice_end("test", rid)
|
|
trace.trace_req_finish(rid)
|
|
|
|
# Verify cleanup
|
|
assert len(trace.reqs_context) == 0
|
|
|
|
# Thread info should persist
|
|
pid = threading.get_native_id()
|
|
assert pid in trace.threads_info
|
|
|
|
|
|
class TestAdditionalCoverage:
|
|
"""Additional test cases for better code coverage"""
|
|
|
|
def setup_method(self):
|
|
"""Setup test environment"""
|
|
self.original_env = os.environ.copy()
|
|
os.environ["TRACES_ENABLE"] = "true"
|
|
os.environ["FD_SERVICE_NAME"] = "test_service"
|
|
os.environ["EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317"
|
|
|
|
# Reset global state
|
|
trace.remote_trace_contexts = {}
|
|
trace.threads_info = {}
|
|
trace.reqs_context = {}
|
|
trace.tracing_enabled = False
|
|
|
|
def teardown_method(self):
|
|
"""Restore environment"""
|
|
os.environ = self.original_env
|
|
|
|
def test_trace_propagate_context_to_dict(self):
|
|
"""Test TracePropagateContext.to_dict method"""
|
|
from fastdeploy.metrics.trace import TracePropagateContext
|
|
|
|
# Mock context objects
|
|
mock_root_context = MagicMock()
|
|
mock_prev_span_context = MagicMock()
|
|
mock_prev_span_context.span_id = 12345
|
|
mock_prev_span_context.trace_id = 67890
|
|
|
|
# Test with prev_span_context
|
|
propagate_context = TracePropagateContext(mock_root_context, mock_prev_span_context)
|
|
result_dict = propagate_context.to_dict()
|
|
|
|
assert "root_span" in result_dict
|
|
assert "prev_span" in result_dict
|
|
assert result_dict["prev_span"]["span_id"] == 12345
|
|
assert result_dict["prev_span"]["trace_id"] == 67890
|
|
|
|
# Test without prev_span_context
|
|
propagate_context_none = TracePropagateContext(mock_root_context, None)
|
|
result_dict_none = propagate_context_none.to_dict()
|
|
|
|
assert "root_span" in result_dict_none
|
|
assert result_dict_none["prev_span"] == "None"
|
|
|
|
def test_trace_propagate_context_instance_from_dict(self):
|
|
"""Test TracePropagateContext.instance_from_dict method"""
|
|
from fastdeploy.metrics.trace import TracePropagateContext
|
|
|
|
# Test valid dict with prev_span
|
|
valid_dict = {"root_span": {"test": "carrier"}, "prev_span": {"span_id": 12345, "trace_id": 67890}}
|
|
|
|
with mock.patch("fastdeploy.metrics.trace.propagate.extract") as mock_extract:
|
|
mock_extract.return_value = "mock_context"
|
|
|
|
with mock.patch("fastdeploy.metrics.trace.trace.span.SpanContext") as mock_span_context:
|
|
mock_span_context_instance = MagicMock()
|
|
mock_span_context.return_value = mock_span_context_instance
|
|
|
|
result = TracePropagateContext.instance_from_dict(valid_dict)
|
|
|
|
assert result is not None
|
|
assert result.root_span_context == "mock_context"
|
|
assert result.prev_span_context == mock_span_context_instance
|
|
mock_span_context.assert_called_once_with(trace_id=67890, span_id=12345, is_remote=True)
|
|
|
|
# Test with None prev_span
|
|
valid_dict_none = {"root_span": {"test": "carrier"}, "prev_span": "None"}
|
|
|
|
with mock.patch("fastdeploy.metrics.trace.propagate.extract") as mock_extract:
|
|
mock_extract.return_value = "mock_context"
|
|
|
|
result = TracePropagateContext.instance_from_dict(valid_dict_none)
|
|
|
|
assert result is not None
|
|
assert result.root_span_context == "mock_context"
|
|
assert result.prev_span_context is None
|
|
|
|
# Test invalid dict (missing keys)
|
|
invalid_dict = {"invalid": "data"}
|
|
result = TracePropagateContext.instance_from_dict(invalid_dict)
|
|
assert result is None
|
|
|
|
# Test empty dict
|
|
result = TracePropagateContext.instance_from_dict({})
|
|
assert result is None
|
|
|
|
def test_trace_custom_id_generator(self):
|
|
"""Test TraceCustomIdGenerator class"""
|
|
from fastdeploy.metrics.trace import TraceCustomIdGenerator
|
|
|
|
generator = TraceCustomIdGenerator()
|
|
|
|
# Test generate_trace_id
|
|
trace_id = generator.generate_trace_id()
|
|
assert isinstance(trace_id, int)
|
|
assert trace_id > 0
|
|
|
|
# Test generate_span_id
|
|
span_id = generator.generate_span_id()
|
|
assert isinstance(span_id, int)
|
|
assert span_id > 0
|
|
|
|
# Test that multiple calls generate different IDs
|
|
trace_id2 = generator.generate_trace_id()
|
|
span_id2 = generator.generate_span_id()
|
|
|
|
# Should be different (very high probability)
|
|
assert trace_id != trace_id2
|
|
assert span_id != span_id2
|
|
|
|
def test_get_host_id_fallback_methods(self):
|
|
"""Test __get_host_id function fallback methods"""
|
|
# Access function through module directly
|
|
import fastdeploy.metrics.trace as trace_module
|
|
|
|
get_host_id_func = trace_module.__dict__.get("__get_host_id")
|
|
|
|
if get_host_id_func is None:
|
|
# Skip test if function is not accessible
|
|
pytest.skip("__get_host_id function not accessible for testing")
|
|
return
|
|
|
|
# Test with FD_HOST_NAME set
|
|
os.environ["FD_HOST_NAME"] = "test-host-name"
|
|
host_id = get_host_id_func()
|
|
assert host_id == "test-host-name"
|
|
|
|
# Test fallback when machine-id files don't exist and MAC is 0
|
|
os.environ.pop("FD_HOST_NAME", None)
|
|
|
|
with mock.patch("builtins.open", side_effect=FileNotFoundError):
|
|
with mock.patch("uuid.getnode", return_value=0):
|
|
with mock.patch("uuid.uuid4") as mock_uuid4:
|
|
mock_uuid = MagicMock()
|
|
mock_uuid.hex = "test-uuid-hex"
|
|
mock_uuid4.return_value = mock_uuid
|
|
|
|
with mock.patch("os.getpid", return_value=12345):
|
|
host_id = get_host_id_func()
|
|
# The function might return different values based on environment
|
|
# Just verify it returns a non-empty string
|
|
assert isinstance(host_id, str)
|
|
assert len(host_id) > 0
|
|
|
|
def test_get_host_id_exception_handling(self):
|
|
"""Test __get_host_id exception handling"""
|
|
import fastdeploy.metrics.trace as trace_module
|
|
|
|
get_host_id_func = trace_module.__dict__.get("__get_host_id")
|
|
|
|
if get_host_id_func is None:
|
|
# Skip test if function is not accessible
|
|
pytest.skip("__get_host_id function not accessible for testing")
|
|
return
|
|
|
|
os.environ.pop("FD_HOST_NAME", None)
|
|
|
|
with mock.patch("builtins.open", side_effect=FileNotFoundError):
|
|
with mock.patch("uuid.getnode", return_value=0):
|
|
with mock.patch("uuid.uuid4", side_effect=Exception("UUID generation failed")):
|
|
host_id = get_host_id_func()
|
|
# The function should return some fallback value
|
|
assert isinstance(host_id, str)
|
|
assert len(host_id) > 0
|
|
# In case of complete failure, it should return "unknown"
|
|
# but depending on environment, it might return other fallback values
|
|
|
|
def test_trace_slice_auto_next_anon(self):
|
|
"""Test trace_slice_end with auto_next_anon parameter"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_auto_anon"
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Start a slice
|
|
trace.trace_slice_start("first_slice", rid)
|
|
|
|
# End with auto_next_anon=True
|
|
trace.trace_slice_end("first_slice", rid, auto_next_anon=True)
|
|
|
|
# Should have automatically started an anonymous slice
|
|
pid = threading.get_native_id()
|
|
thread_context = trace.reqs_context[rid].threads_context[pid]
|
|
assert len(thread_context.cur_slice_stack) == 1
|
|
assert thread_context.cur_slice_stack[0].anonymous is True
|
|
assert thread_context.cur_slice_stack[0].slice_name == ""
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_slice_thread_finish_flag(self):
|
|
"""Test trace_slice_end with thread_finish_flag parameter"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_thread_finish"
|
|
trace.trace_req_start(rid, "")
|
|
|
|
pid = threading.get_native_id()
|
|
|
|
# Start and end a slice with thread_finish_flag=True
|
|
trace.trace_slice_start("test_slice", rid)
|
|
trace.trace_slice_end("test_slice", rid, thread_finish_flag=True)
|
|
|
|
# Thread context should be removed
|
|
assert pid not in trace.reqs_context[rid].threads_context
|
|
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_slice_alias(self):
|
|
"""Test trace_slice alias function"""
|
|
# trace_slice should be an alias for trace_slice_end
|
|
assert trace.trace_slice == trace.trace_slice_end
|
|
|
|
def test_trace_event_and_add_attr_functionality(self):
|
|
"""Test trace_event and trace_slice_add_attr functionality"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_events_attrs"
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Start a slice
|
|
trace.trace_slice_start("test_slice", rid)
|
|
|
|
# Test trace_event
|
|
attrs = {"event_attr": "event_value"}
|
|
trace.trace_event("test_event", rid, attrs=attrs)
|
|
|
|
# Test trace_slice_add_attr
|
|
slice_attrs = {"slice_attr": "slice_value"}
|
|
trace.trace_slice_add_attr(rid, slice_attrs)
|
|
|
|
trace.trace_slice_end("test_slice", rid)
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_span_decorator_sync(self):
|
|
"""Test trace_span decorator with sync function"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
@trace.trace_span("test_sync_function")
|
|
def test_function():
|
|
return "test_result"
|
|
|
|
result = test_function()
|
|
assert result == "test_result"
|
|
|
|
def test_trace_span_decorator_async(self):
|
|
"""Test trace_span decorator with async function"""
|
|
import asyncio
|
|
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
@trace.trace_span("test_async_function")
|
|
async def test_async_function():
|
|
return "test_async_result"
|
|
|
|
async def run_test():
|
|
result = await test_async_function()
|
|
return result
|
|
|
|
result = asyncio.run(run_test())
|
|
assert result == "test_async_result"
|
|
|
|
def test_trace_span_decorator_disabled(self):
|
|
"""Test trace_span decorator when tracing is disabled"""
|
|
trace.tracing_enabled = False
|
|
|
|
@trace.trace_span("test_disabled_function")
|
|
def test_function():
|
|
return "test_result_disabled"
|
|
|
|
result = test_function()
|
|
assert result == "test_result_disabled"
|
|
|
|
def test_trace_span_decorator_no_thread_info(self):
|
|
"""Test trace_span decorator when thread info is not set"""
|
|
trace.process_tracing_init()
|
|
trace.threads_info.clear() # Clear thread info
|
|
|
|
@trace.trace_span("test_no_thread_info")
|
|
def test_function():
|
|
return "test_result_no_thread"
|
|
|
|
result = test_function()
|
|
assert result == "test_result_no_thread"
|
|
|
|
# Should have created thread info automatically
|
|
pid = threading.get_native_id()
|
|
assert pid in trace.threads_info
|
|
|
|
def test_get_otlp_span_exporter_grpc(self):
|
|
"""Test get_otlp_span_exporter with grpc protocol"""
|
|
exporter = trace.get_otlp_span_exporter("http://localhost:4317", None)
|
|
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
|
|
OTLPSpanExporter as GRPCSpanExporter,
|
|
)
|
|
|
|
assert isinstance(exporter, GRPCSpanExporter)
|
|
|
|
def test_get_otlp_span_exporter_http(self):
|
|
"""Test get_otlp_span_exporter with http protocol"""
|
|
# Set environment variable for http protocol
|
|
os.environ["OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"] = "http/protobuf"
|
|
headers = {"Authorization": "Bearer token"}
|
|
exporter = trace.get_otlp_span_exporter("http://localhost:4318", headers)
|
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
|
OTLPSpanExporter as HTTPSpanExporter,
|
|
)
|
|
|
|
assert isinstance(exporter, HTTPSpanExporter)
|
|
|
|
def test_get_otlp_span_exporter_unsupported_protocol(self):
|
|
"""Test get_otlp_span_exporter with unsupported protocol"""
|
|
# Set environment variable for unsupported protocol
|
|
os.environ["OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"] = "unsupported"
|
|
with pytest.raises(ValueError, match="Unsupported OTLP protocol"):
|
|
trace.get_otlp_span_exporter("http://localhost:4317", None)
|
|
|
|
def test_process_tracing_init_without_opentelemetry(self):
|
|
"""Test process_tracing_init when opentelemetry is not imported"""
|
|
original_opentelemetry_imported = trace.opentelemetry_imported
|
|
trace.opentelemetry_imported = False
|
|
|
|
try:
|
|
trace.process_tracing_init()
|
|
assert trace.tracing_enabled is False
|
|
finally:
|
|
trace.opentelemetry_imported = original_opentelemetry_imported
|
|
|
|
def test_trace_set_thread_info_when_tracing_disabled(self):
|
|
"""Test trace_set_thread_info when tracing is disabled"""
|
|
trace.tracing_enabled = False
|
|
|
|
# Should not raise any exception
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
# Should not add to threads_info
|
|
pid = threading.get_native_id()
|
|
assert pid not in trace.threads_info
|
|
|
|
def test_trace_set_thread_info_existing_thread(self):
|
|
"""Test trace_set_thread_info when thread already exists"""
|
|
trace.process_tracing_init()
|
|
|
|
# Set thread info first time
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
# Try to set again - should not overwrite
|
|
original_thread_info = trace.threads_info[threading.get_native_id()]
|
|
trace.trace_set_thread_info("different_thread")
|
|
|
|
# Should still have original info
|
|
pid = threading.get_native_id()
|
|
assert trace.threads_info[pid] == original_thread_info
|
|
|
|
def test_trace_req_start_without_thread_info(self):
|
|
"""Test trace_req_start when thread info is not set"""
|
|
trace.process_tracing_init()
|
|
trace.threads_info.clear() # Clear thread info
|
|
|
|
rid = "test_no_thread_info_req"
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Should not create request context
|
|
assert rid not in trace.reqs_context
|
|
|
|
def test_trace_req_start_existing_request(self):
|
|
"""Test trace_req_start when request already exists"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_existing_req"
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Try to start same request again - should return early
|
|
trace.trace_req_start(rid, "")
|
|
|
|
# Should not overwrite existing request (function returns early)
|
|
assert rid in trace.reqs_context
|
|
|
|
def test_trace_req_finish_nonexistent_request(self):
|
|
"""Test trace_req_finish with non-existent request"""
|
|
trace.process_tracing_init()
|
|
|
|
# Should not raise any exception
|
|
trace.trace_req_finish("nonexistent_rid")
|
|
|
|
def test_trace_slice_operations_without_request(self):
|
|
"""Test trace slice operations without request context"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "nonexistent_request"
|
|
|
|
# Should not raise any exception
|
|
trace.trace_slice_start("test", rid)
|
|
trace.trace_slice_end("test", rid)
|
|
trace.trace_event("test", rid)
|
|
trace.trace_slice_add_attr(rid, {"test": "value"})
|
|
|
|
def test_trace_get_proc_propagate_context_without_request(self):
|
|
"""Test trace_get_proc_propagate_context without request"""
|
|
trace.process_tracing_init()
|
|
|
|
result = trace.trace_get_proc_propagate_context("nonexistent_rid")
|
|
assert result is None
|
|
|
|
def test_trace_set_proc_propagate_context_without_request(self):
|
|
"""Test trace_set_proc_propagate_context without request"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
context_dict = {"test": "context"}
|
|
|
|
# Should not raise any exception
|
|
trace.trace_set_proc_propagate_context("nonexistent_rid", context_dict)
|
|
|
|
def test_trace_set_proc_propagate_context_existing_thread(self):
|
|
"""Test trace_set_proc_propagate_context when thread already exists"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_existing_thread"
|
|
context_dict = {"test": "context"}
|
|
|
|
# Create request context first
|
|
trace.reqs_context[rid] = trace.TraceReqContext(
|
|
rid=rid,
|
|
start_time_ns=int(time.time() * 1e9),
|
|
threads_context={threading.get_native_id(): MagicMock()},
|
|
is_copy=True,
|
|
)
|
|
|
|
# Try to set propagate context - should not create new thread context
|
|
original_threads_context = trace.reqs_context[rid].threads_context.copy()
|
|
trace.trace_set_proc_propagate_context(rid, context_dict)
|
|
|
|
# Should not have changed threads_context
|
|
assert trace.reqs_context[rid].threads_context == original_threads_context
|
|
|
|
def test_trace_report_span_without_request(self):
|
|
"""Test trace_report_span without request context"""
|
|
trace.process_tracing_init()
|
|
|
|
# Should not raise any exception
|
|
trace.trace_report_span("test", "nonexistent_rid", 0, 1000000)
|
|
|
|
def test_all_functions_when_tracing_disabled(self):
|
|
"""Test all trace functions when tracing is disabled"""
|
|
trace.tracing_enabled = False
|
|
|
|
rid = "test_disabled"
|
|
|
|
# All these should not raise exceptions
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_req_finish(rid)
|
|
trace.trace_slice_start("test", rid)
|
|
trace.trace_slice_end("test", rid)
|
|
trace.trace_event("test", rid)
|
|
trace.trace_slice_add_attr(rid, {"test": "value"})
|
|
trace.trace_get_proc_propagate_context(rid)
|
|
trace.trace_set_proc_propagate_context(rid, {})
|
|
trace.trace_report_span("test", rid, 0, 1000000)
|
|
|
|
def test_trace_req_start_with_role(self):
|
|
"""Test trace_req_start with role parameter"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_with_role"
|
|
role = "test_role"
|
|
|
|
trace.trace_req_start(rid, "", role=role)
|
|
|
|
# Should create request context
|
|
assert rid in trace.reqs_context
|
|
|
|
def test_trace_req_start_with_null_role(self):
|
|
"""Test trace_req_start with null role"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_null_role"
|
|
role = "null"
|
|
|
|
trace.trace_req_start(rid, "", role=role)
|
|
|
|
# Should create request context
|
|
assert rid in trace.reqs_context
|
|
|
|
def test_trace_span_decorator_with_custom_name(self):
|
|
"""Test trace_span decorator with custom span name"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
@trace.trace_span("custom_span_name")
|
|
def test_function():
|
|
return "test_result"
|
|
|
|
result = test_function()
|
|
assert result == "test_result"
|
|
|
|
def test_trace_span_decorator_without_name(self):
|
|
"""Test trace_span decorator without custom span name"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
@trace.trace_span()
|
|
def test_function():
|
|
return "test_result"
|
|
|
|
result = test_function()
|
|
assert result == "test_result"
|
|
|
|
def test_trace_span_decorator_with_none_name(self):
|
|
"""Test trace_span decorator with None span name"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
@trace.trace_span(None)
|
|
def test_function():
|
|
return "test_result"
|
|
|
|
result = test_function()
|
|
assert result == "test_result"
|
|
|
|
def test_trace_slice_start_with_timestamp(self):
|
|
"""Test trace_slice_start with custom timestamp"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_timestamp"
|
|
ts = int(time.time() * 1e9) - 1000000 # 1ms ago
|
|
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_slice_start("test_slice", rid, ts=ts)
|
|
trace.trace_slice_end("test_slice", rid)
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_slice_end_with_timestamp(self):
|
|
"""Test trace_slice_end with custom timestamp"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_end_timestamp"
|
|
ts = int(time.time() * 1e9) + 1000000 # 1ms in future
|
|
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_slice_start("test_slice", rid)
|
|
trace.trace_slice_end("test_slice", rid, ts=ts)
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_slice_end_with_attributes(self):
|
|
"""Test trace_slice_end with attributes"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_attrs"
|
|
attrs = {"test_attr": "test_value", "number_attr": 123}
|
|
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_slice_start("test_slice", rid)
|
|
trace.trace_slice_end("test_slice", rid, attrs=attrs)
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_event_with_timestamp(self):
|
|
"""Test trace_event with custom timestamp"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_event_timestamp"
|
|
ts = int(time.time() * 1e9) - 500000 # 0.5ms ago
|
|
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_slice_start("test_slice", rid)
|
|
trace.trace_event("test_event", rid, ts=ts)
|
|
trace.trace_slice_end("test_slice", rid)
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_event_without_attributes(self):
|
|
"""Test trace_event without attributes"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_event_no_attrs"
|
|
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_slice_start("test_slice", rid)
|
|
trace.trace_event("test_event", rid)
|
|
trace.trace_slice_end("test_slice", rid)
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_trace_report_span_with_thread_finish(self):
|
|
"""Test trace_report_span with thread_finish_flag"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_report_thread_finish"
|
|
start_time = int(time.time() * 1e9)
|
|
end_time = start_time + 1000000 # 1ms later
|
|
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_report_span("test_span", rid, start_time, end_time, thread_finish_flag=True)
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_multiple_nested_slices(self):
|
|
"""Test multiple levels of nested slices"""
|
|
trace.process_tracing_init()
|
|
trace.trace_set_thread_info("test_thread")
|
|
|
|
rid = "test_nested"
|
|
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_slice_start("level1", rid)
|
|
trace.trace_slice_start("level2", rid)
|
|
trace.trace_slice_start("level3", rid)
|
|
trace.trace_slice_end("level3", rid)
|
|
trace.trace_slice_end("level2", rid)
|
|
trace.trace_slice_end("level1", rid)
|
|
trace.trace_req_finish(rid)
|
|
|
|
def test_concurrent_slice_operations(self):
|
|
"""Test concurrent slice operations"""
|
|
trace.process_tracing_init()
|
|
|
|
rid = "test_concurrent_slices"
|
|
|
|
def worker_slices():
|
|
trace.trace_set_thread_info("worker_thread")
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_slice_start("worker_slice", rid)
|
|
time.sleep(0.001)
|
|
trace.trace_slice_end("worker_slice", rid)
|
|
trace.trace_req_finish(rid)
|
|
|
|
# Main thread
|
|
trace.trace_set_thread_info("main_thread")
|
|
trace.trace_req_start(rid, "")
|
|
trace.trace_slice_start("main_slice", rid)
|
|
|
|
# Start worker thread
|
|
thread = threading.Thread(target=worker_slices)
|
|
thread.start()
|
|
thread.join()
|
|
|
|
trace.trace_slice_end("main_slice", rid)
|
|
trace.trace_req_finish(rid)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|