Best Cerberus-source code snippet using org.cerberus.util.StringUtil.getExceptionCauseFromString
...152 LOG.debug("Produced Kafka message - topic : " + topic + " key : " + key + " partition : " + partition + " offset : " + offset);153 message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_PRODUCEKAFKA);154 } catch (Exception ex) {155 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);156 message.setDescription(message.getDescription().replace("%EX%", ex.toString() + " " + StringUtil.getExceptionCauseFromString(ex)));157 LOG.debug(ex, ex);158 } finally {159 if (producer != null) {160 producer.flush();161 if (producer != null) {162 try {163 producer.close();164 } catch (Exception e) {165 LOG.error(e, e);166 }167 }168"Closed Producer : " + getKafkaConsumerKey(topic, bootstrapServers));169 } else {170"Producer not opened : " + getKafkaConsumerKey(topic, bootstrapServers));171 }172 }173 serviceREST.setKafkaResponseOffset(offset);174 serviceREST.setKafkaResponsePartition(partition);175 serviceREST.setResponseHTTPBodyContentType(appServiceService.guessContentType(serviceREST, AppService.RESPONSEHTTPBODYCONTENTTYPE_JSON));176 result.setItem(serviceREST);177 message.setDescription(message.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKAPRODUCE));178 message.setDescription(message.getDescription().replace("%TOPIC%", topic));179 message.setDescription(message.getDescription().replace("%PART%", String.valueOf(partition)));180 message.setDescription(message.getDescription().replace("%OFFSET%", String.valueOf(offset)));181 result.setResultMessage(message);182 return result;183 }184 @SuppressWarnings("unchecked")185 @Override186 public AnswerItem<Map<TopicPartition, Long>> seekEvent(String topic, String bootstrapServers,187 List<AppServiceContent> serviceContent, int timeoutMs) {188 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA);189 AnswerItem<Map<TopicPartition, Long>> result = new AnswerItem<>();190 KafkaConsumer consumer = null;191 try {192 Properties props = new Properties();193 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, "Y", 0, "", "", null, "", null));194 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", "Y", 0, "", "", null, "", null));195 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10", "Y", 0, "", "", null, "", null));196 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));197 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));198 // Setting timeout although does not seem to work fine as result on aiven is always 60000 ms.199 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(timeoutMs), "Y", 0, "", "", null, "", null));200 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(timeoutMs), "Y", 0, "", "", null, "", null));201 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, String.valueOf(timeoutMs), "Y", 0, "", "", null, "", null));202 for (AppServiceContent object : serviceContent) {203 if (StringUtil.parseBoolean(object.getActive())) {204 props.put(object.getKey(), object.getValue());205 }206 }207"Open Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));208 consumer = new KafkaConsumer<>(props);209 //Get a list of the topics' partitions210 List<PartitionInfo> partitionList = consumer.partitionsFor(topic);211 if (partitionList == null) {212 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKKAFKA);213 message.setDescription(message.getDescription().replace("%EX%", "Maybe Topic does not exist.").replace("%TOPIC%", topic).replace("%HOSTS%", bootstrapServers));214 } else {215 List<TopicPartition> topicPartitionList = -> new TopicPartition(topic, info.partition())).collect(Collectors.toList());216 //Assign all the partitions to this consumer217 consumer.assign(topicPartitionList);218 consumer.seekToEnd(topicPartitionList); //default to latest offset for all partitions219 HashMap<TopicPartition, Long> valueResult = new HashMap<>();220 Map<TopicPartition, Long> partitionOffset = consumer.endOffsets(topicPartitionList);221 result.setItem(partitionOffset);222 }223 } catch (Exception ex) {224 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKKAFKA);225 message.setDescription(message.getDescription().replace("%EX%", ex.toString() + " " + StringUtil.getExceptionCauseFromString(ex)).replace("%TOPIC%", topic).replace("%HOSTS%", bootstrapServers));226 LOG.debug(ex, ex);227 } finally {228 if (consumer != null) {229 consumer.close();230"Closed Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));231 } else {232"Consumer not opened : " + getKafkaConsumerKey(topic, bootstrapServers));233 }234 }235 result.setResultMessage(message);236 return result;237 }238 @SuppressWarnings("unchecked")239 @Override240 public AnswerItem<String> searchEvent(Map<TopicPartition, Long> mapOffsetPosition, String topic, String bootstrapServers,241 List<AppServiceHeader> serviceHeader, List<AppServiceContent> serviceContent, String filterPath, String filterValue, int targetNbEventsInt, int targetNbSecInt) {242 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);243 AnswerItem<String> result = new AnswerItem<>();244 Instant date1 =;245 JSONArray resultJSON = new JSONArray();246 KafkaConsumer consumer = null;247 int nbFound = 0;248 int nbEvents = 0;249 try {250 Properties props = new Properties();251 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, "Y", 0, "", "", null, "", null));252 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", "Y", 0, "", "", null, "", null));253 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10", "Y", 0, "", "", null, "", null));254 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));255 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));256 for (AppServiceContent object : serviceContent) {257 if (StringUtil.parseBoolean(object.getActive())) {258 props.put(object.getKey(), object.getValue());259 }260 }261"Open Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));262 consumer = new KafkaConsumer<>(props);263 //Get a list of the topics' partitions264 List<PartitionInfo> partitionList = consumer.partitionsFor(topic);265 if (partitionList == null) {266 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);267 message.setDescription(message.getDescription().replace("%EX%", "Maybe Topic does not exist.").replace("%TOPIC%", topic).replace("%HOSTS%", bootstrapServers));268 } else {269 List<TopicPartition> topicPartitionList = -> new TopicPartition(topic, info.partition())).collect(Collectors.toList());270 //Assign all the partitions to this consumer271 consumer.assign(topicPartitionList);272 // Setting each partition to correct Offset.273 for (Map.Entry<TopicPartition, Long> entry : mapOffsetPosition.entrySet()) {274, entry.getValue());275 LOG.debug("Partition : " + entry.getKey().partition() + " set to offset : " + entry.getValue());276 }277 boolean consume = true;278 long timeoutTime =; //default to 30 seconds279 int pollDurationSec = 5;280 if (targetNbSecInt < pollDurationSec) {281 pollDurationSec = targetNbSecInt;282 }283 while (consume) {284 LOG.debug("Start Poll.");285 @SuppressWarnings("unchecked")286 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollDurationSec));287 LOG.debug("End Poll.");288 if ( > timeoutTime) {289 LOG.debug("Timed out searching for record");290 consumer.wakeup(); //exit291 }292 //Now for each record in the batch of records we got from Kafka293 for (ConsumerRecord<String, String> record : records) {294 try {295 LOG.debug("New record " + record.topic() + " " + record.partition() + " " + record.offset());296 LOG.debug(" " + record.key() + " | " + record.value());297 298 // Parsing message.299 JSONObject recordJSON = new JSONObject(record.value());300 301 // Parsing header.302 JSONObject headerJSON = new JSONObject();303 for (Header header : record.headers()) {304 String headerKey = header.key();305 String headerValue = new String(header.value());306 headerJSON.put(headerKey, headerValue);307 }308 nbEvents++;309 boolean match = true;310 if (!StringUtil.isNullOrEmpty(filterPath)) {311 String recordJSONfiltered = "";312 try {313 recordJSONfiltered = jsonService.getStringFromJson(record.value(), filterPath);314 } catch (PathNotFoundException ex) {315 //Catch any exceptions thrown from message processing/testing as they should have already been reported/dealt with316 //but we don't want to trigger the catch block for Kafka consumption317 match = false;318 LOG.debug("Record discarded - Path not found.");319 } catch (Exception ex) {320 LOG.error(ex, ex);321 }322 LOG.debug("Filtered value : " + recordJSONfiltered);323 if (!recordJSONfiltered.equals(filterValue)) {324 match = false;325 LOG.debug("Record discarded - Value different.");326 }327 }328 if (match) {329 JSONObject messageJSON = new JSONObject();330 messageJSON.put("key", record.key());331 messageJSON.put("value", recordJSON);332 messageJSON.put("offset", record.offset());333 messageJSON.put("partition", record.partition());334 messageJSON.put("header", headerJSON);335 resultJSON.put(messageJSON);336 nbFound++;337 if (nbFound >= targetNbEventsInt) {338 consume = false; //exit the consume loop339 consumer.wakeup(); //takes effect on the next poll loop so need to break.340 break; //if we've found a match, stop looping through the current record batch341 }342 }343 } catch (Exception ex) {344 //Catch any exceptions thrown from message processing/testing as they should have already been reported/dealt with345 //but we don't want to trigger the catch block for Kafka consumption346 LOG.error(ex, ex);347 }348 }349 }350 result.setItem(resultJSON.toString());351 Instant date2 =;352 Duration duration = Duration.between(date1, date2);353 message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA)354 .resolveDescription("NBEVENT", String.valueOf(nbFound))355 .resolveDescription("NBTOT", String.valueOf(nbEvents))356 .resolveDescription("NBSEC", String.valueOf(duration.getSeconds()));357 }358 } catch (WakeupException e) {359 result.setItem(resultJSON.toString());360 Instant date2 =;361 Duration duration = Duration.between(date1, date2);362 message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKAPARTIALRESULT)363 .resolveDescription("NBEVENT", String.valueOf(nbFound))364 .resolveDescription("NBTOT", String.valueOf(nbEvents))365 .resolveDescription("NBSEC", String.valueOf(duration.getSeconds()));366 //Ignore367 } catch (NullPointerException ex) {368 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);369 message.setDescription(message.getDescription().replace("%EX%", ex.toString()).replace("%TOPIC%", topic).replace("%HOSTS%", bootstrapServers));370 LOG.error(ex, ex);371 } catch (Exception ex) {372 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);373 message.setDescription(message.getDescription().replace("%EX%", ex.toString() + " " + StringUtil.getExceptionCauseFromString(ex)).replace("%TOPIC%", topic).replace("%HOSTS%", bootstrapServers));374 LOG.debug(ex, ex);375 } finally {376 if (consumer != null) {377"Closed Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));378 consumer.close();379 } else {380"Consumer not opened : " + getKafkaConsumerKey(topic, bootstrapServers));381 }382 }383 result.setItem(resultJSON.toString());384 message.setDescription(message.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKASEARCH).replace("%TOPIC%", topic));385 result.setResultMessage(message);386 return result;387 }...
1import org.cerberus.util.StringUtil;2def cause = StringUtil.getExceptionCauseFromString(exception)3def cause = StringUtil.getExceptionCauseFromString(exception)4def cause = StringUtil.getExceptionCauseFromString(exception)5def cause = StringUtil.getExceptionCauseFromString(exception)6def cause = StringUtil.getExceptionCauseFromString(exception)7def cause = StringUtil.getExceptionCauseFromString(exception)8def cause = StringUtil.getExceptionCauseFromString(exception)9def cause = StringUtil.getExceptionCauseFromString(exception)10def cause = StringUtil.getExceptionCauseFromString(exception)11def cause = StringUtil.getExceptionCauseFromString(exception)12def cause = StringUtil.getExceptionCauseFromString(exception)13def cause = StringUtil.getExceptionCauseFromString(exception)14def cause = StringUtil.getExceptionCauseFromString(exception)
1public class StringUtil {2 public static String getExceptionCauseFromString(Throwable t) {3 String message = t.getMessage();4 if (message == null) {5 message = t.toString();6 }7 return message;8 }9}10public class StringUtil {11 public static String getExceptionCauseFromString(Throwable t) {12 String message = t.getMessage();13 if (message == null) {14 message = t.toString();15 }16 return message;17 }18}19public class StringUtil {20 public static String getExceptionCauseFromString(Throwable t) {21 String message = t.getMessage();22 if (message == null) {23 message = t.toString();24 }25 return message;26 }27}28public class StringUtil {29 public static String getExceptionCauseFromString(Throwable t) {30 String message = t.getMessage();31 if (message == null) {32 message = t.toString();33 }34 return message;35 }36}37public class StringUtil {38 public static String getExceptionCauseFromString(Throwable t) {39 String message = t.getMessage();40 if (message == null) {41 message = t.toString();42 }43 return message;44 }45}46public class StringUtil {47 public static String getExceptionCauseFromString(Throwable t) {48 String message = t.getMessage();49 if (message == null) {50 message = t.toString();51 }52 return message;53 }54}
1 at org.cerberus.servlet.admin.UpdateSystem.doPost( at javax.servlet.http.HttpServlet.service( at javax.servlet.http.HttpServlet.service( at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter( at org.apache.catalina.core.ApplicationFilterChain.doFilter( at org.apache.catalina.core.ApplicationDispatcher.invoke( at org.apache.catalina.core.ApplicationDispatcher.processRequest( at org.apache.catalina.core.ApplicationDispatcher.doForward( at org.apache.catalina.core.ApplicationDispatcher.forward( at org.apache.catalina.core.StandardHostValve.custom( at org.apache.catalina.core.StandardHostValve.status( at org.apache.catalina.core.StandardHostValve.throwable( at org.apache.catalina.core.StandardHostValve.invoke( at org.apache.catalina.valves.ErrorReportValve.invoke( at org.apache.catalina.core.StandardEngineValve.invoke( at org.apache.catalina.connector.CoyoteAdapter.service( at org.apache.coyote.http11.AbstractHttp11Processor.process( at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process( at$ at java.util.concurrent.ThreadPoolExecutor.runWorker( at java.util.concurrent.ThreadPoolExecutor$ at org.apache.tomcat.util.threads.TaskThread$ at at org.cerberus.servlet.admin.UpdateSystem.doPost( ... 20 more";
