Skip to content
Snippets Groups Projects
Commit 4b34adca authored by Christoph Alt's avatar Christoph Alt
Browse files

upload works now also with row data points

parent 2a75ddba
No related merge requests found
Pipeline #43045 passed with stages
in 2 minutes and 30 seconds
......@@ -2,10 +2,15 @@ import logging
import os
import pprint
from dataclasses import dataclass
from typing import Dict, List, Union
import dotenv
from influxdb import InfluxDBClient
from .data_points import DataPoint
Points = Union[List[Dict], List[DataPoint]]
logger = logging.getLogger(__file__)
MISSING_DB_PW = """
......@@ -53,16 +58,25 @@ class Uploader:
database=config.database,
)
def upload(self, points, dry_run=False, *, time_precision='s', **kwargs):
logger.info(f"Uploading: {pprint.pformat(points)}")
def upload(self, points: Points, dry_run=False, *, time_precision='s', **kwargs):
to_upload = points
if isinstance(points[0], DataPoint):
assert all(isinstance(dp, DataPoint) for dp in points)
to_upload = [dp.asdict() for dp in points]
else:
assert all(isinstance(dp, Dict) for dp in points)
logger.info(f"Uploading: {pprint.pformat(to_upload)}")
success = True
if (common_tags := kwargs.get("tags")):
logger.info(f"with common tags: {pprint.pformat(common_tags)}")
if not dry_run:
success = self.client.write_points(points,
success = self.client.write_points(to_upload,
time_precision=time_precision,
**kwargs)
if success:
logger.info(f"Uploaded {len(points)} items")
else:
raise ValueError("Uploading to influxdb went wrong!")
return success
from cbutil.upload import Uploader, DBConfig, load_config_from_env
import os
import time
from cbutil.data_points import DataPoint
from cbutil.upload import DBConfig, Uploader, load_config_from_env
DUMMY_CONF = DBConfig("host", 1234, "user_name", "database", "write_user_pw")
def setup_env():
......@@ -12,8 +17,7 @@ def setup_env():
def test_init_with_conf():
conf = DBConfig("host", 1234, "user_name", "database", "write_user_pw")
Uploader(conf)
Uploader(DUMMY_CONF)
def test_load_conf():
......@@ -37,3 +41,10 @@ def test_init_from_env():
assert up.config.user_name == os.environ["INFLUXDB_USER_NAME"]
assert up.config.database == os.environ["INFLUXDB_DATABASE"]
assert up.config.write_user_pw == os.environ["INFLUXDB_WRITE_USER_PASSWORD"]
def test_upload():
up = Uploader(DUMMY_CONF)
dp = DataPoint(measurement="Test", time=int(time.time()), fields=dict(), tags=dict())
assert up.upload([dp], dry_run=True)
assert up.upload([dp.asdict()], dry_run=True)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment