Content differences:
--- /etc/rsyslog.d/30-output-kafka.conf.orig
+++ /etc/rsyslog.d/30-output-kafka.conf
@@ -13,6 +13,19 @@
# define a template to be used by omkafka dynatopic
template(name="kafka_topic" type="string" string="rsyslog-%syslogseverity-text%")
+
+# Event Platform support (T291645): messages that carry a 'meta.stream' field are
+# Event Platform events (e.g. ECS formatted logs). Produce them to the
+# '<datacenter>.<meta.stream>' topic so they can be ingested into the Data Lake.
+# The datacenter prefix is supplied by puppet; the stream suffix is read from the
+# parsed json message via the '%!meta!stream%' property.
+template(name="event_platform_topic" type="string" string="esams.%!meta!stream%")
+
+# Emit the parsed json message verbatim (with $schema, meta and dt intact) so the
+# event remains valid for Event Platform / Data Lake ingestion.
+template(name="event_platform_json" type="list") {
+ property(name="$!all-json")
+}
# send to kafka if lookup table contains "kafka" for relevant programname
# $.log_outputs defined by lookup table in lookup_output.conf
@@ -42,6 +55,31 @@
# unfortunately rsyslog doesn't allow variables to be used as template
# names, so the kafka action is duplicated here.
if $parsesuccess == "OK" then {
+ # Event Platform events carry a 'meta.stream' field. Produce these only to
+ # the '<datacenter>.<meta.stream>' topic (T291645). Logstash consumes this
+ # topic via an explicit kafka input rather than the 'rsyslog-*' pattern.
+ if ($!meta!stream != "") then {
+ action(type="omkafka"
+ name="omkafka_event_platform"
+ broker=["kafka-logging1001.eqiad.wmnet:9093","kafka-logging1002.eqiad.wmnet:9093","kafka-logging1003.eqiad.wmnet:9093","kafka-logging1004.eqiad.wmnet:9093","kafka-logging1005.eqiad.wmnet:9093"]
+ topic="event_platform_topic"
+ dynatopic="on"
+ dynatopic.cachesize="1000"
+ partitions.auto="on"
+ template="event_platform_json"
+ queue.type="LinkedList" queue.size="10000" queue.filename="output_kafka_event_platform"
+ queue.highWatermark="7000" queue.lowWatermark="6000"
+ queue.checkpointInterval="5"
+ queue.maxDiskSpace="40960000"
+ confParam=[ "security.protocol=ssl",
+ "ssl.ca.location=/etc/ssl/certs/wmf-ca-certificates.crt",
+ "compression.codec=snappy",
+ "socket.timeout.ms=10000",
+ "socket.keepalive.enable=true",
+ "queue.buffering.max.ms=50",
+ "batch.num.messages=1000" ]
+ )
+ } else {
action(type="omkafka"
name="omkafka_syslog_cee"
broker=["kafka-logging1001.eqiad.wmnet:9093","kafka-logging1002.eqiad.wmnet:9093","kafka-logging1003.eqiad.wmnet:9093","kafka-logging1004.eqiad.wmnet:9093","kafka-logging1005.eqiad.wmnet:9093"]
@@ -62,6 +100,7 @@
"queue.buffering.max.ms=50",
"batch.num.messages=1000" ]
)
+ }
} else {
# if ecs_170 in log_outputs, use that template to format
# non-json-formatted syslog events into an ecs-compatible form