Best Cerberus-source code snippet using org.cerberus.crud.factory.impl.FactoryAppServiceContent.create
Source:KafkaService.java
...107 String bootstrapServers,108 List<AppServiceHeader> serviceHeader, List<AppServiceContent> serviceContent, String token, int timeoutMs) {109 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);;110 AnswerItem<AppService> result = new AnswerItem<>();111 AppService serviceREST = factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKAPRODUCE, "", "", "", "", "", "", "", "", "", "", "",112 "", null, "", null, null);113 // If token is defined, we add 'cerberus-token' on the http header.114 if (!StringUtil.isNullOrEmpty(token)) {115 serviceHeader.add(factoryAppServiceHeader.create(null, "cerberus-token", token, "Y", 0, "", "", null, "", null));116 }117 Properties props = new Properties();118 serviceContent.add(factoryAppServiceContent.create(null, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, "Y", 0, "", "", null, "", null));119 serviceContent.add(factoryAppServiceContent.create(null, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true", "Y", 0, "", "", null, "", null));120 serviceContent.add(factoryAppServiceContent.create(null, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));121 serviceContent.add(factoryAppServiceContent.create(null, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer", "Y", 0, "", "", null, "", null));122 // Setting timeout although does not seem to work fine as result on aiven is always 60000 ms.123 serviceContent.add(factoryAppServiceContent.create(null, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(timeoutMs), "Y", 0, "", "", null, "", null));124 serviceContent.add(factoryAppServiceContent.create(null, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, String.valueOf(timeoutMs), "Y", 0, "", "", null, "", null));125 for (AppServiceContent object : serviceContent) {126 if (StringUtil.parseBoolean(object.getActive())) {127 props.put(object.getKey(), object.getValue());128 }129 }130 serviceREST.setServicePath(bootstrapServers);131 serviceREST.setKafkaTopic(topic);132 serviceREST.setKafkaKey(key);133 serviceREST.setServiceRequest(eventMessage);134 serviceREST.setHeaderList(serviceHeader);135 serviceREST.setContentList(serviceContent);136 int partition = -1;137 long offset = -1;138 KafkaProducer<String, String> producer = null;139 try {140 LOG.info("Open Producer : " + getKafkaConsumerKey(topic, bootstrapServers));141 producer = new KafkaProducer<>(props);142 ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, eventMessage);143 for (AppServiceHeader object : serviceHeader) {144 if (StringUtil.parseBoolean(object.getActive())) {145 record.headers().add(new RecordHeader(object.getKey(), object.getValue().getBytes()));146 }147 }148 LOG.debug("Producing Kafka message - topic : " + topic + " key : " + key + " message : " + eventMessage);149 RecordMetadata metadata = producer.send(record).get(); //Wait for a responses150 partition = metadata.partition();151 offset = metadata.offset();152 LOG.debug("Produced Kafka message - topic : " + topic + " key : " + key + " partition : " + partition + " offset : " + offset);153 message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_PRODUCEKAFKA);154 } catch (Exception ex) {155 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_PRODUCEKAFKA);156 message.setDescription(message.getDescription().replace("%EX%", ex.toString() + " " + StringUtil.getExceptionCauseFromString(ex)));157 LOG.debug(ex, ex);158 } finally {159 if (producer != null) {160 producer.flush();161 if (producer != null) {162 try {163 producer.close();164 } catch (Exception e) {165 LOG.error(e, e);166 }167 }168 LOG.info("Closed Producer : " + getKafkaConsumerKey(topic, bootstrapServers));169 } else {170 LOG.info("Producer not opened : " + getKafkaConsumerKey(topic, bootstrapServers));171 }172 }173 serviceREST.setKafkaResponseOffset(offset);174 serviceREST.setKafkaResponsePartition(partition);175 serviceREST.setResponseHTTPBodyContentType(appServiceService.guessContentType(serviceREST, AppService.RESPONSEHTTPBODYCONTENTTYPE_JSON));176 result.setItem(serviceREST);177 message.setDescription(message.getDescription().replace("%SERVICEMETHOD%", AppService.METHOD_KAFKAPRODUCE));178 message.setDescription(message.getDescription().replace("%TOPIC%", topic));179 message.setDescription(message.getDescription().replace("%PART%", String.valueOf(partition)));180 message.setDescription(message.getDescription().replace("%OFFSET%", String.valueOf(offset)));181 result.setResultMessage(message);182 return result;183 }184 @SuppressWarnings("unchecked")185 @Override186 public AnswerItem<Map<TopicPartition, Long>> seekEvent(String topic, String bootstrapServers,187 List<AppServiceContent> serviceContent, int timeoutMs) {188 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_SUCCESS_CALLSERVICE_SEARCHKAFKA);189 AnswerItem<Map<TopicPartition, Long>> result = new AnswerItem<>();190 KafkaConsumer consumer = null;191 try {192 Properties props = new Properties();193 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, "Y", 0, "", "", null, "", null));194 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", "Y", 0, "", "", null, "", null));195 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10", "Y", 0, "", "", null, "", null));196 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));197 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));198 // Setting timeout although does not seem to work fine as result on aiven is always 60000 ms.199 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(timeoutMs), "Y", 0, "", "", null, "", null));200 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(timeoutMs), "Y", 0, "", "", null, "", null));201 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, String.valueOf(timeoutMs), "Y", 0, "", "", null, "", null));202 for (AppServiceContent object : serviceContent) {203 if (StringUtil.parseBoolean(object.getActive())) {204 props.put(object.getKey(), object.getValue());205 }206 }207 LOG.info("Open Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));208 consumer = new KafkaConsumer<>(props);209 //Get a list of the topics' partitions210 List<PartitionInfo> partitionList = consumer.partitionsFor(topic);211 if (partitionList == null) {212 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKKAFKA);213 message.setDescription(message.getDescription().replace("%EX%", "Maybe Topic does not exist.").replace("%TOPIC%", topic).replace("%HOSTS%", bootstrapServers));214 } else {215 List<TopicPartition> topicPartitionList = partitionList.stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toList());216 //Assign all the partitions to this consumer217 consumer.assign(topicPartitionList);218 consumer.seekToEnd(topicPartitionList); //default to latest offset for all partitions219 HashMap<TopicPartition, Long> valueResult = new HashMap<>();220 Map<TopicPartition, Long> partitionOffset = consumer.endOffsets(topicPartitionList);221 result.setItem(partitionOffset);222 }223 } catch (Exception ex) {224 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEEKKAFKA);225 message.setDescription(message.getDescription().replace("%EX%", ex.toString() + " " + StringUtil.getExceptionCauseFromString(ex)).replace("%TOPIC%", topic).replace("%HOSTS%", bootstrapServers));226 LOG.debug(ex, ex);227 } finally {228 if (consumer != null) {229 consumer.close();230 LOG.info("Closed Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));231 } else {232 LOG.info("Consumer not opened : " + getKafkaConsumerKey(topic, bootstrapServers));233 }234 }235 result.setResultMessage(message);236 return result;237 }238 @SuppressWarnings("unchecked")239 @Override240 public AnswerItem<String> searchEvent(Map<TopicPartition, Long> mapOffsetPosition, String topic, String bootstrapServers,241 List<AppServiceHeader> serviceHeader, List<AppServiceContent> serviceContent, String filterPath, String filterValue, int targetNbEventsInt, int targetNbSecInt) {242 MessageEvent message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);243 AnswerItem<String> result = new AnswerItem<>();244 AppService serviceREST = factoryAppService.create("", AppService.TYPE_KAFKA, AppService.METHOD_KAFKASEARCH, "", "", "", "", "", "", "", "", "", "", "", "", null, "", null, null);245 Instant date1 = Instant.now();246 JSONArray resultJSON = new JSONArray();247 KafkaConsumer consumer = null;248 int nbFound = 0;249 int nbEvents = 0;250 try {251 Properties props = new Properties();252 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, "Y", 0, "", "", null, "", null));253 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", "Y", 0, "", "", null, "", null));254 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10", "Y", 0, "", "", null, "", null));255 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));256 serviceContent.add(factoryAppServiceContent.create(null, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", "Y", 0, "", "", null, "", null));257 for (AppServiceContent object : serviceContent) {258 if (StringUtil.parseBoolean(object.getActive())) {259 props.put(object.getKey(), object.getValue());260 }261 }262 LOG.info("Open Consumer : " + getKafkaConsumerKey(topic, bootstrapServers));263 consumer = new KafkaConsumer<>(props);264 //Get a list of the topics' partitions265 List<PartitionInfo> partitionList = consumer.partitionsFor(topic);266 if (partitionList == null) {267 message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE_SEARCHKAFKA);268 message.setDescription(message.getDescription().replace("%EX%", "Maybe Topic does not exist.").replace("%TOPIC%", topic).replace("%HOSTS%", bootstrapServers));269 } else {270 List<TopicPartition> topicPartitionList = partitionList.stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toList());...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!