Skip to content
Snippets Groups Projects
Commit 6fc1faf3 authored by Christoph Alt's avatar Christoph Alt
Browse files

refactored upload test

parent b649c02d
Branches
Tags
No related merge requests found
Pipeline #45301 passed with stages
in 1 minute and 43 seconds
import os import os
import time import time
from unittest.mock import MagicMock
import pytest
from cbutil.data_points import DataPoint from cbutil.data_points import DataPoint
from cbutil.upload import DBConfig, Uploader, load_config_from_env from cbutil.upload import DBConfig, Uploader, load_config_from_env
DUMMY_CONF = DBConfig("host", 1234, "user_name", "database", "write_user_pw")
@pytest.fixture
def dummy_conf():
return DBConfig("host", 1234, "user_name", "database", "write_user_pw")
@pytest.fixture
def mock_upload(dummy_conf):
up = Uploader(dummy_conf)
up.client = MagicMock(return_value=True)
return up
@pytest.fixture
def dummy_dp():
return DataPoint(measurement="Test", time=int(time.time()), fields=dict(), tags=dict())
def setup_env():
def setup_env():
os.environ["INFLUXDB_HOST"] = "INFLUXDB_HOST" os.environ["INFLUXDB_HOST"] = "INFLUXDB_HOST"
os.environ["INFLUXDB_PORT"] = "1234" os.environ["INFLUXDB_PORT"] = "1234"
os.environ["INFLUXDB_USER_NAME"] = "INFLUXDB_USER_NAME" os.environ["INFLUXDB_USER_NAME"] = "INFLUXDB_USER_NAME"
...@@ -16,8 +33,8 @@ def setup_env(): ...@@ -16,8 +33,8 @@ def setup_env():
os.environ["INFLUXDB_WRITE_USER_PASSWORD"] = "INFLUXDB_WRITE_USER_PASSWORD" os.environ["INFLUXDB_WRITE_USER_PASSWORD"] = "INFLUXDB_WRITE_USER_PASSWORD"
def test_init_with_conf(): def test_init_with_conf(dummy_conf):
Uploader(DUMMY_CONF) Uploader(dummy_conf)
def test_load_conf(): def test_load_conf():
...@@ -43,8 +60,17 @@ def test_init_from_env(): ...@@ -43,8 +60,17 @@ def test_init_from_env():
assert up.config.write_user_pw == os.environ["INFLUXDB_WRITE_USER_PASSWORD"] assert up.config.write_user_pw == os.environ["INFLUXDB_WRITE_USER_PASSWORD"]
def test_upload(): def test_dry_run(mock_upload):
up = Uploader(DUMMY_CONF)
dp = DataPoint(measurement="Test", time=int(time.time()), fields=dict(), tags=dict()) dp = DataPoint(measurement="Test", time=int(time.time()), fields=dict(), tags=dict())
assert up.upload([dp], dry_run=True) assert mock_upload.upload([dp], dry_run=True, time_precision='s')
assert up.upload([dp.asdict()], dry_run=True) mock_upload.client.write_points.assert_not_called()
def test_upload_as_dp(mock_upload, dummy_dp):
assert mock_upload.upload([dummy_dp], time_precision='s')
mock_upload.client.write_points.assert_called_with([dummy_dp.asdict()], time_precision='s')
def test_upload_as_dict(mock_upload, dummy_dp):
assert mock_upload.upload([dummy_dp.asdict()], time_precision='s')
mock_upload.client.write_points.assert_called_with([dummy_dp.asdict()], time_precision='s')
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment