From 97d20a115565988243449d2d872fe8fc810a7ec4 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Tue, 30 Sep 2025 10:32:21 -0700 Subject: [PATCH] more logging for errors on pubsub listener --- nonfree/controller/PubSubListener.cpp | 12 +- nonfree/controller/PubSubWriter.cpp | 194 ++++++++++++++------------ 2 files changed, 111 insertions(+), 95 deletions(-) diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index 623b3ba97..7ec59b5ed 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -56,7 +56,7 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s _subscriberThread = std::thread(&PubSubListener::subscribe, this); } -PubSubListener::~PubSubListener() +PubSubListener::~PubSubListener()` { _run = false; if (_subscriberThread.joinable()) { @@ -174,13 +174,14 @@ void PubSubNetworkListener::onNotification(const std::string& payload) } } catch (const nlohmann::json::parse_error& e) { - fprintf(stderr, "JSON parse error: %s\n", e.what()); + fprintf(stderr, "PubSubNetworkListener JSON parse error: %s\n", e.what()); span->SetAttribute("error", e.what()); span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); + fprintf(stderr, "payload: %s\n", payload.c_str()); return; } catch (const std::exception& e) { - fprintf(stderr, "Exception in PubSubNetworkListener: %s\n", e.what()); + fprintf(stderr, "PubSubNetworkListener Exception in PubSubNetworkListener: %s\n", e.what()); span->SetAttribute("error", e.what()); span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); return; @@ -260,13 +261,14 @@ void PubSubMemberListener::onNotification(const std::string& payload) } } catch (const nlohmann::json::parse_error& e) { - fprintf(stderr, "JSON parse error: %s\n", e.what()); + fprintf(stderr, "PubSubMemberListener JSON parse error: %s\n", e.what()); span->SetAttribute("error", e.what()); span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); + fprintf(stderr, "payload: %s\n", payload.c_str()); return; } catch (const std::exception& e) { - fprintf(stderr, "Exception in PubSubMemberListener: %s\n", e.what()); + fprintf(stderr, "PubSubMemberListener Exception in PubSubMemberListener: %s\n", e.what()); span->SetAttribute("error", e.what()); span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); return; diff --git a/nonfree/controller/PubSubWriter.cpp b/nonfree/controller/PubSubWriter.cpp index fcba2c2fa..ee9ba8bd9 100644 --- a/nonfree/controller/PubSubWriter.cpp +++ b/nonfree/controller/PubSubWriter.cpp @@ -144,75 +144,82 @@ pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j) } pbmessages::NetworkChange_Network* n = new pbmessages::NetworkChange_Network(); - n->set_network_id(j.value("id", "")); - n->set_name(j.value("name", "")); - n->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1)); - n->set_creation_time(j.value("creationTime", 0)); - n->set_enable_broadcast(j.value("enableBroadcast", false)); + try { + n->set_network_id(j.value("id", "")); + n->set_name(j.value("name", "")); + n->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1)); + n->set_creation_time(j.value("creationTime", 0)); + n->set_enable_broadcast(j.value("enableBroadcast", false)); - for (const auto& p : j["ipAssignmentPools"]) { - if (p.is_object()) { - auto pool = n->add_assignment_pools(); - pool->set_start_ip(p.value("ipRangeStart", "")); - pool->set_end_ip(p.value("ipRangeEnd", "")); - } - } - - n->set_mtu(j.value("mtu", 2800)); - n->set_multicast_limit(j.value("multicastLimit", 32)); - n->set_is_private(j.value("private", true)); - n->set_remote_trace_level(j.value("remoteTraceLevel", 0)); - n->set_remote_trace_target(j.value("remoteTraceTarget", "")); - n->set_revision(j.value("revision", 0)); - - for (const auto& p : j["routes"]) { - if (p.is_object()) { - auto r = n->add_routes(); - r->set_target(p.value("target", "")); - r->set_via(p.value("via", "")); - } - } - - n->set_rules(""); - n->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1)); - - pbmessages::NetworkChange_IPV4AssignMode* v4am = new pbmessages::NetworkChange_IPV4AssignMode(); - if (j["v4AssignMode"].is_object()) { - v4am->set_zt(j["v4AssignMode"].value("zt", false)); - } - n->set_allocated_ipv4_assign_mode(v4am); - - pbmessages::NetworkChange_IPV6AssignMode* v6am = new pbmessages::NetworkChange_IPV6AssignMode(); - if (j["v6AssignMode"].is_object()) { - v6am->set_zt(j["v6AssignMode"].value("zt", false)); - v6am->set_six_plane(j["v6AssignMode"].value("6plane", false)); - v6am->set_rfc4193(j["v6AssignMode"].value("rfc4193", false)); - } - n->set_allocated_ipv6_assign_mode(v6am); - - nlohmann::json jdns = j.value("dns", nullptr); - if (jdns.is_object()) { - pbmessages::NetworkChange_DNS* dns = new pbmessages::NetworkChange_DNS(); - dns->set_domain(jdns.value("domain", "")); - for (const auto& s : jdns["servers"]) { - if (s.is_string()) { - auto server = dns->add_nameservers(); - *server = s; + for (const auto& p : j["ipAssignmentPools"]) { + if (p.is_object()) { + auto pool = n->add_assignment_pools(); + pool->set_start_ip(p.value("ipRangeStart", "")); + pool->set_end_ip(p.value("ipRangeEnd", "")); } } - n->set_allocated_dns(dns); - } - n->set_sso_enabled(j.value("ssoEnabled", false)); - if (j.value("ssoEnabled", false)) { - n->set_sso_provider(j.value("provider", "")); - n->set_sso_client_id(j.value("clientId", "")); - n->set_sso_authorization_endpoint(j.value("authorizationEndpoint", "")); - n->set_sso_issuer(j.value("issuer", "")); - n->set_sso_provider(j.value("provider", "")); - } + n->set_mtu(j.value("mtu", 2800)); + n->set_multicast_limit(j.value("multicastLimit", 32)); + n->set_is_private(j.value("private", true)); + n->set_remote_trace_level(j.value("remoteTraceLevel", 0)); + n->set_remote_trace_target(j.value("remoteTraceTarget", "")); + n->set_revision(j.value("revision", 0)); - n->set_rules_source(""); + for (const auto& p : j["routes"]) { + if (p.is_object()) { + auto r = n->add_routes(); + r->set_target(p.value("target", "")); + r->set_via(p.value("via", "")); + } + } + + n->set_rules(""); + n->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1)); + + pbmessages::NetworkChange_IPV4AssignMode* v4am = new pbmessages::NetworkChange_IPV4AssignMode(); + if (j["v4AssignMode"].is_object()) { + v4am->set_zt(j["v4AssignMode"].value("zt", false)); + } + n->set_allocated_ipv4_assign_mode(v4am); + + pbmessages::NetworkChange_IPV6AssignMode* v6am = new pbmessages::NetworkChange_IPV6AssignMode(); + if (j["v6AssignMode"].is_object()) { + v6am->set_zt(j["v6AssignMode"].value("zt", false)); + v6am->set_six_plane(j["v6AssignMode"].value("6plane", false)); + v6am->set_rfc4193(j["v6AssignMode"].value("rfc4193", false)); + } + n->set_allocated_ipv6_assign_mode(v6am); + + nlohmann::json jdns = j.value("dns", nullptr); + if (jdns.is_object()) { + pbmessages::NetworkChange_DNS* dns = new pbmessages::NetworkChange_DNS(); + dns->set_domain(jdns.value("domain", "")); + for (const auto& s : jdns["servers"]) { + if (s.is_string()) { + auto server = dns->add_nameservers(); + *server = s; + } + } + n->set_allocated_dns(dns); + } + + n->set_sso_enabled(j.value("ssoEnabled", false)); + if (j.value("ssoEnabled", false)) { + n->set_sso_provider(j.value("provider", "")); + n->set_sso_client_id(j.value("clientId", "")); + n->set_sso_authorization_endpoint(j.value("authorizationEndpoint", "")); + n->set_sso_issuer(j.value("issuer", "")); + n->set_sso_provider(j.value("provider", "")); + } + + n->set_rules_source(j.value("rulesSource", "")); + } + catch (const std::exception& e) { + fprintf(stderr, "Exception parsing network JSON: %s\n", e.what()); + delete n; + return nullptr; + } return n; } @@ -240,34 +247,41 @@ pbmessages::MemberChange_Member* memberFromJson(const nlohmann::json& j) } pbmessages::MemberChange_Member* m = new pbmessages::MemberChange_Member(); - m->set_network_id(j.value("networkId", "")); - m->set_device_id(j.value("id", "")); - m->set_identity(j.value("identity", "")); - m->set_authorized(j.value("authorized", false)); - for (const auto& addr : j.value("ipAssignments", nlohmann::json::array())) { - if (addr.is_string()) { - auto a = m->add_ip_assignments(); - *a = addr; + try { + m->set_network_id(j.value("networkId", "")); + m->set_device_id(j.value("id", "")); + m->set_identity(j.value("identity", "")); + m->set_authorized(j.value("authorized", false)); + for (const auto& addr : j.value("ipAssignments", nlohmann::json::array())) { + if (addr.is_string()) { + auto a = m->add_ip_assignments(); + *a = addr; + } } + m->set_active_bridge(j.value("activeBridge", false)); + m->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1)); + m->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1)); + m->set_creation_time(j.value("creationTime", 0)); + m->set_no_auto_assign_ips(j.value("noAutoAssignIps", false)); + m->set_revision(j.value("revision", 0)); + m->set_last_authorized_time(j.value("lastAuthorizedTime", 0)); + m->set_last_deauthorized_time(j.value("lastDeauthorizedTime", 0)); + m->set_last_authorized_credential_type(j.value("lastAuthorizedCredentialType", nullptr)); + m->set_last_authorized_credential(j.value("lastAuthorizedCredential", nullptr)); + m->set_version_major(j.value("versionMajor", 0)); + m->set_version_minor(j.value("versionMinor", 0)); + m->set_version_rev(j.value("versionRev", 0)); + m->set_version_protocol(j.value("versionProtocol", 0)); + m->set_remote_trace_level(j.value("remoteTraceLevel", 0)); + m->set_remote_trace_target(j.value("remoteTraceTarget", "")); + m->set_sso_exempt(j.value("ssoExempt", false)); + m->set_auth_expiry_time(j.value("authExpiryTime", 0)); + } + catch (const std::exception& e) { + fprintf(stderr, "Exception parsing member JSON: %s\n", e.what()); + delete m; + return nullptr; } - m->set_active_bridge(j.value("activeBridge", false)); - m->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1)); - m->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1)); - m->set_creation_time(j.value("creationTime", 0)); - m->set_no_auto_assign_ips(j.value("noAutoAssignIps", false)); - m->set_revision(j.value("revision", 0)); - m->set_last_authorized_time(j.value("lastAuthorizedTime", 0)); - m->set_last_deauthorized_time(j.value("lastDeauthorizedTime", 0)); - m->set_last_authorized_credential_type(j.value("lastAuthorizedCredentialType", nullptr)); - m->set_last_authorized_credential(j.value("lastAuthorizedCredential", nullptr)); - m->set_version_major(j.value("versionMajor", 0)); - m->set_version_minor(j.value("versionMinor", 0)); - m->set_version_rev(j.value("versionRev", 0)); - m->set_version_protocol(j.value("versionProtocol", 0)); - m->set_remote_trace_level(j.value("remoteTraceLevel", 0)); - m->set_remote_trace_target(j.value("remoteTraceTarget", "")); - m->set_sso_exempt(j.value("ssoExempt", false)); - m->set_auth_expiry_time(j.value("authExpiryTime", 0)); return m; }