@############################################### @# @# EmPy template for generating microRTPS_client.cpp file @# @############################################### @# Start of Template @# @# Context: @# - msgs (List) list of all msg files @# - multi_topics (List) list of all multi-topic names @############################################### @{ import genmsg.msgs import gencpp from px_generate_uorb_topic_helper import * # this is in Tools/ from px_generate_uorb_topic_files import MsgScope # this is in Tools/ from message_id import * # this is in Tools/ topic_names = [single_spec.short_name for single_spec in spec] send_topics = [s.short_name for idx, s in enumerate(spec) if scope[idx] == MsgScope.SEND] recv_topics = [s.short_name for idx, s in enumerate(spec) if scope[idx] == MsgScope.RECEIVE] }@ /**************************************************************************** * * Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. Neither the name of the copyright holder nor the names of its contributors * may be used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * ****************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include "microRTPS_transport.h" @[for topic in list(set(topic_names))]@ #include @[end for]@ #define BUFFER_SIZE 1024 #define UPDATE_TIME_MS 0 #define LOOPS 10000 #define SLEEP_MS 1 #define BAUDRATE 460800 #define DEVICE "/dev/ttyACM0" #define POLL_MS 1 #define DEFAULT_RECV_PORT 2019 #define DEFAULT_SEND_PORT 2020 extern "C" __EXPORT int micrortps_client_main(int argc, char *argv[]); void* send(void *data); struct options { enum class eTransports { UART, UDP }; eTransports transport = options::eTransports::UART; char device[64] = DEVICE; int update_time_ms = UPDATE_TIME_MS; int loops = LOOPS; int sleep_ms = SLEEP_MS; uint32_t baudrate = BAUDRATE; int poll_ms = POLL_MS; uint16_t recv_port = DEFAULT_RECV_PORT; uint16_t send_port = DEFAULT_SEND_PORT; } _options; static int _rtps_task = -1; static bool _should_exit_task = false; Transport_node *transport_node = nullptr; static void usage(const char *name) { PX4_INFO("usage: %s start|stop [options]\n\n" " -t [UART|UDP] Default UART\n" " -d UART device. Default /dev/ttyACM0\n" " -u Time in ms for uORB subscribed topics update. Default 0\n" " -l How many iterations will this program have. -1 for infinite. Default 10000\n" " -w Time in ms for which each iteration sleep. Default 1ms\n" " -b UART device baudrate. Default 460800\n" " -p Time in ms to poll over UART. Default 1ms\n" " -r UDP port for receiving. Default 2019\n" " -s UDP port for sending. Default 2020\n", name); } static int parse_options(int argc, char *argv[]) { int ch; int myoptind = 1; const char *myoptarg = nullptr; while ((ch = px4_getopt(argc, argv, "t:d:u:l:w:b:p:r:s:", &myoptind, &myoptarg)) != EOF) { switch (ch) { case 't': _options.transport = strcmp(myoptarg, "UDP") == 0? options::eTransports::UDP :options::eTransports::UART; break; case 'd': if (nullptr != myoptarg) strcpy(_options.device, myoptarg); break; case 'u': _options.update_time_ms = strtol(myoptarg, nullptr, 10); break; case 'l': _options.loops = strtol(myoptarg, nullptr, 10); break; case 'w': _options.sleep_ms = strtol(myoptarg, nullptr, 10); break; case 'b': _options.baudrate = strtoul(optarg, nullptr, 10); break; case 'p': _options.poll_ms = strtol(optarg, nullptr, 10); break; case 'r': _options.recv_port = strtoul(optarg, nullptr, 10); break; case 's': _options.send_port = strtoul(optarg, nullptr, 10); break; default: usage(argv[1]); return -1; } } return 0; } @[if send_topics]@ void* send(void*) { char data_buffer[BUFFER_SIZE] = {}; uint64_t sent = 0, total_sent = 0; int loop = 0, read = 0; uint32_t length = 0; /* subscribe to topics */ int fds[@(len(send_topics))] = {}; // orb_set_interval statblish an update interval period in milliseconds. @[for idx, topic in enumerate(send_topics)]@ fds[@(idx)] = orb_subscribe(ORB_ID(@(topic))); orb_set_interval(fds[@(idx)], _options.update_time_ms); @[end for]@ // microBuffer to serialized using the user defined buffer struct microBuffer microBufferWriter; initStaticAlignedBuffer(data_buffer, BUFFER_SIZE, µBufferWriter); // microCDR structs for managing the microBuffer struct microCDR microCDRWriter; initMicroCDR(µCDRWriter, µBufferWriter); struct timespec begin; clock_gettime(0, &begin); while (!_should_exit_task) { bool updated; @[for idx, topic in enumerate(send_topics)]@ orb_check(fds[@(idx)], &updated); if (updated) { // obtained data for the file descriptor struct @(topic)_s data = {}; // copy raw data into local buffer orb_copy(ORB_ID(@(topic)), fds[@(idx)], &data); serialize_@(topic)(&data, data_buffer, &length, µCDRWriter); if (0 < (read = transport_node->write((char)@(message_id(topic)), data_buffer, length))) { total_sent += read; ++sent; } } @[end for]@ usleep(_options.sleep_ms*1000); ++loop; } struct timespec end; clock_gettime(0, &end); double elapsed_secs = double(end.tv_sec - begin.tv_sec) + double(end.tv_nsec - begin.tv_nsec)/double(1000000000); printf("\nSENT: %lu messages in %d LOOPS, %lu bytes in %.03f seconds - %.02fKB/s\n", sent, loop, total_sent, elapsed_secs, (double)total_sent/(1000*elapsed_secs)); return nullptr; } static int launch_send_thread(pthread_t &sender_thread) { pthread_attr_t sender_thread_attr; pthread_attr_init(&sender_thread_attr); pthread_attr_setstacksize(&sender_thread_attr, PX4_STACK_ADJUSTED(4000)); struct sched_param param; (void)pthread_attr_getschedparam(&sender_thread_attr, ¶m); param.sched_priority = SCHED_PRIORITY_DEFAULT; (void)pthread_attr_setschedparam(&sender_thread_attr, ¶m); pthread_create(&sender_thread, &sender_thread_attr, send, nullptr); pthread_attr_destroy(&sender_thread_attr); return 0; } @[end if]@ static int micrortps_start(int argc, char *argv[]) { if (0 > parse_options(argc, argv)) { printf("EXITING...\n"); _rtps_task = -1; return -1; } switch (_options.transport) { case options::eTransports::UART: { transport_node = new UART_node(_options.device, _options.baudrate, _options.poll_ms); printf("\nUART transport: device: %s; baudrate: %d; sleep: %dms; poll: %dms\n\n", _options.device, _options.baudrate, _options.sleep_ms, _options.poll_ms); } break; case options::eTransports::UDP: { transport_node = new UDP_node(_options.recv_port, _options.send_port); printf("\nUDP transport: recv port: %u; send port: %u; sleep: %dms\n\n", _options.recv_port, _options.send_port, _options.sleep_ms); } break; default: _rtps_task = -1; printf("EXITING...\n"); return -1; } if (0 > transport_node->init()) { printf("EXITING...\n"); _rtps_task = -1; return -1; } sleep(1); @[if recv_topics]@ char data_buffer[BUFFER_SIZE] = {}; int read = 0; uint8_t topic_ID = 255; // Declare received topics @[for topic in recv_topics]@ struct @(topic)_s @(topic)_data; orb_advert_t @(topic)_pub = nullptr; @[end for]@ // microBuffer to deserialized using the user defined buffer struct microBuffer microBufferReader; initDeserializedAlignedBuffer(data_buffer, BUFFER_SIZE, µBufferReader); // microCDR structs for managing the microBuffer struct microCDR microCDRReader; initMicroCDR(µCDRReader, µBufferReader); @[end if]@ int total_read = 0; uint32_t received = 0; struct timespec begin; clock_gettime(0, &begin); int loop = 0; _should_exit_task = false; @[if send_topics]@ // create a thread for sending data to the simulator pthread_t sender_thread; launch_send_thread(sender_thread); @[end if]@ while (!_should_exit_task) { @[if recv_topics]@ while (0 < (read = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) { total_read += read; switch (topic_ID) { @[for topic in recv_topics]@ case @(message_id(topic)): { deserialize_@(topic)(&@(topic)_data, data_buffer, µCDRReader); if (!@(topic)_pub) @(topic)_pub = orb_advertise(ORB_ID(@(topic)), &@(topic)_data); else orb_publish(ORB_ID(@(topic)), @(topic)_pub, &@(topic)_data); ++received; } break; @[end for]@ default: printf("Unexpected topic ID\n"); break; } } @[end if]@ // loop forever if informed loop number is negative if (_options.loops > 0 && loop >= _options.loops) break; usleep(_options.sleep_ms*1000); ++loop; } @[if send_topics]@ _should_exit_task = true; pthread_join(sender_thread, 0); @[end if]@ struct timespec end; clock_gettime(0, &end); double elapsed_secs = double(end.tv_sec - begin.tv_sec) + double(end.tv_nsec - begin.tv_nsec)/double(1000000000); printf("RECEIVED: %lu messages in %d LOOPS, %d bytes in %.03f seconds - %.02fKB/s\n\n", (unsigned long)received, loop, total_read, elapsed_secs, (double)total_read/(1000*elapsed_secs)); delete transport_node; transport_node = nullptr; PX4_INFO("exiting"); fflush(stdout); _rtps_task = -1; return 0; } int micrortps_client_main(int argc, char *argv[]) { if (argc < 2) { usage(argv[0]); return -1; } if (!strcmp(argv[1], "start")) { if (_rtps_task != -1) { PX4_INFO("Already running"); return -1; } _rtps_task = px4_task_spawn_cmd("rtps", SCHED_DEFAULT, SCHED_PRIORITY_DEFAULT, 4096, (px4_main_t) micrortps_start, (char *const *)argv); if (_rtps_task < 0) { PX4_WARN("Could not start task"); _rtps_task = -1; return -1; } return 0; } if (!strcmp(argv[1], "stop")) { if (_rtps_task == -1) { PX4_INFO("Not running"); return -1; } _should_exit_task = true; if (nullptr != transport_node) transport_node->close(); return 0; } usage(argv[0]); return -1; }